from datetime import datetime import json from flask import Blueprint, current_app, request, jsonify from flask_jwt_extended import create_access_token, get_jwt, get_jwt_identity, jwt_required from sqlalchemy import and_ from app import jwt,db from app.models import OperationHistory, Role, User, UserRole from app.utils.password_hasher import hash_password,check_password from app.utils.response_dict import response auth_bp= Blueprint('data', __name__) @auth_bp.route('/login', methods=['POST']) def login(): # 1. 获取登录凭证 data = request.get_json() username = data.get('username') password = data.get('password') print('username',username) print('password',password) # 2. 验证必填字段 # if not username or not password: # response['code']=400 # response['message']="用户名和密码不能为空" # return jsonify(response) # # # 3. 查询用户 # user = User.query.filter_by(username=username).first() # # # 4. 验证用户存在性和密码 # if not user or not check_password(password,user.password): # response['code']=401 # response['message']="用户名或密码错误" # return jsonify({"msg": "用户名或密码错误"}), 401 # # if user.status != 1: # return jsonify({"msg": "用户已禁用"}), 403 # 5. 生成JWT令牌[2,6](@ref) access_token = create_access_token(identity=str(1)) result = db.session.query( User.id, User.username.label('realName'), # 使用 label 实现别名 User.password, User.status, Role.name.label('roles'), Role.description ).join( UserRole, User.id == UserRole.user_id # 第一次 JOIN:用户 ↔ 用户角色 ).join( Role, UserRole.role_id == Role.id # 第二次 JOIN:用户角色 ↔ 角色 ).filter( User.status == 1, # 状态过滤 User.id == 1 # 用户ID过滤 ).all() if not result: return jsonify({"msg": "用户未分配角色或已禁用"}), 403 # dict_list = [row._asdict() for row in result] # dict_list[0]['accessToken']=access_token # dict_list[0]['roles']=[dict_list[0]['roles']] # data={ # "accessToken": access_token, # "user_id": user.id, # "message": "登录成功" # } # response['data']= dict_list[0] # response['error']='' # 6. 返回响应 res={"accessToken":access_token} data={} data['data']=res return jsonify(res) @auth_bp.route('/userinfo', methods=['GET']) @jwt_required() # 要求有效Token[1,5](@ref) def get_user_info(): userid = get_jwt_identity() """ 获取用户信息 """ result = db.session.query( User.id, User.username.label('realName'), # 使用 label 实现别名 User.password, User.status, Role.name.label('roles'), Role.description ).join( UserRole, User.id == UserRole.user_id # 第一次 JOIN:用户 ↔ 用户角色 ).join( Role, UserRole.role_id == Role.id # 第二次 JOIN:用户角色 ↔ 角色 ).filter( User.status == 1, # 状态过滤 User.id == userid # 用户ID过滤 ).all() dict_list = [row._asdict() for row in result] if not dict_list: return jsonify({"msg": "用户未分配角色或已禁用"}), 403 role_list = [row.get('roles') for row in dict_list if row.get('roles')] if not role_list: return jsonify({"msg": "用户未分配角色或已禁用"}), 403 dict_list[0]['roles'] = role_list data={"roles": dict_list[0]['roles'],"realName": dict_list[0]['realName'],"homePath": '/datamanage/datalabel'} return jsonify(data) @auth_bp.route('/codes', methods=['GET']) @jwt_required() # 要求有效Token[1,5](@ref) def get_code(): response['code']=200 response['data']=["AC_100100","AC_100110"] return jsonify(response) @auth_bp.route('/refresh', methods=['POST']) @jwt_required() # 要求有效Token[1,5](@ref) def refresh(): # data: string; # status: number; token = create_access_token(identity='service_account') data={} data['data']=token data['status']=1 return jsonify(data) # ===== 受保护路由示例 ===== @auth_bp.route('/protected', methods=['GET']) @jwt_required() # 要求有效Token[1,5](@ref) def protected(): # 从Token获取用户身份 current_user_id = get_jwt_identity() user = User.query.get(current_user_id) return jsonify({ "message": f"欢迎回来, {user.username}!", "user_id": user.id }), 200 @jwt.unauthorized_loader def missing_token_callback(error): """缺少Token时的自定义响应""" return jsonify({ 'status': 401, 'msg': '缺少访问令牌' }), 401 blacklist = set() @auth_bp.route('/createuser', methods=['POST']) @jwt_required() # 需要有效Token才能创建用户 def create_user(): ''' { "username": "new_user", "password": "securePass123", "role_ids": [1, 3] // 要分配的角色ID列表 } ''' data = request.get_json() print(data) if not data or 'username' not in data or 'password' not in data: return jsonify({"error": "缺少用户名或密码"}), 400 # 1. 验证用户名唯一性 existing_user = User.query.filter_by(username=data['username']).first() if existing_user: return jsonify({"error": "用户名已存在"}), 409 # 2. 获取当前操作者 current_user_id = get_jwt_identity() print('current_user_id',current_user_id) current_user = User.query.get(int(current_user_id)) if not current_user: return jsonify({"error": "无效的操作者"}), 401 # 3. 密码复杂度验证 password = data['password'] if len(password) < 8 or not any(c.isdigit() for c in password): return jsonify({"error": "密码需至少8位且包含数字"}), 400 # 4. 验证角色(新增部分) role_ids = data.get('role_ids', []) valid_roles = [] if role_ids: valid_roles = Role.query.filter(Role.id.in_(role_ids)).all() if len(valid_roles) != len(role_ids): return jsonify({"error": "包含无效的角色ID"}), 400 try: # 5. 创建新用户 new_user = User( username=data['username'], password=hash_password(data['password']), status=data.get('status', 1), created_by=current_user_id ) db.session.add(new_user) db.session.flush() # 获取新用户ID但不提交事务 # 6. 分配角色(新增核心功能) for role in valid_roles: user_role = UserRole( user_id=new_user.id, role_id=role.id, created_by=current_user_id ) db.session.add(user_role) db.session.commit() # 7. 构造响应数据(包含角色信息) response_data = { "id": new_user.id, "username": new_user.username, "created_at": new_user.created_at.isoformat(), "created_by": new_user.created_by, "roles": [{"id": r.id, "name": r.name} for r in valid_roles] # 新增角色信息 } # 8. 记录操作日志(新增角色信息) current_app.logger.info( f"用户创建成功: {new_user.username} (ID:{new_user.id}) " f"操作者: {current_user.username} (ID:{current_user_id}) " f"分配角色: {[r.name for r in valid_roles]}" # 记录分配的角色 ) return jsonify(response_data), 201 except Exception as e: db.session.rollback() current_app.logger.error(f"用户创建失败: {str(e)}") return jsonify({"error": "服务器内部错误"}), 500 @auth_bp.route('/updateuser', methods=['POST']) @jwt_required() # 需要有效Token才能访问 def update_user(): ''' 请求体格式: { "user_id": 123, // 必需:目标用户ID "username": "updated_user", // 可选:新用户名 "password": "newSecurePass456", // 可选:新密码 "status": 1 // 可选:用户状态(1-启用,0-禁用) } ''' data = request.get_json() if not data or 'user_id' not in data: return jsonify({"error": "请求数据不能为空且必须包含user_id"}), 400 # 1. 验证操作者身份 current_user_id = get_jwt_identity() current_user = User.query.get(int(current_user_id)) if not current_user: return jsonify({"error": "无效的操作者"}), 401 # 2. 验证目标用户是否存在 target_user_id = data['user_id'] target_user = User.query.get(int(target_user_id)) if not target_user: return jsonify({"error": "目标用户不存在"}), 404 # 3. 权限校验:仅允许管理员修改(核心修改点) # 检查当前用户是否为管理员(假设管理员角色ID为1) is_admin = UserRole.query.filter_by( # 注意:这里修正为UserRole表(用户-角色关联表) user_id=current_user_id, role_id=1 ).first() if not is_admin: # 只有管理员能通过校验 return jsonify({"error": "仅管理员有权限修改用户信息"}), 403 role_ids = data.get('role_ids', None) valid_roles = [] if role_ids is not None: if not isinstance(role_ids, list): return jsonify({"error": "role_ids必须为数组"}), 400 if role_ids: valid_roles = Role.query.filter(Role.id.in_(role_ids)).all() if len(valid_roles) != len(role_ids): return jsonify({"error": "包含无效的角色ID"}), 400 try: # 4. 处理用户名修改(如提供) if 'username' in data and data['username']: # 验证用户名唯一性(排除自身) existing_user = User.query.filter( User.username == data['username'], User.id != target_user_id ).first() if existing_user: return jsonify({"error": "用户名已存在"}), 409 target_user.username = data['username'] # 5. 处理密码修改(如提供) if 'password' in data and data['password']: password = data['password'] # 密码复杂度验证(与创建用户保持一致) if len(password) < 8 or not any(c.isdigit() for c in password): return jsonify({"error": "密码需至少8位且包含数字"}), 400 target_user.password = hash_password(password) # 复用哈希函数 # 6. 处理状态修改(如提供) if 'status' in data: target_user.status = data['status'] # 管理员无需额外校验 # 6.1 处理角色修改(如提供) if role_ids is not None: UserRole.query.filter_by(user_id=target_user_id).delete() for role in valid_roles: user_role = UserRole( user_id=target_user_id, role_id=role.id, created_by=current_user_id, ) db.session.add(user_role) # 7. 提交修改 target_user.updated_at = datetime.now() # 假设模型有updated_at字段 target_user.updated_by = current_user_id # 假设模型有updated_by字段 db.session.commit() # 8. 构造响应数据 response_data = { "id": target_user.id, "username": target_user.username, "status": target_user.status, "updated_at": target_user.updated_at.isoformat(), "updated_by": current_user.username } if role_ids is not None: response_data["roles"] = [{"id": r.id, "name": r.name} for r in valid_roles] # 9. 记录操作日志 current_app.logger.info( f"用户信息更新成功: {target_user.username} (ID:{target_user.id}) " f"操作者: {current_user.username} (ID:{current_user_id}) " f"修改内容: {[k for k in data.keys() if k != 'user_id']}" ) return jsonify(response_data), 200 except Exception as e: db.session.rollback() current_app.logger.error(f"用户信息更新失败: {str(e)}") return jsonify({"error": "服务器内部错误"}), 500 @auth_bp.route('/logout', methods=['POST']) @jwt_required() def log_out(): jti = get_jwt()["jti"] # 获取 Token 的唯一标识 blacklist.add(jti) # 将 Token 添加到黑名单 return jsonify({"msg": "Successfully logged out"}), 200 @auth_bp.route('/getuserlist', methods=['GET']) @jwt_required() def get_users(): result = db.session.query( User.id, User.username, User.status, User.created_at, User.created_by, User.updated_at, UserRole.role_id, Role.description, Role.name ).join( UserRole, User.id == UserRole.user_id ).join( Role, UserRole.role_id == Role.id ).all() dict_list = [row._asdict() for row in result] return jsonify(dict_list), 200 @auth_bp.route('/getroles', methods=['GET']) @jwt_required() def get_roles(): roles = Role.query.filter(Role.status == 1).all() data = [{"id": r.id, "name": r.name, "description": r.description} for r in roles] return jsonify(data), 200 @auth_bp.route('/deleteuser', methods=['GET']) @jwt_required() def delete_user(): user_id=request.args.get('userid') # 1. 查询用户(确保状态之前为 1) user = User.query.filter( User.id == user_id, User.status == 1 # 只选择状态为 1 的用户 ).first() if not user: return jsonify({"error": "用户不存在或状态不是 1"}), 400 # 用户不存在或状态不是 1 # 2. 修改状态 user.status = 0 # 3. 提交事务 try: db.session.commit() return jsonify({"msg": "Successfully"}),200 except Exception as e: db.session.rollback() print(f"Error: {e}") return jsonify({"error": f"出现错误{e}"}), 400 @auth_bp.route('/test', methods=['GET']) @jwt_required() def test(): pass @auth_bp.route('/gethisttory', methods=['POST']) @jwt_required() def query_operation_history(): """ { "username": "admin", // 可选,精确匹配用户名 "api_path": "/api/user/create", // 可选,精确匹配接口路径 "start_time": "2025-07-01T00:00:00", // 可选,开始时间(ISO 8601格式) "end_time": "2025-07-10T23:59:59", // 可选,结束时间(ISO 8601格式) "request_keyword": "email", // 可选,在请求参数中模糊搜索关键词 "page": 2, // 可选,页码(默认1) "per_page": 15 // 可选,每页条数(默认20) } """ """查询操作历史记录(支持多条件过滤)""" # 接收JSON格式的查询参数 query_params = request.get_json() # 构建基础查询 query = OperationHistory.query # 1. 用户名过滤(精确匹配) if query_params['username']: query = query.filter(OperationHistory.username == query_params['username']) # 2. 接口路径过滤(精确匹配) if query_params['api_path']: query = query.filter(OperationHistory.api_path == query_params['api_path']) # 3. 操作时间范围过滤 if query_params['start_time'] and query_params['end_time']: # 解析时间参数(支持ISO 8601格式) start_time = datetime.fromisoformat(query_params['start_time']) if 'start_time' in query_params else None end_time = datetime.fromisoformat(query_params['end_time']) if 'end_time' in query_params else None # 构建时间范围查询条件 time_conditions = [] if start_time: time_conditions.append(OperationHistory.operation_time >= start_time) if end_time: time_conditions.append(OperationHistory.operation_time <= end_time) if time_conditions: query = query.filter(and_(*time_conditions)) # 4. 请求参数关键词过滤(模糊匹配) if query_params['request_keyword']: keyword = f"%{query_params['request_keyword']}%" query = query.filter( OperationHistory.request_params.like(keyword) ) # 5. 分页参数处理 page = query_params.get('page', 1) per_page = query_params.get('per_page', 20) # 执行分页查询(使用索引优化) pagination = query.order_by(OperationHistory.operation_time.desc()) \ .paginate(page=page, per_page=per_page, error_out=False) # 序列化结果 results = [] for record in pagination.items: results.append({ 'id': record.id, 'username': record.username, 'api_path': record.api_path, 'http_method': record.http_method, 'operation_time': record.operation_time.isoformat(), 'response_code': record.response_code, 'ip_address': record.ip_address, 'operation_result': record.operation_result, 'error_message': record.error_message, 'trace_id': record.trace_id, # 请求参数反序列化(JSON字符串→Python对象) 'request_params': json.loads(record.request_params) if record.request_params else None }) # 返回分页结果 return jsonify({ 'total': pagination.total, 'pages': pagination.pages, 'current_page': pagination.page, 'per_page': pagination.per_page, 'results': results })