from collections import defaultdict import csv from datetime import date, datetime, timedelta from io import StringIO import re from sqlite3 import IntegrityError from urllib import response from flask import Blueprint, current_app, make_response, request, jsonify,json from flask_jwt_extended import ( create_access_token, get_jwt, get_jwt_identity, jwt_required, ) import requests from sqlalchemy import Date, and_, asc, case, desc, distinct, exists, func, or_ from app.models import ( BagFile, QaStatus, BagStatus, EventSource, Fst, Role, TaggingEvents, User, UserRole, VerdictStatus, BagMergeRecord, SyncStatus ) from app.utils.fst_tree import build_tree, build_sub_tree from app import db from app.utils.log_record import log_operation from app.utils.bagname import extract_datetime_from_filename from sqlalchemy.orm import aliased from app.utils.response_dict import api_response # 1.创建蓝图实例,指定名称和导入名称 data_factory_bp = Blueprint("data_factory", __name__) ##### 首页 ### # 1.根据查询出所有的status @data_factory_bp.route("/getstatus", methods=["GET"]) @jwt_required() def get_status(): all_values = [ {"value": member.value, "label": member.value} for member in BagStatus ] return jsonify(all_values) # 2.查询出所有的一级标签 @data_factory_bp.route("/getlevel1", methods=["GET"]) @jwt_required() def get_level1(): level1_tags = Fst.query.filter(Fst.level == 1).all() result = [ {"id": tag.id, "name": tag.name, "level": tag.level} for tag in level1_tags ] return jsonify(result) @data_factory_bp.route("/getbaglist", methods=["POST"]) @jwt_required() def get_baglist(): try: # 1. 解析请求参数 params = request.get_json() or {} print(f"【请求参数】原始参数: {params}") page = params.get("page", 1) per_page = params.get("per_page", 20) file_name = params.get("file_name", "").strip() start_datetime = params.get("start_datetime", "").strip() end_datetime = params.get("end_datetime", "").strip() level1_tag = params.get("level1_tag", "") status_str = params.get("status", "").strip() # ========== 封装一个函数,按“model”生成条件 ========== def build_conditions(model): # 固定:只返回 bag_status = 0 的记录 conds = [model.bag_status == 0] # 文件名模糊查询 #if file_name: # conds.append(model.file_name.like(f"%{file_name}%")) # 时间范围查询 if start_datetime: try: start_dt = datetime.fromisoformat(start_datetime) conds.append(model.capture_datetime >= start_dt) except ValueError: print(f"【过滤条件】开始时间格式错误: {start_datetime}") if end_datetime: try: end_dt = datetime.fromisoformat(end_datetime) conds.append(model.capture_datetime <= end_dt) except ValueError: print(f"【过滤条件】结束时间格式错误: {end_datetime}") # 一级标签查询(单个字符串) if level1_tag: try: level1_id = int(level1_tag) conds.append(model.level1_tag_id == level1_id) except ValueError: print(f"【过滤条件】level1_tag转换失败(非整数): {level1_tag}") # 状态查询(单个字符串) if status_str: try: status_enum = BagStatus[status_str] conds.append(model.status == status_enum) except KeyError: print(f"【过滤条件】无效状态值: {status_str}") return conds # 主查询条件:用 BagFile base_conditions = build_conditions(BagFile) # ========= 关键部分:在 SQL 层做 merge + 去重(与第二版同逻辑) ========= # 主查询:再 alias 一份 BagMergeRecord 用来回表 M = aliased(BagMergeRecord) logical_name = func.coalesce(M.joined_name, BagFile.file_name) query = ( db.session.query(BagFile, logical_name.label("logical_name")) .outerjoin( M, M.src_name == BagFile.file_name, ) .filter( *base_conditions, or_( M.is_initiator == True, # 被 merge 的主 bag M.id.is_(None), # 完全没 merge 过的 bag ), ) .order_by(BagFile.create_time.desc()) ) if file_name: query= query.filter(logical_name.like(f"%{file_name}%")) from sqlalchemy.dialects import mysql sql = query.statement.compile( dialect=mysql.dialect(), # 或者 db.engine.dialect compile_kwargs={"literal_binds": True}, ) current_app.logger.info("getbaglist SQL: %s", sql) print("getbaglist SQL:", sql) # 5. 分页查询(此时已经是“按 joined_name 去重后”的结果集) pagination = query.paginate(page=page, per_page=per_page, error_out=False) total_items = pagination.total total_pages = pagination.pages # 6. 序列化(items 是 (BagFile, logical_name) 元组) bag_files = [] for bag, logical_name_value in pagination.items: bag_files.append( { "id": bag.id, # 对外展示的 file_name:优先 merged 的 joined_name "file_name": logical_name_value, "event": bag.event, "capture_datetime": ( bag.capture_datetime.isoformat() if bag.capture_datetime else None ), "status": bag.status.value if bag.status else None, "create_time": ( bag.create_time.isoformat() if bag.create_time else None ), "level1_tag": bag.level1_tag.name if bag.level1_tag else None, "bag_status": bag.bag_status, # 始终为 0 } ) # 7. 返回响应 return jsonify( { "code": 200, "data": bag_files, "total": total_items, "page": page, "pages": total_pages, "message": "查询成功(仅返回bag_status为0的数据)", } ) except Exception as e: print(f"【接口异常】: {str(e)}") return ( jsonify( { "code": 500, "data": [], "total": 0, "page": page, "pages": 0, "message": f"查询出错:{str(e)}", } ), 500, ) #### 视频播放页面 # 查询出视频信息,1、2、3、4级标签信息,标签描述等 @data_factory_bp.route("/getinfo", methods=["GET"]) @jwt_required() def get_baginfo(): try: bag_id = request.args.get("bag_id") bagfile = BagFile.query.get(bag_id) if not bagfile: return api_response(404, "BagFile not found") # ========== 新增:查询 merge 关系 ========== merge_info = None # 用来返回 mergeName+所有 src 列表 extra_video_sources = [] # 前端要的 extraVideoSources 数组 # 先看当前这个 bag 的 file_name 有没有被 merge 过(作为 src_name 出现在映射表) rels_for_this = BagMergeRecord.query.filter_by( src_name=bagfile.file_name ).all() if rels_for_this: # 正常情况下,同一个 src_name 对应同一个 joined_name,取第一条即可 joined_name = rels_for_this[0].joined_name # 再查出这个 joined_name 下的所有 src all_rels = BagMergeRecord.query.filter_by(joined_name=joined_name).order_by( BagMergeRecord.id.asc() # 其余按插入顺序 ).all() src_bag_names = [r.src_name for r in all_rels] bagfiles = ( BagFile.query .filter(BagFile.file_name.in_(src_bag_names)) .all() ) # 根据 file_name 建索引,方便按原顺序组装 bag_by_name = {b.file_name: b for b in bagfiles} # 7) 组装返回 list(按上游 names 顺序) result_list = [] for name in src_bag_names: bag = bag_by_name.get(name) if bag: result_list.append(serialize_bagfile(bag)) # 如果某个名字在本地不存在: # 直接根据文件名拼出视频 url else: datetime_full, video_urls = extract_datetime_from_filename(name) result_list.append( { "file_name": name, "exists": False, "video_url": video_urls.get("wide"), "video_urls": video_urls, "capture_datetime": datetime_full, }) merge_info = { "joined_name": joined_name, "src_bag_names": src_bag_names, } # 这里按你的需求:把所有 src bagname 作为数组,放到 extraVideoSources extra_video_sources = result_list # ========= 获取各级标签信息(保持你原来的逻辑) ========= level1_tag = None level2_tags = [] # 存储二级标签列表 level3_tags = [] # 存储三级标签列表 level4_tags = [] # 存储四级标签列表 if bagfile.level1_tag_id: all_tags_struct = get_all_tags_struct() # { "all_tags": [...], "total_level1": X } # 从 all_tags 里找到 level1_id 对应的块 match_level1_tag = None for blk in all_tags_struct.get("all_tags", []): level1 = blk.get("level1_tag") if level1 and level1.get("id") == bagfile.level1_tag_id: match_level1_tag = blk break if match_level1_tag: level1_tag = match_level1_tag.get("level1_tag") level2_tags = match_level1_tag.get("level2_tags", []) level3_tags = match_level1_tag.get("level3_tags", []) level4_tags = match_level1_tag.get("level4_tags", []) video_urls = build_video_urls(bagfile.file_name) # 构建响应数据 bagfile_data = { "id": bagfile.id, "file_name": bagfile.file_name, "event": bagfile.event, "capture_datetime": ( bagfile.capture_datetime.isoformat() if bagfile.capture_datetime else None ), "status": bagfile.status.value if bagfile.status else None, "frame_url": bagfile.frame_url, "video_url": video_urls.get("wide"), "video_urls": video_urls, "batch_name": bagfile.batch_name, "create_time": ( bagfile.create_time.isoformat() if bagfile.create_time else None ), "update_time": ( bagfile.update_time.isoformat() if bagfile.update_time else None ), "bag_status": bagfile.bag_status, "last_rule_version_id": bagfile.last_rule_version_id, "level1_tag_id": bagfile.level1_tag_id, "comment1": bagfile.comment1, "comment2": bagfile.comment2, "comment3": bagfile.comment3, # 可选:把逻辑上的 merged 名字一并返回,前端要展示也方便 "joined_name": merge_info["joined_name"] if merge_info else None, } # 构建完整响应 response_data = { "bagfile": bagfile_data, "level1_tag": level1_tag, "level2_tags": level2_tags, "level3_tags": level3_tags, "level4_tags": level4_tags, # 新增两个字段 "extraVideoSources": extra_video_sources, # [] 或 ["A.bag", "B.bag", ...] "merge_info": merge_info, # None 或 { joined_name, src_bag_names } "tag_events": [ serialize_tagging_event(e) for e in TaggingEvents.query.filter_by(bag_id=bag_id, is_deleted=False) .order_by(TaggingEvents.ts_event.asc()) .all() ], } if not response_data["tag_events"]: response_data["tag_events"] = [build_default_tag_event(bagfile)] return api_response(data=response_data) except Exception as e: return api_response(500, f"Server error: {str(e)}") def _serialize_tag_row(row): if not row: return None return { "id": row.id, "name": row.name, "name_cn": row.name_cn, "level": row.level, "annotation": row.annotation, "parent_id": row.parent_id, } def get_all_tags_struct(): """ 返回: { "all_tags": [ { "level1_tag": {...}, "level2_tags": [....], "level3_tags": [....], "level4_tags": [....] }, ... ], "total_level1": X } """ # 一次取全表 rows = Fst.query.with_entities( Fst.id, Fst.name, Fst.name_cn, Fst.level, Fst.annotation, Fst.parent_id ).all() by_parent = defaultdict(list) by_id = {} for r in rows: node = _serialize_tag_row(r) by_id[r.id] = node by_parent[r.parent_id].append(node) # level1 = parent_id 为空 或 level==1 level1_nodes = [n for n in by_id.values() if n["level"] == 1] all_tags_blocks = [] for l1 in level1_nodes: l2s = [n for n in by_parent.get(l1["id"], []) if n["level"] == 2] l3s = [] for l2 in l2s: l3s.extend([n for n in by_parent.get(l2["id"], []) if n["level"] == 3]) l4s = [] for l3 in l3s: l4s.extend([n for n in by_parent.get(l3["id"], []) if n["level"] == 4]) all_tags_blocks.append({ "level1_tag": l1, "level2_tags": l2s, "level3_tags": l3s, "level4_tags": l4s, }) return { "all_tags": all_tags_blocks, "total_level1": len(level1_nodes) } VIDEO_URL_KEYS = ("wide", "left", "right", "rear_left", "rear_right", "rear") def build_video_urls(bag_filename: str) -> dict: urls = {key: None for key in VIDEO_URL_KEYS} try: _, generated = extract_datetime_from_filename(bag_filename) print(f'generated:{generated}') except Exception: return urls if isinstance(generated, dict): for key in VIDEO_URL_KEYS: urls[key] = generated.get(key) return urls def serialize_bagfile(bag) -> dict: video_urls = build_video_urls(bag.file_name) return { "id": bag.id, "file_name": bag.file_name, "event": bag.event, "capture_datetime": ( bag.capture_datetime.isoformat() if bag.capture_datetime else None ), "status": bag.status.value if bag.status else None, "frame_url": bag.frame_url, "video_url": video_urls.get("wide"), "video_urls": video_urls, "batch_name": bag.batch_name, "create_time": bag.create_time.isoformat() if bag.create_time else None, "update_time": bag.update_time.isoformat() if bag.update_time else None, "bag_status": bag.bag_status, "last_rule_version_id": bag.last_rule_version_id, "level1_tag_id": bag.level1_tag_id, # 保留bagfile自身的一级标签ID,但不影响标签查询 } ## 获取所有一级标签及其子标签 @data_factory_bp.route("/getalltag", methods=["GET"]) @jwt_required() def get_all_tags_and_bagfile(): try: # 1. 获取bag_id参数(用于查询单个bagfile) bag_id = request.args.get("bag_id") if not bag_id: return api_response(400, "缺少参数:bag_id") # 2. 查询指定的bagfile信息(独立于标签逻辑) bagfile = BagFile.query.get(bag_id) if not bagfile: return api_response(404, f"未找到ID为{bag_id}的bagfile") # 3. 查询所有标签(完全独立于bagfile的标签逻辑) all_tags_data = get_all_tags_struct() # 4. 合并响应:所有标签 + 单个bagfile信息(两者独立) return api_response( data={ "all_tags": all_tags_data["all_tags"], # 所有一级标签及其子标签(与bagfile无关) "total_level1": all_tags_data["total_level1"], # 一级标签总数 "bagfile": serialize_bagfile(bagfile), # 单个bagfile详情(独立) } ) except Exception as e: return api_response(500, f"Server error: {str(e)}") ### 点击不符合时 # 1.点击不符合,响应一段文字后 `这个数据包是否符合另一个STS的描述?` ## 点击否时,把用户信息user_id,更新时间update_time,bag_status写入表 @data_factory_bp.route("/tag-invalid", methods=["POST"]) @jwt_required() def get_baginfo_no(): try: # 1. 获取请求体数据 data = request.get_json() if not data: return api_response(400, "缺少请求体数据") required_fields = ["bag_id", "bag_status", "status", "source"] for field in required_fields: if field not in data: return api_response(400, f"缺少参数:{field}") # 2. 查询并验证bagfile状态 bag_id = data["bag_id"] bagfile = BagFile.query.get(bag_id) if not bagfile: return api_response(404, f"未找到ID为{bag_id}的bagfile") if bagfile.bag_status != 0: return api_response(403, "bag_status不等于0,禁止更新") # 3. 获取当前用户ID(从JWT中提取) current_user = get_jwt_identity() # print(current_user) # 4. 更新BagFile表 update_data = { "bag_status": data["bag_status"], "status": data["status"], "source": data["source"], "bag_update_time": datetime.now(), # 添加更新时间 "user_id": current_user, # 添加用户ID } for key, value in update_data.items(): setattr(bagfile, key, value) # 5. 更新TaggingEvents表(如果记录存在) new_event = TaggingEvents( bag_id=bag_id, source=data["source"], reviewer_id=current_user, # 添加用户ID rule_version_id=bagfile.last_rule_version_id, level1_tag_id=bagfile.level1_tag_id, ts_event=datetime.now(), ) db.session.add(new_event) # 6. 提交事务 db.session.commit() # 7. 返回成功响应 return api_response( 200, "更新成功", data={ "bag_id": bag_id, "bag_update_time": update_data["bag_update_time"].isoformat(), }, ) except Exception as e: db.session.rollback() # 发生异常时回滚事务 return api_response(500, f"更新失败: {str(e)}") # 新增辅助函数:安全解析ISO时间格式[1](@ref) def parse_iso_datetime(datetime_str): """解析ISO格式时间字符串,返回datetime对象或None""" if not datetime_str: return None try: # 支持带时区和不带时区的格式 return datetime.fromisoformat(datetime_str.replace("Z", "+00:00")) except (TypeError, ValueError): return None def serialize_tagging_event(event: TaggingEvents) -> dict: def safe_name(tag): return getattr(tag, "name", None) if tag else None level1 = safe_name(event.level1_tag) level2 = safe_name(event.level2_tag) level3 = safe_name(event.level3_tag) level4 = safe_name(event.level4_tag) tag_path = " / ".join([x for x in [level1, level2, level3, level4] if x]) return { "event_id": event.event_id, "bag_id": event.bag_id, "tag_path": tag_path, "level1_tag_id": event.level1_tag_id, "level1_tag_name": level1, "level2_tag_id": event.level2_tag_id, "level2_tag_name": level2, "level3_tag_id": event.level3_tag_id, "level3_tag_name": level3, "level4_tag_id": event.level4_tag_id, "level4_tag_name": level4, "qa_status": event.qa_status.value if event.qa_status else None, "case_type": event.case_type, "front_starttime": event.front_starttime.isoformat() if event.front_starttime else None, "front_endtime": event.front_endtime.isoformat() if event.front_endtime else None, "front_start_sec": event.front_start_sec, "front_end_sec": event.front_end_sec, "high_speed": event.high_speed, "urban": event.urban, "parking": event.parking, "note": event.note, "source": event.source.value if event.source else None, "reviewer_id": event.reviewer_id, "ts_event": event.ts_event.isoformat() if event.ts_event else None, } def build_default_tag_event(bagfile: BagFile) -> dict: """当没有任何历史 TaggingEvents 时,使用当前 bag 的信息构造一个默认事件""" level1 = getattr(bagfile.level1_tag, "name", None) level2 = getattr(bagfile.level2_tag, "name", None) level3 = getattr(bagfile.level3_tag, "name", None) level4 = getattr(bagfile.level4_tag, "name", None) tag_path = " / ".join([x for x in [level1, level2, level3, level4] if x]) ts = bagfile.update_time or bagfile.create_time return { "event_id": -1, # 前端识别为本地默认数据 "bag_id": bagfile.id, "tag_path": tag_path, "level1_tag_id": bagfile.level1_tag_id, "level1_tag_name": level1, "level2_tag_id": bagfile.level2_tag_id, "level2_tag_name": level2, "level3_tag_id": bagfile.level3_tag_id, "level3_tag_name": level3, "level4_tag_id": bagfile.level4_tag_id, "level4_tag_name": level4, "qa_status": bagfile.qa_status.value if bagfile.qa_status else None, "case_type": bagfile.case_type, "front_starttime": bagfile.front_starttime.isoformat() if bagfile.front_starttime else None, "front_endtime": bagfile.front_endtime.isoformat() if bagfile.front_endtime else None, "front_start_sec": bagfile.front_start_sec, "front_end_sec": bagfile.front_end_sec, "high_speed": bagfile.high_speed, "urban": bagfile.urban, "parking": bagfile.parking, "note": bagfile.comment1, "source": None, "reviewer_id": bagfile.user_id, "ts_event": ts.isoformat() if ts else None, } @data_factory_bp.route("/tag-events", methods=["GET"]) @jwt_required() def list_tag_events(): bag_id = request.args.get("bag_id") if not bag_id: return api_response(400, "缺少参数:bag_id") events = ( TaggingEvents.query.filter_by(bag_id=bag_id, is_deleted=False) .order_by(TaggingEvents.ts_event.asc()) .all() ) return api_response(data={"list": [serialize_tagging_event(e) for e in events]}) @data_factory_bp.route("/batch-bag-record", methods=["POST"]) @jwt_required() def batch_add_record(): """ 请求参数: { "bag_id": "4", "alltags": [ { "id": 395, "name": "U_TURN", "level": 1 }, { "id": 386, "name": "U_TURN_INTERSECTION", "level": 2 }, { "id": 648, "name": "U_TURN_INTERSECTION_UTURN_LIGHT", "level": 3 }, { "id": null, "name": "无四级标签数据", "level": 4 } ], "fstime": "15:06:15", "fetime": "16:00:00", "casetype": "1", "bag_status": 1, "status": "RULE_BASED_ACCEPTED", "source": "RULE", "comment1": "", "comment2": "", "comment3": "" } 批量模式: { "bag_id": "4", "primary_key": "event-1-0", "records": [ { ...单条记录结构... }, { ...单条记录结构... } ], "deleted_event_ids": [1, 2] } """ try: # 1. 解析请求参数 params = request.get_json() or {} records = params.get("records") if not isinstance(records, list): return jsonify({"code": 400, "message": "records 必须为数组"}), 400 if not records: return jsonify({"code": 400, "message": "records 不能为空"}), 400 top_bag_id = params.get("bag_id") if top_bag_id is not None: for rec in records: rec.setdefault("bag_id", top_bag_id) required_fields = [ "bag_id", "alltags", "fstime", "fetime", "casetype", "bag_status", ] for idx, rec in enumerate(records): missing = [key for key in required_fields if key not in rec] if missing: return ( jsonify( { "code": 400, "message": f"记录{idx}缺少必要参数: {', '.join(missing)}", } ), 400, ) bag_id = records[0].get("bag_id") if not bag_id: return jsonify({"code": 400, "message": "bag_id 不能为空"}), 400 for idx, rec in enumerate(records): if rec.get("bag_id") != bag_id: return ( jsonify( { "code": 400, "message": f"记录{idx}的 bag_id 与首条记录不一致", } ), 400, ) # 2. 查询BAG记录并验证状态 bagfile = BagFile.query.get(bag_id) if not bagfile: return jsonify({"code": 404, "message": "BAG记录不存在"}), 404 current_user = get_jwt_identity() def extract_tag_ids(alltags): level1_tag_id = next( ( tag["id"] for tag in alltags if tag["level"] == 1 and tag["id"] is not None ), None, ) level2_tag_id = next( ( tag["id"] for tag in alltags if tag["level"] == 2 and tag["id"] is not None ), None, ) level3_tag_id = next( ( tag["id"] for tag in alltags if tag["level"] == 3 and tag["id"] is not None ), None, ) level4_tag_id = next( ( tag["id"] for tag in alltags if tag["level"] == 4 and tag["id"] is not None ), None, ) return level1_tag_id, level2_tag_id, level3_tag_id, level4_tag_id def normalize_qa_status(value, idx): if value is None or value == "": return QaStatus.QA_NOT_REVIEWED, None if isinstance(value, QaStatus): return value, None try: return QaStatus(value), None except ValueError: valid_statuses = [status.value for status in QaStatus] return ( None, ( jsonify( { "code": 400, "message": f"记录{idx}无效的qa_status值,有效值为: {valid_statuses}", } ), 400, ), ) def normalize_event_id(value): if value is None or value == "": return None try: event_id_int = int(value) except (TypeError, ValueError): return value if event_id_int <= 0: return None return event_id_int deleted_event_ids_raw = params.get("deleted_event_ids") or [] if not isinstance(deleted_event_ids_raw, list): return jsonify({"code": 400, "message": "deleted_event_ids 必须为数组"}), 400 deleted_event_ids = [] for raw_id in deleted_event_ids_raw: normalized = normalize_event_id(raw_id) if isinstance(normalized, int): deleted_event_ids.append(normalized) def aggregate_qa_status(statuses): has_not_reviewed = False has_invalid = False has_modify = False has_passed = False for status in statuses: status = status or QaStatus.QA_NOT_REVIEWED if status == QaStatus.QA_NOT_REVIEWED: has_not_reviewed = True elif status == QaStatus.QA_INVALID: has_invalid = True elif status == QaStatus.QA_MODIFY: has_modify = True elif status == QaStatus.QA_PASSED: has_passed = True if has_not_reviewed: return QaStatus.QA_NOT_REVIEWED if has_invalid: return QaStatus.QA_INVALID if has_modify: return QaStatus.QA_MODIFY if has_passed: return QaStatus.QA_PASSED return QaStatus.QA_NOT_REVIEWED def validate_tag_hierarchy( level1_tag_id, level2_tag_id, level3_tag_id, level4_tag_id, idx ): if level4_tag_id is not None: level4_tag = Fst.query.get(level4_tag_id) if not level4_tag: return ( jsonify( { "code": 400, "message": f"记录{idx}四级标签不存在(ID: {level4_tag_id})", } ), 400, ) if not level3_tag_id: return ( jsonify( {"code": 400, "message": f"记录{idx}四级标签存在时,三级标签不能为空"} ), 400, ) if level4_tag.parent_id != level3_tag_id: return ( jsonify( { "code": 400, "message": f"记录{idx}四级标签的父级ID({level4_tag.parent_id})与三级标签ID({level3_tag_id})不匹配", } ), 400, ) if level3_tag_id is not None: level3_tag = Fst.query.get(level3_tag_id) if not level3_tag: return ( jsonify( { "code": 400, "message": f"记录{idx}三级标签不存在(ID: {level3_tag_id})", } ), 400, ) if not level2_tag_id: return ( jsonify( {"code": 400, "message": f"记录{idx}三级标签存在时,二级标签不能为空"} ), 400, ) if level3_tag.parent_id != level2_tag_id: return ( jsonify( { "code": 400, "message": f"记录{idx}三级标签的父级ID({level3_tag.parent_id})与二级标签ID({level2_tag_id})不匹配", } ), 400, ) if level2_tag_id is not None: level2_tag = Fst.query.get(level2_tag_id) if not level2_tag: return ( jsonify( { "code": 400, "message": f"记录{idx}二级标签不存在(ID: {level2_tag_id})", } ), 400, ) if not level1_tag_id: return ( jsonify( {"code": 400, "message": f"记录{idx}二级标签存在时,一级标签不能为空"} ), 400, ) if level2_tag.parent_id != level1_tag_id: return ( jsonify( { "code": 400, "message": f"记录{idx}二级标签的父级ID({level2_tag.parent_id})与一级标签ID({level1_tag_id})不匹配", } ), 400, ) if level1_tag_id is not None: level1_tag = Fst.query.get(level1_tag_id) if not level1_tag: return ( jsonify( { "code": 400, "message": f"记录{idx}一级标签不存在(ID: {level1_tag_id})", } ), 400, ) return None def pick_primary(processed_records): primary_key = params.get("primary_key") if primary_key: for item in processed_records: if item["record"].get("key") == primary_key: return item return processed_records[0] processed = [] submitted_event_ids = set() for idx, rec in enumerate(records): alltags = rec["alltags"] if not isinstance(alltags, list): return ( jsonify( {"code": 400, "message": f"记录{idx}的 alltags 必须为数组"} ), 400, ) ( level1_tag_id, level2_tag_id, level3_tag_id, level4_tag_id, ) = extract_tag_ids(alltags) error = validate_tag_hierarchy( level1_tag_id, level2_tag_id, level3_tag_id, level4_tag_id, idx ) if error: return error try: front_starttime = datetime.strptime(rec["fstime"], "%Y-%m-%d %H:%M:%S") front_endtime = datetime.strptime(rec["fetime"], "%Y-%m-%d %H:%M:%S") except ValueError as e: return ( jsonify( {"code": 400, "message": f"记录{idx}时间格式错误: {str(e)}"} ), 400, ) casetype = rec["casetype"] bag_status = rec["bag_status"] status = rec.get("status", "RULE_BASED_ACCEPTED") source = rec.get("source", "RULE") comment1 = rec.get("comment1", "") comment2 = rec.get("comment2", "") comment3 = rec.get("comment3", "") front_start_sec = rec.get("front_start_sec") front_end_sec = rec.get("front_end_sec") qa_status, qa_error = normalize_qa_status(rec.get("qa_status"), idx) if qa_error: return qa_error high_speed = rec.get("high_speed") urban = rec.get("urban") parking = rec.get("parking") request_type = rec.get("request_type") event_id = normalize_event_id(rec.get("event_id")) if isinstance(event_id, int): submitted_event_ids.add(event_id) if event_id is not None: existing_event = TaggingEvents.query.get(event_id) if not existing_event: return ( jsonify( { "code": 404, "message": f"记录{idx}未找到 event_id={event_id} 的标签事件", } ), 404, ) if existing_event.is_deleted: return ( jsonify( { "code": 404, "message": f"记录{idx}的 event_id={event_id} 已被删除", } ), 404, ) if str(existing_event.bag_id) != str(bag_id): return ( jsonify( { "code": 400, "message": f"记录{idx}的 event_id={event_id} 不属于 bag_id={bag_id}", } ), 400, ) existing_event.level1_tag_id = level1_tag_id existing_event.level2_tag_id = level2_tag_id existing_event.level3_tag_id = level3_tag_id existing_event.level4_tag_id = level4_tag_id existing_event.source = source existing_event.reviewer_id = current_user existing_event.ts_event = datetime.now() existing_event.case_type = casetype existing_event.front_starttime = front_starttime existing_event.front_endtime = front_endtime existing_event.front_start_sec = front_start_sec existing_event.front_end_sec = front_end_sec existing_event.high_speed = high_speed if high_speed else 0 existing_event.urban = urban if urban else 0 existing_event.parking = parking if parking else 0 existing_event.qa_status = qa_status existing_event.note = ( comment1.strip() if isinstance(comment1, str) and comment1.strip() else None ) else: new_event = TaggingEvents( bag_id=bag_id, level1_tag_id=level1_tag_id, level2_tag_id=level2_tag_id, level3_tag_id=level3_tag_id, level4_tag_id=level4_tag_id, source=source, reviewer_id=current_user, ts_event=datetime.now(), case_type=casetype, front_starttime=front_starttime, front_endtime=front_endtime, front_start_sec=front_start_sec, front_end_sec=front_end_sec, high_speed=high_speed if high_speed else 0, urban=urban if urban else 0, parking=parking if parking else 0, qa_status=qa_status, note=comment1.strip() if isinstance(comment1, str) and comment1.strip() else None, ) db.session.add(new_event) processed.append( { "record": rec, "event_id": event_id, "level1_tag_id": level1_tag_id, "level2_tag_id": level2_tag_id, "level3_tag_id": level3_tag_id, "level4_tag_id": level4_tag_id, "front_starttime": front_starttime, "front_endtime": front_endtime, "casetype": casetype, "bag_status": bag_status, "status": status, "comment1": comment1, "comment2": comment2, "comment3": comment3, "front_start_sec": front_start_sec, "front_end_sec": front_end_sec, "qa_status": qa_status, "high_speed": high_speed, "urban": urban, "parking": parking, "request_type": request_type, "source": source, } ) if deleted_event_ids: delete_ids = {eid for eid in deleted_event_ids if eid not in submitted_event_ids} for del_id in delete_ids: delete_event = TaggingEvents.query.get(del_id) if not delete_event: return ( jsonify( { "code": 404, "message": f"未找到 event_id={del_id} 的标签事件", } ), 404, ) if str(delete_event.bag_id) != str(bag_id): return ( jsonify( { "code": 400, "message": f"event_id={del_id} 不属于 bag_id={bag_id}", } ), 400, ) delete_event.is_deleted = True primary = pick_primary(processed) # 3. 更新BAG记录(以主记录为准) bagfile.front_starttime = primary["front_starttime"] bagfile.front_endtime = primary["front_endtime"] bagfile.case_type = primary["casetype"] bagfile.bag_status = primary["bag_status"] bagfile.status = primary["status"] bagfile.level1_tag_id = primary["level1_tag_id"] bagfile.level2_tag_id = primary["level2_tag_id"] bagfile.level3_tag_id = primary["level3_tag_id"] bagfile.level4_tag_id = primary["level4_tag_id"] bagfile.comment1 = primary["comment1"] bagfile.comment2 = primary["comment2"] bagfile.comment3 = primary["comment3"] bagfile.front_start_sec = primary["front_start_sec"] bagfile.front_end_sec = primary["front_end_sec"] bagfile.high_speed = primary["high_speed"] if primary["high_speed"] else 0 bagfile.urban = primary["urban"] if primary["urban"] else 0 bagfile.parking = primary["parking"] if primary["parking"] else 0 db.session.flush() active_events = TaggingEvents.query.filter_by( bag_id=bag_id, is_deleted=False ).all() bagfile.qa_status = aggregate_qa_status([ev.qa_status for ev in active_events]) if primary["request_type"] == "annotation": bagfile.bag_update_time = datetime.now() bagfile.user_id = current_user if primary["request_type"] == "qa": bagfile.qa_confirm_time = datetime.now() bagfile.qa_id = current_user db.session.commit() return ( jsonify( { "code": 200, "message": "批量标签更新成功", "data": { "bag_id": bag_id, "level1_tag_id": primary["level1_tag_id"], "level2_tag_id": primary["level2_tag_id"], "level3_tag_id": primary["level3_tag_id"], "level4_tag_id": primary["level4_tag_id"], "user_id": current_user, "source": primary["source"], "operation": "created", "event_count": len(processed), "qa_status": bagfile.qa_status.value if bagfile.qa_status else None, }, } ), 200, ) except Exception as e: db.session.rollback() return jsonify({"code": 500, "message": f"服务器错误: {str(e)}"}), 500 # 查询detail信息 @data_factory_bp.route("/bag-detail", methods=["POST"]) @jwt_required() def bag_detail(): """ 参数:[xxxx.bag,xxx.bag] """ url = "http://10.0.240.4:5232/api/bags/pangu/detail" data = request.json res = requests.post(url=url, json=data["bagname"]) # res='123' return jsonify({"data": res.json(), "code": 200, "message": "成功"}) return jsonify({"data": 123, "code": 200, "message": "成功"}) @data_factory_bp.route("/bag-joined-detail", methods=["POST"]) @jwt_required() def bag_joined_detail(): """ 请求体示例: { "bagname": "xxx.bag", "before": 5, # 或字符串 "5" "after": 10 # 或字符串 "10" } """ try: payload = request.get_json(silent=True) or {} bagname = payload.get("bagname") before = payload.get("before") after = payload.get("after") # 1) 基础校验 if not bagname or before is None or after is None: return api_response(400, "缺少参数:bagname/before/after") # 2) 类型校验/转换(常见为整数秒) try: before = int(before) after = int(after) except (TypeError, ValueError): return api_response(400, "参数类型错误:before/after 必须为整数") # 3) 上游地址可配置化 base_url = "http://10.0.240.4:5232" #base_url = "http://127.0.0.1:5232" url = f"{base_url}/api/bags/joined" # 4) 请求上游(使用 params 自动编码,设置合理超时) res = requests.get( url, params={"bag_name": bagname, "before": before, "after": after}, timeout=6, ) # 5) 处理上游响应(JSON/非JSON都兜底) content_type = res.headers.get("content-type", "") body = res.json() if "application/json" in content_type.lower() else res.text # print(res.url) print(body) if not res.ok: # 将上游错误透传为 502,并带上上游状态码与内容,便于排查 return ( jsonify( { "data": body, "code": res.status_code, "message": "上游服务返回非 2xx", } ), 502, ) print(f"body:{body} type={type(body)}") names = body # 6) 用这些字符串当作 file_name 去 BagFile 查 # 保持原顺序,可以先一次性查出所有,再按 names 顺序组装 bagfiles = ( BagFile.query .filter(BagFile.file_name.in_(names)) .all() ) # 根据 file_name 建索引,方便按原顺序组装 bag_by_name = {b.file_name: b for b in bagfiles} # 7) 组装返回 list(按上游 names 顺序) result_list = [] for name in names: bag = bag_by_name.get(name) if bag: result_list.append(serialize_bagfile(bag)) # 如果某个名字在本地不存在: # 直接根据文件名拼出视频 url else: datetime_full, video_urls = extract_datetime_from_filename(name) result_list.append( { "file_name": name, "exists": False, "video_url": video_urls.get("wide"), "video_urls": video_urls, "capture_datetime": datetime_full, }) return jsonify( { "data": {"list": result_list}, "code": 200, "message": "成功", } ), 200 except requests.Timeout: return api_response(504, "上游接口超时") except requests.RequestException as e: return api_response(502, f"上游请求失败:{str(e)}") except Exception as e: return api_response(500, f"服务器错误: {str(e)}") @data_factory_bp.route("/mergebags", methods=["POST"]) @jwt_required() def mergebags(): """ 请求体示例: { "bag_name": [ "A.bag", "B.bag" ] } 逻辑: 1. 校验 bag_name 为非空数组 2. 调用上游 /api/bags/joined/create,参数:{ "bag_name": [ "A.bag", "B.bag" ] } 3. 上游返回形如: { "joined_id": 0, "joined_name": "string" } 4. 只有在 joined_name 有值时认为合并成功,本接口返回成功 """ try: payload = request.get_json(silent=True) or {} bag_names = payload.get("bag_name") initiator_src_name = payload.get("initiator_src_name") or bag_names[0] # 1) 参数校验 if not isinstance(bag_names, list) or not bag_names: return api_response(400, "参数错误:bag_name 必须为非空数组") # initiator_src_name 必须非空 if not initiator_src_name: return api_response(400, "参数错误:initiator_src_name 不能为空") # 2) 上游地址 base_url = "http://10.0.240.4:5232" # 如果你本地调试要走本机,可以改成下面这行(现在先注释) #base_url = "http://127.0.0.1:5232" url = f"{base_url}/api/bags/joined/create" # 3) 请求上游 upstream_resp = requests.post( url, json={"bag_names": bag_names}, timeout=10, ) content_type = upstream_resp.headers.get("content-type", "") if "application/json" in content_type.lower(): try: body = upstream_resp.json() except Exception: body = None else: # 尝试把文本解析成 JSON;失败就保留原始文本用于报错 raw_text = upstream_resp.text try: body = json.loads(raw_text) except Exception: body = None # 4) 上游非 2xx 的情况 if not upstream_resp.ok: return ( jsonify( { "code": upstream_resp.status_code, "message": "上游服务返回非 2xx", "data": body, } ), 502, ) # 5) 上游格式校验 if not isinstance(body, dict): return api_response(502, "上游返回格式错误,期望为 JSON 对象") joined_id = body.get("joined_id") joined_name = body.get("joined_name") # 6) 只有 joined_name 有值才算成功 if not joined_name: return ( jsonify( { "code": 500, "message": "mergebags 失败:上游返回的 joined_name 为空或缺失", "data": body, } ), 500, ) # 6) 把 merge 关系写入 bag_merge_record current_user = get_jwt_identity() try: for src in bag_names: record = BagMergeRecord( joined_name=joined_name, src_name=src, is_initiator=(src == initiator_src_name), created_by=current_user, ) db.session.add(record) db.session.commit() except Exception as db_e: db.session.rollback() # 上游已成功,但本地记录失败,这里明确提示 return api_response(500, f"mergebags 上游成功,本地写入映射表失败:{str(db_e)}") # 7) 返回成功 return ( jsonify( { "code": 200, "message": "mergebags 成功", "data": { "joined_id": joined_id, "joined_name": joined_name, "bag_name": bag_names, # 把原请求也回显一下,前端更好用 }, } ), 200, ) except requests.Timeout: return api_response(504, "上游接口超时") except requests.RequestException as e: return api_response(502, f"上游请求失败:{str(e)}") except Exception as e: return api_response(500, f"服务器错误: {str(e)}") @data_factory_bp.route("/restore-mergebags", methods=["POST"]) @jwt_required() def restore_mergebags(): """ 请求体示例: { "bag_name": [ "A.bag", "B.bag" ] } 逻辑: 1. 校验 bag_name 为非空数组 2. 调用上游 /api/bags/joined/delete,参数: { "bag_name": [...] } 3. 上游返回形如: { "deleted_parents": [ "A_C_merged" ] } 4. 只有在 deleted_parents 有值时认为“解除合并”成功, 然后本地删除 bag_merge_record 中对应的映射记录 """ try: payload = request.get_json(silent=True) or {} bag_names = payload.get("bag_name") # 1) 参数校验 if not isinstance(bag_names, list) or not bag_names: return api_response(400, "参数错误:bag_name 必须为非空数组") # 2) 上游地址 base_url = "http://10.0.240.4:5232" # 如果你本地调试要走本机,可以改成下面这行(现在先注释) #base_url = "http://127.0.0.1:5232" url = f"{base_url}/api/bags/joined/delete" # 3) 请求上游 upstream_resp = requests.post( url, json={"bag_names": bag_names}, timeout=10, ) content_type = upstream_resp.headers.get("content-type", "") if "application/json" in content_type.lower(): try: body = upstream_resp.json() except Exception: body = None else: # 尝试把文本解析成 JSON;失败就保留原始文本用于报错 raw_text = upstream_resp.text try: body = json.loads(raw_text) except Exception: body = None # 4) 上游非 2xx 的情况 if not upstream_resp.ok: return ( jsonify( { "code": upstream_resp.status_code, "message": "上游服务返回非 2xx", "data": body, } ), 502, ) # 5) 上游格式校验 if not isinstance(body, dict): return api_response(502, "上游返回格式错误,期望为 JSON 对象") # 上游返回 { "deleted_parents": [...] } deleted_parents = body.get("deleted_parents") # 6) 只有 deleted_parents 为非空列表才算解除成功 if not isinstance(deleted_parents, list) or not deleted_parents: return ( jsonify( { "code": 500, "message": "restore-mergebags 失败:上游返回的 deleted_parents 为空或缺失", "data": body, } ), 500, ) # 6.5) 先根据 deleted_parents 找到 is_initiator 那条记录, # 再查出对应的 BagFile,并用 serialize_bagfile 序列化 initiator_bagfile_data = None try: initiator_record = ( BagMergeRecord.query .filter( BagMergeRecord.joined_name.in_(deleted_parents), BagMergeRecord.is_initiator.is_(True), ) .first() ) if initiator_record: initiator_bag = ( BagFile.query .filter(BagFile.file_name == initiator_record.src_name) .first() ) if initiator_bag: initiator_bagfile_data = serialize_bagfile(initiator_bag) except Exception as db_e: # 不影响主流程,只是无法返回 initiator_bagfile current_app.logger.error( "查询 is_initiator BagFile 失败:%s", str(db_e) ) # 7) # 删除本地映射关系 try: deleted_rows = ( BagMergeRecord.query .filter(BagMergeRecord.joined_name.in_(deleted_parents)) .delete(synchronize_session=False) ) db.session.commit() except Exception as db_e: db.session.rollback() return api_response( 500, f"restore-mergebags 上游已解除合并,本地删除映射记录失败:{str(db_e)}", ) # 无论 deleted_rows 是 0 还是 >0,都算成功(幂等删除) return ( jsonify( { "code": 200, "message": "restore-mergebags 成功", "data": { "deleted_parents": deleted_parents, "bag_name": bag_names, "deleted_rows": deleted_rows, "initiator_bagfile": initiator_bagfile_data, }, } ), 200, ) except requests.Timeout: return api_response(504, "上游接口超时") except requests.RequestException as e: return api_response(502, f"上游请求失败:{str(e)}") except Exception as e: return api_response(500, f"服务器错误: {str(e)}") @data_factory_bp.route("/bag-total", methods=["GET"]) @jwt_required() def bag_total(): try: # 统计总记录数 total_count = BagFile.query.count() # 统计bag_status为0的记录数 zero_count = BagFile.query.filter_by(bag_status=0).count() # 构建响应数据 result = { "success": True, "data": { "total_count": total_count, "zero_count": zero_count, "non_zero_count": total_count - zero_count, # 额外提供非零数量作为参考 }, "message": "统计成功", } return jsonify(result), 200 except Exception as e: # 错误处理 return ( jsonify({"success": False, "data": None, "message": f"统计失败: {str(e)}"}), 500, ) def get_weeks_from_date_select(value): """解析 weekN,返回周数,无效则返回默认 3 周""" if not value or not re.match(r"^week\d+$", value): return 3 try: return int(value.replace("week", "")) except ValueError: return 3 def format_week(week_num): """格式化周显示:2025年第31周""" return f"2025年第{week_num}周" def get_dynamic_week_ranges(weeks_count): """ 动态生成过去N周的范围(自动适应时间变化) 返回:[(周数, 周一00:00, 周日23:59:59), ...],按时间顺序(远→近)排列 """ today = datetime.now().date() # 1. 计算“最近已完整结束的周”的周日(周结束日) # 例:今天是8.4(周一),最近完整周的周日是8.3 days_since_sunday = ( today.weekday() + 1 ) % 7 # 0=周一,6=周日;计算距离上周日的天数 last_complete_sunday = today - timedelta(days=days_since_sunday) # 2. 生成过去N周的范围(从最近完整周往前推) week_ranges = [] for i in range(weeks_count): # 计算当前周的周日(周结束日):最近完整周 - i周 current_sunday = last_complete_sunday - timedelta(weeks=i) # 计算当前周的周一(周起始日):周日 - 6天 current_monday = current_sunday - timedelta(days=6) # 计算当前周的周数(ISO周数,周一为周首) # isocalendar()返回 (年, 周数, 星期几),星期几1=周一,7=周日 week_num = current_monday.isocalendar()[1] # 转换为 datetime 类型(含时间) week_start = datetime.combine(current_monday, datetime.min.time()) # 周一00:00 week_end = datetime.combine( current_sunday, datetime.max.time().replace(microsecond=0) ) # 周日23:59:59 week_ranges.append((week_num, week_start, week_end)) # 3. 反转列表,按时间顺序排列(最早的周在前,最近的周在后) week_ranges.reverse() return week_ranges @data_factory_bp.route("/echarts", methods=["POST"]) def bag_statistics(): try: data = request.get_json() if not data: return jsonify({"success": False, "message": "请求体不能为空"}), 400 tag_value = data.get("tagValue", "") date_select_value = data.get("dateSelectValue", "").strip() # 基础条件:目标状态 target_statuses = [ BagStatus.REVIEWED_BUT_INVALID, BagStatus.MANUAL_OVERRIDE_ACCEPTED, BagStatus.PROCESSED_NOT_REVIEWED, ] base_conditions = [BagFile.status.in_(target_statuses)] # 标签筛选 if tag_value: tag_id = int(tag_value) fst_tag = Fst.query.filter_by(id=tag_id, level=1).first() if not fst_tag: return jsonify({"success": False, "message": "无效的一级标签ID"}), 400 base_conditions.append(BagFile.level1_tag_id == tag_id) weeks_count = get_weeks_from_date_select(date_select_value) # 1. 动态生成过去N周的范围(自动适应时间变化) week_ranges = get_dynamic_week_ranges(weeks_count) print( f"DEBUG: 动态生成的周范围: {[(w[0], w[1].date(), w[2].date()) for w in week_ranges]}" ) # 2. 初始化结果字典 result_dict = { format_week(week_num): {s.value: 0 for s in target_statuses} for week_num, _, _ in week_ranges } # 3. 逐个周查询数据(按动态生成的日期范围) for week_num, week_start, week_end in week_ranges: conditions = base_conditions + [ BagFile.bag_update_time >= week_start, BagFile.bag_update_time <= week_end, ] week_stats = ( db.session.query(BagFile.status, func.count(BagFile.id).label("count")) .filter(*conditions) .group_by(BagFile.status) .all() ) # 填充数据 formatted_week = format_week(week_num) for status, count in week_stats: result_dict[formatted_week][status.value] = count print( f"DEBUG: 第{week_num}周({week_start.date()}至{week_end.date()})数据:{status.value}={count}" ) # 4. 按时间顺序返回(最近的周在最后) sorted_result = { format_week(week_num): result_dict[format_week(week_num)] for week_num, _, _ in week_ranges } total_data_points = sum(sum(d.values()) for d in sorted_result.values()) return jsonify( { "success": True, "data": { "type": "weeks", "weeks": weeks_count, "statistics": sorted_result, }, "message": ( "查询成功" if total_data_points > 0 else f"过去 {weeks_count} 周内无匹配数据" ), } ) except Exception as e: import traceback print(f"CRITICAL ERROR: {str(e)}") traceback.print_exc() return jsonify({"success": False, "message": f"服务器内部错误: {str(e)}"}), 500 @data_factory_bp.route("/updatedinfo", methods=["GET"]) @jwt_required() def bag_updatedinfo(): # 根据用户的id获取到 try: # 获取请求ID bag_id = request.args.get("bag_id") if request.args.get("bag_id") else None # 直接查询主表数据(不指定预加载) bag_file = BagFile.query.get(bag_id) if not bag_file: return ( jsonify({"success": False, "message": f"未找到id为{bag_id}的记录"}), 404, ) # ========= 新增:查询 merge 关系 ========== merge_info = None # { joined_name, src_bag_names } extra_video_sources = [] # 序列化后的 src 列表(按 merge 保存顺序) # 看当前 bag 的 file_name 是否作为 src_name 出现在合并表 rels_for_this = BagMergeRecord.query.filter_by( src_name=bag_file.file_name ).all() if rels_for_this: # 通常同一个 src_name 对应同一个 joined_name,取第一条即可 joined_name = rels_for_this[0].joined_name # 查出该 joined_name 下所有 src all_rels = BagMergeRecord.query.filter_by(joined_name=joined_name).all() src_bag_names = [r.src_name for r in all_rels] # 一次性查出这些 bag,再按 src_bag_names 顺序组装 bagfiles = ( BagFile.query .filter(BagFile.file_name.in_(src_bag_names)) .all() ) bag_by_name = {b.file_name: b for b in bagfiles} result_list = [] for name in src_bag_names: bag = bag_by_name.get(name) if bag: result_list.append(serialize_bagfile(bag)) else: # 本地没有记录,兜底拼 video_url datetime_full, video_urls = extract_datetime_from_filename(name) result_list.append( { "file_name": name, "exists": False, "video_url": video_urls.get("wide"), "video_urls": video_urls, "capture_datetime": datetime_full, } ) merge_info = { "joined_name": joined_name, "src_bag_names": src_bag_names, } extra_video_sources = result_list # if bag_file.bag_status == 2: # return jsonify({ # "success": False, # "message": f"该记录已经核验" # }), 404 # 序列化日期 def fmt_dt(dt): return dt.isoformat() if dt else None # 构建返回数据(访问关联属性时会触发惰性加载) tag_events = [ serialize_tagging_event(e) for e in TaggingEvents.query.filter_by(bag_id=bag_id, is_deleted=False) .order_by(TaggingEvents.ts_event.asc()) .all() ] if not tag_events: tag_events = [build_default_tag_event(bag_file)] video_urls = build_video_urls(bag_file.file_name) return jsonify( { "success": True, "data": { # 基础信息 "id": bag_file.id, "file_name": bag_file.file_name, "file_path": bag_file.file_path, "capture_datetime": fmt_dt(bag_file.capture_datetime), "vehicle_status": bag_file.vehicle_status, "user_id": bag_file.user_id, "create_time": fmt_dt(bag_file.bag_update_time), "update_time": fmt_dt(bag_file.update_time), "car_state": bag_file.car_state, "frame_url": bag_file.frame_url, "video_url": video_urls.get("wide"), "video_urls": video_urls, "batch_name": bag_file.batch_name, "status": bag_file.status.value if bag_file.status else None, "last_time": fmt_dt(bag_file.last_time), "last_rule_version_id": bag_file.last_rule_version_id, "front_starttime": fmt_dt(bag_file.front_starttime), "front_endtime": fmt_dt(bag_file.front_endtime), "case_type": bag_file.case_type, "comment1": bag_file.comment1, "comment2": bag_file.comment2, "comment3": bag_file.comment3, "bag_status": bag_file.bag_status, "front_start_sec": bag_file.front_start_sec, "front_end_sec": bag_file.front_end_sec, "high_speed": bag_file.high_speed, "urban": bag_file.urban, "parking": bag_file.parking, "sync_status": bag_file.sync_status.value, # merge 相关信息 "joined_name": merge_info["joined_name"] if merge_info else None, "extraVideoSources": extra_video_sources, "merge_info": merge_info, "tag_events": tag_events, # 关联的FST标签信息(触发惰性加载) "level1_tag": ( { "id": bag_file.level1_tag.id, "name": bag_file.level1_tag.name, "name_cn": bag_file.level1_tag.name_cn, "annotation": bag_file.level1_tag.annotation, } if bag_file.level1_tag else None ), "level2_tag": ( { "id": bag_file.level2_tag.id, "name": bag_file.level2_tag.name, "name_cn": bag_file.level2_tag.name_cn, "annotation": bag_file.level2_tag.annotation, } if bag_file.level2_tag else None ), "level3_tag": ( { "id": bag_file.level3_tag.id, "name": bag_file.level3_tag.name, "name_cn": bag_file.level3_tag.name_cn, "annotation": bag_file.level3_tag.annotation, } if bag_file.level3_tag else None ), "level4_tag": ( { "id": bag_file.level4_tag.id, "name": bag_file.level4_tag.name, "name_cn": bag_file.level4_tag.name_cn, "level": bag_file.level4_tag.level, "annotation": bag_file.level4_tag.annotation, } if bag_file.level4_tag else None ), }, "message": f"成功获取id为{bag_id}的记录", } ) except Exception as e: return jsonify({"success": False, "message": f"服务器错误:{str(e)}"}), 500 @data_factory_bp.route("/getretestbaglist", methods=["POST"]) @jwt_required() def get_retest_baglist(): try: # 1. 解析请求参数(与原代码一致) params = request.get_json() or {} print(f"【请求参数】原始参数: {params}") page = params.get("page", 1) per_page = params.get("per_page", 20) file_name = params.get("file_name", "").strip() start_datetime = params.get("start_datetime", "").strip() end_datetime = params.get("end_datetime", "").strip() level1_tag = params.get("level1_tag", "") status_str = params.get("status", "").strip() user_id = params.get("user_id", "") qa_status_str = params.get("qa_status", "").strip() qa_id = params.get("qa_id", "") # 2. 创建表别名(新增QA用户表别名) FstLevel1 = aliased(Fst) FstLevel2 = aliased(Fst) FstLevel3 = aliased(Fst) FstLevel4 = aliased(Fst) QaUser = aliased(User) # 用于区分创建者和QA操作人 M = aliased(BagMergeRecord) # 新增:merge 表别名 # === 3. 构建基础查询(注意这里用 db.session.query + outerjoin M)=== logical_name = func.coalesce(M.joined_name, BagFile.file_name) # 3. 构建查询(新增qa_id与QaUser的关联) query = ( db.session.query(BagFile, logical_name.label("logical_name")) .outerjoin(User, BagFile.user_id == User.id) .outerjoin(QaUser, BagFile.qa_id == QaUser.id) .outerjoin(FstLevel1, BagFile.level1_tag_id == FstLevel1.id) .outerjoin(FstLevel2, BagFile.level2_tag_id == FstLevel2.id) .outerjoin(FstLevel3, BagFile.level3_tag_id == FstLevel3.id) .outerjoin(FstLevel4, BagFile.level4_tag_id == FstLevel4.id) .outerjoin(M, M.src_name == BagFile.file_name) # 关键:把 M 左连接进来 ) # 4. 动态添加过滤条件(与原代码一致) conditions = [] conditions.append(BagFile.bag_status >= 1) conditions.append(BagFile.sync_status == SyncStatus.SYNC_NOT_READY) if user_id: try: user_id_int = int(user_id) conditions.append(BagFile.user_id == user_id_int) except ValueError: print(f"【过滤条件】user_id格式错误(非整数): {user_id}") if qa_id: try: qa_id_int = int(qa_id) conditions.append(BagFile.qa_id == qa_id_int) # 关联BagFile的qa_id字段 except ValueError: print(f"【过滤条件】qa_id格式错误(非整数): {qa_id}") #if file_name: # conditions.append(BagFile.file_name.like(f"%{file_name}%")) if start_datetime: try: start_dt = datetime.fromisoformat(start_datetime) conditions.append(BagFile.bag_update_time >= start_dt) except ValueError: print(f"【过滤条件】开始时间格式错误: {start_datetime}") if end_datetime: try: end_dt = datetime.fromisoformat(end_datetime) conditions.append(BagFile.bag_update_time <= end_dt) except ValueError: print(f"【过滤条件】结束时间格式错误: {end_datetime}") if level1_tag: try: level1_id = int(level1_tag) conditions.append(BagFile.level1_tag_id == level1_id) except ValueError: print(f"【过滤条件】level1_tag转换失败(非整数): {level1_tag}") if status_str: try: status_enum = BagStatus[status_str] conditions.append(BagFile.status == status_enum) except KeyError: print(f"【过滤条件】无效状态值: {status_str}") if qa_status_str: try: qa_status_enum = QaStatus[qa_status_str] conditions.append(BagFile.qa_status == qa_status_enum) except KeyError: print(f"【过滤条件】无效qa_status值: {qa_status_str}") # === 新增:只保留 is_initiator / 未合并 的记录 === conditions.append( or_( M.is_initiator == True, # 被 merge 的主 bag M.id.is_(None), # 完全没 merge 过的 bag ) ) if conditions: query = query.filter(and_(*conditions)) if file_name: query= query.filter(logical_name.like(f"%{file_name}%")) # 5. 排序与分页(与原代码一致) # query = query.order_by(BagFile.bag_update_time.desc()) # 使用case语句给QA_NOT_REVIEWED的记录赋予更高优先级(0比1小,排序时靠前) priority_case = case( ( BagFile.qa_status == QaStatus.QA_NOT_REVIEWED, 0, ), # 直接传递元组作为位置参数 else_=1, ) # 组合排序条件:先按优先级升序(0在前),再按update_time降序 query = query.order_by( priority_case.asc(), # 第一排序:QA_NOT_REVIEWED优先 BagFile.update_time.desc(), # 第二排序:最新更新在前 ) pagination = query.paginate(page=page, per_page=per_page, error_out=False) total_items = pagination.total total_pages = pagination.pages bag_files = [] for bag ,logical_name_value in pagination.items: level1_name = bag.level1_tag.name if bag.level1_tag else None level2_name = bag.level2_tag.name if bag.level2_tag else None level3_name = bag.level3_tag.name if bag.level3_tag else None level4_name = bag.level4_tag.name if bag.level4_tag else None qa_username = ( bag.qa_user.username if (bag.qa_user and bag.qa_user.username) else "" ) bag_files.append( { "id": bag.id, "file_name": logical_name_value, "capture_datetime": ( bag.capture_datetime.isoformat() if bag.capture_datetime else None ), "status": bag.status.value if bag.status else None, "create_time": ( bag.create_time.isoformat() if bag.create_time else None ), "level1_tag": level1_name, "level2_tag": level2_name, "level3_tag": level3_name, "level4_tag": level4_name, "bag_status": bag.bag_status, "user_id": bag.user_id, "username": ( bag.user.username if (bag.user and bag.user.username) else "未知用户" ), # 创建者用户名 "update_time": ( bag.bag_update_time.isoformat() if bag.bag_update_time else None ), "qa_status": ( bag.qa_status.value if bag.qa_status else None ), # 补充None判断 "front_start_sec": bag.front_start_sec, "front_end_sec": bag.front_end_sec, "qa_user_id": bag.qa_id, # QA操作人ID "qa_username": qa_username, # 修正后的QA用户名 "comment1": bag.comment1, "comment2": bag.comment2, "comment3": bag.comment3, "qa_time": ( bag.qa_confirm_time.isoformat() if bag.qa_confirm_time else None ), "high_speed": bag.high_speed, "urban": bag.urban, "parking": bag.parking, "sync_status": bag.sync_status.value, } ) # 7. 返回响应(与原代码一致) return jsonify( { "code": 200, "data": bag_files, "total": total_items, "page": page, "pages": total_pages, "message": "查询成功", } ) except Exception as e: print(f"【接口异常】: {str(e)}") return ( jsonify( { "code": 500, "data": [], "total": 0, "page": page if "page" in locals() else 1, "pages": 0, "message": f"查询出错:{str(e)}", } ), 500, ) @data_factory_bp.route("/bag-total-tobe-checked", methods=["GET"]) @jwt_required() def bag_total_tobe_checked(): try: M = aliased(BagMergeRecord) q = ( db.session.query(func.count(BagFile.id)) .outerjoin(M, M.src_name == BagFile.file_name) .filter( BagFile.status == BagStatus.PROCESSED_NOT_REVIEWED, BagFile.bag_status == 0, or_( M.is_initiator == True, M.id.is_(None), ), ) ) total = q.scalar() # 构造响应 response = { "code": 200, "message": "查询成功", "data": { "total_tobe_checked": total, # 待审核的总数 "status": "PROCESSED_NOT_REVIEWED", # 明确查询的状态值 "query_time": datetime.now().isoformat(), # 查询时间戳 }, } return jsonify(response) except Exception as e: # 异常处理 return ( jsonify( { "code": 500, "message": f"查询失败:{str(e)}", "data": {"total_tobe_checked": 0}, } ), 500, ) @data_factory_bp.route("/exportretestbaglist", methods=["POST"]) @jwt_required() def export_retest_baglist(): try: # 1. 解析请求参数(与原查询接口完全一致) params = request.get_json() or {} print(f"【导出请求参数】原始参数: {params}") # 导出无需分页,忽略page和per_page,仅保留筛选条件 file_name = params.get("file_name", "").strip() start_datetime = params.get("start_datetime", "").strip() end_datetime = params.get("end_datetime", "").strip() level1_tag = params.get("level1_tag", "") status_str = params.get("status", "").strip() user_id = params.get("user_id", "") qa_status_str = params.get("qa_status", "").strip() qa_status_enum = None # 2. 创建表别名(与原接口一致) FstLevel1 = aliased(Fst) FstLevel2 = aliased(Fst) FstLevel3 = aliased(Fst) FstLevel4 = aliased(Fst) QaUser = aliased(User) M = aliased(BagMergeRecord) # 新增:merge 表别名 # === 3. 构建基础查询(注意这里用 db.session.query + outerjoin M)=== logical_name = func.coalesce(M.joined_name, BagFile.file_name) # 3. 构建查询(与原接口一致,但不分页) query = ( db.session.query(BagFile, logical_name.label("logical_name")) .outerjoin(User, BagFile.user_id == User.id) .outerjoin(QaUser, BagFile.qa_id == QaUser.id) .outerjoin(FstLevel1, BagFile.level1_tag_id == FstLevel1.id) .outerjoin(FstLevel2, BagFile.level2_tag_id == FstLevel2.id) .outerjoin(FstLevel3, BagFile.level3_tag_id == FstLevel3.id) .outerjoin(FstLevel4, BagFile.level4_tag_id == FstLevel4.id) .outerjoin(M, M.src_name == BagFile.file_name) # 关键:把 M 左连接进来 ) # 4. 动态添加过滤条件(与原接口完全一致) conditions = [] conditions.append(BagFile.bag_status >= 1) conditions.append(BagFile.sync_status == SyncStatus.SYNC_NOT_READY) if user_id: try: user_id_int = int(user_id) conditions.append(BagFile.user_id == user_id_int) except ValueError: print(f"【过滤条件】user_id格式错误: {user_id}") #if file_name: # conditions.append(BagFile.file_name.like(f"%{file_name}%")) if start_datetime: try: start_dt = datetime.fromisoformat(start_datetime) conditions.append(BagFile.capture_datetime >= start_dt) except ValueError: print(f"【过滤条件】开始时间格式错误: {start_datetime}") if end_datetime: try: end_dt = datetime.fromisoformat(end_datetime) conditions.append(BagFile.capture_datetime <= end_dt) except ValueError: print(f"【过滤条件】结束时间格式错误: {end_datetime}") if level1_tag: try: level1_id = int(level1_tag) conditions.append(BagFile.level1_tag_id == level1_id) except ValueError: print(f"【过滤条件】level1_tag转换失败: {level1_tag}") if status_str: try: status_enum = BagStatus[status_str] conditions.append(BagFile.status == status_enum) except KeyError: print(f"【过滤条件】无效状态值: {status_str}") if qa_status_str: try: qa_status_enum = QaStatus[qa_status_str] conditions.append(BagFile.qa_status == qa_status_enum) except KeyError: print(f"【过滤条件】无效qa_status值: {qa_status_str}") # === 新增:只保留 is_initiator / 未合并 的记录 === conditions.append( or_( M.is_initiator == True, # 被 merge 的主 bag M.id.is_(None), # 完全没 merge 过的 bag ) ) if conditions: query = query.filter(and_(*conditions)) if file_name: query= query.filter(logical_name.like(f"%{file_name}%")) # 5. 排序(保留排序逻辑,与原接口一致) query = query.order_by(BagFile.bag_update_time.desc()) # 6. 获取所有数据(不分页) all_data = query.all() # 直接查询所有符合条件的数据 print(f"【导出数据量】共 {len(all_data)} 条记录") bag_ids = [bag.id for bag, _ in all_data] tag_events_by_bag = defaultdict(list) fst_name_by_id = {} if bag_ids: tag_events_query = TaggingEvents.query.filter( TaggingEvents.bag_id.in_(bag_ids), TaggingEvents.is_deleted == False, TaggingEvents.qa_status.isnot(None), ) if qa_status_enum is not None: tag_events_query = tag_events_query.filter( TaggingEvents.qa_status == qa_status_enum ) tag_events = tag_events_query.order_by(TaggingEvents.ts_event.asc()).all() for ev in tag_events: tag_events_by_bag[ev.bag_id].append(ev) tag_ids = set() for ev in tag_events: for tag_id in ( ev.level1_tag_id, ev.level2_tag_id, ev.level3_tag_id, ev.level4_tag_id, ): if tag_id: tag_ids.add(tag_id) if tag_ids: fst_name_by_id = { tag.id: tag.name for tag in Fst.query.filter(Fst.id.in_(tag_ids)).all() } # 7. 批量查询QA用户信息(优化性能) qa_ids = [bag.qa_id for bag, _ in all_data if bag.qa_id is not None] qa_users = ( {u.id: u for u in User.query.filter(User.id.in_(qa_ids)).all()} if qa_ids else {} ) # 8. 生成CSV内容 output = StringIO() output.write("\ufeff") csv_writer = csv.writer(output) # 8.1 写入CSV表头(与返回JSON的字段对应) csv_writer.writerow( [ "ID", "文件名", "采集时间", "状态", "创建时间", "一级标签", "二级标签", "三级标签", "四级标签", "bag状态", "标注人ID", "标注人用户名", "标注人的更新时间", "质检人的更新时间", "质检状态", "场景开始秒数", "场景结束秒数", "质检人ID", "质检人用户名", "备注1", # 对应comment1字段 "场景类型", "高速", "城区", "parking", ] ) # 8.2 写入数据行 for bag, logical_name_value in all_data: # QA用户名 qa_username = "" if bag.qa_id in qa_users: qa_user = qa_users[bag.qa_id] qa_username = qa_user.username if qa_user.username else "" # 时间格式化 capture_time = ( bag.capture_datetime.isoformat() if bag.capture_datetime else "" ) create_time = bag.create_time.isoformat() if bag.create_time else "" update_time = bag.bag_update_time.isoformat() if bag.bag_update_time else "" qa_time = bag.qa_confirm_time.isoformat() if bag.qa_confirm_time else "" # 枚举值处理 status = bag.status.value if bag.status else "" events = tag_events_by_bag.get(bag.id, []) if events: for ev in events: level1_name = fst_name_by_id.get(ev.level1_tag_id, "") level2_name = fst_name_by_id.get(ev.level2_tag_id, "") level3_name = fst_name_by_id.get(ev.level3_tag_id, "") level4_name = fst_name_by_id.get(ev.level4_tag_id, "") qa_status = ev.qa_status.value if ev.qa_status else "" front_start_sec = ( ev.front_start_sec if ev.front_start_sec is not None else 0 ) front_end_sec = ( ev.front_end_sec if ev.front_end_sec is not None else 0 ) comment = ev.note or "" case_type = ev.case_type high_speed = ev.high_speed if ev.high_speed is not None else 0 urban = ev.urban if ev.urban is not None else 0 parking = ev.parking if ev.parking is not None else 0 csv_writer.writerow( [ bag.id, logical_name_value, capture_time, status, create_time, level1_name, level2_name, level3_name, level4_name, bag.bag_status, bag.user_id, ( bag.user.username if (bag.user and bag.user.username) else "未知用户" ), update_time, qa_time, qa_status, front_start_sec, front_end_sec, bag.qa_id, qa_username, comment, case_type, high_speed, urban, parking, ] ) else: level1_name = bag.level1_tag.name if bag.level1_tag else "" level2_name = bag.level2_tag.name if bag.level2_tag else "" level3_name = bag.level3_tag.name if bag.level3_tag else "" level4_name = bag.level4_tag.name if bag.level4_tag else "" qa_status = bag.qa_status.value if bag.qa_status else "" csv_writer.writerow( [ bag.id, logical_name_value, capture_time, status, create_time, level1_name, level2_name, level3_name, level4_name, bag.bag_status, bag.user_id, ( bag.user.username if (bag.user and bag.user.username) else "未知用户" ), update_time, qa_time, qa_status, bag.front_start_sec if bag.front_start_sec else 0, bag.front_end_sec, bag.qa_id, qa_username, bag.comment1 or "", bag.case_type, bag.high_speed, bag.urban, bag.parking, ] ) # 9. 构建响应(返回CSV文件) output.seek(0) # 重置缓冲区指针 response = make_response(output.getvalue()) # 设置响应头:指定文件名和CSV格式,确保中文正常显示 filename = f"retest_bag_export_{datetime.now().strftime('%Y%m%d%H%M%S')}.csv" response.headers["Content-Disposition"] = f"attachment; filename={filename}" response.headers["Content-type"] = "text/csv; charset=utf-8" response.headers["Cache-Control"] = "no-cache" return response except Exception as e: print(f"【导出异常】: {str(e)}") return jsonify({"code": 500, "message": f"导出失败:{str(e)}"}), 500