Django+Celery+Flower实现异步和定时任务及其监控告警

news2025/1/19 3:03:35

用Django框架进行web开发非常的快捷方便,但Django框架请求/响应是同步的。但我们在实际项目中经常会碰到一些耗时的不能立即返回请求结果任务如:数据爬取、发邮件等,如果常时间等待对用户体验不是很好,在这种情况下就需要实现异步实现,马上返回响应请求,但真正的耗时任务在后台异步执行。Django框架本身无法实现异步响应但可以通过Celery很快的实现异步和定时任务。本文将介绍如何通过Django+Celery+Flower实现异步和定时任务及其任务的监控告警。

常见的任务有两类,一类是异步任务,一类是定时任务(定时执行或按一定周期执行)。Celery都能很好的支持。

Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, 举几个实例场景中可用的例子:

  • 异步任务:将耗时的操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音频处理等等
  • 做一个定时任务,比如每天定时执行爬虫爬取指定内容

Celery 在执行任务时需要通过一个消息中间件(Broker)来接收和发送任务消息,以及存储任务结果, 一般使用rabbitMQ、Redis或其他DB。

本文使用redis作为消息中间件和结果存储,在后面的通过数据库监控任务执行案例将介绍用到数据库作为结果存储。

一、在Django中引入Celary

1、安装库

pip install celery
pip install redis
pip install eventlet  #在windows环境下需要安装eventlet包

2、引入celary

在主项目目录下,新建celary.py文件,内容如下:

import os
import django
from celery import Celery
from django.conf import settings

# 设置系统环境变量,否则在启动celery时会报错
# taskproject 是当前项目名
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'taskproject.settings')
django.setup()

celery_app = Celery('taskproject')
celery_app.config_from_object('django.conf:settings')
celery_app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

在这里插入图片描述

在主目录的__init__.py中添加如下代码:

from .celery import celery_app

__all__ = ['celery_app']

在这里插入图片描述

3、在settings.py中设置celery的相关参数

###----Celery redis 配置-----###
# Broker配置,使用Redis作为消息中间件
BROKER_URL = 'redis://:redispassword@127.0.0.1:6379/0'

# BACKEND配置,使用redis
CELERY_RESULT_BACKEND = 'redis://:redispassword@127.0.0.1:6379/1'


CELERY_ACCEPT_CONTENT=['json']
CELERY_TASK_SERIALIZER='json'
# 结果序列化方案
CELERY_RESULT_SERIALIZER = 'json'

# 任务结果过期时间,秒
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24

# 时区配置
CELERY_TIMEZONE = 'Asia/Shanghai'

在这里插入图片描述

这时候Celery的基本配置完成了,可以实现并添加任务了。

二、实现异步任务

1、创建tasks.py

在子应用下建立各自对应的任务文件tasks.py(必须是tasks.py这个名字,不允许修改)

import datetime
from time import sleep
from celery import shared_task
import logging
logger = logging.getLogger(__name__)


@shared_task()
def task1(x):
    for i in range(int(x)):
        sleep(1)
        logger.info('this is task1 '+str(i))
    return x


@shared_task
def scheduletask1():
    now = datetime.datetime.now()
    logger.info('this is scheduletask '+now.strftime("%Y-%m-%d %H:%M:%S"))
    return None

在tasks.py中我们定义了两个任务,这两个任务要用@shared_task装饰起来,否则celery无法管理。
在这里插入图片描述

为了放便执行我们通过views把这两个任务通过函数方法调用起来,用URL进行发布。

views.py
from django.http import JsonResponse
from . import tasks
# Create your views here.


def runtask(request):
    x=request.GET.get('x')
    tasks.task1.delay(x)
    content= {'200': 'run task1 success!---'+str(x)}
    return JsonResponse(content)


def runscheduletask(request):
    tasks.scheduletask1.delay()
    content= {'200': 'success!'}
    return JsonResponse(content)

在这里插入图片描述

在urls中加入路由进行发布

from django.urls import path

from taskapp import views

urlpatterns = [
    path('task', views.runtask),
    path('runscheduletask', views.runscheduletask),
]

在这里插入图片描述

在项目的主urls中加入子项目的urls
在这里插入图片描述

2、启动celery

在启动celery之前,先要启动redis服务,因为celery在settings中配置要用到redis作为消息中间件和结果存储。
windows环境下启动redis的命令为redis-server.exe redis.windows.conf

在控制台启动celery的worker

celery -A taskproject worker -l debug -P eventlet

在这里插入图片描述

启动django访问url调用任务,看异步效果
在这里插入图片描述

3、查看任务

控制台查看异步任务执行的情况,可以看web的url很快返回响应结果,后台控制台一直在执行异步任务。
在这里插入图片描述

三、实现定时任务

Celery实现定时任务也很方便

1、定义调度器

