89 lines
2.1 KiB
Python
89 lines
2.1 KiB
Python
|
|
"""异步任务处理模块
|
|||
|
|
|
|||
|
|
TODO: 后续替换为 RabbitMQ 消息队列
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import asyncio
|
|||
|
|
import uuid
|
|||
|
|
from datetime import datetime
|
|||
|
|
from typing import Callable, Awaitable
|
|||
|
|
from app.utils.logger import logger
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 任务状态存储(后续替换为 Redis)
|
|||
|
|
_task_store: Dict[str, Dict] = {}
|
|||
|
|
|
|||
|
|
|
|||
|
|
def generate_task_id() -> str:
|
|||
|
|
"""生成任务ID"""
|
|||
|
|
return f"task-{uuid.uuid4().hex[:12]}"
|
|||
|
|
|
|||
|
|
|
|||
|
|
class AsyncTaskManager:
|
|||
|
|
"""异步任务管理器"""
|
|||
|
|
|
|||
|
|
def __init__(self):
|
|||
|
|
self._running_tasks: dict[str, asyncio.Task] = {}
|
|||
|
|
|
|||
|
|
def create_task(
|
|||
|
|
self,
|
|||
|
|
task_id: str,
|
|||
|
|
task_func: Callable[[str], Awaitable[None]],
|
|||
|
|
) -> asyncio.Task:
|
|||
|
|
"""
|
|||
|
|
创建异步任务
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
task_id: 任务ID
|
|||
|
|
task_func: 任务执行函数
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
asyncio.Task
|
|||
|
|
"""
|
|||
|
|
task = asyncio.create_task(self._run_task(task_id, task_func))
|
|||
|
|
self._running_tasks[task_id] = task
|
|||
|
|
return task
|
|||
|
|
|
|||
|
|
async def _run_task(
|
|||
|
|
self,
|
|||
|
|
task_id: str,
|
|||
|
|
task_func: Callable[[str], Awaitable[None]],
|
|||
|
|
):
|
|||
|
|
"""运行任务并处理状态"""
|
|||
|
|
try:
|
|||
|
|
await task_func(task_id)
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"Task {task_id} failed: {e}")
|
|||
|
|
_task_store[task_id] = {
|
|||
|
|
"status": "failed",
|
|||
|
|
"error": str(e),
|
|||
|
|
"completed_at": datetime.now(),
|
|||
|
|
}
|
|||
|
|
finally:
|
|||
|
|
if task_id in self._running_tasks:
|
|||
|
|
del self._running_tasks[task_id]
|
|||
|
|
|
|||
|
|
def get_task_status(self, task_id: str) -> dict | None:
|
|||
|
|
"""获取任务状态"""
|
|||
|
|
return _task_store.get(task_id)
|
|||
|
|
|
|||
|
|
def cancel_task(self, task_id: str) -> bool:
|
|||
|
|
"""取消任务"""
|
|||
|
|
if task_id in self._running_tasks:
|
|||
|
|
self._running_tasks[task_id].cancel()
|
|||
|
|
return True
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 单例
|
|||
|
|
task_manager = AsyncTaskManager()
|
|||
|
|
|
|||
|
|
|
|||
|
|
def get_task_status(task_id: str) -> dict | None:
|
|||
|
|
"""获取任务状态"""
|
|||
|
|
return _task_store.get(task_id)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def set_task_status(task_id: str, status: dict):
|
|||
|
|
"""设置任务状态"""
|
|||
|
|
_task_store[task_id] = status
|