保定ai问答主体项目
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

31 lines
894 B

from starlette.background import BackgroundTasks
from .ctx import CTX_BG_TASKS
class BgTasks:
"""后台任务统一管理"""
@classmethod
async def init_bg_tasks_obj(cls):
"""实例化后台任务,并设置到上下文"""
bg_tasks = BackgroundTasks()
CTX_BG_TASKS.set(bg_tasks)
@classmethod
async def get_bg_tasks_obj(cls):
"""从上下文中获取后台任务实例"""
return CTX_BG_TASKS.get()
@classmethod
async def add_task(cls, func, *args, **kwargs):
"""添加后台任务"""
bg_tasks = await cls.get_bg_tasks_obj()
bg_tasks.add_task(func, *args, **kwargs)
@classmethod
async def execute_tasks(cls):
"""执行后台任务,一般是请求结果返回之后执行"""
bg_tasks = await cls.get_bg_tasks_obj()
if bg_tasks.tasks:
await bg_tasks()