Files

440 lines
13 KiB
Python
Raw Permalink Normal View History

2026-04-16 15:44:32 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import time
import argparse
import threading
import subprocess
import shutil
import logging
from datetime import datetime
import queue
import re
from prometheus_client import start_http_server, Counter, Gauge, Summary, Histogram
from flask import Flask, request, jsonify
# —— 常量 & 路径 —— #
BASE = os.getcwd()
INPUT_ROOT = os.path.join(BASE, "input")
OUTPUT_ROOT = os.path.join(BASE, "output")
EMPTY_DIR = os.path.join(BASE, "empty")
LOG_DIR = os.path.join(BASE, "logs")
COS_BUCKET = "mb_raw_rosbag_decode_dirs"
DOCKER_IMAGE = (
"artifact.swfcn.i.mercedes-benz.com/swfcn_docker/perception-3d/mmtbag_decoder:v6.6"
)
DOCKER_CMD_TEMPLATE = [
"docker",
"run",
"--rm",
"-v",
"{in_dir}:/input",
"-v",
"{out_dir}:/output",
DOCKER_IMAGE,
"bash",
"-c",
"source /opt/ros/noetic/setup.bash && "
"/opt/perception-3d/scripts/tools/"
"mmt_bag_decoder_scripts/decoded-bag.sh /input /output 3 1",
]
BATCH_SIZE = 50
MAX_LOCAL = 100
MAX_RETRIES = 3
RETRY_DELAY_S = 2
METRICS_PORT = 8000
SENTINEL = (None, None)
# —— 日志配置 —— #
os.makedirs(LOG_DIR, exist_ok=True)
logger = logging.getLogger("pipeline")
logger.setLevel(logging.INFO)
h_info = logging.FileHandler(os.path.join(LOG_DIR, "pipeline.log"), encoding="utf-8")
h_err = logging.FileHandler(os.path.join(LOG_DIR, "error_tasks.log"), encoding="utf-8")
fmt = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
h_info.setFormatter(fmt)
h_err.setFormatter(fmt)
h_err.setLevel(logging.ERROR)
logger.addHandler(h_info)
logger.addHandler(h_err)
# —— Prometheus 指标 —— #
DL_TOTAL = Counter("pipeline_download_total", "下载尝试总数")
DL_FAIL = Counter("pipeline_download_failures", "下载失败总数")
DL_RETRY = Counter("pipeline_download_retries", "下载重试总数")
PR_TOTAL = Counter("pipeline_process_total", "处理尝试总数")
PR_FAIL = Counter("pipeline_process_failures", "处理失败总数")
PR_RETRY = Counter("pipeline_process_retries", "处理重试总数")
UP_TOTAL = Counter("pipeline_upload_total", "上传尝试总数")
UP_FAIL = Counter("pipeline_upload_failures", "上传失败总数")
UP_RETRY = Counter("pipeline_upload_retries", "上传重试总数")
DL_DUR = Summary("pipeline_download_duration_seconds", "单批下载耗时秒")
PR_DUR = Summary("pipeline_process_duration_seconds", "单批处理耗时秒")
UP_DUR = Summary("pipeline_upload_duration_seconds", "单批上传耗时秒")
BATCH_SIZE_HIST = Histogram(
"pipeline_batch_size",
"单批任务中文件数量分布",
buckets=[1, 10, 20, 50, 100, 200, 500],
)
FILE_DL_DUR = Histogram("pipeline_file_download_duration_seconds", "单文件下载耗时分布")
BATCH_OUT_FILES = Gauge("pipeline_batch_output_file_count", "单批处理后输出文件数")
Q_BATCH = Gauge("pipeline_queue_batches", "待下载批次数")
Q_PROC = Gauge("pipeline_queue_processing", "待处理批次数")
Q_UP = Gauge("pipeline_queue_uploading", "待上传批次数")
LOCAL_FILES = Gauge("pipeline_local_file_count", "本地 input 文件总数")
# —— 全局队列 & inflight 计数 —— #
batch_q = queue.Queue()
proc_q = queue.Queue()
up_q = queue.Queue()
inflight = 0
inflight_lock = threading.Lock()
# —— 辅助函数 —— #
def count_local_files():
return sum(len(files) for _, _, files in os.walk(INPUT_ROOT))
def run(cmd, timeout=None):
logger.info("CMD: %s", " ".join(cmd))
try:
p = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
)
start = time.time()
out_lines = []
timed_out = False
for line in p.stdout:
out_lines.append(line)
if timeout and (time.time() - start) > timeout:
p.kill()
timed_out = True
break
code = p.wait()
return code, timed_out, "".join(out_lines)
except Exception:
logger.exception("CMD 执行异常")
return -1, False, ""
def with_retry(tag, fn, *args):
for i in range(1, MAX_RETRIES + 1):
code, timed_out, _ = fn(*args)
if code == 0:
return True
if timed_out:
logger.error("%s 阶段超时,不再重试", tag)
break
# 计数重试
if tag.startswith("DL["):
DL_RETRY.inc()
if tag.startswith("PR["):
PR_RETRY.inc()
if tag.startswith("UP["):
UP_RETRY.inc()
logger.warning("%s 重试 %d/%d", tag, i, MAX_RETRIES)
time.sleep(RETRY_DELAY_S)
logger.error("%s 最终失败", tag)
return False
# —— 下载 —— #
@DL_DUR.time()
def do_download(batch_id, paths, batch_timeout):
if batch_id is None:
proc_q.put(SENTINEL)
return
DL_TOTAL.inc()
start = time.time()
in_dir = os.path.join(INPUT_ROOT, batch_id)
os.makedirs(in_dir, exist_ok=True)
# 限制本地文件数
while count_local_files() >= MAX_LOCAL:
logger.warning("本地文件过多暂停下载5分钟")
time.sleep(300)
for p in paths:
if time.time() - start > batch_timeout:
logger.error("DL[%s] 下载阶段超时,跳过剩余", batch_id)
DL_FAIL.inc()
break
dst = os.path.join(in_dir, os.path.basename(p))
f_start = time.time()
ok = with_retry(
f"DL[{batch_id}]",
lambda s, d: run(
["coscmd", "-s", "download", s, d],
timeout=batch_timeout - (time.time() - start),
),
p,
dst,
)
FILE_DL_DUR.observe(time.time() - f_start)
if not ok:
DL_FAIL.inc()
proc_q.put((batch_id, in_dir))
# —— 处理 —— #
@PR_DUR.time()
def do_process(batch_id, in_dir, batch_timeout):
if batch_id is None:
up_q.put(SENTINEL)
return
PR_TOTAL.inc()
start = time.time()
out_dir = os.path.join(OUTPUT_ROOT, batch_id)
os.makedirs(out_dir, exist_ok=True)
cmd = [c.format(in_dir=in_dir, out_dir=out_dir) for c in DOCKER_CMD_TEMPLATE]
def run_pr(command):
p = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
)
for line in p.stdout:
logger.info("[PR %s] %s", batch_id, line.rstrip())
if time.time() - start > batch_timeout:
p.kill()
return p.wait(), True, ""
return p.wait(), False, ""
ok = with_retry(f"PR[{batch_id}]", run_pr, cmd)
if not ok:
PR_FAIL.inc()
# 统计输出文件
files = []
for r, _, fs in os.walk(out_dir):
for fn in fs:
files.append(os.path.relpath(os.path.join(r, fn), out_dir))
BATCH_OUT_FILES.set(len(files))
shutil.rmtree(in_dir, ignore_errors=True)
up_q.put((batch_id, out_dir))
# —— 上传 —— #
@UP_DUR.time()
def do_upload(batch_id, out_dir, batch_timeout):
global inflight
if batch_id is None:
return
try:
UP_TOTAL.inc()
ok = with_retry(
f"UP[{batch_id}]",
lambda d: run(
["coscmd", "-s", "upload", "-r", d, COS_BUCKET], timeout=batch_timeout
),
out_dir,
)
if not ok:
UP_FAIL.inc()
return
# 删除目录结构
for cmd in [
["sudo", "rsync", "-av", "--delete", f"{EMPTY_DIR}/", f"{out_dir}/"],
["sudo", "rm", "-rf", out_dir],
]:
run(cmd, timeout=60)
logger.info("UP[%s] 完成", batch_id)
finally:
# 无论成功失败任务算完成inflight-1
with inflight_lock:
inflight -= 1
# —— Worker 模板 —— #
def worker(q, fn, timeout):
while True:
bid, data = q.get()
fn(bid, data, timeout)
q.task_done()
if bid is None:
break
# —— Service HTTP —— #
app = Flask(__name__)
@app.route("/ready", methods=["GET"])
def api_ready():
with inflight_lock:
busy = inflight > 0
return jsonify(ready=not busy)
@app.route("/notify", methods=["POST"])
def api_notify():
global inflight
data = request.get_json(force=True)
if not isinstance(data, list):
return jsonify(error="Expect JSON list"), 400
# 兼容 bag-checker只发 name 时补前缀
paths = []
for item in data:
if not isinstance(item, str):
continue
if item.startswith("mb_cuct_data_collection/"):
paths.append(item)
else:
paths.append("mb_cuct_data_collection/" + item)
TIME_RE = re.compile(r"_(\d{8})-(\d{6})_") # 匹配 20230803-160828
def extract_ts(p: str) -> datetime:
m = TIME_RE.search(os.path.basename(p))
if not m:
return datetime.min # 无法解析的放最后
date_part, time_part = m.groups()
ts_str = f"{date_part}{time_part}"
return datetime.strptime(ts_str, "%Y%m%d%H%M%S")
paths.sort(key=extract_ts, reverse=True)
with inflight_lock:
inflight += 1
for idx in range(0, len(paths), BATCH_SIZE):
blk = paths[idx : idx + BATCH_SIZE]
BATCH_SIZE_HIST.observe(len(blk))
bid = datetime.utcnow().strftime("%Y%m%dT%H%M%SZ") + f"_{idx // BATCH_SIZE + 1}"
batch_q.put((bid, blk))
batch_q.put(SENTINEL)
# batch_q.put((bid, paths))
return jsonify(status="accepted", batch_size=BATCH_SIZE), 202
def start_metric_updater():
def loop():
while True:
Q_BATCH.set(batch_q.qsize())
Q_PROC.set(proc_q.qsize())
Q_UP.set(up_q.qsize())
LOCAL_FILES.set(count_local_files())
time.sleep(1)
t = threading.Thread(target=loop, daemon=True)
t.start()
# —— 两种模式的入口 —— #
def file_mode(args):
# 读 tasks-file分批入队放入 sentinel然后启动处理
lines = [
line.strip() for line in open(args.tasks_file, encoding="utf-8") if line.strip()
]
for idx in range(0, len(lines), args.batch_size):
blk = lines[idx : idx + args.batch_size]
BATCH_SIZE_HIST.observe(len(blk))
bid = (
datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
+ f"_{idx // args.batch_size + 1}"
)
batch_q.put((bid, blk))
batch_q.put(SENTINEL)
start_http_server(METRICS_PORT)
logger.info("Metrics HTTP 启动,端口 %d", METRICS_PORT)
threads = [
threading.Thread(
target=worker, args=(batch_q, do_download, args.batch_timeout)
),
threading.Thread(target=worker, args=(proc_q, do_process, args.batch_timeout)),
threading.Thread(target=worker, args=(up_q, do_upload, args.batch_timeout)),
]
for t in threads:
t.start()
# 更新指标 & 等待完成
while batch_q.unfinished_tasks or proc_q.unfinished_tasks or up_q.unfinished_tasks:
Q_BATCH.set(batch_q.qsize())
Q_PROC.set(proc_q.qsize())
Q_UP.set(up_q.qsize())
LOCAL_FILES.set(count_local_files())
time.sleep(1)
for t in threads:
t.join()
logger.info("文件模式处理完成,退出。")
sys.exit(0)
def service_mode(args):
# 确保目录存在
for d in (INPUT_ROOT, OUTPUT_ROOT, EMPTY_DIR, LOG_DIR):
os.makedirs(d, exist_ok=True)
# 启动 Prometheus 和指标更新
start_http_server(METRICS_PORT)
logger.info("Metrics HTTP 启动,端口 %d", METRICS_PORT)
start_metric_updater()
# 启动后台 worker
threads = [
threading.Thread(
target=worker, args=(batch_q, do_download, args.batch_timeout), daemon=True
),
threading.Thread(
target=worker, args=(proc_q, do_process, args.batch_timeout), daemon=True
),
threading.Thread(
target=worker, args=(up_q, do_upload, args.batch_timeout), daemon=True
),
]
for t in threads:
t.start()
# 启动 Flask
logger.info("Decode Service 启动 HTTP on %s:%d", args.host, args.port)
app.run(host=args.host, port=args.port, threaded=True)
def main():
p = argparse.ArgumentParser()
sub = p.add_subparsers(dest="mode", required=True)
f = sub.add_parser("file", help="文件模式:--tasks-file")
f.add_argument("--tasks-file", required=True)
f.add_argument("--batch-size", type=int, default=BATCH_SIZE)
f.add_argument("--batch-timeout", type=int, default=3600)
s = sub.add_parser("service", help="服务模式:启动 HTTP ready/notify")
s.add_argument("--batch-timeout", type=int, default=3600)
s.add_argument("--host", default="0.0.0.0")
s.add_argument("--port", type=int, default=5000)
args = p.parse_args()
if args.mode == "file":
file_mode(args)
else:
service_mode(args)
if __name__ == "__main__":
main()