// 负责SSE通信 import 'dart:convert'; import 'dart:io'; import 'dart:math'; import 'package:ai_chat_assistant/utils/tts_util.dart'; import '../utils/common_util.dart'; class ChatSseService { // 缓存用户ID和会话ID String? _cachedUserId; String? _cachedConversationId; HttpClient? _currentClient; HttpClientResponse? _currentResponse; bool _isAborted = true; bool _isFirstData = true; bool _isTtsStarted = false; String? get conversationId => _cachedConversationId; Future? _startTtsFuture; void request({ required String messageId, required String text, required bool isChinese, required Function(String, String, bool) onStreamResponse, }) async { print("----------------------SSE Start"); _isAborted = false; if (_cachedUserId == null) { _cachedUserId = _generateRandomUserId(6); print('初始化用户ID: $_cachedUserId'); } String responseText = ''; StringBuffer buffer = StringBuffer(); // 英文分句符 final enEnders = RegExp(r'[.!?]'); // 中文分句符 final zhEnders = RegExp(r'[。!?]'); // 专有名词保护 String protectIDUNYX(String text) => text.replaceAll('ID.UNYX', 'ID_UNYX'); String restoreIDUNYX(String text) => text.replaceAll('ID_UNYX', 'ID.UNYX'); Future processBuffer(bool isChinese) async { String txt = buffer.toString(); // 先过滤 markdown 图片语法 int imgStart = txt.indexOf('!['); while (imgStart != -1) { int imgEnd = txt.indexOf(')', imgStart); if (imgEnd == -1) { // 图片语法未闭合,缓存剩余部分 buffer.clear(); buffer.write(txt.substring(imgStart)); return; } // 移除图片语法 txt = txt.substring(0, imgStart) + txt.substring(imgEnd + 1); imgStart = txt.indexOf('!['); } // 彻底移除 markdown 有序/无序列表序号(如 1.、2.、-、*、+) txt = txt.replaceAll(RegExp(r'(^|\n)[ \t]*[0-9]+\.[ \t]*'), '\n'); txt = txt.replaceAll(RegExp(r'(^|\n)[ \t]*[-\*\+][ \t]+'), '\n'); // 分句符 RegExp enders = isChinese ? zhEnders : enEnders; int lastEnd = 0; // 本地缓存句子 List sentences = []; for (final match in enders.allMatches(txt)) { int endIndex = match.end; String sentence = txt.substring(lastEnd, endIndex).trim(); if (sentence.isNotEmpty) { sentences.add(sentence); } lastEnd = endIndex; } // 只在达到完整句子时调用 TtsUtil.send for (final s in sentences) { String ttsStr=CommonUtil.cleanText(s, true); // print("发送数据到TTS: $ttsStr"); TtsUtil.send(ttsStr); } // 缓存剩余不完整部分 String remain = txt.substring(lastEnd).trim(); buffer.clear(); if (remain.isNotEmpty) { buffer.write(remain); } } _currentClient = HttpClient(); try { final chatUri = Uri.parse('http://143.64.185.20:18606/chat'); final request = await _currentClient!.postUrl(chatUri); request.headers.set('Content-Type', 'application/json'); request.headers.set('Accept', 'text/event-stream'); final body = { 'message': text, 'user': _cachedUserId, }; if (_cachedConversationId != null) { body['conversation_id'] = _cachedConversationId; } request.add(utf8.encode(json.encode(body))); _currentResponse = await request.close(); if (_currentResponse!.statusCode == 200) { _startTtsFuture = startTts(isChinese); await for (final line in _currentResponse! .transform(utf8.decoder) .transform(const LineSplitter())) { if (_isAborted) { break; } if (line.startsWith('data:')) { final jsonStr = line.substring(5).trim(); if (jsonStr == '[DONE]' || jsonStr.contains('message_end')) { await processBuffer(isChinese); onStreamResponse(messageId, responseText, true); break; } try { final jsonData = json.decode(jsonStr); if (jsonData.containsKey('conversation_id') && _cachedConversationId == null) { _cachedConversationId = jsonData['conversation_id']; } if (jsonData['event'].toString().contains('message')) { final textChunk = jsonData.containsKey('answer') ? jsonData['answer'] : ''; if (_isAborted) { break; } buffer.write(textChunk); await processBuffer(isChinese); responseText += textChunk; onStreamResponse(messageId, responseText, false); } } catch (e) { print('解析 SSE 数据出错: $e, 原始数据: $jsonStr'); } } } } else { print('SSE 连接失败,状态码: ${_currentResponse!.statusCode}'); } } catch (e) { // todo } finally { resetRequest(); } } Future startTts(bool isChinese) async { print("----------------------TTS Start"); if (!_isTtsStarted) { if (await TtsUtil.start(isChinese) == true) { _isTtsStarted = true; } } } Future abort() async { _isAborted = true; TtsUtil.stop(); resetRequest(); } void resetRequest() { _currentResponse?.detachSocket().then((socket) { socket.destroy(); }); _currentClient?.close(force: true); _currentClient = null; _currentResponse = null; _isFirstData = true; _isTtsStarted = false; _startTtsFuture = null; } int _getCompleteTextEndIndex(String buffer) { // 支持句号、问号、感叹号和换行符作为分割依据 final sentenceEnders = RegExp(r'[,.!?:,。!?:\n]'); final matches = sentenceEnders.allMatches(buffer); return matches.isEmpty ? 0 : matches.last.end; } // 新增:重置会话方法 void resetSession() { _cachedUserId = null; _cachedConversationId = null; print('SSE会话已重置'); } String _generateRandomUserId(int length) { const chars = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'; final random = Random(); return String.fromCharCodes(Iterable.generate( length, (_) => chars.codeUnitAt(random.nextInt(chars.length)))); } }