在settings.py中加入定时任务的定义就可以实现定时任务

from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
    'every_5_seconds': {
        # 任务路径
        'task': 'taskapp.tasks.scheduletask1',
        # 每5秒执行一次
        'schedule': 5,
        'args': ()
    }
}

这里这个scheduletask1是前面tasks.py中定义的任务,当然也可以定义多个定时任务,如加一个task1,task1是有参数的,可以在’args’: ()中传入参数

CELERYBEAT_SCHEDULE = {
    'every_5_seconds': {
        # 任务路径
        'task': 'taskapp.tasks.scheduletask1',
        # 每5秒执行一次
        'schedule': 5,
        'args': ()
    },
    'every_10_seconds': {
        # 任务路径
        'task': 'taskapp.tasks.task1',
        # 每10秒执行一次,task1的参数是5
        'schedule': 10,
        'args': ([5])
    }
}

在这里插入图片描述

这里定义了task1是10秒执行一次,传入的参数是5。

2、启动beat

需要保持worker进程,另外开一个控制台启动beat

celery -A taskproject beat -l debug

3、查看任务

启动任务后看控制台打印的日志task1和scheduletask1都按计划定时执行了。
在这里插入图片描述

三、通过数据库配置定时任务

虽然通过settings.py的配置可以实现定时任务的配置,做为实际项目中可能还是不够实用,更加工程化的做法是将定时任务的配置放到数据库里通过界面来配置。同样Celery对此也提供了很好的支持,这需要安装django-celery-beat插件。以下将介绍使用过程。

1、安装djiango-celery-beat

pip install django-celery-beat

2、在APP中注册djiango-celery-beat

INSTALLED_APPS = [
....
'django_celery_beat',
]

3、在settings.py中设置调度器及时区

在settings.py中屏蔽到原来的调度器,加入

CELERYBEAT_SCHEDULER = 'django_celery_beat.schedulers.DatabaseScheduler' 

在这里插入图片描述

在setings.py中设置好语言、时区等

LANGUAGE_CODE = 'zh-hans'

TIME_ZONE = 'Asia/Shanghai'

USE_I18N = True

USE_TZ = False

4、进行数据库迁移

python manage.py migrate django_celery_beat

5、分别启动woker和beta

在两个控制台分别启动woker和beta

celery -A taskproject worker -l debug -P eventlet
celery -A taskproject beat -l debug

6、启动django服务,访问admin的web管理端

访问 http://localhost:8000/admin/ 可以看到周期任务的管理菜单,管理定时任务非常方便。
在这里插入图片描述

7、配置定时任务

点击“间隔”
在这里插入图片描述

点击“增加间隔”来增加定时任务的配置,增加一个5秒执行一次的定时器。
在这里插入图片描述

看到有个每5秒的定时器
在这里插入图片描述

这时可以用这个定时器去新建调度任务了。选择周期性任务,点击“增加周期性任务”
在这里插入图片描述

填入任务名,选择需要定时执行的任务
在这里插入图片描述

因为task1需要参数,在后面参数设置中进行参数的设置。
在这里插入图片描述

保存后可以看到新加了一条“每5秒执行一次task1”的调度任务。
在这里插入图片描述

8、查看调度效果

在woker和beta的控制台都可以看到有定时任务执行的信息,说明任务被成功调度执行了。
在这里插入图片描述

四、通过django的web界面监控任务执行情况

在控制台监控任务执行情况,还不是很方便,最好是能够通过web界面看到任务的执行情况,如有多少任务在执行,有多少任务执行失败了等。这个Celery也是可以做到了,就是将任务执行结果写到数据库中,通过web界面显示出来。这里要用到django-celery-results插件。通过插件可以使用Django的orm作为结果存储,这样的好处在于我们可以直接通过django的数据查看到任务状态,同时为可以制定更多的操作,下面介绍如何使用orm作为结果存储。

1、安装django-celery-results

pip install django-celery-results

2、配置settings.py,注册app

INSTALLED_APPS = (
...,
'django_celery_results',
)

3、修改backend配置,将Redis改为django-db

# BACKEND配置,使用redis
#CELERY_RESULT_BACKEND = 'redis://:12345678@127.0.0.1:6379/1'
# 使用使用django orm 作为结果存储
CELERY_RESULT_BACKEND = 'django-db'  #使用django orm 作为结果存储

4、迁移数据库

python manage.py migrate django_celery_results

可以看到创建了django_celery_results相关的表
在这里插入图片描述

5、查看任务

启动django服务后,执行异步和定时任务,就可以在管理界面看到任务的执行情况,执行了哪些任务,哪些任务执行失败了等。
在这里插入图片描述

五、通过Flower监控任务执行情况

如果不想通django的管理界面监控任务的执行,还可以通过Flower插件来进行任务的监控。FLower的界面更加丰富,可以监控的信息更全。以下介绍通过Flower来进行任务监控。

