271 lines
7.7 KiB
Python
271 lines
7.7 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
# -*- coding: utf-8 -*-
|
||
|
|
|
||
|
|
import os
|
||
|
|
import re
|
||
|
|
import time
|
||
|
|
import shutil
|
||
|
|
import logging
|
||
|
|
import subprocess
|
||
|
|
from pathlib import Path
|
||
|
|
from concurrent.futures import ProcessPoolExecutor, as_completed
|
||
|
|
|
||
|
|
import psycopg2
|
||
|
|
|
||
|
|
# =========================================================
|
||
|
|
# Logging
|
||
|
|
# =========================================================
|
||
|
|
logging.basicConfig(
|
||
|
|
level=os.getenv("LOG_LEVEL", "INFO").upper(),
|
||
|
|
format="%(asctime)s | %(levelname)s | %(message)s",
|
||
|
|
handlers=[logging.StreamHandler()],
|
||
|
|
)
|
||
|
|
log = logging.getLogger("pangu_joined_runner")
|
||
|
|
|
||
|
|
# =========================================================
|
||
|
|
# Config (env only)
|
||
|
|
# =========================================================
|
||
|
|
PG_DSN = os.getenv(
|
||
|
|
"PG_DSN",
|
||
|
|
"host=127.0.0.1 port=5432 dbname=test user=test password=test",
|
||
|
|
)
|
||
|
|
|
||
|
|
TEMP_ROOT = Path(os.getenv("TEMP_ROOT", "/tmp/pangu_join"))
|
||
|
|
MAX_WORKERS = int(os.getenv("MAX_WORKERS", "2"))
|
||
|
|
|
||
|
|
COSCMD_BIN = os.getenv("COSCMD_BIN", "coscmd")
|
||
|
|
COSCMD_TIMEOUT = int(os.getenv("COSCMD_TIMEOUT", "3600"))
|
||
|
|
|
||
|
|
COS_ENDPOINT = os.getenv("COS_ENDPOINT", "")
|
||
|
|
COS_BUCKET = os.getenv("COS_BUCKET", "")
|
||
|
|
COS_REGION = os.getenv("COS_REGION", "")
|
||
|
|
|
||
|
|
MERGED_PREFIX = os.getenv("MERGED_PREFIX", "joined")
|
||
|
|
AUTO_FIX = os.getenv("AUTO_FIX", "0") == "1"
|
||
|
|
|
||
|
|
# 固定表结构(你已确认)
|
||
|
|
BAG_TABLE = "bag_list"
|
||
|
|
JOINED_BAGS_TABLE = "joined_bags"
|
||
|
|
JOINED_PANGU_TABLE = "joined_pangu"
|
||
|
|
|
||
|
|
|
||
|
|
# =========================================================
|
||
|
|
# Helpers
|
||
|
|
# =========================================================
|
||
|
|
def safe_name(s: str) -> str:
|
||
|
|
s = (s or "").strip().replace("\\", "/")
|
||
|
|
s = re.sub(r"/+", "/", s)
|
||
|
|
s = s.replace("..", "__")
|
||
|
|
return s.replace("/", "__") or "empty"
|
||
|
|
|
||
|
|
|
||
|
|
def run_cmd(cmd: list[str], *, timeout: int | None = None):
|
||
|
|
log.info("CMD: %s", " ".join(cmd))
|
||
|
|
subprocess.run(cmd, check=True, timeout=timeout)
|
||
|
|
|
||
|
|
|
||
|
|
def make_cos_url(key: str) -> str:
|
||
|
|
k = key.lstrip("/")
|
||
|
|
if COS_ENDPOINT:
|
||
|
|
return f"https://{COS_ENDPOINT.rstrip('/')}/{k}"
|
||
|
|
if COS_BUCKET and COS_REGION:
|
||
|
|
return f"https://{COS_BUCKET}.cos.{COS_REGION}.myqcloud.com/{k}"
|
||
|
|
return key
|
||
|
|
|
||
|
|
|
||
|
|
def cos_download(key: str, local: Path):
|
||
|
|
local.parent.mkdir(parents=True, exist_ok=True)
|
||
|
|
cos_path = "/" + key.lstrip("/")
|
||
|
|
run_cmd([COSCMD_BIN, "download", cos_path, str(local)], timeout=COSCMD_TIMEOUT)
|
||
|
|
|
||
|
|
|
||
|
|
def cos_upload(local: Path, key: str) -> str:
|
||
|
|
cos_path = "/" + key.lstrip("/")
|
||
|
|
run_cmd([COSCMD_BIN, "upload", str(local), cos_path], timeout=COSCMD_TIMEOUT)
|
||
|
|
return make_cos_url(key)
|
||
|
|
|
||
|
|
|
||
|
|
# =========================================================
|
||
|
|
# DB helpers
|
||
|
|
# =========================================================
|
||
|
|
def db_fetchall(sql: str, args=()):
|
||
|
|
with psycopg2.connect(PG_DSN) as conn:
|
||
|
|
with conn.cursor() as cur:
|
||
|
|
cur.execute(sql, args)
|
||
|
|
return cur.fetchall()
|
||
|
|
|
||
|
|
|
||
|
|
def db_execute(sql: str, args=()):
|
||
|
|
with psycopg2.connect(PG_DSN) as conn:
|
||
|
|
with conn.cursor() as cur:
|
||
|
|
cur.execute(sql, args)
|
||
|
|
conn.commit()
|
||
|
|
|
||
|
|
|
||
|
|
def fetch_parent_ids() -> list[int]:
|
||
|
|
rows = db_fetchall(
|
||
|
|
f"SELECT DISTINCT parent_id FROM {JOINED_BAGS_TABLE} ORDER BY parent_id"
|
||
|
|
)
|
||
|
|
return [int(r[0]) for r in rows]
|
||
|
|
|
||
|
|
|
||
|
|
def fetch_children_ids(parent_id: int) -> list[int]:
|
||
|
|
rows = db_fetchall(
|
||
|
|
f"SELECT child_id FROM {JOINED_BAGS_TABLE} WHERE parent_id=%s",
|
||
|
|
(parent_id,),
|
||
|
|
)
|
||
|
|
return [int(r[0]) for r in rows]
|
||
|
|
|
||
|
|
|
||
|
|
def fetch_bag_meta(bag_id: int) -> tuple[str, str]:
|
||
|
|
rows = db_fetchall(
|
||
|
|
f"SELECT name, data_path FROM {BAG_TABLE} WHERE id=%s",
|
||
|
|
(bag_id,),
|
||
|
|
)
|
||
|
|
if not rows:
|
||
|
|
raise RuntimeError(f"bag_list not found: id={bag_id}")
|
||
|
|
name, path = rows[0]
|
||
|
|
if not path:
|
||
|
|
raise RuntimeError(f"bag data_path empty: id={bag_id}")
|
||
|
|
return str(name), str(path)
|
||
|
|
|
||
|
|
|
||
|
|
def is_parent_done(parent_name: str) -> bool:
|
||
|
|
rows = db_fetchall(
|
||
|
|
f"""
|
||
|
|
SELECT 1 FROM {JOINED_PANGU_TABLE}
|
||
|
|
WHERE name=%s AND data_path IS NOT NULL AND data_path<>''
|
||
|
|
LIMIT 1
|
||
|
|
""",
|
||
|
|
(parent_name,),
|
||
|
|
)
|
||
|
|
return bool(rows)
|
||
|
|
|
||
|
|
|
||
|
|
def upsert_joined_pangu(name: str, data_path: str):
|
||
|
|
db_execute(
|
||
|
|
f"""
|
||
|
|
INSERT INTO {JOINED_PANGU_TABLE} (name, data_path)
|
||
|
|
VALUES (%s, %s)
|
||
|
|
ON CONFLICT (name)
|
||
|
|
DO UPDATE SET data_path=EXCLUDED.data_path
|
||
|
|
""",
|
||
|
|
(name, data_path),
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
# =========================================================
|
||
|
|
# ROS helpers
|
||
|
|
# =========================================================
|
||
|
|
def ros_fix(src: Path) -> Path:
|
||
|
|
if not AUTO_FIX:
|
||
|
|
return src
|
||
|
|
fixed = src.with_suffix(src.suffix + ".fixed.bag")
|
||
|
|
run_cmd(["rosbag", "fix", str(src), str(fixed)], timeout=COSCMD_TIMEOUT)
|
||
|
|
return fixed
|
||
|
|
|
||
|
|
|
||
|
|
def ros_merge(out_bag: Path, inputs: list[Path]):
|
||
|
|
run_cmd(
|
||
|
|
["/app/merge_ros1.sh", str(out_bag)] + [str(p) for p in inputs],
|
||
|
|
timeout=COSCMD_TIMEOUT,
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
# =========================================================
|
||
|
|
# Worker
|
||
|
|
# =========================================================
|
||
|
|
def work_one(parent_id: int):
|
||
|
|
start = time.time()
|
||
|
|
|
||
|
|
parent_name, parent_key = fetch_bag_meta(parent_id)
|
||
|
|
|
||
|
|
if is_parent_done(parent_name):
|
||
|
|
log.info(
|
||
|
|
"[SKIP] parent already done | parent_id=%s name=%s", parent_id, parent_name
|
||
|
|
)
|
||
|
|
return
|
||
|
|
|
||
|
|
log.info("[START] parent | parent_id=%s name=%s", parent_id, parent_name)
|
||
|
|
|
||
|
|
children = fetch_children_ids(parent_id)
|
||
|
|
log.info("[CHILDREN] parent_id=%s count=%d", parent_id, len(children))
|
||
|
|
|
||
|
|
if not children:
|
||
|
|
log.warning("[EMPTY] no children | parent_id=%s", parent_id)
|
||
|
|
return
|
||
|
|
|
||
|
|
wd = TEMP_ROOT / f"parent_{parent_id}_{safe_name(parent_name)}"
|
||
|
|
wd.mkdir(parents=True, exist_ok=True)
|
||
|
|
|
||
|
|
try:
|
||
|
|
local_inputs = []
|
||
|
|
|
||
|
|
for cid in children:
|
||
|
|
cname, ckey = fetch_bag_meta(cid)
|
||
|
|
lp = wd / safe_name(ckey)
|
||
|
|
log.info("[DOWNLOAD] parent_id=%s child_id=%s key=%s", parent_id, cid, ckey)
|
||
|
|
cos_download(ckey, lp)
|
||
|
|
local_inputs.append(ros_fix(lp))
|
||
|
|
|
||
|
|
merged_local = wd / f"{safe_name(parent_name)}.bag"
|
||
|
|
log.info("[MERGE] start | parent_id=%s", parent_id)
|
||
|
|
ros_merge(merged_local, local_inputs)
|
||
|
|
log.info("[MERGE] done | parent_id=%s", parent_id)
|
||
|
|
|
||
|
|
merged_key = f"{MERGED_PREFIX}/{parent_name}.bag"
|
||
|
|
log.info("[UPLOAD] parent_id=%s key=%s", parent_id, merged_key)
|
||
|
|
url = cos_upload(merged_local, merged_key)
|
||
|
|
|
||
|
|
upsert_joined_pangu(parent_name, url)
|
||
|
|
cost = time.time() - start
|
||
|
|
log.info(
|
||
|
|
"[DONE] parent | parent_id=%s name=%s cost=%.2fs",
|
||
|
|
parent_id,
|
||
|
|
parent_name,
|
||
|
|
cost,
|
||
|
|
)
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
log.exception(
|
||
|
|
"[FAILED] parent | parent_id=%s name=%s error=%s", parent_id, parent_name, e
|
||
|
|
)
|
||
|
|
raise
|
||
|
|
finally:
|
||
|
|
shutil.rmtree(wd, ignore_errors=True)
|
||
|
|
|
||
|
|
|
||
|
|
# =========================================================
|
||
|
|
# Main
|
||
|
|
# =========================================================
|
||
|
|
def main():
|
||
|
|
TEMP_ROOT.mkdir(parents=True, exist_ok=True)
|
||
|
|
|
||
|
|
log.info(
|
||
|
|
"runner start | workers=%s TEMP_ROOT=%s AUTO_FIX=%s",
|
||
|
|
MAX_WORKERS,
|
||
|
|
TEMP_ROOT,
|
||
|
|
AUTO_FIX,
|
||
|
|
)
|
||
|
|
|
||
|
|
run_cmd([COSCMD_BIN, "--version"], timeout=30)
|
||
|
|
run_cmd(["rosbag", "info", "--help"], timeout=30)
|
||
|
|
|
||
|
|
parents = fetch_parent_ids()
|
||
|
|
log.info("parents found: %d", len(parents))
|
||
|
|
if not parents:
|
||
|
|
return
|
||
|
|
|
||
|
|
with ProcessPoolExecutor(max_workers=MAX_WORKERS) as pool:
|
||
|
|
futures = {pool.submit(work_one, pid): pid for pid in parents}
|
||
|
|
for fu in as_completed(futures):
|
||
|
|
pid = futures[fu]
|
||
|
|
try:
|
||
|
|
fu.result()
|
||
|
|
except Exception:
|
||
|
|
log.error("parent failed | parent_id=%s", pid)
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
main()
|