cbcs adas statics 压测

This commit is contained in:
2026-04-29 18:05:28 +08:00
commit 3b3f16a875
2 changed files with 156 additions and 0 deletions

156
cbcs_statics.py Normal file
View File

@@ -0,0 +1,156 @@
import json
from locust import HttpUser, TaskSet, task, between, tag
from urllib.parse import urljoin
from cryptography.hazmat.primitives.serialization import pkcs12
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
import tempfile
import urllib3
# ================= 配置区域 =================
# 请根据实际情况修改以下配置,或者在启动 Locust 时通过 --config 参数传入
# 禁用InsecureRequestWarning警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 1. OAuth2 认证配置
AUTH_URL = "https://ssoalpha.dvb.corpinter.net.cn/v1/token" # 获取Token的相对路径或绝对路径
CLIENT_ID = "D650DB94-FB62-427A-980A-601BBC3081B6"
CLIENT_SECRET = "wU2G-xa08SHY_T7N.5BcFv~M94q61D3n"
SCOPE = "openid"
# 2. 目标业务接口配置
# 截图中的 VIN 码
GEN6_TEST_VIN = "LE4FG1DB1SL002567"
GEN5_TEST_VIN = "LE4AG4CB7SLYTKXLG"
# 业务接口路径 (截图中的 /api/vehicles/{vin}/statistics)
API_PATH = "/api/vehicles/{vin}/statistics"
Baumuster = "174123"
# ============================================
class UserBehavior(TaskSet):
access_token = None
key_store_file_path = "mic_certificate_W1AFJ10000Z000085.pfx"
key_store_passphrase = "b04ca3c0-518d-40b0-a0d5-01c0c25350b9"
def on_start(self):
"""
虚拟用户启动时执行:获取 Access Token
"""
self.get_access_token()
self.setup_client_certificate()
def get_access_token(self):
"""
使用 Client Credentials 模式获取 Token
"""
# 构造请求体OAuth2 标准格式
payload = {
"grant_type": "client_credentials",
"client_id": self.user.client_id,
"client_secret": self.user.client_secret,
"scope": self.user.scope
}
# 发送 POST 请求获取 Token
# 注意:这里使用 self.client.post意味着它会使用 HttpUser 中定义的 host
with self.client.post(
AUTH_URL,
data=payload,
name="OAuth_GetToken", # 在报告中统一命名为 OAuth_GetToken
catch_response=True # 允许手动控制成功/失败
) as response:
if response.status_code == 200:
try:
json_resp = response.json()
# 假设返回结构中有 access_token 字段
self.__class__.access_token = json_resp.get("access_token")
if self.__class__.access_token:
response.success()
print(f"\n[成功] 获取到 Token: {self.__class__.access_token[:10]}...")
else:
response.failure("响应中未找到 access_token")
except json.JSONDecodeError:
response.failure("响应不是有效的 JSON")
else:
response.failure(f"获取 Token 失败,状态码: {response.status_code}")
def setup_client_certificate(self):
"""设置客户端证书用于HTTPS请求"""
# 从PFX文件加载私钥和证书
with open(self.key_store_file_path, 'rb') as f:
pfx_data = f.read()
private_key, certificate, additional_certificates = pkcs12.load_key_and_certificates(
pfx_data,
self.key_store_passphrase.encode(),
backend=default_backend()
)
# 将证书和私钥保存为临时PEM文件
cert_temp = tempfile.NamedTemporaryFile(mode='w', suffix='.pem', delete=False)
cert_temp.write(certificate.public_bytes(serialization.Encoding.PEM).decode())
cert_temp.close()
key_temp = tempfile.NamedTemporaryFile(mode='w', suffix='.pem', delete=False)
key_temp.write(private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
).decode())
key_temp.close()
# 配置requests session使用证书
self.client.cert = (cert_temp.name, key_temp.name)
# 禁用SSL验证在企业内网环境中常见
self.client.verify = False
@tag('business_api')
@task
def get_adas_statistics(self):
"""
业务接口:获取车辆统计信息
"""
if not self.__class__.access_token:
print("没有 Token无法请求业务接口")
return
# 构造 Header
headers = {
"Authorization": f"Bearer {self.__class__.access_token}",
"Baumuster": Baumuster
}
# 格式化 URL将 VIN 填入路径
# 注意:如果 API_PATH 是绝对路径urljoin 可能表现不同,建议 host 配置为空或域名API_PATH 为相对路径
# 这里为了稳健,直接拼接或根据实际情况调整
url = API_PATH.format(vin=self.user.vin)
# print(f"\n[开始] 请求业务接口: {url}")
# 发送 GET 请求
self.client.get(
url,
headers=headers,
name="GET_AdasStatistics" # 在报告中统一名称
)
class WebsiteUser(HttpUser):
tasks = [UserBehavior]
# 设置用户等待时间,模拟真实用户操作间隔 (例如 1到5秒)
wait_time = between(1, 3)
# 从全局变量读取配置,方便命令行覆盖
client_id = CLIENT_ID
client_secret = CLIENT_SECRET
scope = SCOPE
vin = GEN6_TEST_VIN
# 如果需要支持 HTTPS 且证书不被信任,可以添加 verify=False
verify = False

Binary file not shown.