以下是一个使用后台任务的 FastAPI 示例:
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
def write_log(message: str):
with open("logs.txt", mode="a") as log_file:
log_file.write(message)
@app.post("/send-notification/{email}")
async def send_notification(
email: str, background_tasks: BackgroundTasks
):
message = f"message to {email}\n"
background_tasks.add_task(write_log, message)
return {"message": "Message sent in the background"}
在这个例子中:
1. 定义了一个 write_log 函数,它将消息写入一个日志文件(logs.txt)。
2. 创建一个 /send-notification/{email} 路由,当该路由被请求时,会异步执行后台任务。
3. 在路径操作函数中,使用 background_tasks.add_task 来添加后台任务。在这个例子中,添加了一个写入日志文件的后台任务。
当客户端访问 /send-notification/{email} 路由时,路径操作函数会立即返回响应,而后台任务会在后台异步执行,不会阻塞主请求的处理。
请注意,后台任务是在应用程序的进程内执行的,因此如果主应用程序关闭,后台任务可能会被终止。如果需要更高级的异步任务处理,可以考虑使用 Celery 等异步任务队列。
转载请注明出处:http://www.zyzy.cn/article/detail/7398/FastAPI