"""Implement application-layer logic for services.""" from __future__ import annotations from app.domain.retrieval import RetrievalQuery, Retriever, RetrievedChunk from app.domain.retrieval.ports import Reranker # Keep orchestration logic centralized so use-case flow stays easy to trace. def _reciprocal_rank_fusion( ranked_lists: list[list[RetrievedChunk]], k: int = 60 ) -> list[RetrievedChunk]: """Merge multiple ranked lists with Reciprocal Rank Fusion. Score for chunk c = sum over lists of 1 / (k + rank(c)). A chunk appearing in multiple lists gets a higher fused score. """ scores: dict[str, float] = {} chunk_map: dict[str, RetrievedChunk] = {} for ranked in ranked_lists: for rank, chunk in enumerate(ranked): key = chunk.chunk_id scores[key] = scores.get(key, 0.0) + 1.0 / (k + rank + 1) chunk_map[key] = chunk sorted_keys = sorted(scores, key=lambda ck: scores[ck], reverse=True) return [ RetrievedChunk( chunk_id=chunk_map[ck].chunk_id, doc_id=chunk_map[ck].doc_id, doc_title=chunk_map[ck].doc_title, text=chunk_map[ck].text, score=scores[ck], chunk_type=chunk_map[ck].chunk_type, section_title=chunk_map[ck].section_title, page_start=chunk_map[ck].page_start, page_end=chunk_map[ck].page_end, section_level=chunk_map[ck].section_level, chunk_index=chunk_map[ck].chunk_index, piece_index=chunk_map[ck].piece_index, metadata=chunk_map[ck].metadata, ) for ck in sorted_keys ] class KnowledgeRetrievalService: """Provide the Knowledge Retrieval Service service.""" def __init__( self, *, retriever: Retriever, bm25_retriever=None, reranker: Reranker | None = None, reranker_top_k: int = 5, ) -> None: """Initialize the Knowledge Retrieval Service instance.""" self.retriever = retriever self.bm25_retriever = bm25_retriever self.reranker = reranker self.reranker_top_k = reranker_top_k def retrieve(self, *, query: str, top_k: int, filters: str | None = None) -> list[RetrievedChunk]: """Retrieve and optionally rerank chunks for a query. When a BM25 retriever is available, combines dense + sparse results via Reciprocal Rank Fusion before optional reranking. """ use_hybrid = self.bm25_retriever is not None and getattr(self.bm25_retriever, "available", False) candidate_k = max(top_k * 4, 20) if (self.reranker is not None or use_hybrid) else top_k retrieval_query = RetrievalQuery(query=query, top_k=candidate_k, filters=filters) dense_results = self.retriever.retrieve(retrieval_query) if use_hybrid: bm25_results = self.bm25_retriever.retrieve(query, top_k=candidate_k, filters=filters) candidates = _reciprocal_rank_fusion([dense_results, bm25_results]) else: candidates = dense_results if self.reranker and candidates: return self.reranker.rerank(query, candidates, top_k=self.reranker_top_k) return candidates[:top_k]