#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os import re import time import shutil import logging import subprocess from pathlib import Path from concurrent.futures import ProcessPoolExecutor, as_completed import psycopg2 # ========================================================= # Logging # ========================================================= logging.basicConfig( level=os.getenv("LOG_LEVEL", "INFO").upper(), format="%(asctime)s | %(levelname)s | %(message)s", handlers=[logging.StreamHandler()], ) log = logging.getLogger("pangu_joined_runner") # ========================================================= # Config (env only) # ========================================================= PG_DSN = os.getenv( "PG_DSN", "host=127.0.0.1 port=5432 dbname=test user=test password=test", ) TEMP_ROOT = Path(os.getenv("TEMP_ROOT", "/tmp/pangu_join")) MAX_WORKERS = int(os.getenv("MAX_WORKERS", "2")) COSCMD_BIN = os.getenv("COSCMD_BIN", "coscmd") COSCMD_TIMEOUT = int(os.getenv("COSCMD_TIMEOUT", "3600")) COS_ENDPOINT = os.getenv("COS_ENDPOINT", "") COS_BUCKET = os.getenv("COS_BUCKET", "") COS_REGION = os.getenv("COS_REGION", "") MERGED_PREFIX = os.getenv("MERGED_PREFIX", "joined") AUTO_FIX = os.getenv("AUTO_FIX", "0") == "1" # 固定表结构(你已确认) BAG_TABLE = "bag_list" JOINED_BAGS_TABLE = "joined_bags" JOINED_PANGU_TABLE = "joined_pangu" # ========================================================= # Helpers # ========================================================= def safe_name(s: str) -> str: s = (s or "").strip().replace("\\", "/") s = re.sub(r"/+", "/", s) s = s.replace("..", "__") return s.replace("/", "__") or "empty" def run_cmd(cmd: list[str], *, timeout: int | None = None): log.info("CMD: %s", " ".join(cmd)) subprocess.run(cmd, check=True, timeout=timeout) def make_cos_url(key: str) -> str: k = key.lstrip("/") if COS_ENDPOINT: return f"https://{COS_ENDPOINT.rstrip('/')}/{k}" if COS_BUCKET and COS_REGION: return f"https://{COS_BUCKET}.cos.{COS_REGION}.myqcloud.com/{k}" return key def cos_download(key: str, local: Path): local.parent.mkdir(parents=True, exist_ok=True) cos_path = "/" + key.lstrip("/") run_cmd([COSCMD_BIN, "download", cos_path, str(local)], timeout=COSCMD_TIMEOUT) def cos_upload(local: Path, key: str) -> str: cos_path = "/" + key.lstrip("/") run_cmd([COSCMD_BIN, "upload", str(local), cos_path], timeout=COSCMD_TIMEOUT) return make_cos_url(key) # ========================================================= # DB helpers # ========================================================= def db_fetchall(sql: str, args=()): with psycopg2.connect(PG_DSN) as conn: with conn.cursor() as cur: cur.execute(sql, args) return cur.fetchall() def db_execute(sql: str, args=()): with psycopg2.connect(PG_DSN) as conn: with conn.cursor() as cur: cur.execute(sql, args) conn.commit() def fetch_parent_ids() -> list[int]: rows = db_fetchall( f"SELECT DISTINCT parent_id FROM {JOINED_BAGS_TABLE} ORDER BY parent_id" ) return [int(r[0]) for r in rows] def fetch_children_ids(parent_id: int) -> list[int]: rows = db_fetchall( f"SELECT child_id FROM {JOINED_BAGS_TABLE} WHERE parent_id=%s", (parent_id,), ) return [int(r[0]) for r in rows] def fetch_bag_meta(bag_id: int) -> tuple[str, str]: rows = db_fetchall( f"SELECT name, data_path FROM {BAG_TABLE} WHERE id=%s", (bag_id,), ) if not rows: raise RuntimeError(f"bag_list not found: id={bag_id}") name, path = rows[0] if not path: raise RuntimeError(f"bag data_path empty: id={bag_id}") return str(name), str(path) def is_parent_done(parent_name: str) -> bool: rows = db_fetchall( f""" SELECT 1 FROM {JOINED_PANGU_TABLE} WHERE name=%s AND data_path IS NOT NULL AND data_path<>'' LIMIT 1 """, (parent_name,), ) return bool(rows) def upsert_joined_pangu(name: str, data_path: str): db_execute( f""" INSERT INTO {JOINED_PANGU_TABLE} (name, data_path) VALUES (%s, %s) ON CONFLICT (name) DO UPDATE SET data_path=EXCLUDED.data_path """, (name, data_path), ) # ========================================================= # ROS helpers # ========================================================= def ros_fix(src: Path) -> Path: if not AUTO_FIX: return src fixed = src.with_suffix(src.suffix + ".fixed.bag") run_cmd(["rosbag", "fix", str(src), str(fixed)], timeout=COSCMD_TIMEOUT) return fixed def ros_merge(out_bag: Path, inputs: list[Path]): run_cmd( ["/app/merge_ros1.sh", str(out_bag)] + [str(p) for p in inputs], timeout=COSCMD_TIMEOUT, ) # ========================================================= # Worker # ========================================================= def work_one(parent_id: int): start = time.time() parent_name, parent_key = fetch_bag_meta(parent_id) if is_parent_done(parent_name): log.info( "[SKIP] parent already done | parent_id=%s name=%s", parent_id, parent_name ) return log.info("[START] parent | parent_id=%s name=%s", parent_id, parent_name) children = fetch_children_ids(parent_id) log.info("[CHILDREN] parent_id=%s count=%d", parent_id, len(children)) if not children: log.warning("[EMPTY] no children | parent_id=%s", parent_id) return wd = TEMP_ROOT / f"parent_{parent_id}_{safe_name(parent_name)}" wd.mkdir(parents=True, exist_ok=True) try: local_inputs = [] for cid in children: cname, ckey = fetch_bag_meta(cid) lp = wd / safe_name(ckey) log.info("[DOWNLOAD] parent_id=%s child_id=%s key=%s", parent_id, cid, ckey) cos_download(ckey, lp) local_inputs.append(ros_fix(lp)) merged_local = wd / f"{safe_name(parent_name)}.bag" log.info("[MERGE] start | parent_id=%s", parent_id) ros_merge(merged_local, local_inputs) log.info("[MERGE] done | parent_id=%s", parent_id) merged_key = f"{MERGED_PREFIX}/{parent_name}.bag" log.info("[UPLOAD] parent_id=%s key=%s", parent_id, merged_key) url = cos_upload(merged_local, merged_key) upsert_joined_pangu(parent_name, url) cost = time.time() - start log.info( "[DONE] parent | parent_id=%s name=%s cost=%.2fs", parent_id, parent_name, cost, ) except Exception as e: log.exception( "[FAILED] parent | parent_id=%s name=%s error=%s", parent_id, parent_name, e ) raise finally: shutil.rmtree(wd, ignore_errors=True) # ========================================================= # Main # ========================================================= def main(): TEMP_ROOT.mkdir(parents=True, exist_ok=True) log.info( "runner start | workers=%s TEMP_ROOT=%s AUTO_FIX=%s", MAX_WORKERS, TEMP_ROOT, AUTO_FIX, ) run_cmd([COSCMD_BIN, "--version"], timeout=30) run_cmd(["rosbag", "info", "--help"], timeout=30) parents = fetch_parent_ids() log.info("parents found: %d", len(parents)) if not parents: return with ProcessPoolExecutor(max_workers=MAX_WORKERS) as pool: futures = {pool.submit(work_one, pid): pid for pid in parents} for fu in as_completed(futures): pid = futures[fu] try: fu.result() except Exception: log.error("parent failed | parent_id=%s", pid) if __name__ == "__main__": main()