from fastapi import FastAPI
import asyncio
app = FastAPI()
async def task1():
# 模拟执行任务1
print("开始执行任务1")
await asyncio.sleep(1)
print("结束执行任务1")
return "Result from Task 1"
async def task2():
# 模拟执行任务2
print("开始执行任务2")
await asyncio.sleep(1)
print("结束执行任务2")
return "Result from Task 2"
async def task3():
# 模拟执行任务3
print("开始执行任务3")
await asyncio.sleep(1)
print("结束执行任务3")
return "Result from Task 3"
async def execute_tasks_async():
# 并发执行三个任务
results = await asyncio.gather(task1(), task2(), task3())
return results
@app.get("/execute_tasks")
async def execute_tasks():
results = await execute_tasks_async()
return {"Task 1": results[0], "Task 2": results[1], "Task 3": results[2]}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=5023)
执行结果
解释说明
https://fastapi.tiangolo.com/zh/async/#in-a-hurry