""" SSE 测试脚本 - 验证事件流是否正确发送 """ import requests import json import time import sys # 设置控制台输出编码 sys.stdout.reconfigure(encoding='utf-8') # 启动任务 response = requests.post( 'http://localhost:8080/api/v1/sdlc/start', json={'requirement': '测试需求:做一个简单的计算器'}, headers={'Content-Type': 'application/json'} ) if response.status_code != 200: print(f"Start failed: {response.status_code} - {response.text}") exit(1) task_data = response.json() task_id = task_data['task_id'] print(f"Task started: {task_id}") print("=" * 60) # 连接 SSE sse_url = f'http://localhost:8080/api/v1/sdlc/stream/{task_id}' print(f"Connecting SSE: {sse_url}") print("=" * 60) with requests.get(sse_url, stream=True, timeout=300) as sse_response: for line in sse_response.iter_lines(): if line: line_str = line.decode('utf-8') print(line_str) # 解析事件 if line_str.startswith('event:'): event_type = line_str[6:].strip() print(f" -> Event type: {event_type}") if line_str.startswith('data:'): try: data = json.loads(line_str[5:].strip()) print(f" -> Data: {json.dumps(data, indent=2, ensure_ascii=False)}") except: pass # 如果是结束事件,退出 if 'event: final_result' in line_str or 'event: error' in line_str: print("=" * 60) print("Task completed!") break print("Test completed")