"""Build the Siemens offline smoke dataset from a completed dataset_build run. Must be run AFTER `python main.py --dataset-build-config scenarios/siemens_build/siemens-pdf-build.yaml` has completed successfully. It uses the stable `latest/` alias so you don't need to know the run_id. Usage: python scripts/build_siemens_offline_smoke.py Output: datasets/normalized/siemens_pdf_offline_smoke.csv (referenced by scenarios/offline/siemens-pdf-offline-smoke.yaml) """ from __future__ import annotations from pathlib import Path # --------------------------------------------------------------------------- # Paths — all relative to the siemens_ragas/ repository root # --------------------------------------------------------------------------- REPO_ROOT = Path(__file__).resolve().parents[1] DRAFT_DATASET_PATH = ( REPO_ROOT / "outputs" / "dataset-builds" / "siemens-pdf-question-bank" / "latest" / "dataset_draft.csv" ) SOURCE_CHUNKS_PATH = ( REPO_ROOT / "outputs" / "dataset-builds" / "siemens-pdf-question-bank" / "latest" / "source_chunks.jsonl" ) OUTPUT_PATH = REPO_ROOT / "datasets" / "normalized" / "siemens_pdf_offline_smoke.csv" def main() -> None: """Convert the Siemens build artefacts into an offline-evaluable dataset.""" if not DRAFT_DATASET_PATH.exists(): raise FileNotFoundError( f"Draft dataset not found: {DRAFT_DATASET_PATH}\n" "Run the dataset build first:\n" " python main.py --dataset-build-config " "scenarios/siemens_build/siemens-pdf-build.yaml" ) if not SOURCE_CHUNKS_PATH.exists(): raise FileNotFoundError( f"Source chunks not found: {SOURCE_CHUNKS_PATH}\n" "Run the dataset build first." ) # Import here so the script is importable even before rag_eval is fully set up. from rag_eval.dataset_builder.offline_converter import build_offline_smoke_dataset output = build_offline_smoke_dataset( draft_dataset_path=DRAFT_DATASET_PATH, source_chunks_path=SOURCE_CHUNKS_PATH, output_path=OUTPUT_PATH, ) import pandas as pd frame = pd.read_csv(output) print(f"Offline smoke dataset written to: {output}") print(f"Total rows: {len(frame)}") if len(frame) > 0: lang_counts = frame["language"].value_counts().to_dict() if "language" in frame.columns else {} diff_counts = frame["difficulty"].value_counts().to_dict() if "difficulty" in frame.columns else {} print(f"Language distribution: {lang_counts}") print(f"Difficulty distribution: {diff_counts}") if __name__ == "__main__": main()