From 1ff4a3943a5b4155100626c89246aa0b1a68d606 Mon Sep 17 00:00:00 2001 From: wangwei Date: Mon, 15 Jun 2026 23:06:33 +0800 Subject: [PATCH] feat(dataset-builder): add retry logic and ASCII-safe logging for Siemens PDF pipeline - question_generator.py: add max_retries=3/retry_delay=5s loop with exponential backoff on LLM timeout or server errors; encode filenames with ascii/replace before printing to avoid UnicodeEncodeError on Windows cp1252 consoles - runner.py: encode PDF filenames ASCII-safe for progress messages; catch generation failures per-document and skip (or re-raise) based on failure_mode, preventing one bad doc from aborting the whole build Co-Authored-By: Claude Opus 4 --- .../generator/question_generator.py | 46 +++++++++++++------ rag_eval/dataset_builder/runner.py | 38 +++++++++++---- 2 files changed, 60 insertions(+), 24 deletions(-) diff --git a/rag_eval/dataset_builder/generator/question_generator.py b/rag_eval/dataset_builder/generator/question_generator.py index bba736f..5c4eb6d 100644 --- a/rag_eval/dataset_builder/generator/question_generator.py +++ b/rag_eval/dataset_builder/generator/question_generator.py @@ -3,6 +3,7 @@ from __future__ import annotations import json +import time from abc import ABC, abstractmethod from typing import Any @@ -150,24 +151,39 @@ class OpenAIQuestionGenerator(QuestionGenerator): max_questions: int, max_chunks_per_question: int, job_name: str, + max_retries: int = 3, + retry_delay: float = 5.0, ) -> list[DraftQuestionSample]: - """Generate draft questions for one parsed document.""" + """Generate draft questions for one parsed document, with retry on timeout/server errors.""" prompt = self._build_prompt( document, max_questions=max_questions, max_chunks_per_question=max_chunks_per_question, ) - response = self.client.chat.completions.create( - model=self.model, - messages=[ - {"role": "system", "content": "You generate structured draft question banks from source documents."}, - {"role": "user", "content": prompt}, - ], - response_format={"type": "json_object"}, - ) - content = response.choices[0].message.content or "{}" - payload = self._parse_response_payload(content) - return [ - self._build_sample(document=document, payload=item, index=index, job_name=job_name) - for index, item in enumerate(payload[:max_questions], start=1) - ] + last_exc: Exception | None = None + for attempt in range(1, max_retries + 1): + try: + response = self.client.chat.completions.create( + model=self.model, + messages=[ + {"role": "system", "content": "You generate structured draft question banks from source documents."}, + {"role": "user", "content": prompt}, + ], + response_format={"type": "json_object"}, + ) + content = response.choices[0].message.content or "{}" + payload = self._parse_response_payload(content) + return [ + self._build_sample(document=document, payload=item, index=index, job_name=job_name) + for index, item in enumerate(payload[:max_questions], start=1) + ] + except Exception as exc: + last_exc = exc + if attempt < max_retries: + wait = retry_delay * attempt + doc_name_safe = document.doc_name.encode("ascii", "replace").decode("ascii") + print(f" [warn] generate attempt {attempt}/{max_retries} failed for {doc_name_safe!r}: {exc}. Retrying in {wait:.0f}s...") + time.sleep(wait) + raise RuntimeError( + f"Question generation failed for {document.doc_name!r} after {max_retries} attempts" + ) from last_exc diff --git a/rag_eval/dataset_builder/runner.py b/rag_eval/dataset_builder/runner.py index 4e7e494..6b3be15 100644 --- a/rag_eval/dataset_builder/runner.py +++ b/rag_eval/dataset_builder/runner.py @@ -111,12 +111,32 @@ def run_dataset_build( continue documents.append(document) - generated = generator.generate( - document, - max_questions=job.max_questions_per_document, - max_chunks_per_question=job.max_source_chunks_per_question, - job_name=job.job_name, - ) + doc_name_safe = pdf_path.name.encode("ascii", "replace").decode("ascii") + print(f" [info] generating questions for: {doc_name_safe}") + try: + generated = generator.generate( + document, + max_questions=job.max_questions_per_document, + max_chunks_per_question=job.max_source_chunks_per_question, + job_name=job.job_name, + ) + except Exception as exc: + gen_failure = ParseFailure(file_path=pdf_path.as_posix(), error=f"generation failed: {exc}") + failures.append(gen_failure) + print(f" [warn] skipping {doc_name_safe} after generation failure: {exc}") + if job.failure_mode == "fail": + result = DatasetBuildResult( + job=job, + run_id=run_id, + artifact_paths=artifact_paths, + documents=documents, + draft_samples=draft_samples, + parse_failures=failures, + ) + write_dataset_build_artifacts(result) + raise + continue + valid_generated = [] for sample in generated: errors = validate_draft_sample( @@ -126,9 +146,9 @@ def run_dataset_build( ) if not errors: valid_generated.append(sample) - draft_samples.extend( - dedupe_samples(valid_generated)[: job.max_questions_per_document] - ) + new_samples = dedupe_samples(valid_generated)[: job.max_questions_per_document] + draft_samples.extend(new_samples) + print(f" [info] {doc_name_safe}: {len(new_samples)} questions generated (total so far: {len(draft_samples)})") result = DatasetBuildResult( job=job,