Files
ai_chat_assistant/lib/services/chat_sse_service.dart

208 lines
6.7 KiB
Dart
Raw Normal View History

2025-08-12 13:36:42 +08:00
// 负责SSE通信
import 'dart:convert';
2025-09-30 14:48:12 +08:00
import 'dart:io' as IO;
2025-08-12 13:36:42 +08:00
import 'dart:math';
2025-09-30 14:48:12 +08:00
import 'package:ai_chat_assistant/ai_chat_assistant.dart';
import '../utils/tts_util.dart';
2025-08-13 16:00:23 +08:00
import '../utils/common_util.dart';
@Deprecated('ChatSseService Deprecated, Use ai_chat_core/ChatSseService instead')
/// SSE 服务类,处理与后端的流式通信
2025-08-12 13:36:42 +08:00
class ChatSseService {
// 缓存用户ID和会话ID
String? _cachedUserId;
String? _cachedConversationId;
2025-09-30 14:48:12 +08:00
IO.HttpClient? _currentClient;
IO.HttpClientResponse? _currentResponse;
2025-08-12 13:36:42 +08:00
bool _isAborted = true;
2025-08-15 09:37:53 +08:00
bool _isFirstData = true;
bool _isTtsStarted = false;
2025-08-12 13:36:42 +08:00
String? get conversationId => _cachedConversationId;
2025-08-15 09:37:53 +08:00
Future<void>? _startTtsFuture;
2025-08-12 13:36:42 +08:00
void request({
required String messageId,
required String text,
2025-08-15 09:37:53 +08:00
required bool isChinese,
required Function(String, String, bool) onStreamResponse,
2025-08-12 13:36:42 +08:00
}) async {
2025-09-30 14:48:12 +08:00
Logger.i("----------------------SSE Start");
2025-08-12 13:36:42 +08:00
_isAborted = false;
if (_cachedUserId == null) {
_cachedUserId = _generateRandomUserId(6);
2025-09-30 14:48:12 +08:00
Logger.i('初始化用户ID: $_cachedUserId');
2025-08-12 13:36:42 +08:00
}
String responseText = '';
StringBuffer buffer = StringBuffer();
// 英文分句符
final enEnders = RegExp(r'[.!?]');
// 中文分句符
final zhEnders = RegExp(r'[。!?]');
// 专有名词保护
String protectIDUNYX(String text) => text.replaceAll('ID.UNYX', 'ID_UNYX');
String restoreIDUNYX(String text) => text.replaceAll('ID_UNYX', 'ID.UNYX');
Future<void> processBuffer(bool isChinese) async {
String txt = buffer.toString();
// 先过滤 markdown 图片语法
int imgStart = txt.indexOf('![');
while (imgStart != -1) {
int imgEnd = txt.indexOf(')', imgStart);
if (imgEnd == -1) {
// 图片语法未闭合,缓存剩余部分
buffer.clear();
buffer.write(txt.substring(imgStart));
return;
}
// 移除图片语法
txt = txt.substring(0, imgStart) + txt.substring(imgEnd + 1);
imgStart = txt.indexOf('![');
}
2025-09-17 15:09:05 +08:00
// 彻底移除 markdown 有序/无序列表序号(如 1.、2.、-、*、+
txt = txt.replaceAll(RegExp(r'(^|\n)[ \t]*[0-9]+\.[ \t]*'), '\n');
txt = txt.replaceAll(RegExp(r'(^|\n)[ \t]*[-\*\+][ \t]+'), '\n');
// 分句符
RegExp enders = isChinese ? zhEnders : enEnders;
int lastEnd = 0;
// 本地缓存句子
List<String> sentences = [];
for (final match in enders.allMatches(txt)) {
int endIndex = match.end;
String sentence = txt.substring(lastEnd, endIndex).trim();
if (sentence.isNotEmpty) {
sentences.add(sentence);
}
lastEnd = endIndex;
}
// 只在达到完整句子时调用 TtsUtil.send
for (final s in sentences) {
2025-09-17 15:09:05 +08:00
String ttsStr=CommonUtil.cleanText(s, true);
2025-09-30 14:48:12 +08:00
// Logger.i("发送数据到TTS: $ttsStr");
TtsUtil.send(ttsStr);
}
// 缓存剩余不完整部分
String remain = txt.substring(lastEnd).trim();
buffer.clear();
if (remain.isNotEmpty) {
buffer.write(remain);
}
}
2025-09-30 14:48:12 +08:00
_currentClient = IO.HttpClient();
2025-08-12 13:36:42 +08:00
try {
final chatUri = Uri.parse('http://143.64.185.20:18606/chat');
final request = await _currentClient!.postUrl(chatUri);
request.headers.set('Content-Type', 'application/json');
request.headers.set('Accept', 'text/event-stream');
final body = {
'message': text,
'user': _cachedUserId,
};
if (_cachedConversationId != null) {
body['conversation_id'] = _cachedConversationId;
}
request.add(utf8.encode(json.encode(body)));
_currentResponse = await request.close();
if (_currentResponse!.statusCode == 200) {
2025-08-15 09:37:53 +08:00
_startTtsFuture = startTts(isChinese);
2025-08-12 13:36:42 +08:00
await for (final line in _currentResponse!
.transform(utf8.decoder)
.transform(const LineSplitter())) {
if (_isAborted) {
break;
}
if (line.startsWith('data:')) {
2025-09-30 14:48:12 +08:00
Logger.i('SSE line: $line');
2025-08-12 13:36:42 +08:00
final jsonStr = line.substring(5).trim();
if (jsonStr == '[DONE]' || jsonStr.contains('message_end')) {
TtsUtil.complete();
2025-08-15 09:37:53 +08:00
onStreamResponse(messageId, responseText, true);
2025-08-12 13:36:42 +08:00
break;
}
try {
final jsonData = json.decode(jsonStr);
2025-09-30 14:48:12 +08:00
Logger.i('SSE jsonData: $jsonData');
2025-08-12 13:36:42 +08:00
if (jsonData.containsKey('conversation_id') &&
_cachedConversationId == null) {
_cachedConversationId = jsonData['conversation_id'];
}
if (jsonData['event'].toString().contains('message')) {
final textChunk =
jsonData.containsKey('answer') ? jsonData['answer'] : '';
2025-08-13 16:00:23 +08:00
if (_isAborted) {
break;
}
buffer.write(textChunk);
await processBuffer(isChinese);
2025-08-12 13:36:42 +08:00
responseText += textChunk;
2025-08-15 09:37:53 +08:00
onStreamResponse(messageId, responseText, false);
2025-08-12 13:36:42 +08:00
}
} catch (e) {
2025-09-30 14:48:12 +08:00
Logger.e('解析 SSE 数据出错: $e, 原始数据: $jsonStr');
2025-08-12 13:36:42 +08:00
}
}
}
} else {
2025-09-30 14:48:12 +08:00
Logger.e('SSE 连接失败,状态码: ${_currentResponse!.statusCode}');
2025-08-12 13:36:42 +08:00
}
} catch (e) {
// todo
} finally {
resetRequest();
}
}
2025-08-15 09:37:53 +08:00
Future<void> startTts(bool isChinese) async {
2025-09-30 14:48:12 +08:00
Logger.i("----------------------TTS Start");
2025-08-15 09:37:53 +08:00
if (!_isTtsStarted) {
if (await TtsUtil.start(isChinese) == true) {
_isTtsStarted = true;
}
}
}
2025-08-12 13:36:42 +08:00
Future<void> abort() async {
_isAborted = true;
TtsUtil.stop();
2025-08-12 13:36:42 +08:00
resetRequest();
}
void resetRequest() {
_currentResponse?.detachSocket().then((socket) {
socket.destroy();
});
_currentClient?.close(force: true);
_currentClient = null;
_currentResponse = null;
2025-08-15 09:37:53 +08:00
_isFirstData = true;
_isTtsStarted = false;
_startTtsFuture = null;
2025-08-12 13:36:42 +08:00
}
int _getCompleteTextEndIndex(String buffer) {
// 支持句号、问号、感叹号和换行符作为分割依据
final sentenceEnders = RegExp(r'[,.!?:,。!?:\n]');
2025-08-12 13:36:42 +08:00
final matches = sentenceEnders.allMatches(buffer);
return matches.isEmpty ? 0 : matches.last.end;
}
// 新增:重置会话方法
void resetSession() {
_cachedUserId = null;
_cachedConversationId = null;
2025-09-30 14:48:12 +08:00
Logger.i('SSE会话已重置');
2025-08-12 13:36:42 +08:00
}
String _generateRandomUserId(int length) {
const chars =
'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789';
final random = Random();
return String.fromCharCodes(Iterable.generate(
length, (_) => chars.codeUnitAt(random.nextInt(chars.length))));
}
}