Files
2026-04-22 13:35:40 +08:00

753 lines
28 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from datetime import datetime
from flask import Blueprint, current_app, request, jsonify
from flask_jwt_extended import (
create_access_token,
get_jwt,
get_jwt_identity,
jwt_required,
)
from sqlalchemy import and_, asc, desc, exists, or_,func
from app.models import (
BagFile,
BagStatus,
Fst,
QaStatus,
Role,
SyncStatus,
TaggingEvents,
User,
UserRole,
BagMergeRecord,
)
from app.utils.fst_tree import build_tree
from app import db
from app.utils.log_record import log_operation
from requests.exceptions import RequestException
import requests
from sqlalchemy.orm import aliased, joinedload
check_data = Blueprint("display", __name__)
def serialize_tagging_event(event: TaggingEvents) -> dict:
def safe_name(tag):
return getattr(tag, "name", None) if tag else None
level1 = safe_name(event.level1_tag)
level2 = safe_name(event.level2_tag)
level3 = safe_name(event.level3_tag)
level4 = safe_name(event.level4_tag)
tag_path = " / ".join([x for x in [level1, level2, level3, level4] if x])
return {
"event_id": event.event_id,
"bag_id": event.bag_id,
"tag_path": tag_path,
"level1_tag_id": event.level1_tag_id,
"level1_tag_name": level1,
"level2_tag_id": event.level2_tag_id,
"level2_tag_name": level2,
"level3_tag_id": event.level3_tag_id,
"level3_tag_name": level3,
"level4_tag_id": event.level4_tag_id,
"level4_tag_name": level4,
"qa_status": event.qa_status.value if event.qa_status else None,
"case_type": event.case_type,
"front_starttime": event.front_starttime.isoformat()
if event.front_starttime
else None,
"front_endtime": event.front_endtime.isoformat() if event.front_endtime else None,
"front_start_sec": event.front_start_sec,
"front_end_sec": event.front_end_sec,
"high_speed": event.high_speed,
"urban": event.urban,
"parking": event.parking,
"note": event.note,
"source": event.source.value if event.source else None,
"reviewer_id": event.reviewer_id,
"ts_event": event.ts_event.isoformat() if event.ts_event else None,
}
@check_data.route("/bagtotal", methods=["GET"])
@jwt_required()
def bag_total():
"""查询BagFile表记录总数"""
# 执行SELECT count(*) FROM bag_file
total_count = BagFile.query.count()
return jsonify({"total": total_count}), 200
@check_data.route("/noprocess-bagtotal", methods=["GET"])
@jwt_required()
def no_process_bag_total():
count = BagFile.query.filter(BagFile.operation_status == 0).count()
return jsonify({"total": count}), 200
@check_data.route("/insert-qa-status", methods=["POST"])
@jwt_required()
def insert_qa_status():
try:
# 获取请求数据
data = request.get_json()
current_user = get_jwt_identity()
# 验证必要参数bag_id、qa_status为必填评论为可选
required_fields = ["bag_id", "qa_status"]
if not data or not all(field in data for field in required_fields):
return (
jsonify(
{
"success": False,
"message": f"缺少必要参数,需要: {required_fields}",
}
),
400,
)
# 验证qa_status是否为有效值
try:
qa_status = QaStatus(data["qa_status"])
except ValueError:
valid_statuses = [status.value for status in QaStatus]
return (
jsonify(
{
"success": False,
"message": f"无效的qa_status值有效值为: {valid_statuses}",
}
),
400,
)
# 查询对应的BagFile记录
bag_file = BagFile.query.get(data["bag_id"])
if not bag_file:
return (
jsonify(
{
"success": False,
"message": f'未找到ID为{data["bag_id"]}的BagFile记录',
}
),
404,
)
# 仅汇总 TaggingEvents不覆盖每条事件的 qa_status
tag_events = TaggingEvents.query.filter_by(
bag_id=bag_file.id, is_deleted=False
).all()
def aggregate_qa_status(events):
has_not_reviewed = False
has_invalid = False
has_modify = False
has_passed = False
for ev in events:
status = ev.qa_status or QaStatus.QA_NOT_REVIEWED
if status == QaStatus.QA_NOT_REVIEWED:
has_not_reviewed = True
elif status == QaStatus.QA_INVALID:
has_invalid = True
elif status == QaStatus.QA_MODIFY:
has_modify = True
elif status == QaStatus.QA_PASSED:
has_passed = True
if has_not_reviewed:
return QaStatus.QA_NOT_REVIEWED
if has_invalid:
return QaStatus.QA_INVALID
if has_modify:
return QaStatus.QA_MODIFY
if has_passed:
return QaStatus.QA_PASSED
return QaStatus.QA_NOT_REVIEWED
# 更新字段(包含评论字段)
if tag_events:
bag_file.qa_status = aggregate_qa_status(tag_events)
bag_file.qa_confirm_time = datetime.now()
bag_file.qa_id = current_user
if "bag_status" in data:
bag_file.bag_status = data.get("bag_status")
# 处理评论字段使用get方法避免参数不存在时报错默认空字符串
bag_file.comment1 = data.get("comment1") if data.get("comment1") else None
bag_file.comment2 = data.get("comment2") if data.get("comment2") else None
bag_file.comment3 = data.get("comment3") if data.get("comment3") else None
# 提交到数据库
db.session.commit()
final_qa_status = bag_file.qa_status.value if bag_file.qa_status else None
return jsonify(
{
"success": True,
"message": f'BagFile ID {data["bag_id"]} 的qa_status已更新为 {final_qa_status}',
"data": {
"bag_id": data["bag_id"],
"qa_status": final_qa_status,
"bag_status": data.get("bag_status"),
"comments": {
"comment1": bag_file.comment1,
"comment2": bag_file.comment2,
"comment3": bag_file.comment3,
},
},
}
)
except Exception as e:
# 发生错误时回滚
db.session.rollback()
return jsonify({"success": False, "message": f"更新失败: {str(e)}"}), 500
@check_data.route("/get-finish-list", methods=["POST"])
@jwt_required()
def get_processed():
try:
# 1. 解析请求参数新增sync_status参数
params = request.get_json() or {}
print(f"【请求参数】原始参数: {params}")
page = params.get("page", 1)
per_page = params.get("per_page", 20)
file_name = params.get("file_name", "").strip()
start_datetime = params.get("start_datetime", "").strip()
end_datetime = params.get("end_datetime", "").strip()
level1_tag = params.get("level1_tag", "")
status_str = params.get("status", "").strip()
user_id = params.get("user_id", "")
# sync_status_str = params.get('sync_status', '').strip() # 新增:同步状态参数
# 2. merge 表别名
M = aliased(BagMergeRecord)
# 3. 构建基础查询:
# 选出 BagFile + 逻辑文件名 logical_name = coalesce(joined_name, file_name)
logical_name = func.coalesce(M.joined_name, BagFile.file_name)
# 2. 构建基础查询关联User表用于查询username
query = (
db.session.query(
BagFile,
logical_name.label("logical_name"),
)
.outerjoin(User, BagFile.user_id == User.id)
.outerjoin(M, M.src_name == BagFile.file_name)
.options(
joinedload(BagFile.level1_tag),
joinedload(BagFile.level2_tag),
joinedload(BagFile.level3_tag),
joinedload(BagFile.level4_tag),
)
)
# 3. 动态添加过滤条件
conditions = []
# 固定返回bag_status>=1的数据
conditions.append(BagFile.bag_status >= 1)
conditions.append(BagFile.sync_status == "SYNCED")
# 只保留 merge 主 bag 或未 merge 的 bag
conditions.append(
or_(
M.is_initiator == True, # 参与 merge 的主 bag
M.id.is_(None), # 完全没 merge 过的 bagouter join 不命中M.* 为 NULL
)
)
# 根据user_id过滤
if user_id:
try:
user_id_int = int(user_id)
conditions.append(BagFile.user_id == user_id_int)
except ValueError:
print(f"【过滤条件】user_id格式错误非整数: {user_id}")
# 新增根据sync_status过滤
# if sync_status_str:
# try:
# sync_status_enum = SyncStatus[sync_status_str] # 假设SyncStatus是定义的枚举类
# conditions.append(BagFile.sync_status == sync_status_enum)
# except KeyError:
# print(f"【过滤条件】无效的sync_status值: {sync_status_str}")
# 文件名模糊查询
if file_name:
conditions.append(BagFile.file_name.like(f"%{file_name}%"))
print(f"【过滤条件】添加文件名模糊查询: {file_name}")
# 时间范围查询
if start_datetime:
try:
start_dt = datetime.fromisoformat(start_datetime)
conditions.append(BagFile.capture_datetime >= start_dt)
except ValueError:
print(f"【过滤条件】开始时间格式错误: {start_datetime}")
if end_datetime:
try:
end_dt = datetime.fromisoformat(end_datetime)
conditions.append(BagFile.capture_datetime <= end_dt)
except ValueError:
print(f"【过滤条件】结束时间格式错误: {end_datetime}")
# 一级标签查询
if level1_tag:
try:
level1_id = int(level1_tag)
conditions.append(BagFile.level1_tag_id == level1_id)
except ValueError:
print(f"【过滤条件】level1_tag转换失败非整数: {level1_tag}")
# 状态查询
if status_str:
try:
status_enum = BagStatus[status_str]
conditions.append(BagFile.status == status_enum)
except KeyError:
print(f"【过滤条件】无效状态值: {status_str}")
# 应用所有条件
query = query.filter(and_(*conditions))
# 4. 排序按update_time降序核心调整
query = query.order_by(BagFile.qa_confirm_time.desc())
# 5. 分页查询
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
total_items = pagination.total
total_pages = pagination.pages
# 6. 序列化新增sync_status字段
bag_files = []
bag_ids = [bag.id for bag, _ in pagination.items]
tag_events_by_bag = {}
if bag_ids:
tag_events = (
TaggingEvents.query.options(
joinedload(TaggingEvents.level1_tag),
joinedload(TaggingEvents.level2_tag),
joinedload(TaggingEvents.level3_tag),
joinedload(TaggingEvents.level4_tag),
)
.filter(
TaggingEvents.bag_id.in_(bag_ids),
TaggingEvents.is_deleted == False,
)
.order_by(TaggingEvents.bag_id.asc(), TaggingEvents.ts_event.asc())
.all()
)
for ev in tag_events:
tag_events_by_bag.setdefault(ev.bag_id, []).append(
serialize_tagging_event(ev)
)
for bag ,logical_name_value in pagination.items:
qa_username = (
bag.qa_user.username if (bag.qa_user and bag.qa_user.username) else ""
)
bag_files.append(
{
"id": bag.id,
"file_name": logical_name_value,
"capture_datetime": (
bag.capture_datetime.isoformat()
if bag.capture_datetime
else None
),
"status": bag.status.value if bag.status else None,
"create_time": (
bag.create_time.isoformat() if bag.create_time else None
),
"update_time": (
bag.qa_confirm_time.isoformat() if bag.qa_confirm_time else None
), # 排序字段显示
"level1_tag": bag.level1_tag.name if bag.level1_tag else None,
"level2_tag": bag.level2_tag.name if bag.level2_tag else None,
"level3_tag": bag.level3_tag.name if bag.level3_tag else None,
"level4_tag": bag.level4_tag.name if bag.level4_tag else None,
"bag_status": bag.bag_status,
"user_id": bag.user_id,
"username": (
bag.user.username if (bag.user and bag.user.username) else ""
),
"qa_status": bag.qa_status.value,
"sync_status": (
bag.sync_status.value if bag.sync_status else None
), # 新增:同步状态
"qa_user_id": bag.qa_id, # 可选返回QA操作人ID
"qa_username": qa_username,
"comment1": bag.comment1,
"front_start_sec": bag.front_start_sec,
"front_end_sec": bag.front_end_sec,
"high_speed": bag.high_speed,
"urban": bag.urban,
"parking": bag.parking,
"tag_events": tag_events_by_bag.get(bag.id, []),
}
)
# 7. 返回响应
return jsonify(
{
"code": 200,
"data": bag_files,
"total": total_items,
"page": page,
"pages": total_pages,
"message": "查询成功包含sync_status过滤及按update_time排序",
}
)
except Exception as e:
print(f"【接口异常】: {str(e)}")
return (
jsonify(
{
"code": 500,
"data": [],
"total": 0,
"page": page if "page" in locals() else 1,
"pages": 0,
"message": f"查询出错:{str(e)}",
}
),
500,
)
@check_data.route("/insert-db-ids", methods=["POST"])
@jwt_required()
def insert_db_ids():
try:
# 获取请求体中的ID列表
id_list = request.get_json()
# 1. 验证参数格式
if not isinstance(id_list, list):
return (
jsonify(
{
"success": False,
"message": "参数格式错误,需提供数组格式,如[1,2,3]",
}
),
400,
)
# 检查列表中是否全为整数
if not all(isinstance(id_val, int) for id_val in id_list):
return jsonify({"success": False, "message": "数组元素必须为整数ID"}), 400
# 检查列表是否为空
if len(id_list) == 0:
return jsonify({"success": False, "message": "ID列表不能为空"}), 400
# 2. 执行批量更新
current_time = datetime.now()
# 使用SQLAlchemy的批量更新
updated_rows = BagFile.query.filter(BagFile.id.in_(id_list)).update(
{BagFile.sync_status: "SYNCED", BagFile.qa_confirm_time: current_time},
synchronize_session=False,
)
# 提交事务
db.session.commit()
# 3. 返回结果
return (
jsonify(
{
"success": True,
"message": f"成功更新{updated_rows}条记录",
"updated_count": updated_rows,
"updated_ids": id_list,
}
),
200,
)
except Exception as e:
# 发生错误时回滚事务
db.session.rollback()
return jsonify({"success": False, "message": f"更新失败:{str(e)}"}), 500
@check_data.route("/insert-rootdb", methods=["POST"])
@jwt_required()
def insert_rootdb():
API_URL = "http://10.0.240.4:5232/api/fst/bags/update"
TIMEOUT = 10
result = {"success_count": 0, "fail_count": 0, "fail_details": []}
try:
param_list = request.get_json()
# print(f"【insert_rootdb】请求参数: {param_list}")
if not param_list or not isinstance(param_list, list):
return jsonify({"success": False, "message": "请求参数必须为非空数组"}), 400
bag_ids = [
item.get("rowid")
for item in param_list
if isinstance(item, dict) and "rowid" in item
]
bag_files = BagFile.query.filter(BagFile.id.in_(bag_ids)).all() if bag_ids else []
bag_by_id = {bag.id: bag for bag in bag_files}
merge_name_by_src = {}
if bag_files:
src_names = [bag.file_name for bag in bag_files if bag.file_name]
if src_names:
merge_records = (
BagMergeRecord.query.filter(BagMergeRecord.src_name.in_(src_names))
.filter(BagMergeRecord.is_initiator == True)
.all()
)
for record in merge_records:
merge_name_by_src[record.src_name] = record.joined_name
tag_events = (
TaggingEvents.query.options(
joinedload(TaggingEvents.level1_tag),
joinedload(TaggingEvents.level2_tag),
joinedload(TaggingEvents.level3_tag),
joinedload(TaggingEvents.level4_tag),
)
.filter(
TaggingEvents.bag_id.in_(bag_ids),
TaggingEvents.is_deleted == False,
)
.order_by(TaggingEvents.bag_id.asc(), TaggingEvents.ts_event.asc())
.all()
if bag_ids
else []
)
tag_events_by_bag = {}
for ev in tag_events:
tag_events_by_bag.setdefault(ev.bag_id, []).append(ev)
def _tags_from_flags(high_speed, urban, parking):
tags = []
if high_speed == 1:
tags.append("highway")
if urban == 1:
tags.append("city")
if parking == 0:
tags.append("driving")
return tags
def _nodes_and_level(level1, level2, level3, level4):
nodes = [level1 or "", level2 or "", level3 or "", level4 or ""]
first_empty_index = next(
(idx for idx, node in enumerate(nodes) if node == ""), -1
)
if first_empty_index == -1:
fst_node_level = len(nodes) - 1
else:
fst_node_level = first_empty_index - 1
return nodes, fst_node_level
def _build_payload(
bag_name,
level1,
level2,
level3,
level4,
start_sec,
end_sec,
comments,
high_speed,
urban,
parking,
):
nodes, fst_node_level = _nodes_and_level(level1, level2, level3, level4)
return {
"bag_name": bag_name,
"nodes": nodes,
"start_time": start_sec if start_sec is not None else 0,
"end_time": end_sec if end_sec is not None else 0,
"comments": comments or "",
"tags": _tags_from_flags(high_speed, urban, parking),
"fst_node_level": fst_node_level,
}
def _safe_name(tag):
return tag.name if tag else ""
def _payload_from_event(event, bag_name):
return _build_payload(
bag_name,
_safe_name(event.level1_tag),
_safe_name(event.level2_tag),
_safe_name(event.level3_tag),
_safe_name(event.level4_tag),
event.front_start_sec,
event.front_end_sec,
event.note,
event.high_speed,
event.urban,
event.parking,
)
def _payload_from_bag(bag, bag_name):
return _build_payload(
bag_name,
_safe_name(bag.level1_tag),
_safe_name(bag.level2_tag),
_safe_name(bag.level3_tag),
_safe_name(bag.level4_tag),
bag.front_start_sec,
bag.front_end_sec,
bag.comment1,
bag.high_speed,
bag.urban,
bag.parking,
)
for idx, item in enumerate(param_list):
bag_failures = []
try:
if not isinstance(item, dict):
raise ValueError("单条数据必须为对象")
if "rowid" not in item:
raise ValueError("参数缺少必要的'rowid'字段")
current_rowid = item["rowid"]
bag_file = bag_by_id.get(current_rowid)
if not bag_file:
raise ValueError("未找到对应ID的本地记录")
raw_params = item.get("params")
current_param = (
item.get("param") if isinstance(item.get("param"), dict) else None
)
bag_name = item.get("bag_name")
if not bag_name and current_param:
bag_name = current_param.get("bag_name")
if not bag_name and isinstance(raw_params, list) and raw_params:
first_param = raw_params[0]
if isinstance(first_param, dict):
bag_name = first_param.get("bag_name")
if not bag_name:
bag_name = (
merge_name_by_src.get(bag_file.file_name)
or bag_file.file_name
)
if not bag_name:
raise ValueError("bag_name为空")
payloads = []
if raw_params is not None:
if not isinstance(raw_params, list) or not raw_params:
raise ValueError("params必须为非空数组")
for param in raw_params:
if not isinstance(param, dict):
raise ValueError("params元素必须为对象")
if not param.get("bag_name"):
param["bag_name"] = bag_name
payloads.append((param, None))
elif current_param:
if not current_param.get("bag_name"):
current_param["bag_name"] = bag_name
payloads.append((current_param, None))
else:
events = tag_events_by_bag.get(current_rowid, [])
if events:
for ev in events:
payloads.append((_payload_from_event(ev, bag_name), ev))
else:
payloads.append((_payload_from_bag(bag_file, bag_name), None))
except Exception as e:
bag_failures.append(
{
"index": idx,
"rowid": item.get("rowid") if isinstance(item, dict) else None,
"message": f"处理失败:{str(e)}",
"error_type": type(e).__name__,
}
)
result["fail_count"] += 1
result["fail_details"].extend(bag_failures)
continue
for ev_idx, (payload, ev) in enumerate(payloads):
# print(
# f"【insert_rootdb】发送到RootDB: rowid={current_rowid}, "
# f"event_index={ev_idx}, event_id={ev.event_id if ev else None}, "
# f"payload={payload}"
# )
try:
response = requests.post(
url=API_URL,
json=[payload],
timeout=TIMEOUT,
)
response.raise_for_status()
response_data = response.json()
if not isinstance(response_data, list) or len(response_data) == 0:
raise ValueError("远程API返回格式错误预期非空列表")
if not isinstance(response_data[0], dict) or "success" not in response_data[0]:
raise ValueError("远程API返回缺少'success'字段")
if not response_data[0].get("success"):
bag_failures.append(
{
"index": idx,
"rowid": current_rowid,
"event_index": ev_idx,
"event_id": ev.event_id if ev else None,
"message": "远程API插入失败",
"remote_response": response_data[0],
}
)
except RequestException as e:
bag_failures.append(
{
"index": idx,
"rowid": current_rowid,
"event_index": ev_idx,
"event_id": ev.event_id if ev else None,
"message": f"网络请求失败:{str(e)}",
"error_type": type(e).__name__,
}
)
except Exception as e:
bag_failures.append(
{
"index": idx,
"rowid": current_rowid,
"event_index": ev_idx,
"event_id": ev.event_id if ev else None,
"message": f"处理失败:{str(e)}",
"error_type": type(e).__name__,
}
)
if bag_failures:
result["fail_count"] += 1
result["fail_details"].extend(bag_failures)
continue
bag_file.sync_status = SyncStatus.SYNCED
bag_file.qa_confirm_time = datetime.now()
result["success_count"] += 1
db.session.commit()
return (
jsonify({
"success": True,
"message": f"处理完成,成功{result['success_count']}条,失败{result['fail_count']}",
"result": result,
}),
200,
)
except Exception as e:
db.session.rollback()
return jsonify({"success": False, "message": f"服务器处理错误:{str(e)}"}), 500