fix: 修复LogInterceptor 导致message不能流式吐字的问题

This commit is contained in:
2025-09-30 14:38:03 +08:00
parent c29a3f665a
commit 1729d0f266
15 changed files with 212 additions and 109 deletions

View File

@@ -4,6 +4,7 @@ import 'dart:async';
import 'dart:io';
// 负责录音相关功能
@Deprecated('AudioRecorderService Deprecated')
class AudioRecorderService {
final AudioRecorder _recorder = AudioRecorder();
bool _isRecording = false;

View File

@@ -7,6 +7,8 @@ import '../utils/tts_util.dart';
import '../utils/common_util.dart';
@Deprecated('ChatSseService Deprecated, Use ai_chat_core/ChatSseService instead')
/// SSE 服务类,处理与后端的流式通信
class ChatSseService {
// 缓存用户ID和会话ID
String? _cachedUserId;
@@ -112,14 +114,17 @@ class ChatSseService {
break;
}
if (line.startsWith('data:')) {
print('SSE line: $line');
final jsonStr = line.substring(5).trim();
if (jsonStr == '[DONE]' || jsonStr.contains('message_end')) {
TtsUtil.complete();
onStreamResponse(messageId, responseText, true);
break;
}
print('SSE json: $jsonStr');
try {
final jsonData = json.decode(jsonStr);
print('SSE jsonData: $jsonData');
if (jsonData.containsKey('conversation_id') &&
_cachedConversationId == null) {
_cachedConversationId = jsonData['conversation_id'];

View File

@@ -1,27 +0,0 @@
import 'dart:convert';
import 'package:http/http.dart' as http;
class TextClassificationService {
Future<int> classifyText(String text) async {
try {
final uri = Uri.parse('http://143.64.185.20:18606/classify');
final response = await http.post(
uri,
headers: {'Content-Type': 'application/json'},
body: json.encode({'text': text}),
);
if (response.statusCode == 200) {
return json.decode(response.body)['category'];
} else {
print(
'Classification failed: ${response.statusCode}, ${response.body}');
return -1;
}
} catch (e) {
print('Error during text classification: $e');
return -1;
}
}
}

View File

@@ -9,7 +9,6 @@ import 'package:flutter/services.dart';
import 'package:permission_handler/permission_handler.dart';
import 'package:uuid/uuid.dart';
import '../services/chat_sse_service.dart' as Service;
import '../services/classification_service.dart';
import '../services/control_recognition_service.dart';
// import '../services/audio_recorder_service.dart';
// import '../services/voice_recognition_service.dart';
@@ -86,8 +85,8 @@ class MessageService extends ChangeNotifier {
}
}
// final ChatSseService _chatSseService = ChatSseService(PlatformTtsService(aliSdkChannelName));
final Service.ChatSseService _chatSseService = Service.ChatSseService();
final ChatSseService _chatSseService = ChatSseService(PlatformTtsService(aliSdkChannelName));
// final Service.ChatSseService _chatSseService = Service.ChatSseService();
// final LocalTtsService _ttsService = LocalTtsService();
// final AudioRecorderService _audioService = AudioRecorderService();

View File

@@ -80,11 +80,15 @@ export 'src/extensions/color_extension.dart';
// utils
export 'src/utils/markdown_cleaner.dart';
export 'src/utils/logger.dart';
// services
export 'src/services/chat_sse_service.dart';
export 'src/services/tts_service.dart';
export 'src/nlp/text_classification_service.dart';
// http client
export 'src/network/api_config.dart';
export 'src/network/http_client.dart';

View File

@@ -1,9 +1,11 @@
// assistant_api.dart
class AssistantAPI {
static const String baseUrl = 'https://api.example.com';
static const String chatEndpoint = '/v1/chat';
static const String ttsEndpoint = '/v1/tts';
static const String imageEndpoint = '/v1/image';
static const String videoEndpoint = '/v1/video';
static const String baseURL = 'http://143.64.185.20:18606';
// chat
static const String chatEndpoint = '/chat';
// text classification
static const String classifyEndpoint = '/classify';
// tts
static const String ttsEndpoint = '/tts';
}

View File

@@ -1,8 +1,9 @@
import '../apis/assistant_api.dart';
// API配置文件
class ApiConfig {
// API基础URL
static const String baseURL = 'http://143.64.185.20:18606';
static const String baseURL = AssistantAPI.baseURL;
/// 请求超时时间
static const Duration timeout = Duration(seconds: 30);
/// 默认请求头

View File

@@ -2,6 +2,7 @@ import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:http/http.dart' as http;
import '../utils/logger.dart';
import 'api_config.dart';
import 'header_interceptor.dart';
import 'http_exception.dart';
@@ -14,7 +15,6 @@ class HttpClient extends http.BaseClient {
static List<Interceptor> defaultInterceptors = [
HeaderInterceptor(),
LogInterceptor(),
];
HttpClient({
@@ -25,6 +25,53 @@ class HttpClient extends http.BaseClient {
@override
Future<http.StreamedResponse> send(http.BaseRequest request) async {
// http.BaseRequest modifiedRequest = request;
// final requestUrl = request.url;
// // 检查 URL 是否为相对路径 (没有 scheme 和 host)
// if (requestUrl.scheme.isEmpty && requestUrl.host.isEmpty) {
// // 使用 ApiConfig.baseURL 来构建完整的 URL
// final fullUri = Uri.parse(ApiConfig.baseURL).resolveUri(requestUrl);
// // 根据原始请求类型,创建一个带有新 URL 的新请求对象
// if (request is http.Request) {
// final newRequest = http.Request(request.method, fullUri)
// ..headers.addAll(request.headers)
// ..bodyBytes = request.bodyBytes
// ..encoding = request.encoding
// ..followRedirects = request.followRedirects
// ..maxRedirects = request.maxRedirects
// ..persistentConnection = request.persistentConnection;
// modifiedRequest = newRequest;
// } else if (request is http.StreamedRequest) {
// // 对于流式请求,我们需要重新创建一个 StreamedRequest
// final newRequest = http.StreamedRequest(request.method, fullUri)
// ..headers.addAll(request.headers)
// ..contentLength = request.contentLength
// ..followRedirects = request.followRedirects
// ..maxRedirects = request.maxRedirects
// ..persistentConnection = request.persistentConnection;
// request.finalize().listen(
// newRequest.sink.add,
// onError: newRequest.sink.addError,
// onDone: newRequest.sink.close,
// );
// modifiedRequest = newRequest;
// } else if (request is http.MultipartRequest) {
// // 对于 MultipartRequest我们也需要重新创建一个实例
// final newRequest = http.MultipartRequest(request.method, fullUri)
// ..headers.addAll(request.headers)
// ..fields.addAll(request.fields)
// ..files.addAll(request.files)
// ..followRedirects = request.followRedirects
// ..maxRedirects = request.maxRedirects
// ..persistentConnection = request.persistentConnection;
// modifiedRequest = newRequest;
// } else {
// // 如果是其他类型的请求,可以根据需要进行处理
// }
// }
int retries = 0;
while (true) {
try {
@@ -56,7 +103,7 @@ class HttpClient extends http.BaseClient {
// 如果是可重试的错误 (例如网络问题),并且未达到最大次数
if (e is SocketException && retries < ApiConfig.maxRetries) {
retries++;
print('⚠️ Network error, retrying... ($retries/${ApiConfig.maxRetries})');
Logger.w('⚠️ Network error, retrying... ($retries/${ApiConfig.maxRetries})');
await Future.delayed(const Duration(milliseconds: ApiConfig.retryDelayMs));
continue; // 继续下一次循环
}

View File

@@ -1,37 +1,45 @@
import 'dart:async';
import 'dart:convert';
import 'package:http/http.dart' as http;
import '../utils/logger.dart';
import 'interceptor.dart';
class LogInterceptor extends Interceptor {
@override
Future<http.StreamedResponse> onRequest(InterceptorChain chain) async {
final request = chain.request;
print('➡️ [${request.method}] ${request.url}');
print('Headers: ${request.headers}');
// 注意:对于流式请求,我们无法在这里直接打印 body
Logger.i('➡️ [${request.method}] ${request.url}');
Logger.i('Headers: ${request.headers}');
return await chain.next(request);
}
@override
Future<http.StreamedResponse> onResponse(http.StreamedResponse response) async {
// 为了打印 body我们需要读取流然后重新创建一个
final bodyBytes = await response.stream.toBytes();
final bodyString = utf8.decode(bodyBytes, allowMalformed: true);
print('⬅️ [${response.statusCode}] ${response.request?.url}');
print('Response Body: $bodyString');
Logger.i('⬅️ [${response.statusCode}] ${response.request?.url}');
final streamTransformer = StreamTransformer<List<int>, List<int>>.fromHandlers(
handleData: (data, sink) {
// 打印流经的数据
Logger.i('📦 SSE Chunk: ${utf8.decode(data, allowMalformed: true)}');
// 将数据原封不动地传递给下一个监听者
sink.add(data);
},
);
// 重新创建流并返回新的响应
return http.StreamedResponse(
Stream.value(bodyBytes),
response.stream.transform(streamTransformer), // 应用转换器
response.statusCode,
headers: response.headers,
request: response.request,
reasonPhrase: response.reasonPhrase,
contentLength: response.contentLength,
isRedirect: response.isRedirect,
persistentConnection: response.persistentConnection,
);
}
@override
Future<void> onError(Object error) async {
print(' http Error: $error');
Logger.e('http Error: $error');
}
}

View File

@@ -0,0 +1,37 @@
import 'dart:convert';
import '../utils/logger.dart';
import '../network/http_client.dart';
import '../network/api_config.dart';
/// Text Classification Service
class TextClassificationService {
Future<int> classifyText(String text) async {
try {
final uri = Uri.parse('${ApiConfig.baseURL}/classify');
HttpClient client = HttpClient();
final response = await client.post(
uri,
body: json.encode({'text': text}),
);
if (response.statusCode == 200) {
var map = json.decode(response.body);
if (map != null && map['class'] != null) {
return map['category'] ?? -1;
} else {
Logger.i('Invalid response format: ${response.body}');
return -1;
}
} else {
Logger.i('Classification failed: ${response.statusCode}, ${response.body}');
return -1;
}
} catch (e) {
Logger.e('Error during text classification: $e');
return -1;
}
}
}

View File

@@ -0,0 +1,14 @@
class MessageDAO {
// 保存消息
Future<void> saveMessage(String message) async {
}
// 获取消息
Future<List<String>> getMessages() async {
return [];
}
// 删除消息
Future<void> deleteMessage(String messageId) async {
}
}

View File

@@ -1,44 +0,0 @@
import 'package:flutter/services.dart';
/// 一个通用的服务定位器,用于管理和提供所有类型的平台通道。
class ChannelProvider {
ChannelProvider._();
static final ChannelProvider instance = ChannelProvider._();
// 【修改】使用 Object 作为 value 类型,使其可以存储任何类型的 Channel
final Map<String, Object> _channels = {};
/// 注册一个平台通道。
/// 可以是 MethodChannel, EventChannel, 或 BasicMessageChannel。
void register(String name, Object channel) {
if (channel is! MethodChannel &&
channel is! EventChannel &&
channel is! BasicMessageChannel) {
throw ArgumentError('The provided channel must be a MethodChannel, EventChannel, or BasicMessageChannel.');
}
_channels[name] = channel;
}
/// 【修改】使用泛型方法获取一个已注册的平台通道,并进行类型检查。
///
/// 示例:
/// final methodChannel = ChannelProvider.instance.get<MethodChannel>('my_method_channel');
/// final eventChannel = ChannelProvider.instance.get<EventChannel>('my_event_channel');
T? get<T extends Object>(String name) {
final channel = _channels[name];
if (channel is T) {
return channel;
}
// 如果找到了通道但类型不匹配,可以打印一个警告
if (channel != null) {
print('Warning: Channel with name "$name" was found, but its type (${channel.runtimeType}) does not match the requested type ($T).');
}
return null;
}
/// 移除一个通道(可选功能)
void unregister(String name) {
_channels.remove(name);
}
}

View File

@@ -2,9 +2,12 @@ import 'dart:async';
import 'dart:convert';
import 'dart:math';
import 'package:http/http.dart' as http;
import '../apis/assistant_api.dart';
import '../network/api_config.dart';
import '../network/http_client.dart';
import 'tts_service.dart';
import '../utils/markdown_cleaner.dart';
import '../utils/logger.dart';
class ChatSseService {
// 依赖注入 TtsService
@@ -35,11 +38,11 @@ class ChatSseService {
required bool isChinese,
required Function(String, String, bool) onStreamResponse,
}) async {
print("----------------------SSE Start");
Logger.i("----------------------SSE Start");
_isAborted = false;
if (_cachedUserId == null) {
_cachedUserId = _generateRandomUserId(6);
print('初始化用户ID: $_cachedUserId');
Logger.i('初始化用户ID: $_cachedUserId');
}
String responseText = '';
StringBuffer buffer = StringBuffer();
@@ -81,10 +84,9 @@ class ChatSseService {
}
// 只在达到完整句子时调用 TtsUtil.send
for (final s in sentences) {
// 【已迁移】使用 MarkdownCleaner 替代 CommonUtil
// 使用 MarkdownCleaner 替代 CommonUtil
String ttsStr = MarkdownCleaner.cleanText(s, true);
// print("发送数据到TTS: $ttsStr");
// 【已迁移】调用注入的 _ttsService
Logger.i("发送数据到TTS: $ttsStr");
_ttsService.send(ttsStr);
}
// 缓存剩余不完整部分
@@ -97,7 +99,7 @@ class ChatSseService {
_currentClient = HttpClient();
try {
final chatUri = Uri.parse('http://143.64.185.20:18606/chat');
final chatUri = Uri.parse('${ApiConfig.baseURL}${AssistantAPI.chatEndpoint}');
final request = http.Request('POST', chatUri)
..headers['Content-Type'] = 'application/json'
..headers['Accept'] = 'text/event-stream';
@@ -119,17 +121,16 @@ class ChatSseService {
(line) {
if (_isAborted) return;
if (line.startsWith('data:')) {
Logger.i('SSE line: $line');
final jsonStr = line.substring(5).trim();
if (jsonStr == '[DONE]' || jsonStr.contains('message_end')) {
// 【已迁移】调用注入的 _ttsService
_ttsService.complete();
onStreamResponse(messageId, responseText, true);
// 这里不需要 breakonDone 会被调用
return;
}
try {
// 【逻辑保留】这里是原来 await for 循环中的核心逻辑,已完整迁移
final jsonData = json.decode(jsonStr);
Logger.i('SSE jsonData: $jsonData');
if (jsonData.containsKey('conversation_id') &&
_cachedConversationId == null) {
_cachedConversationId = jsonData['conversation_id'];
@@ -146,12 +147,12 @@ class ChatSseService {
onStreamResponse(messageId, responseText, false);
}
} catch (e) {
print('解析 SSE 数据出错: $e, 原始数据: $jsonStr');
Logger.e('解析 SSE 数据出错: $e, 原始数据: $jsonStr');
}
}
},
onDone: () {
print('SSE stream closed.');
Logger.i('SSE stream closed.');
if (!_isAborted) {
// 【已迁移】调用注入的 _ttsService
_ttsService.complete();
@@ -160,26 +161,26 @@ class ChatSseService {
resetRequest();
},
onError: (error) {
print('SSE stream error: $error');
Logger.e('SSE stream error: $error');
// onError 意味着流异常结束,需要重置资源
resetRequest();
},
cancelOnError: true,
);
} else {
print('SSE 连接失败,状态码: ${_currentResponse!.statusCode}');
Logger.e('SSE 连接失败,状态码: ${_currentResponse!.statusCode}');
// 【逻辑保留】连接失败也需要重置资源
resetRequest();
}
} catch (e) {
print('SSE request failed: $e');
Logger.e('SSE request failed: $e');
// 【逻辑保留】捕获到任何其他异常时,也需要重置资源
resetRequest();
}
}
Future<void> startTts(bool isChinese) async {
print("----------------------TTS Start");
Logger.i("----------------------TTS Start");
if (!_isTtsStarted) {
// 【已迁移】调用注入的 _ttsService
if (await _ttsService.start(isChinese) == true) {
@@ -220,7 +221,7 @@ class ChatSseService {
void resetSession() {
_cachedUserId = null;
_cachedConversationId = null;
print('SSE会话已重置');
Logger.i('SSE会话已重置');
}
// 【逻辑保留】_generateRandomUserId 方法保持不变

View File

@@ -0,0 +1,8 @@
// 常量定义
class Constants {
// 最大消息长度
static const int maxMessageLength = 1000;
// 默认分页大小
static const int defaultPageSize = 20;
}

View File

@@ -0,0 +1,47 @@
import 'package:flutter/foundation.dart';
/// 一个简单的日志记录器,仅在非 Release 模式下打印日志。
class Logger {
// 私有构造函数,防止外部实例化
Logger._();
static String _timestamp() {
return DateTime.now().toIso8601String();
}
/// 记录调试信息 (Debug)
static void d(String message, {String? tag}) {
if (!kReleaseMode) {
debugPrint('${_timestamp()} [${tag ?? 'DEBUG'}] $message');
}
}
/// 记录一般信息 (Info)
static void i(String message, {String? tag}) {
if (!kReleaseMode) {
debugPrint('${_timestamp()} [${tag ?? 'INFO'}] $message');
}
}
/// 记录警告信息 (Warning)
static void w(String message, {String? tag, Object? error}) {
if (!kReleaseMode) {
debugPrint('${_timestamp()} [${tag ?? 'WARN'}] $message ${error ?? ''}');
}
}
/// 记录错误信息 (Error)
static void e(String message, {String? tag, Object? error, StackTrace? stackTrace}) {
if (!kReleaseMode) {
debugPrint('==================== ERROR ====================');
debugPrint('${_timestamp()} [${tag ?? 'ERROR'}] $message');
if (error != null) {
debugPrint('Error: $error');
}
if (stackTrace != null) {
debugPrint('StackTrace: $stackTrace');
}
debugPrint('=============================================');
}
}
}