Files
2026-04-22 13:35:40 +08:00

523 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from datetime import datetime
import json
from flask import Blueprint, current_app, request, jsonify
from flask_jwt_extended import create_access_token, get_jwt, get_jwt_identity, jwt_required
from sqlalchemy import and_
from app import jwt,db
from app.models import OperationHistory, Role, User, UserRole
from app.utils.password_hasher import hash_password,check_password
from app.utils.response_dict import response
auth_bp= Blueprint('data', __name__)
@auth_bp.route('/login', methods=['POST'])
def login():
# 1. 获取登录凭证
data = request.get_json()
username = data.get('username')
password = data.get('password')
print('username',username)
print('password',password)
# 2. 验证必填字段
# if not username or not password:
# response['code']=400
# response['message']="用户名和密码不能为空"
# return jsonify(response)
#
# # 3. 查询用户
# user = User.query.filter_by(username=username).first()
#
# # 4. 验证用户存在性和密码
# if not user or not check_password(password,user.password):
# response['code']=401
# response['message']="用户名或密码错误"
# return jsonify({"msg": "用户名或密码错误"}), 401
#
# if user.status != 1:
# return jsonify({"msg": "用户已禁用"}), 403
# 5. 生成JWT令牌[2,6](@ref)
access_token = create_access_token(identity=str(1))
result = db.session.query(
User.id,
User.username.label('realName'), # 使用 label 实现别名
User.password,
User.status,
Role.name.label('roles'),
Role.description
).join(
UserRole, User.id == UserRole.user_id # 第一次 JOIN用户 ↔ 用户角色
).join(
Role, UserRole.role_id == Role.id # 第二次 JOIN用户角色 ↔ 角色
).filter(
User.status == 1, # 状态过滤
User.id == 1 # 用户ID过滤
).all()
if not result:
return jsonify({"msg": "用户未分配角色或已禁用"}), 403
# dict_list = [row._asdict() for row in result]
# dict_list[0]['accessToken']=access_token
# dict_list[0]['roles']=[dict_list[0]['roles']]
# data={
# "accessToken": access_token,
# "user_id": user.id,
# "message": "登录成功"
# }
# response['data']= dict_list[0]
# response['error']=''
# 6. 返回响应
res={"accessToken":access_token}
data={}
data['data']=res
return jsonify(res)
@auth_bp.route('/userinfo', methods=['GET'])
@jwt_required() # 要求有效Token[1,5](@ref)
def get_user_info():
userid = get_jwt_identity()
"""
获取用户信息
"""
result = db.session.query(
User.id,
User.username.label('realName'), # 使用 label 实现别名
User.password,
User.status,
Role.name.label('roles'),
Role.description
).join(
UserRole, User.id == UserRole.user_id # 第一次 JOIN用户 ↔ 用户角色
).join(
Role, UserRole.role_id == Role.id # 第二次 JOIN用户角色 ↔ 角色
).filter(
User.status == 1, # 状态过滤
User.id == userid # 用户ID过滤
).all()
dict_list = [row._asdict() for row in result]
if not dict_list:
return jsonify({"msg": "用户未分配角色或已禁用"}), 403
role_list = [row.get('roles') for row in dict_list if row.get('roles')]
if not role_list:
return jsonify({"msg": "用户未分配角色或已禁用"}), 403
dict_list[0]['roles'] = role_list
data={"roles": dict_list[0]['roles'],"realName": dict_list[0]['realName'],"homePath": '/datamanage/datalabel'}
return jsonify(data)
@auth_bp.route('/codes', methods=['GET'])
@jwt_required() # 要求有效Token[1,5](@ref)
def get_code():
response['code']=200
response['data']=["AC_100100","AC_100110"]
return jsonify(response)
@auth_bp.route('/refresh', methods=['POST'])
@jwt_required() # 要求有效Token[1,5](@ref)
def refresh():
# data: string;
# status: number;
token = create_access_token(identity='service_account')
data={}
data['data']=token
data['status']=1
return jsonify(data)
# ===== 受保护路由示例 =====
@auth_bp.route('/protected', methods=['GET'])
@jwt_required() # 要求有效Token[1,5](@ref)
def protected():
# 从Token获取用户身份
current_user_id = get_jwt_identity()
user = User.query.get(current_user_id)
return jsonify({
"message": f"欢迎回来, {user.username}!",
"user_id": user.id
}), 200
@jwt.unauthorized_loader
def missing_token_callback(error):
"""缺少Token时的自定义响应"""
return jsonify({
'status': 401,
'msg': '缺少访问令牌'
}), 401
blacklist = set()
@auth_bp.route('/createuser', methods=['POST'])
@jwt_required() # 需要有效Token才能创建用户
def create_user():
'''
{
"username": "new_user",
"password": "securePass123",
"role_ids": [1, 3] // 要分配的角色ID列表
}
'''
data = request.get_json()
print(data)
if not data or 'username' not in data or 'password' not in data:
return jsonify({"error": "缺少用户名或密码"}), 400
# 1. 验证用户名唯一性
existing_user = User.query.filter_by(username=data['username']).first()
if existing_user:
return jsonify({"error": "用户名已存在"}), 409
# 2. 获取当前操作者
current_user_id = get_jwt_identity()
print('current_user_id',current_user_id)
current_user = User.query.get(int(current_user_id))
if not current_user:
return jsonify({"error": "无效的操作者"}), 401
# 3. 密码复杂度验证
password = data['password']
if len(password) < 8 or not any(c.isdigit() for c in password):
return jsonify({"error": "密码需至少8位且包含数字"}), 400
# 4. 验证角色(新增部分)
role_ids = data.get('role_ids', [])
valid_roles = []
if role_ids:
valid_roles = Role.query.filter(Role.id.in_(role_ids)).all()
if len(valid_roles) != len(role_ids):
return jsonify({"error": "包含无效的角色ID"}), 400
try:
# 5. 创建新用户
new_user = User(
username=data['username'],
password=hash_password(data['password']),
status=data.get('status', 1),
created_by=current_user_id
)
db.session.add(new_user)
db.session.flush() # 获取新用户ID但不提交事务
# 6. 分配角色(新增核心功能)
for role in valid_roles:
user_role = UserRole(
user_id=new_user.id,
role_id=role.id,
created_by=current_user_id
)
db.session.add(user_role)
db.session.commit()
# 7. 构造响应数据(包含角色信息)
response_data = {
"id": new_user.id,
"username": new_user.username,
"created_at": new_user.created_at.isoformat(),
"created_by": new_user.created_by,
"roles": [{"id": r.id, "name": r.name} for r in valid_roles] # 新增角色信息
}
# 8. 记录操作日志(新增角色信息)
current_app.logger.info(
f"用户创建成功: {new_user.username} (ID:{new_user.id}) "
f"操作者: {current_user.username} (ID:{current_user_id}) "
f"分配角色: {[r.name for r in valid_roles]}" # 记录分配的角色
)
return jsonify(response_data), 201
except Exception as e:
db.session.rollback()
current_app.logger.error(f"用户创建失败: {str(e)}")
return jsonify({"error": "服务器内部错误"}), 500
@auth_bp.route('/updateuser', methods=['POST'])
@jwt_required() # 需要有效Token才能访问
def update_user():
'''
请求体格式:
{
"user_id": 123, // 必需目标用户ID
"username": "updated_user", // 可选:新用户名
"password": "newSecurePass456", // 可选:新密码
"status": 1 // 可选用户状态1-启用0-禁用)
}
'''
data = request.get_json()
if not data or 'user_id' not in data:
return jsonify({"error": "请求数据不能为空且必须包含user_id"}), 400
# 1. 验证操作者身份
current_user_id = get_jwt_identity()
current_user = User.query.get(int(current_user_id))
if not current_user:
return jsonify({"error": "无效的操作者"}), 401
# 2. 验证目标用户是否存在
target_user_id = data['user_id']
target_user = User.query.get(int(target_user_id))
if not target_user:
return jsonify({"error": "目标用户不存在"}), 404
# 3. 权限校验:仅允许管理员修改(核心修改点)
# 检查当前用户是否为管理员假设管理员角色ID为1
is_admin = UserRole.query.filter_by( # 注意这里修正为UserRole表用户-角色关联表)
user_id=current_user_id,
role_id=1
).first()
if not is_admin: # 只有管理员能通过校验
return jsonify({"error": "仅管理员有权限修改用户信息"}), 403
role_ids = data.get('role_ids', None)
valid_roles = []
if role_ids is not None:
if not isinstance(role_ids, list):
return jsonify({"error": "role_ids必须为数组"}), 400
if role_ids:
valid_roles = Role.query.filter(Role.id.in_(role_ids)).all()
if len(valid_roles) != len(role_ids):
return jsonify({"error": "包含无效的角色ID"}), 400
try:
# 4. 处理用户名修改(如提供)
if 'username' in data and data['username']:
# 验证用户名唯一性(排除自身)
existing_user = User.query.filter(
User.username == data['username'],
User.id != target_user_id
).first()
if existing_user:
return jsonify({"error": "用户名已存在"}), 409
target_user.username = data['username']
# 5. 处理密码修改(如提供)
if 'password' in data and data['password']:
password = data['password']
# 密码复杂度验证(与创建用户保持一致)
if len(password) < 8 or not any(c.isdigit() for c in password):
return jsonify({"error": "密码需至少8位且包含数字"}), 400
target_user.password = hash_password(password) # 复用哈希函数
# 6. 处理状态修改(如提供)
if 'status' in data:
target_user.status = data['status'] # 管理员无需额外校验
# 6.1 处理角色修改(如提供)
if role_ids is not None:
UserRole.query.filter_by(user_id=target_user_id).delete()
for role in valid_roles:
user_role = UserRole(
user_id=target_user_id,
role_id=role.id,
created_by=current_user_id,
)
db.session.add(user_role)
# 7. 提交修改
target_user.updated_at = datetime.now() # 假设模型有updated_at字段
target_user.updated_by = current_user_id # 假设模型有updated_by字段
db.session.commit()
# 8. 构造响应数据
response_data = {
"id": target_user.id,
"username": target_user.username,
"status": target_user.status,
"updated_at": target_user.updated_at.isoformat(),
"updated_by": current_user.username
}
if role_ids is not None:
response_data["roles"] = [{"id": r.id, "name": r.name} for r in valid_roles]
# 9. 记录操作日志
current_app.logger.info(
f"用户信息更新成功: {target_user.username} (ID:{target_user.id}) "
f"操作者: {current_user.username} (ID:{current_user_id}) "
f"修改内容: {[k for k in data.keys() if k != 'user_id']}"
)
return jsonify(response_data), 200
except Exception as e:
db.session.rollback()
current_app.logger.error(f"用户信息更新失败: {str(e)}")
return jsonify({"error": "服务器内部错误"}), 500
@auth_bp.route('/logout', methods=['POST'])
@jwt_required()
def log_out():
jti = get_jwt()["jti"] # 获取 Token 的唯一标识
blacklist.add(jti) # 将 Token 添加到黑名单
return jsonify({"msg": "Successfully logged out"}), 200
@auth_bp.route('/getuserlist', methods=['GET'])
@jwt_required()
def get_users():
result = db.session.query(
User.id,
User.username,
User.status,
User.created_at,
User.created_by,
User.updated_at,
UserRole.role_id,
Role.description,
Role.name
).join(
UserRole, User.id == UserRole.user_id
).join(
Role, UserRole.role_id == Role.id
).all()
dict_list = [row._asdict() for row in result]
return jsonify(dict_list), 200
@auth_bp.route('/getroles', methods=['GET'])
@jwt_required()
def get_roles():
roles = Role.query.filter(Role.status == 1).all()
data = [{"id": r.id, "name": r.name, "description": r.description} for r in roles]
return jsonify(data), 200
@auth_bp.route('/deleteuser', methods=['GET'])
@jwt_required()
def delete_user():
user_id=request.args.get('userid')
# 1. 查询用户(确保状态之前为 1
user = User.query.filter(
User.id == user_id,
User.status == 1 # 只选择状态为 1 的用户
).first()
if not user:
return jsonify({"error": "用户不存在或状态不是 1"}), 400 # 用户不存在或状态不是 1
# 2. 修改状态
user.status = 0
# 3. 提交事务
try:
db.session.commit()
return jsonify({"msg": "Successfully"}),200
except Exception as e:
db.session.rollback()
print(f"Error: {e}")
return jsonify({"error": f"出现错误{e}"}), 400
@auth_bp.route('/test', methods=['GET'])
@jwt_required()
def test():
pass
@auth_bp.route('/gethisttory', methods=['POST'])
@jwt_required()
def query_operation_history():
"""
{
"username": "admin", // 可选,精确匹配用户名
"api_path": "/api/user/create", // 可选,精确匹配接口路径
"start_time": "2025-07-01T00:00:00", // 可选开始时间ISO 8601格式
"end_time": "2025-07-10T23:59:59", // 可选结束时间ISO 8601格式
"request_keyword": "email", // 可选,在请求参数中模糊搜索关键词
"page": 2, // 可选页码默认1
"per_page": 15 // 可选每页条数默认20
}
"""
"""查询操作历史记录(支持多条件过滤)"""
# 接收JSON格式的查询参数
query_params = request.get_json()
# 构建基础查询
query = OperationHistory.query
# 1. 用户名过滤(精确匹配)
if query_params['username']:
query = query.filter(OperationHistory.username == query_params['username'])
# 2. 接口路径过滤(精确匹配)
if query_params['api_path']:
query = query.filter(OperationHistory.api_path == query_params['api_path'])
# 3. 操作时间范围过滤
if query_params['start_time'] and query_params['end_time']:
# 解析时间参数支持ISO 8601格式
start_time = datetime.fromisoformat(query_params['start_time']) if 'start_time' in query_params else None
end_time = datetime.fromisoformat(query_params['end_time']) if 'end_time' in query_params else None
# 构建时间范围查询条件
time_conditions = []
if start_time:
time_conditions.append(OperationHistory.operation_time >= start_time)
if end_time:
time_conditions.append(OperationHistory.operation_time <= end_time)
if time_conditions:
query = query.filter(and_(*time_conditions))
# 4. 请求参数关键词过滤(模糊匹配)
if query_params['request_keyword']:
keyword = f"%{query_params['request_keyword']}%"
query = query.filter(
OperationHistory.request_params.like(keyword)
)
# 5. 分页参数处理
page = query_params.get('page', 1)
per_page = query_params.get('per_page', 20)
# 执行分页查询(使用索引优化)
pagination = query.order_by(OperationHistory.operation_time.desc()) \
.paginate(page=page, per_page=per_page, error_out=False)
# 序列化结果
results = []
for record in pagination.items:
results.append({
'id': record.id,
'username': record.username,
'api_path': record.api_path,
'http_method': record.http_method,
'operation_time': record.operation_time.isoformat(),
'response_code': record.response_code,
'ip_address': record.ip_address,
'operation_result': record.operation_result,
'error_message': record.error_message,
'trace_id': record.trace_id,
# 请求参数反序列化JSON字符串→Python对象
'request_params': json.loads(record.request_params) if record.request_params else None
})
# 返回分页结果
return jsonify({
'total': pagination.total,
'pages': pagination.pages,
'current_page': pagination.page,
'per_page': pagination.per_page,
'results': results
})