"""新架构下的检索与 Milvus dense-only 约定测试。""" from __future__ import annotations from app.application.agent.services import AgentConversationService from app.application.knowledge.services import KnowledgeRetrievalService from app.domain.conversation.models import AnswerResult, AnswerSource, ConversationSession from app.domain.retrieval import RetrievalQuery, RetrievedChunk class FakeRetriever: def __init__(self) -> None: self.queries: list[RetrievalQuery] = [] def retrieve(self, query: RetrievalQuery) -> list[RetrievedChunk]: self.queries.append(query) return [ RetrievedChunk( chunk_id="chunk-1", doc_id="doc-1", doc_name="测试法规", content="法规正文", score=0.91, section_title="第一章", page_number=1, metadata={"section_title": "第一章"}, ) ] def search(self, query: str, top_k: int, filters: str | None = None) -> list[RetrievedChunk]: return self.retrieve(RetrievalQuery(query=query, top_k=top_k, filters=filters)) class FakeAnswerGenerator: def generate( self, *, query: str, retrieved_chunks: list[RetrievedChunk], history: list[dict[str, str]] | None = None, provider: str | None = None, model: str | None = None, prompt_template: str | None = None, ) -> AnswerResult: return AnswerResult( answer=f"回答: {query}", sources=[ AnswerSource( doc_id=item.doc_id, doc_name=item.doc_name, chunk_id=item.chunk_id, section_title=item.section_title, page_number=item.page_number, score=item.score, content=item.content, metadata=item.metadata, ) for item in retrieved_chunks ], model=model or "deepseek-v4-flash", latency_ms=12, retrieved_count=len(retrieved_chunks), context_tokens=128, ) def stream_generate(self, **kwargs): sources = [source.__dict__ for source in self.generate(**kwargs).sources] yield {"event": "sources", "data": sources} yield {"event": "content", "data": "流式回答"} yield {"event": "done", "data": {"retrieved_count": 1}} class FakeConversationStore: def __init__(self) -> None: self.sessions: dict[str, ConversationSession] = {} def create_session(self, metadata: dict | None = None) -> ConversationSession: session = ConversationSession(session_id="sess-1", created_at=1, updated_at=1, metadata=metadata or {}) self.sessions[session.session_id] = session return session def get_session(self, session_id: str) -> ConversationSession | None: return self.sessions.get(session_id) def save_message(self, session_id: str, *, role: str, content: str, sources: list[dict] | None = None): session = self.sessions.get(session_id) if session is None: return None session.messages.append(type("Msg", (), {"role": role, "content": content})()) return session def delete_session(self, session_id: str) -> bool: return self.sessions.pop(session_id, None) is not None def list_sessions(self) -> list[dict]: return [{"session_id": key, "message_count": len(value.messages), "created_at": value.created_at, "updated_at": value.updated_at} for key, value in self.sessions.items()] def test_knowledge_retrieval_service_builds_retrieval_query(): retriever = FakeRetriever() service = KnowledgeRetrievalService(retriever=retriever) results = service.retrieve(query="机动车安全", top_k=3, filters='doc_name == "测试法规"') assert len(results) == 1 assert retriever.queries[0].query == "机动车安全" assert retriever.queries[0].top_k == 3 assert retriever.queries[0].filters == 'doc_name == "测试法规"' def test_agent_conversation_service_reuses_shared_retrieval_service(): retriever = FakeRetriever() retrieval_service = KnowledgeRetrievalService(retriever=retriever) conversation_store = FakeConversationStore() service = AgentConversationService( retrieval_service=retrieval_service, answer_generator=FakeAnswerGenerator(), conversation_store=conversation_store, ) session_id, result = service.chat(query="问一个问题", top_k=2, model="qwen3.5-flash") assert session_id == "sess-1" assert result.answer == "回答: 问一个问题" assert result.retrieved_count == 1 assert retriever.queries[0].top_k == 2 assert len(conversation_store.sessions["sess-1"].messages) == 2