Files

89 lines
2.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""异步任务处理模块
TODO: 后续替换为 RabbitMQ 消息队列
"""
import asyncio
import uuid
from datetime import datetime
from typing import Callable, Awaitable
from app.utils.logger import logger
# 任务状态存储(后续替换为 Redis
_task_store: Dict[str, Dict] = {}
def generate_task_id() -> str:
"""生成任务ID"""
return f"task-{uuid.uuid4().hex[:12]}"
class AsyncTaskManager:
"""异步任务管理器"""
def __init__(self):
self._running_tasks: dict[str, asyncio.Task] = {}
def create_task(
self,
task_id: str,
task_func: Callable[[str], Awaitable[None]],
) -> asyncio.Task:
"""
创建异步任务
Args:
task_id: 任务ID
task_func: 任务执行函数
Returns:
asyncio.Task
"""
task = asyncio.create_task(self._run_task(task_id, task_func))
self._running_tasks[task_id] = task
return task
async def _run_task(
self,
task_id: str,
task_func: Callable[[str], Awaitable[None]],
):
"""运行任务并处理状态"""
try:
await task_func(task_id)
except Exception as e:
logger.error(f"Task {task_id} failed: {e}")
_task_store[task_id] = {
"status": "failed",
"error": str(e),
"completed_at": datetime.now(),
}
finally:
if task_id in self._running_tasks:
del self._running_tasks[task_id]
def get_task_status(self, task_id: str) -> dict | None:
"""获取任务状态"""
return _task_store.get(task_id)
def cancel_task(self, task_id: str) -> bool:
"""取消任务"""
if task_id in self._running_tasks:
self._running_tasks[task_id].cancel()
return True
return False
# 单例
task_manager = AsyncTaskManager()
def get_task_status(task_id: str) -> dict | None:
"""获取任务状态"""
return _task_store.get(task_id)
def set_task_status(task_id: str, status: dict):
"""设置任务状态"""
_task_store[task_id] = status