一、架构设计背景
1.1 需求场景分析
在Web应用中,当遇到以下场景时需要异步任务处理方案:
- 高延迟操作(大文件解析/邮件发送/复杂计算)
- 请求响应解耦(客户端快速响应)
- 任务队列管理(任务优先级/失败重试)
- 分布式任务调度(多Worker节点)
1.2 技术选型说明
组件 | 作用 | 版本要求 |
---|---|---|
FastAPI | 构建高性能API接口 | >=0.68 |
Redis | 消息中间件+结果存储 | >=5.0 |
Celery | 分布式任务队列 | >=5.2 |
Supervisord | 进程监控与管理 | >=4.2 |
二、核心实现逻辑
2.1 异步任务处理流程
- 客户端上传文件到FastAPI
- API生成唯一任务ID并持久化任务信息
- 任务进入Redis队列
- Celery Worker消费队列任务
- 状态更新与结果存储
- 客户端轮询获取任务状态
2.2 代码实现优化
2.2.1 增强型FastAPI服务
# 文件校验中间件
def validate_file(file: UploadFile):
if not file.filename.lower().endswith(('.csv', '.xlsx')):
raise HTTPException(400, "仅支持CSV/XLSX格式")
if file.size > 1024*1024*100: # 100MB限制
raise HTTPException(413, "文件超过大小限制")
return file
# 上传接口
@app.post("/upload")
async def upload(file: UploadFile = File(...)):
validated_file = validate_file(file)
task_id = f"{uuid.uuid4().hex}_{secure_filename(file.filename)}"
# 异步存储文件
await file.seek(0)
content = await file.read()
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, save_upload_file, content, task_id)
task_data = {
"task_id": task_id,
"file_path": file_path,
}
r.lpush("task_queue", json.dumps(task_data))
r.hset(name="task_status",
key=task_id,
value="pending")
return JSONResponse({
"code": 200,
"data": {"task_id": task_id},
"msg": "任务创建成功"
})
2.2.2 健壮型Celery Worker
@app.task(
bind=True,
max_retries=3,
soft_time_limit=300,
autoretry_for=(Exception,),
retry_backoff=True
)
def process_file_task(self, task_data):
try:
logger.info(f"Processing {task_data['task_id']}")
# 实际业务逻辑
time.sleep(10)
r.hset("task_status", task_data["task_id"], "completed")
except Exception as exc:
self.retry(exc=exc, countdown=2 ** self.request.retries)
2.2.3 轮询任务队列
r = redis.Redis(host="localhost", port=6379, db=0,password="123456")
def main():
logger.info("任务轮询启动,正在轮询 Redis...")
while True:
task_data = r.lpop("task_queue")
if task_data:
data = json.loads(task_data)
logger.info("轮询到任务ID:"+data["task_id"])
r.hset("task_status", data["task_id"], "processing")
save_file_to_disk.delay(data)
logger.info("已在后台执行,继续轮询")
time.sleep(3)
if __name__ == "__main__":
main()
三、生产级Supervisord配置
3.1 配置文件
[supervisord]
logfile=/var/log/supervisord.log
logfile_maxbytes=50MB
logfile_backups=10
loglevel=info
pidfile=/tmp/supervisord.pid
nodaemon=false
[program:fastapi]
command=uvicorn main:app --host 0.0.0.0 --port 8000
directory=/opt/app
autostart=true
autorestart=unexpected
startsecs=5
stopwaitsecs=30
user=www-data
environment=PYTHONPATH="/opt/app"
[program:celery_worker]
command=celery -A worker.celery_app worker --concurrency=4 -O fair
directory=/opt/app
autostart=true
autorestart=true
stdout_logfile=/var/log/celery_worker.log
redirect_stderr=true
killasgroup=true
stopasgroup=true
3.2 关键配置说明
- 进程分组管理:killasgroup/stopasgroup确保子进程被正确回收
- 日志轮转:logfile_maxbytes和logfile_backups防止日志膨胀
- 资源限制:通过concurrency参数控制Worker并发数
- 环境隔离:指定运行用户和Python路径
一、总结
在Web应用开发中,为了应对诸如处理大文件上传、发送邮件、执行复杂计算等耗时操作,以及实现请求响应解耦和分布式任务调度的需求,我们通常需要采用异步任务处理方案。本文介绍了一种基于FastAPI、Redis、Celery和Supervisord构建的高效异步任务处理架构。