Files
2026-05-21 23:20:39 +08:00

39 lines
1.2 KiB
Python

"""Async utility helpers for bridging sync generators to async FastAPI routes."""
from __future__ import annotations
import asyncio
import threading
from typing import AsyncGenerator, Generator, TypeVar
T = TypeVar("T")
_SENTINEL = object()
async def iter_in_thread(sync_gen: Generator[T, None, None]) -> AsyncGenerator[T, None]:
"""Yield items from a synchronous generator without blocking the event loop.
Runs the generator in a daemon thread, forwarding items via an asyncio.Queue.
Use this to wrap blocking LLM streaming generators inside async FastAPI routes.
"""
loop = asyncio.get_running_loop()
queue: asyncio.Queue = asyncio.Queue(maxsize=32)
def _drain() -> None:
try:
for item in sync_gen:
future = asyncio.run_coroutine_threadsafe(queue.put(item), loop)
future.result()
finally:
asyncio.run_coroutine_threadsafe(queue.put(_SENTINEL), loop).result()
thread = threading.Thread(target=_drain, daemon=True)
thread.start()
while True:
item = await queue.get()
if item is _SENTINEL:
break
yield item # type: ignore[misc]