Files

540 lines
20 KiB
Python
Raw Permalink Normal View History

2026-04-22 13:35:40 +08:00
from datetime import datetime
import re
from flask import Blueprint, current_app, json, request, jsonify
from flask_jwt_extended import jwt_required
import requests
from sqlalchemy import and_, func
from app import db
from app.models import BagFile, Fst
from app.utils.fst_tree import build_tree
remote_bp = Blueprint("remote", __name__)
def extract_datetime_from_filename(bag_filename):
"""
从文件名中提取日期和时间
返回包含年月日时分秒的字符串datetime对象以及视频URL
"""
match = re.search(r"_(\d{8})-(\d{6})_", bag_filename)
if not match:
raise ValueError("文件名格式错误,无法提取时间信息")
date_part, time_part = match.groups()
# 提取年月日时分秒各部分
year = date_part[:4]
month = date_part[4:6]
day = date_part[6:8]
hour = time_part[:2]
minute = time_part[2:4]
second = time_part[4:6]
# 年月日时分秒整合到一个变量(字符串格式)
datetime_full = f"{year}-{month}-{day} {hour}:{minute}:{second}"
# 原有视频路径相关逻辑保持不变
bag_filename = bag_filename.replace(".bag", "")
video_final_path = f"{bag_filename}- Wide.mp4"
video_cos = f"momenta/videos/{year}/{month}/{day}/{video_final_path}"
video_url = (
"https://data-miningc01-1318950322.cos.ap-shanghai-adc.myqcloud.com/"
+ video_cos
)
# 返回整合后的完整日期时间字符串、datetime对象和视频URL
return datetime_full, video_url
def get_level1_labels(root_data, target_level1_id, use_label=False):
"""
收集特定一级标签及其所有子标签的id或label为扁平数组
:param root_data: 原始数据字典或JSON字符串
:param target_level1_id: 目标一级标签的id
:param use_label: 若为True则返回label否则返回id默认False
:return: 扁平标签数组如未找到目标则返回空数组
"""
# 确保root_data是字典
if isinstance(root_data, str):
try:
root_data = json.loads(root_data)
except json.JSONDecodeError:
raise ValueError("root_data无法解析为JSON")
if not isinstance(root_data, dict):
raise TypeError("root_data必须是字典或JSON字符串")
result = []
key = "label" if use_label else "id" # 选择使用label还是id
def recursive_collect(node):
"""递归收集节点的标签id或label"""
if not isinstance(node, dict) or key not in node:
return
# 只添加当前节点的标签(不保留其他字段)
result.append(node[key])
# 递归处理子节点
for child in node.get("children", []):
recursive_collect(child)
# 查找目标一级标签
target_node = None
for node in root_data.get("children", []):
if (
isinstance(node, dict)
and node.get("level") == 1
and node.get("id") == target_level1_id
):
target_node = node
break
# 收集目标节点及其所有子标签
if target_node:
recursive_collect(target_node)
return result
@remote_bp.route("/fstmenu", methods=["GET"])
def fstmenu():
# pass
"""
返回值
{
path: string
name: string
title: string
component: string
children?: BackendRoute[]
}
"""
level1_tags = Fst.query.filter(Fst.level == 1).all()
# result = [
# {"id": tag.id, "name": tag.name, "level": tag.level} for tag in level1_tags
# ]
result = [
{
"path": tag.name,
"name": tag.name,
"title": tag.name,
"component": "",
}
for tag in level1_tags
]
return jsonify(result)
@remote_bp.route("/fstmenu1", methods=["GET"])
def fstmenu1():
# pass
"""
返回值
{
path: string
name: string
title: string
component: string
children?: BackendRoute[]
}
"""
API_URL = "http://10.0.240.4:5232/api/fst/print_tree"
TIMEOUT = 10 # 超时时间(秒)
try:
response = requests.get(url=API_URL, timeout=TIMEOUT)
# 3. 验证HTTP响应状态
response.raise_for_status()
level1_results = []
# 遍历最外层的 children 列表level=0 的子节点即 level=1
for node in response.json()[0]["children"]:
# 二次验证 level 是否为 1避免结构异常
if node.get("level") == 1:
level1_results.append(
{
"path": node["id"],
"name": node["label"],
"title": node["label"],
"component": "",
}
)
for node_park in response.json()[1]["children"]:
# 二次验证 level 是否为 1避免结构异常
if node_park.get("level") == 1:
level1_results.append(
{
"path": node_park["id"],
"name": node_park["label"],
"title": node_park["label"],
"component": "",
}
)
return jsonify(level1_results)
except Exception as e:
# 其他未捕获异常
return jsonify({"success": False, "message": f"服务器处理错误:{str(e)}"})
# 根据fst标签返回bag信息----一级标签的所有
@remote_bp.route("/remote-baglist1", methods=["POST"])
def query_bag_file_by_fst():
try:
# 1. 解析核心请求参数(仅保留:分页参数 + Fst 1-4级标签
params = request.get_json() or {}
print(f"【请求参数】原始参数: {params}")
# 分页参数默认第1页每页20条
page = params.get("page", 1)
per_page = params.get("per_page", 20)
# Fst 标签参数1-4级标签字符串格式ID
level1_tag = params.get("level1_tag", "").strip()
level2_tag = params.get("level2_tag", "").strip()
level3_tag = params.get("level3_tag", "").strip()
level4_tag = params.get("level4_tag", "").strip()
# 2. 构建基础查询
query = BagFile.query
# 3. 核心过滤:仅保留 Fst 标签筛选条件
conditions = []
# 一级标签过滤转换为整数ID匹配
if level1_tag:
try:
conditions.append(BagFile.level1_tag_id == int(level1_tag))
except ValueError:
print(f"【过滤警告】一级标签ID格式错误: {level1_tag}")
# 二级标签过滤
if level2_tag:
try:
conditions.append(BagFile.level2_tag_id == int(level2_tag))
except ValueError:
print(f"【过滤警告】二级标签ID格式错误: {level2_tag}")
# 三级标签过滤
if level3_tag:
try:
conditions.append(BagFile.level3_tag_id == int(level3_tag))
except ValueError:
print(f"【过滤警告】三级标签ID格式错误: {level3_tag}")
# 四级标签过滤
if level4_tag:
try:
conditions.append(BagFile.level4_tag_id == int(level4_tag))
except ValueError:
print(f"【过滤警告】四级标签ID格式错误: {level4_tag}")
# 应用标签过滤条件
if conditions:
query = query.filter(and_(*conditions))
# 4. 核心排序:按 bag_update_time 降序(最新更新优先)
query = query.order_by(BagFile.bag_update_time.desc())
# 5. 分页查询(页码超出时返回空列表,不报错)
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
total_items = pagination.total # 总数据条数
total_pages = pagination.pages # 总页数
# 6. 数据序列化(仅保留核心字段,包含 Fst 标签信息)
bag_files = []
for bag in pagination.items:
# 拼接 Fst 标签“中文+英文”名称(提升可读性)
level1_name = f"{bag.level1_tag.name}" if bag.level1_tag else None
level2_name = f"{bag.level2_tag.name}" if bag.level2_tag else None
level3_name = f"{bag.level3_tag.name}" if bag.level3_tag else None
level4_name = f"{bag.level4_tag.name}" if bag.level4_tag else None
bag_files.append(
{
"id": bag.id,
"file_name": bag.file_name, # BAG文件名
"capture_datetime": (
bag.capture_datetime.isoformat()
if bag.capture_datetime
else None
), # 采集时间
"bag_update_time": (
bag.bag_update_time.isoformat() if bag.bag_update_time else None
), # 排序依据字段
# Fst 标签核心信息
"level1_tag_id": bag.level1_tag_id,
"level1_tag_name": level1_name,
"level2_tag_id": bag.level2_tag_id,
"level2_tag_name": level2_name,
"level3_tag_id": bag.level3_tag_id,
"level3_tag_name": level3_name,
"level4_tag_id": bag.level4_tag_id,
"level4_tag_name": level4_name,
# 基础状态字段(按需保留,非核心可删除)
"status": bag.status.name if bag.status else None,
"bag_status": bag.bag_status,
"comment": bag.comment1,
"front_starttime": bag.front_start_sec,
"front_endtime": bag.front_end_sec,
"case_type": "简单场景" if bag.case_type == 1 else "复杂场景",
"highway": "高速" if bag.high_speed == 1 else None,
"city": "城区" if bag.urban == 1 else None,
"driving": "parking" if bag.urban else "driving",
"video_url": bag.video_url,
}
)
# 7. 返回精简响应
return jsonify(
{
"code": 200,
"data": bag_files,
"total": total_items,
"page": page,
"pages": total_pages,
"message": "根据 Fst 标签查询成功",
}
)
except Exception as e:
# 异常捕获与响应
error_msg = f"查询出错:{str(e)}"
print(f"【接口异常】{error_msg}")
return (
jsonify(
{
"code": 500,
"data": [],
"total": 0,
"page": page if "page" in locals() else 1,
"pages": 0,
"message": error_msg,
}
),
500,
)
@remote_bp.route("/remote-baglist", methods=["POST"])
def query_bag_file_by_fst1():
# 1. 解析核心请求参数(仅保留:分页参数 + Fst 1-4级标签
params = request.get_json() or {}
print(f"【请求参数】原始参数: {params}")
# 分页参数默认第1页每页20条
page = params.get("page", 1)
per_page = params.get("per_page", 20)
# Fst 标签参数1-4级标签字符串格式ID
fst_tag_list = params.get("fst_tag", "")
print(f"【请求参数】Fst 标签列表: {fst_tag_list}")
# 获取过滤的参数
filter_fst = params.get("filter_fst", [])
# 把请求到的数据进行处理
API_LEVEL1_URL = "http://10.0.240.4:5232/api/fst/print_tree"
API_NODES_URL = "http://10.0.240.4:5232/api/fst/bags/nodes"
API_OTHER_INFO_URL = "http://10.0.240.4:5232/api/bags/fst/nodes"
TIMEOUT = 10 # 超时时间(秒)
try:
# 如果filter_fst为空说明为一级标签
if len(filter_fst) == 0:
response = requests.get(url=API_LEVEL1_URL, timeout=TIMEOUT)
# print(123,response.json())
# 3. 验证HTTP响应状态
response.raise_for_status()
# 遍历最外层的 children 列表level=0 的子节点即 level=1
driving_tree = response.json()[0]
park_tree = response.json()[1]
# 获取所有的一级标签的子标签,组成一个新的列表
driving_fst_list = get_level1_labels(driving_tree, fst_tag_list)
park_fst_list = get_level1_labels(park_tree, fst_tag_list)
if len(driving_fst_list)>0:
fst_list=driving_fst_list
if len(park_fst_list)>0:
fst_list=park_fst_list
# print(555,fst_list)
filter_fst = fst_list
# 查询所有的标签
query_params = {"page": page, "per_page": per_page} # 页码 # 每页条数
# bag_files = []
if filter_fst:
res_bags = requests.post(
url=API_NODES_URL,
params=query_params, # URL查询参数自动拼接为 ?page=1&per_page=20
json=filter_fst, # JSON请求体自动设置Content-Type头
timeout=TIMEOUT, # 超时时间(秒)
)
# 对查询到结果,循环进行查询
all_bag_list = res_bags.json()["items"]
total_nums = res_bags.json()["total"]
# 获取所有的bag_name组成一个列表
bag_name_list = []
for item in all_bag_list:
bag_name = item.get("bag_name")
if bag_name:
bag_name_list.append(bag_name)
# 查询所有的bagname的列表数据,返回的数据是字典中对应的列表
bag_other_info_list = requests.post(url=API_OTHER_INFO_URL, json=bag_name_list)
# 组装数据
res_list=[]
for row in all_bag_list:
bag_name = row.get("bag_name")
if bag_name:
record={}
record["file_name"]=bag_name
record["level1_tag_name"] = fst_tag_list
record["sub_tag_name"] = row.get('sts')
# 过滤值
bag_other_detail=bag_other_info_list.json()[bag_name]
if len(bag_other_detail)==0:
record["comment"] = ""
record["highway"] = ""
record["city"] = ""
record["driving"] = ""
record["front_starttime"] = ""
record["front_endtime"] = ""
if len(bag_other_detail)==1:
record["comment"] = bag_other_detail[0]["comments"]
record["highway"] = ""
record["city"] = ""
record["driving"] = ""
record["front_starttime"] = bag_other_detail[0]["start"]
record["front_endtime"] = bag_other_detail[0]["end"]
if len(bag_other_detail)>1:
# 循环bag_other_detail的值当row.get('sts')等于某项的name时。
for info in bag_other_detail:
if info['name']==row.get('sts'):
record["comment"] = info['comments']
record["highway"] = ""
record["city"] = ""
record["driving"] = ""
record["front_starttime"] = info['start']
record["front_endtime"] = info['end']
else:
record["comment"] = ""
record["highway"] = ""
record["city"] = ""
record["driving"] = ""
record["front_starttime"] = ""
record["front_endtime"] = ""
datetime_full, video_url = extract_datetime_from_filename(bag_name)
record["video_url"] = video_url
record["capture_datetime"] = datetime_full
res_list.append(record)
# 7. 返回精简响应
return jsonify(
{
"code": 200,
"data": res_list,
"total": total_nums,
"page": page,
"pages": page,
"message": "根据 Fst 标签查询成功",
}
)
else:
return (
jsonify(
{
"code": 500,
"data": [],
"total": 0,
"page": page if "page" in locals() else 1,
"pages": 0,
"message": "标签数据出错",
}
),
500,
)
except Exception as e:
# 异常捕获与响应
error_msg = f"查询出错:{str(e)}"
print(f"【接口异常】{error_msg}")
return (
jsonify(
{
"code": 500,
"data": [],
"total": 0,
"page": page if "page" in locals() else 1,
"pages": 0,
"message": error_msg,
}
),
500,
)
@remote_bp.route("/sub-fst", methods=["GET"])
def get_sub_fst1():
target_label = request.args.get("fst_id")
API_URL = "http://10.0.240.4:5232/api/fst/print_tree"
TIMEOUT = 10 # 超时时间(秒)
try:
response = requests.get(url=API_URL, timeout=TIMEOUT)
# 3. 验证HTTP响应状态
response.raise_for_status()
# 遍历最外层的 children 列表level=0 的子节点即 level=1
result=[]
for node in response.json()[0].get("children", []):
if node.get("level") == 1 and node.get("label") == target_label:
data = node.get("children", [])
if data:
result=data
for node_park in response.json()[1].get("children", []):
if node_park.get("level") == 1 and node_park.get("label") == target_label:
data_park = node_park.get("children", [])
if data_park:
result=data_park
return jsonify({"code": 200, "data": result}) # 返回该节点的children
return (
jsonify({"code": 404, "message": "Fst not found"}),
404,
) # 未找到匹配节点时返回None
except Exception as e:
return jsonify({"success": False, "message": f"服务器处理错误:{str(e)}"})
@remote_bp.route("/all-fst", methods=["GET"])
def get_all_fst1():
type = request.args.get("type")
API_URL = "http://10.0.240.4:5232/api/fst/print_tree"
TIMEOUT = 10 # 超时时间(秒)
try:
response = requests.get(url=API_URL, timeout=TIMEOUT)
# 3. 验证HTTP响应状态
response.raise_for_status()
if type == "driving":
return jsonify({"code": 200, "data": response.json()[0]})
if type == "parking":
return jsonify({"code": 200, "data": response.json()[1]})
except Exception as e:
return jsonify({"success": False, "message": f"服务器处理错误:{str(e)}"})