Files

87 lines
3.2 KiB
Python
Raw Permalink Normal View History

"""Implement application-layer logic for services."""
from __future__ import annotations
from app.domain.retrieval import RetrievalQuery, Retriever, RetrievedChunk
from app.domain.retrieval.ports import Reranker
# Keep orchestration logic centralized so use-case flow stays easy to trace.
2026-05-21 23:20:39 +08:00
def _reciprocal_rank_fusion(
ranked_lists: list[list[RetrievedChunk]], k: int = 60
) -> list[RetrievedChunk]:
"""Merge multiple ranked lists with Reciprocal Rank Fusion.
Score for chunk c = sum over lists of 1 / (k + rank(c)).
A chunk appearing in multiple lists gets a higher fused score.
"""
scores: dict[str, float] = {}
chunk_map: dict[str, RetrievedChunk] = {}
for ranked in ranked_lists:
for rank, chunk in enumerate(ranked):
key = chunk.chunk_id
scores[key] = scores.get(key, 0.0) + 1.0 / (k + rank + 1)
chunk_map[key] = chunk
sorted_keys = sorted(scores, key=lambda ck: scores[ck], reverse=True)
return [
RetrievedChunk(
chunk_id=chunk_map[ck].chunk_id,
doc_id=chunk_map[ck].doc_id,
doc_title=chunk_map[ck].doc_title,
text=chunk_map[ck].text,
2026-05-21 23:20:39 +08:00
score=scores[ck],
chunk_type=chunk_map[ck].chunk_type,
2026-05-21 23:20:39 +08:00
section_title=chunk_map[ck].section_title,
page_start=chunk_map[ck].page_start,
page_end=chunk_map[ck].page_end,
section_level=chunk_map[ck].section_level,
chunk_index=chunk_map[ck].chunk_index,
piece_index=chunk_map[ck].piece_index,
2026-05-21 23:20:39 +08:00
metadata=chunk_map[ck].metadata,
)
for ck in sorted_keys
]
class KnowledgeRetrievalService:
"""Provide the Knowledge Retrieval Service service."""
2026-05-21 23:20:39 +08:00
def __init__(
self,
*,
retriever: Retriever,
bm25_retriever=None,
reranker: Reranker | None = None,
reranker_top_k: int = 5,
) -> None:
"""Initialize the Knowledge Retrieval Service instance."""
self.retriever = retriever
2026-05-21 23:20:39 +08:00
self.bm25_retriever = bm25_retriever
self.reranker = reranker
self.reranker_top_k = reranker_top_k
def retrieve(self, *, query: str, top_k: int, filters: str | None = None) -> list[RetrievedChunk]:
2026-05-21 23:20:39 +08:00
"""Retrieve and optionally rerank chunks for a query.
When a BM25 retriever is available, combines dense + sparse results
via Reciprocal Rank Fusion before optional reranking.
"""
use_hybrid = self.bm25_retriever is not None and getattr(self.bm25_retriever, "available", False)
candidate_k = max(top_k * 4, 20) if (self.reranker is not None or use_hybrid) else top_k
retrieval_query = RetrievalQuery(query=query, top_k=candidate_k, filters=filters)
2026-05-21 23:20:39 +08:00
dense_results = self.retriever.retrieve(retrieval_query)
if use_hybrid:
bm25_results = self.bm25_retriever.retrieve(query, top_k=candidate_k, filters=filters)
candidates = _reciprocal_rank_fusion([dense_results, bm25_results])
else:
candidates = dense_results
if self.reranker and candidates:
return self.reranker.rerank(query, candidates, top_k=self.reranker_top_k)
2026-05-21 23:20:39 +08:00
return candidates[:top_k]