Files
ai_chat_assistant/lib/services/chat_sse_service.dart
2025-09-17 15:09:05 +08:00

202 lines
6.4 KiB
Dart
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// 负责SSE通信
import 'dart:convert';
import 'dart:io';
import 'dart:math';
import 'package:ai_chat_assistant/utils/tts_util.dart';
import '../utils/common_util.dart';
class ChatSseService {
// 缓存用户ID和会话ID
String? _cachedUserId;
String? _cachedConversationId;
HttpClient? _currentClient;
HttpClientResponse? _currentResponse;
bool _isAborted = true;
bool _isFirstData = true;
bool _isTtsStarted = false;
String? get conversationId => _cachedConversationId;
Future<void>? _startTtsFuture;
void request({
required String messageId,
required String text,
required bool isChinese,
required Function(String, String, bool) onStreamResponse,
}) async {
print("----------------------SSE Start");
_isAborted = false;
if (_cachedUserId == null) {
_cachedUserId = _generateRandomUserId(6);
print('初始化用户ID: $_cachedUserId');
}
String responseText = '';
StringBuffer buffer = StringBuffer();
// 英文分句符
final enEnders = RegExp(r'[.!?]');
// 中文分句符
final zhEnders = RegExp(r'[。!?]');
// 专有名词保护
String protectIDUNYX(String text) => text.replaceAll('ID.UNYX', 'ID_UNYX');
String restoreIDUNYX(String text) => text.replaceAll('ID_UNYX', 'ID.UNYX');
Future<void> processBuffer(bool isChinese) async {
String txt = buffer.toString();
// 先过滤 markdown 图片语法
int imgStart = txt.indexOf('![');
while (imgStart != -1) {
int imgEnd = txt.indexOf(')', imgStart);
if (imgEnd == -1) {
// 图片语法未闭合,缓存剩余部分
buffer.clear();
buffer.write(txt.substring(imgStart));
return;
}
// 移除图片语法
txt = txt.substring(0, imgStart) + txt.substring(imgEnd + 1);
imgStart = txt.indexOf('![');
}
// 彻底移除 markdown 有序/无序列表序号(如 1.、2.、-、*、+
txt = txt.replaceAll(RegExp(r'(^|\n)[ \t]*[0-9]+\.[ \t]*'), '\n');
txt = txt.replaceAll(RegExp(r'(^|\n)[ \t]*[-\*\+][ \t]+'), '\n');
// 分句符
RegExp enders = isChinese ? zhEnders : enEnders;
int lastEnd = 0;
// 本地缓存句子
List<String> sentences = [];
for (final match in enders.allMatches(txt)) {
int endIndex = match.end;
String sentence = txt.substring(lastEnd, endIndex).trim();
if (sentence.isNotEmpty) {
sentences.add(sentence);
}
lastEnd = endIndex;
}
// 只在达到完整句子时调用 TtsUtil.send
for (final s in sentences) {
String ttsStr=CommonUtil.cleanText(s, true);
// print("发送数据到TTS: $ttsStr");
TtsUtil.send(ttsStr);
}
// 缓存剩余不完整部分
String remain = txt.substring(lastEnd).trim();
buffer.clear();
if (remain.isNotEmpty) {
buffer.write(remain);
}
}
_currentClient = HttpClient();
try {
final chatUri = Uri.parse('http://143.64.185.20:18606/chat');
final request = await _currentClient!.postUrl(chatUri);
request.headers.set('Content-Type', 'application/json');
request.headers.set('Accept', 'text/event-stream');
final body = {
'message': text,
'user': _cachedUserId,
};
if (_cachedConversationId != null) {
body['conversation_id'] = _cachedConversationId;
}
request.add(utf8.encode(json.encode(body)));
_currentResponse = await request.close();
if (_currentResponse!.statusCode == 200) {
_startTtsFuture = startTts(isChinese);
await for (final line in _currentResponse!
.transform(utf8.decoder)
.transform(const LineSplitter())) {
if (_isAborted) {
break;
}
if (line.startsWith('data:')) {
final jsonStr = line.substring(5).trim();
if (jsonStr == '[DONE]' || jsonStr.contains('message_end')) {
TtsUtil.complete();
onStreamResponse(messageId, responseText, true);
break;
}
try {
final jsonData = json.decode(jsonStr);
if (jsonData.containsKey('conversation_id') &&
_cachedConversationId == null) {
_cachedConversationId = jsonData['conversation_id'];
}
if (jsonData['event'].toString().contains('message')) {
final textChunk =
jsonData.containsKey('answer') ? jsonData['answer'] : '';
if (_isAborted) {
break;
}
buffer.write(textChunk);
await processBuffer(isChinese);
responseText += textChunk;
onStreamResponse(messageId, responseText, false);
}
} catch (e) {
print('解析 SSE 数据出错: $e, 原始数据: $jsonStr');
}
}
}
} else {
print('SSE 连接失败,状态码: ${_currentResponse!.statusCode}');
}
} catch (e) {
// todo
} finally {
resetRequest();
}
}
Future<void> startTts(bool isChinese) async {
print("----------------------TTS Start");
if (!_isTtsStarted) {
if (await TtsUtil.start(isChinese) == true) {
_isTtsStarted = true;
}
}
}
Future<void> abort() async {
_isAborted = true;
TtsUtil.stop();
resetRequest();
}
void resetRequest() {
_currentResponse?.detachSocket().then((socket) {
socket.destroy();
});
_currentClient?.close(force: true);
_currentClient = null;
_currentResponse = null;
_isFirstData = true;
_isTtsStarted = false;
_startTtsFuture = null;
}
int _getCompleteTextEndIndex(String buffer) {
// 支持句号、问号、感叹号和换行符作为分割依据
final sentenceEnders = RegExp(r'[,.!?:,。!?:\n]');
final matches = sentenceEnders.allMatches(buffer);
return matches.isEmpty ? 0 : matches.last.end;
}
// 新增:重置会话方法
void resetSession() {
_cachedUserId = null;
_cachedConversationId = null;
print('SSE会话已重置');
}
String _generateRandomUserId(int length) {
const chars =
'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789';
final random = Random();
return String.fromCharCodes(Iterable.generate(
length, (_) => chars.codeUnitAt(random.nextInt(chars.length))));
}
}