1、安装flower

pip install flower

2、启动flower

celery -A taskproject flower --port-5566

在这里插入图片描述

3、使用flower进行任务监控

在这里插入图片描述

点击失败的我们可以看到执行失败的详情,这里是故意给task1的参数传了个‘a’字符,导致它执行报错了。可以看到任务执行的报错信息也展示出来了。
在这里插入图片描述

六、实现任务异常自动邮件告警

虽然可以通过界面来监控了,但是我们想要得更多,人不可能天天盯着界面看吧,如果能实现任务执行失败就自动发邮件告警就好了。这个Celery当然也是没有问题的。
通过钩子程序在异常的时候触发邮件通知。

1、加入钩子程序

对tasks.py的改造如下:

import datetime
from time import sleep
from celery import shared_task
from celery import Task
from django.core.mail import send_mail
import logging
logger = logging.getLogger(__name__)

class MyHookTask(Task):
    def on_success(self, retval, task_id, args, kwargs):
        info=f'任务成功-- 0task id:{task_id} , arg:{args} , successful !'
        logger.info(info)
        send_mail('celery任务监控', info, 'sendmail@qq.com', ['tomail@qq.com'])

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        info=f'任务失败-- task id:{task_id} , arg:{args} , failed ! erros: {exc}'
        logger.info(info)
        send_mail('celery任务监控异常', info, 'sendmail@qq.com', ['tomail@qq.com'])

    def on_retry(self, exc, task_id, args, kwargs, einfo):
        logger.info(f'task id:{task_id} , arg:{args} , retry !  erros: {exc}')


@shared_task(base=MyHookTask, bind=True)
def task1(self,x):
    for i in range(int(x)):
        sleep(1)
        logger.info('this is task1 '+str(i))
    return x


@shared_task
def scheduletask1():
    now = datetime.datetime.now()
    logger.info('this is scheduletask '+now.strftime("%Y-%m-%d %H:%M:%S"))
    return None

在这里插入图片描述

2、重启服务

将work和beta服务关掉,在两个控制台分别重新启动woker和beta

celery -A taskproject worker -l debug -P eventlet
celery -A taskproject beat -l debug

3、验证效果

在任务成功或失败的时候发邮件通知。
在这里插入图片描述

任务执行成功通知
在这里插入图片描述

任务执行异常告警通知
在这里插入图片描述

Django如何发送邮件见 https://blog.csdn.net/fullbug/article/details/128495415

至此,本文通过几个简单的应用介绍了Django+Celery+Flower实现异步和定时任务及其监控告警。


博客地址:http://xiejava.ishareread.com/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/130176.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SOFA Weekly|2023 我们一起加油、本周 Contributor QA

SOFA WEEKLY | 每周精选 筛选每周精华问答,同步开源进展欢迎留言互动~SOFAStack(Scalable Open Financial Architecture Stack)是蚂蚁集团自主研发的金融级云原生架构,包含了构建金融级云原生架构所需的各个组件&#…

RocketMQ 搭建

目录 1、什么是MQ?为什么要用MQ? 2、MQ的优缺点 3、几大MQ产品特点比较 4.RocketMQ在Windows的启动 1.下载RocketMQ 4.7.1版本 2.解压到本地磁盘并配置好JAVA_HOME和ROCKETMQ_HOME 3.修改runserver.cmd 4.启动server 5.修改runbroker.cmd 6.启动…

ROS2 基础概念 服务

ROS2 基础概念 服务1. Services2. 服务类型3. 查找服务4. 服务请求1. Services 服务基于 请求-应答 模型,而不是话题的 发布-订阅 模型 虽然话题允许节点订阅数据流并获得持续更新,但服务 仅在客户端专门调用时提供数据 还是启动海龟及其遥控节点为例&…

[标准库]STM32F103R8T6 点灯以及按键扫描

刚开始学32的时候,选择了基于HAL库进行开发,原因是HAL比较容易上手,像点灯、输出PWM、按键输入这种操作都很快捷。但是到ADCDMA这部分的时候发现,HAL库有一些地方我认为不是很合理和方便。比如DMA中断这部分,ST官方给出…

音视频开发系列--H264编解码总结

一、概述 H264,通常也被称之为H264/AVC(或者H.264/MPEG-4 AVC或MPEG-4/H.264 AVC) 对摄像头采集的每一帧视频需要进行编码,由于视频中存在空间和时间的冗余,需要用算法来去除这些冗余。H264是专门去除这些冗余的算法…

王者荣耀崩溃解决记录

王者荣耀竟然崩溃了 上周玩王者荣耀,突然就进不去了,点击开始游戏后应用直接就崩溃退出了。 第一反应,肯定是反馈给游戏客服。但是果然腾讯的游戏是找不到真客服的,全部都是机器人处理的,给了我一个毫无用处的官方回…

