Files
fst_data_pipeline-feature-e…/fst_data_pipeline/pipelines/tencent/decoder.py
2026-04-16 16:31:48 +08:00

440 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import time
import argparse
import threading
import subprocess
import shutil
import logging
from datetime import datetime
import queue
import re
from prometheus_client import start_http_server, Counter, Gauge, Summary, Histogram
from flask import Flask, request, jsonify
# —— 常量 & 路径 —— #
BASE = os.getcwd()
INPUT_ROOT = os.path.join(BASE, "input")
OUTPUT_ROOT = os.path.join(BASE, "output")
EMPTY_DIR = os.path.join(BASE, "empty")
LOG_DIR = os.path.join(BASE, "logs")
COS_BUCKET = "mb_raw_rosbag_decode_dirs"
DOCKER_IMAGE = (
"artifact.swfcn.i.mercedes-benz.com/swfcn_docker/perception-3d/mmtbag_decoder:v6.6"
)
DOCKER_CMD_TEMPLATE = [
"docker",
"run",
"--rm",
"-v",
"{in_dir}:/input",
"-v",
"{out_dir}:/output",
DOCKER_IMAGE,
"bash",
"-c",
"source /opt/ros/noetic/setup.bash && "
"/opt/perception-3d/scripts/tools/"
"mmt_bag_decoder_scripts/decoded-bag.sh /input /output 3 1",
]
BATCH_SIZE = 50
MAX_LOCAL = 100
MAX_RETRIES = 3
RETRY_DELAY_S = 2
METRICS_PORT = 8000
SENTINEL = (None, None)
# —— 日志配置 —— #
os.makedirs(LOG_DIR, exist_ok=True)
logger = logging.getLogger("pipeline")
logger.setLevel(logging.INFO)
h_info = logging.FileHandler(os.path.join(LOG_DIR, "pipeline.log"), encoding="utf-8")
h_err = logging.FileHandler(os.path.join(LOG_DIR, "error_tasks.log"), encoding="utf-8")
fmt = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
h_info.setFormatter(fmt)
h_err.setFormatter(fmt)
h_err.setLevel(logging.ERROR)
logger.addHandler(h_info)
logger.addHandler(h_err)
# —— Prometheus 指标 —— #
DL_TOTAL = Counter("pipeline_download_total", "下载尝试总数")
DL_FAIL = Counter("pipeline_download_failures", "下载失败总数")
DL_RETRY = Counter("pipeline_download_retries", "下载重试总数")
PR_TOTAL = Counter("pipeline_process_total", "处理尝试总数")
PR_FAIL = Counter("pipeline_process_failures", "处理失败总数")
PR_RETRY = Counter("pipeline_process_retries", "处理重试总数")
UP_TOTAL = Counter("pipeline_upload_total", "上传尝试总数")
UP_FAIL = Counter("pipeline_upload_failures", "上传失败总数")
UP_RETRY = Counter("pipeline_upload_retries", "上传重试总数")
DL_DUR = Summary("pipeline_download_duration_seconds", "单批下载耗时秒")
PR_DUR = Summary("pipeline_process_duration_seconds", "单批处理耗时秒")
UP_DUR = Summary("pipeline_upload_duration_seconds", "单批上传耗时秒")
BATCH_SIZE_HIST = Histogram(
"pipeline_batch_size",
"单批任务中文件数量分布",
buckets=[1, 10, 20, 50, 100, 200, 500],
)
FILE_DL_DUR = Histogram("pipeline_file_download_duration_seconds", "单文件下载耗时分布")
BATCH_OUT_FILES = Gauge("pipeline_batch_output_file_count", "单批处理后输出文件数")
Q_BATCH = Gauge("pipeline_queue_batches", "待下载批次数")
Q_PROC = Gauge("pipeline_queue_processing", "待处理批次数")
Q_UP = Gauge("pipeline_queue_uploading", "待上传批次数")
LOCAL_FILES = Gauge("pipeline_local_file_count", "本地 input 文件总数")
# —— 全局队列 & inflight 计数 —— #
batch_q = queue.Queue()
proc_q = queue.Queue()
up_q = queue.Queue()
inflight = 0
inflight_lock = threading.Lock()
# —— 辅助函数 —— #
def count_local_files():
return sum(len(files) for _, _, files in os.walk(INPUT_ROOT))
def run(cmd, timeout=None):
logger.info("CMD: %s", " ".join(cmd))
try:
p = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
)
start = time.time()
out_lines = []
timed_out = False
for line in p.stdout:
out_lines.append(line)
if timeout and (time.time() - start) > timeout:
p.kill()
timed_out = True
break
code = p.wait()
return code, timed_out, "".join(out_lines)
except Exception:
logger.exception("CMD 执行异常")
return -1, False, ""
def with_retry(tag, fn, *args):
for i in range(1, MAX_RETRIES + 1):
code, timed_out, _ = fn(*args)
if code == 0:
return True
if timed_out:
logger.error("%s 阶段超时,不再重试", tag)
break
# 计数重试
if tag.startswith("DL["):
DL_RETRY.inc()
if tag.startswith("PR["):
PR_RETRY.inc()
if tag.startswith("UP["):
UP_RETRY.inc()
logger.warning("%s 重试 %d/%d", tag, i, MAX_RETRIES)
time.sleep(RETRY_DELAY_S)
logger.error("%s 最终失败", tag)
return False
# —— 下载 —— #
@DL_DUR.time()
def do_download(batch_id, paths, batch_timeout):
if batch_id is None:
proc_q.put(SENTINEL)
return
DL_TOTAL.inc()
start = time.time()
in_dir = os.path.join(INPUT_ROOT, batch_id)
os.makedirs(in_dir, exist_ok=True)
# 限制本地文件数
while count_local_files() >= MAX_LOCAL:
logger.warning("本地文件过多暂停下载5分钟")
time.sleep(300)
for p in paths:
if time.time() - start > batch_timeout:
logger.error("DL[%s] 下载阶段超时,跳过剩余", batch_id)
DL_FAIL.inc()
break
dst = os.path.join(in_dir, os.path.basename(p))
f_start = time.time()
ok = with_retry(
f"DL[{batch_id}]",
lambda s, d: run(
["coscmd", "-s", "download", s, d],
timeout=batch_timeout - (time.time() - start),
),
p,
dst,
)
FILE_DL_DUR.observe(time.time() - f_start)
if not ok:
DL_FAIL.inc()
proc_q.put((batch_id, in_dir))
# —— 处理 —— #
@PR_DUR.time()
def do_process(batch_id, in_dir, batch_timeout):
if batch_id is None:
up_q.put(SENTINEL)
return
PR_TOTAL.inc()
start = time.time()
out_dir = os.path.join(OUTPUT_ROOT, batch_id)
os.makedirs(out_dir, exist_ok=True)
cmd = [c.format(in_dir=in_dir, out_dir=out_dir) for c in DOCKER_CMD_TEMPLATE]
def run_pr(command):
p = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
)
for line in p.stdout:
logger.info("[PR %s] %s", batch_id, line.rstrip())
if time.time() - start > batch_timeout:
p.kill()
return p.wait(), True, ""
return p.wait(), False, ""
ok = with_retry(f"PR[{batch_id}]", run_pr, cmd)
if not ok:
PR_FAIL.inc()
# 统计输出文件
files = []
for r, _, fs in os.walk(out_dir):
for fn in fs:
files.append(os.path.relpath(os.path.join(r, fn), out_dir))
BATCH_OUT_FILES.set(len(files))
shutil.rmtree(in_dir, ignore_errors=True)
up_q.put((batch_id, out_dir))
# —— 上传 —— #
@UP_DUR.time()
def do_upload(batch_id, out_dir, batch_timeout):
global inflight
if batch_id is None:
return
try:
UP_TOTAL.inc()
ok = with_retry(
f"UP[{batch_id}]",
lambda d: run(
["coscmd", "-s", "upload", "-r", d, COS_BUCKET], timeout=batch_timeout
),
out_dir,
)
if not ok:
UP_FAIL.inc()
return
# 删除目录结构
for cmd in [
["sudo", "rsync", "-av", "--delete", f"{EMPTY_DIR}/", f"{out_dir}/"],
["sudo", "rm", "-rf", out_dir],
]:
run(cmd, timeout=60)
logger.info("UP[%s] 完成", batch_id)
finally:
# 无论成功失败任务算完成inflight-1
with inflight_lock:
inflight -= 1
# —— Worker 模板 —— #
def worker(q, fn, timeout):
while True:
bid, data = q.get()
fn(bid, data, timeout)
q.task_done()
if bid is None:
break
# —— Service HTTP —— #
app = Flask(__name__)
@app.route("/ready", methods=["GET"])
def api_ready():
with inflight_lock:
busy = inflight > 0
return jsonify(ready=not busy)
@app.route("/notify", methods=["POST"])
def api_notify():
global inflight
data = request.get_json(force=True)
if not isinstance(data, list):
return jsonify(error="Expect JSON list"), 400
# 兼容 bag-checker只发 name 时补前缀
paths = []
for item in data:
if not isinstance(item, str):
continue
if item.startswith("mb_cuct_data_collection/"):
paths.append(item)
else:
paths.append("mb_cuct_data_collection/" + item)
TIME_RE = re.compile(r"_(\d{8})-(\d{6})_") # 匹配 20230803-160828
def extract_ts(p: str) -> datetime:
m = TIME_RE.search(os.path.basename(p))
if not m:
return datetime.min # 无法解析的放最后
date_part, time_part = m.groups()
ts_str = f"{date_part}{time_part}"
return datetime.strptime(ts_str, "%Y%m%d%H%M%S")
paths.sort(key=extract_ts, reverse=True)
with inflight_lock:
inflight += 1
for idx in range(0, len(paths), BATCH_SIZE):
blk = paths[idx : idx + BATCH_SIZE]
BATCH_SIZE_HIST.observe(len(blk))
bid = datetime.utcnow().strftime("%Y%m%dT%H%M%SZ") + f"_{idx // BATCH_SIZE + 1}"
batch_q.put((bid, blk))
batch_q.put(SENTINEL)
# batch_q.put((bid, paths))
return jsonify(status="accepted", batch_size=BATCH_SIZE), 202
def start_metric_updater():
def loop():
while True:
Q_BATCH.set(batch_q.qsize())
Q_PROC.set(proc_q.qsize())
Q_UP.set(up_q.qsize())
LOCAL_FILES.set(count_local_files())
time.sleep(1)
t = threading.Thread(target=loop, daemon=True)
t.start()
# —— 两种模式的入口 —— #
def file_mode(args):
# 读 tasks-file分批入队放入 sentinel然后启动处理
lines = [
line.strip() for line in open(args.tasks_file, encoding="utf-8") if line.strip()
]
for idx in range(0, len(lines), args.batch_size):
blk = lines[idx : idx + args.batch_size]
BATCH_SIZE_HIST.observe(len(blk))
bid = (
datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
+ f"_{idx // args.batch_size + 1}"
)
batch_q.put((bid, blk))
batch_q.put(SENTINEL)
start_http_server(METRICS_PORT)
logger.info("Metrics HTTP 启动,端口 %d", METRICS_PORT)
threads = [
threading.Thread(
target=worker, args=(batch_q, do_download, args.batch_timeout)
),
threading.Thread(target=worker, args=(proc_q, do_process, args.batch_timeout)),
threading.Thread(target=worker, args=(up_q, do_upload, args.batch_timeout)),
]
for t in threads:
t.start()
# 更新指标 & 等待完成
while batch_q.unfinished_tasks or proc_q.unfinished_tasks or up_q.unfinished_tasks:
Q_BATCH.set(batch_q.qsize())
Q_PROC.set(proc_q.qsize())
Q_UP.set(up_q.qsize())
LOCAL_FILES.set(count_local_files())
time.sleep(1)
for t in threads:
t.join()
logger.info("文件模式处理完成,退出。")
sys.exit(0)
def service_mode(args):
# 确保目录存在
for d in (INPUT_ROOT, OUTPUT_ROOT, EMPTY_DIR, LOG_DIR):
os.makedirs(d, exist_ok=True)
# 启动 Prometheus 和指标更新
start_http_server(METRICS_PORT)
logger.info("Metrics HTTP 启动,端口 %d", METRICS_PORT)
start_metric_updater()
# 启动后台 worker
threads = [
threading.Thread(
target=worker, args=(batch_q, do_download, args.batch_timeout), daemon=True
),
threading.Thread(
target=worker, args=(proc_q, do_process, args.batch_timeout), daemon=True
),
threading.Thread(
target=worker, args=(up_q, do_upload, args.batch_timeout), daemon=True
),
]
for t in threads:
t.start()
# 启动 Flask
logger.info("Decode Service 启动 HTTP on %s:%d", args.host, args.port)
app.run(host=args.host, port=args.port, threaded=True)
def main():
p = argparse.ArgumentParser()
sub = p.add_subparsers(dest="mode", required=True)
f = sub.add_parser("file", help="文件模式:--tasks-file")
f.add_argument("--tasks-file", required=True)
f.add_argument("--batch-size", type=int, default=BATCH_SIZE)
f.add_argument("--batch-timeout", type=int, default=3600)
s = sub.add_parser("service", help="服务模式:启动 HTTP ready/notify")
s.add_argument("--batch-timeout", type=int, default=3600)
s.add_argument("--host", default="0.0.0.0")
s.add_argument("--port", type=int, default=5000)
args = p.parse_args()
if args.mode == "file":
file_mode(args)
else:
service_mode(args)
if __name__ == "__main__":
main()