This commit is contained in:
ZhuJW
2026-04-17 14:37:42 +08:00
parent 5a98242f2f
commit 988678b75f
2 changed files with 200 additions and 2 deletions

View File

@@ -19,6 +19,36 @@ ROOT DB API 是基于Flask的REST API服务为自动驾驶数据管理提供
- **完善测试覆盖** 基于pytest框架
- **生产环境就绪** 支持Gunicorn部署
## 🔐 认证登录SSO
当前服务已支持作为 `Relying Party` 接入外部统一认证服务,流程如下:
1. 访问受保护页面时跳转到 `/api/login`
2. 服务重定向到 SSO `Authorization URL`
3. SSO 回调 `/api/daimler/authorized` 并携带 `code`
4. 服务用 `code` 调用 `Token URL` 获取 `access_token`
5. 服务用 `access_token` 调用 `Userinfo URL` 获取用户信息
6. 用户信息写入 Flask Session 后跳转主页 `/`
可配置环境变量(示例):
```bash
FLASK_SECRET_KEY=replace-with-a-random-secret
AUTH_AUTHORIZATION_URL=https://ssoalpha.dvb.corpinter.net/v1/auth
AUTH_TOKEN_URL=https://ssoalpha.dvb.corpinter.net/v1/token
AUTH_USERINFO_URL=https://ssoalpha.dvb.corpinter.net/v1/userinfo
AUTH_CLIENT_ID=YOUR_CLIENT_ID
AUTH_CLIENT_SECRET=YOUR_CLIENT_SECRET
AUTH_SCOPE="groups openid email profile"
AUTH_REDIRECT_URI=http://localhost:8081/api/daimler/authorized
```
说明:
- 请在 SSO 配置中将回调地址注册为 `http://localhost:8081/api/daimler/authorized`(或你的实际部署地址)
- `FLASK_SECRET_KEY` 在生产环境必须替换,避免使用默认值
- 如需退出登录,可访问 `/api/logout`
## 🛠 Tech Stack
| 组件 | 技术栈 |

View File