springboot中配置文件优先级以及分类,这你都可以不会吗?不会赶紧进来学( ̄(∞) ̄)

各位小伙伴大家好呀┗( ▔, ▔ )┛,马上过年了,但是感觉没啥期待的哈哈哈哈哈,现在的年说实话真的挺没劲的呜呜。 言归正传,我们大家在使用springboot时难免会写各种各样的配置信息,比如port,jdbc啊这些&am…

2022这一年:阳了、变轨和逆风

又到年末了,2022这一年应该会让人记忆深刻,于我而言这一年的感受有明显的分界线,在此之前的世界温暖一些,提供着能量,让人心生探索它的纷繁多彩;今年世界变得寒冷了,展示着它的严酷与无情。阳了…

再学C语言20:循环控制语句——for循环

在while循环中,建立一个重复执行固定次数的循环涉及到3个动作: 1)初始化一个计数器 2)计数器与某个有限的值比较 3)每次执行循环,要在循环体中让计数器的值递增 其中,计数器的初始化在循环之…

【pandas】教程:6-如何计算摘要统计

Pandas 计算摘要统计 本节使用的数据为 data/titanic.csv,链接为 pandas案例和教程所使用的数据-机器学习文档类资源-CSDN文库 加载数据 import pandas as pdtitanic pd.read_csv("data/titanic.csv") titanic.head()PassengerId Survived Pclass \…

#Z0424. 树上的旅行

题目 Description 给出一棵有N个结点的树,给出Q个询问,求结点xj过结点K到节点yj的最短距离 Format Input 第一行一个数n 接下来共有n-1行,三个数u,v,len表示u和v之间存在一条边长为len 再给你Q,K。代表有Q个询问&#xff0…

视频 | bedtools使用介绍1

点击阅读原文跳转完整教案。基因组中的趣事(二)- 最长的基因2.7 million,最短的基因只有8 nt却能编码基因组中的趣事(一):这个基因编码98种转录本1 Linux初探,打开新世界的大门1.1 Linux系统简介…

10000+条数据的内容滚动功能如何实现?

遇到脑子有问题的产品经理该怎么办?如果有这么一个需求要你在一个可视区范围内不间断循环滚动几千上万条数据你会怎么去实现? 且不说提这个需求的人是不是脑子有问题,这个需求能不能实现?肯定是可以的,把数据请求回来渲…

2022蓝桥杯省赛C++A组初尝试

前言 耗时三个半小时,看看自己不懂的有多少,以便明确后续备赛2023方向 耗时3个半小时,只拿了18分,没学过,时间再多也做不出来,有奥数那感觉了 据说蓝桥杯省3得做对 2填空 2大题(30分&#x…

PMP®项目管理|不同场景使用不同沟通方式

不同沟通方式的确有适用场景和不适用场景。无效沟通的重要原因之一就是错误选择沟通方式。 我们会在工作中用到很多沟通方式,每种沟通方式都有适用的场合,也有不适用的场合,错误选择将使沟通变得低效甚至无效。 沟通方式主要有三种&#xf…

一百种语言的LOVE

2023年快要到来啦,很高兴这次我们又能一起度过~ 目录 一、前言 二、详细介绍 三、效果展示 四、代码编写 index.html script.js style.css 五、获取代码 需要源码,可以私信我(⊙o⊙)?关注我? 一、前言 时光荏苒&#xf…

vue element-ui 手机号校验 验证码校验 获取验证码倒数60秒无样式实现

这段时间被迫搞前端搞裂开了,记录一下手机号验证码校验登录的极简无样式前端实现 巨丑!希望大佬们不介意 下面是先演示效果 点击登陆后显示校验信息 输入手机号点击获取验证码 输入符合校验的内容后点击登录提示成功 无后端交互!&#…

从档案信息管理到档案知识管理

今年6月份的时候,笔者发过一篇文章《DIKW模型在档案信息资源开发中的应用》,简要阐述了知识管理领域非常著名的DIKW模型,即从数据(Data)→信息(Information)→知识(Knowledge&#x…

基于SpringBoot和微信小程序的餐馆点餐系统的设计和实现

作者主页:Designer 小郑 作者简介:Java全栈软件工程师一枚,来自浙江宁波,负责开发管理公司OA项目,专注软件前后端开发(Vue、SpringBoot和微信小程序)、系统定制、远程技术指导。CSDN学院、蓝桥云…

Android 学习笔记

目录一.Android入门1.Android 概述2.Android Studio3.创建模拟器4.使用外部模拟器5.第一个app二.app开发基础1.开发语言2.app工程目录结构3.文本控件TextView(1)设置文本内容(2)设置文本大小(3)设置文本颜色(4)设置背景颜色(5)设置视图宽高(6)设置视图间距(7)设置视图对齐方式4…