# from datetime import datetime,timedelta # from app import db # from app.models import BagRecord # from sqlalchemy.exc import SQLAlchemyError # from sqlalchemy import func # def add(): # new_record = BagRecord( # bag_name="sensor_data_2025.bag", # collection_time=datetime(2025, 5, 1, 12, 1, 1), # bag_state=0, # frames=120, # car_state=1, # address="beijing", # collect_month=5 # ) # db.session.add(new_record) # db.session.commit() # 提交事务 [1,7](@ref) # def create_record(data): # """创建新记录""" # try: # # 转换日期类型字段 # if 'collection_time' in data: # data['collection_time'] = datetime.fromisoformat(data['collection_time']) # if 'parse_time' in data: # data['parse_time'] = datetime.fromisoformat(data['parse_time']) # record = BagRecord(**data) # db.session.add(record) # db.session.commit() # return record # except SQLAlchemyError as e: # db.session.rollback() # raise e # def get_record_by_id(record_id): # """根据ID获取记录""" # return BagRecord.query.get(record_id) # def get_all_records(**filters): # """获取所有记录(支持过滤)""" # query = BagRecord.query # if filters: # query = query.filter_by(**filters) # return query.all() # def get_daily_counts(start_date, end_date): # # 执行 SQL 查询,按日期分组并统计数量 # query = ( # db.session.query( # db.func.date(BagRecord.collection_time).label('date'), # db.func.count(BagRecord.id).label('count') # ) # .filter(BagRecord.collection_time >= start_date) # .filter(BagRecord.collection_time <= end_date + timedelta(days=1)) # 包含结束日期 # .group_by(db.func.date(BagRecord.collection_time)) # .order_by(db.func.date(BagRecord.collection_time)) # ) # print(query.all()) # # 将结果转换为字典列表 # results = [] # for date, count in query.all(): # results.append({ # 'date': str(date), # 将日期对象转换为字符串 # 'count': count # }) # return results