@@ -1,10 +1,35 @@
# run.py
import os
from urllib.parse import urlencode, urlparse
import requests
from flasgger import Swagger
from flask import Flask, render_template
from flask import Flask, jsonify, redirect, render_template, request, session, url_for
from fst_data_pipeline.apps.root_db_api.src.api import api_bp
app = Flask(__name__, template_folder="../templates")
app.config["SECRET_KEY"] = os.getenv("FLASK_SECRET_KEY", "SECRET")
AUTH_AUTHORIZATION_URL = os.getenv(
"AUTH_AUTHORIZATION_URL", "https://ssoalpha.dvb.corpinter.net/v1/auth"
)
AUTH_TOKEN_URL = os.getenv("AUTH_TOKEN_URL", "https://ssoalpha.dvb.corpinter.net/v1/token")
AUTH_USERINFO_URL = os.getenv(
"AUTH_USERINFO_URL", "https://ssoalpha.dvb.corpinter.net/v1/userinfo"
)
AUTH_CLIENT_ID = os.getenv("AUTH_CLIENT_ID", "F0ED0AB5-16B9-49A2-96F5-A5702D83B614")
AUTH_CLIENT_SECRET = os.getenv("AUTH_CLIENT_SECRET", "j92fUMG35PSCul8-7Hw0ca_.1vA~6mI4")
AUTH_SCOPE = os.getenv("AUTH_SCOPE", "groups openid email profile")
AUTH_REDIRECT_URI = os.getenv("AUTH_REDIRECT_URI", "http://localhost:8081/api/daimler/authorized")
AUTH_ALLOWED_NEXT_ORIGINS = {
origin.strip()
for origin in os.getenv(
"AUTH_ALLOWED_NEXT_ORIGINS",
"http://localhost:8081",
).split(",")
if origin.strip()
}
# 注册蓝图
app.register_blueprint(api_bp, url_prefix="/api")
@@ -19,16 +44,159 @@ app.config["SWAGGER"] = {
Swagger(app)
def _normalize_next_target(next_target: str) -> str:
if not next_target:
return "/"
parsed = urlparse(next_target)
# Allowlist explicit frontend origins for dev cross-port callback redirects.
if parsed.scheme and parsed.netloc:
origin = f"{parsed.scheme}://{parsed.netloc}"
safe_path = parsed.path or "/"
if (
parsed.scheme in {"http", "https"}
and origin in AUTH_ALLOWED_NEXT_ORIGINS
and not safe_path.startswith("/api/login")
and not safe_path.startswith("/api/daimler/authorized")
):
query = f"?{parsed.query}" if parsed.query else ""
fragment = f"#{parsed.fragment}" if parsed.fragment else ""
return f"{origin}{safe_path}{query}{fragment}"
return "/"
if not next_target.startswith("/"):
return "/"
if next_target.startswith("//"):
return "/"
if (
next_target.startswith("/api/login")
or next_target.startswith("/api/daimler/authorized")
):
return "/"
return next_target
def _build_authorization_query() -> dict:
return {
"response_type": "code",
"client_id": AUTH_CLIENT_ID,
"scope": AUTH_SCOPE,
"redirect_uri": AUTH_REDIRECT_URI,
"prompt": "login",
}
def _exchange_code_for_token(code: str) -> dict:
data = {
"grant_type": "authorization_code",
"code": code,
"client_id": AUTH_CLIENT_ID,
"client_secret": AUTH_CLIENT_SECRET,
"redirect_uri": AUTH_REDIRECT_URI,
}
# Try form-encoded first, then JSON fallback
try:
form_resp = requests.post(AUTH_TOKEN_URL, data=data, timeout=10)
if form_resp.ok:
return form_resp.json()
except Exception:
pass
try:
json_resp = requests.post(AUTH_TOKEN_URL, json=data, timeout=10)
if json_resp.ok:
return json_resp.json()
json_resp.raise_for_status()
except requests.RequestException as e:
raise e
def _fetch_userinfo(access_token: str) -> dict:
headers = {"Authorization": f"Bearer {access_token}"}
response = requests.get(AUTH_USERINFO_URL, headers=headers, timeout=10)
response.raise_for_status()
return response.json()
@app.before_request
def _protect_api_routes():
auth_allowlist = {
"/api/login",
"/api/daimler/authorized",
}
if request.path in auth_allowlist:
return None
if request.path.startswith("/api") and "auth_user" not in session:
referer = request.headers.get("Referer", "")
next_target = _normalize_next_target(referer)
login_url = url_for("auth_login_api", next=next_target, _external=True)
return jsonify({"error": "Unauthorized", "login_url": login_url}), 401
@app.route("/")
def hello_world():
return "Hello World!"
user = session.get("auth_user")
if not user:
return redirect(url_for("auth_login_api"))
return jsonify({"message": "Hello World!", "user": user})
@app.route("/data-browser")
def data_browser():
"""数据浏览器前端界面"""
if "auth_user" not in session:
return redirect(url_for("auth_login_api"))
return render_template("data_browser.html")
@app.route("/api/login")
def auth_login_api():
next_target = request.args.get("next", "")
session["auth_next"] = _normalize_next_target(next_target)
query = _build_authorization_query()
return redirect(f"{AUTH_AUTHORIZATION_URL}?{urlencode(query)}")
@app.route("/api/daimler/authorized")
def auth_callback_api():
code = request.args.get("code", "")
if not code:
return jsonify({"error": "Missing code from SSO callback"}), 400
try:
token_response = _exchange_code_for_token(code)
access_token = token_response.get("access_token")
id_token = token_response.get("id_token") # Save id_token for logout
if not access_token:
return jsonify({"error": "No access_token in token response"}), 400
userinfo = _fetch_userinfo(access_token)
session["auth_user"] = userinfo
session["auth_access_token"] = access_token
if id_token:
session["auth_id_token"] = id_token
except requests.RequestException as exc:
return jsonify({"error": "Auth request failed", "detail": str(exc)}), 502
target = _normalize_next_target(session.pop("auth_next", "/"))
return redirect(target)
@app.route("/api/auth/me")
def auth_me():
user = session.get("auth_user")
if not user:
return jsonify({"error": "Unauthorized"}), 401
return jsonify({"user": user})
if __name__ == "__main__":
app.run(debug=False, host="0.0.0.0", port=5232)