// 负责SSE通信 import 'dart:convert'; import 'dart:io'; import 'dart:math'; class ChatSseService { // 缓存用户ID和会话ID String? _cachedUserId; String? _cachedConversationId; HttpClient? _currentClient; HttpClientResponse? _currentResponse; bool _isAborted = true; String? get conversationId => _cachedConversationId; void request({ required String messageId, required String text, required Function(String, String, String, bool) onStreamResponse, }) async { _isAborted = false; if (_cachedUserId == null) { _cachedUserId = _generateRandomUserId(6); print('初始化用户ID: $_cachedUserId'); } String responseText = ''; String tempText = ''; _currentClient = HttpClient(); try { final chatUri = Uri.parse('http://143.64.185.20:18606/chat'); final request = await _currentClient!.postUrl(chatUri); request.headers.set('Content-Type', 'application/json'); request.headers.set('Accept', 'text/event-stream'); final body = { 'message': text, 'user': _cachedUserId, }; if (_cachedConversationId != null) { body['conversation_id'] = _cachedConversationId; } request.add(utf8.encode(json.encode(body))); _currentResponse = await request.close(); if (_currentResponse!.statusCode == 200) { await for (final line in _currentResponse! .transform(utf8.decoder) .transform(const LineSplitter())) { if (_isAborted) { break; } if (line.startsWith('data:')) { final jsonStr = line.substring(5).trim(); if (jsonStr == '[DONE]' || jsonStr.contains('message_end')) { onStreamResponse(messageId, responseText, tempText, true); break; } try { final jsonData = json.decode(jsonStr); if (jsonData.containsKey('conversation_id') && _cachedConversationId == null) { _cachedConversationId = jsonData['conversation_id']; } if (jsonData['event'].toString().contains('message')) { final textChunk = jsonData.containsKey('answer') ? jsonData['answer'] : ''; responseText += textChunk; tempText += textChunk; int endIndex = _getCompleteTextEndIndex(tempText); final completeText = tempText.substring(0, endIndex).trim(); tempText = tempText.substring(endIndex).trim(); onStreamResponse(messageId, responseText, completeText, false); } } catch (e) { print('解析 SSE 数据出错: $e, 原始数据: $jsonStr'); } } } } else { print('SSE 连接失败,状态码: ${_currentResponse!.statusCode}'); } } catch (e) { // todo } finally { resetRequest(); } } Future abort() async { _isAborted = true; resetRequest(); } void resetRequest() { _currentResponse?.detachSocket().then((socket) { socket.destroy(); }); _currentClient?.close(force: true); _currentClient = null; _currentResponse = null; } int _getCompleteTextEndIndex(String buffer) { // 支持句号、问号、感叹号和换行符作为分割依据 final sentenceEnders = RegExp(r'[。!?\n]'); final matches = sentenceEnders.allMatches(buffer); return matches.isEmpty ? 0 : matches.last.end; } // 新增:重置会话方法 void resetSession() { _cachedUserId = null; _cachedConversationId = null; print('SSE会话已重置'); } String _generateRandomUserId(int length) { const chars = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'; final random = Random(); return String.fromCharCodes(Iterable.generate( length, (_) => chars.codeUnitAt(random.nextInt(chars.length)))); } }