Django框架-使用celery(一):django使用celery的通用配置,不受版本影响

news2025/2/10 22:13:44

目录

一、依赖包情况

二、项目目录结构

   2.1、怎么将django的应用创建到apps包

三、celery的配置

2.1、celery_task/celery.py

2.2、celery_task/async_task.py

2.3、celery_task/scheduler_task.py

2.4、utils/check_task.py

四、apps/user中配置相关处理视图

4.1、基本配置

4.2、user的models

4.3、user的视图函数

五、调用函数测试

5.1、启动项目

5.2、测试项目:Postman接口工具

六、报错


一、依赖包情况

python==3.9.0

django==3.2.0

celery==5.3.1

django-redis==5.3.0

eventlet==0.33.3  #windows系统需要使用到

注意:还需要在系统中安装好redis数据库,不然无法使用。

二、项目目录结构

                 

   2.1、怎么将django的应用创建到apps包

·1、创建user应用

cd apps
python ../manage.py startapp user

   2、修改user包下的apps.py模块

from django.apps import AppConfig


class UserConfig(AppConfig):
    default_auto_field = 'django.db.models.BigAutoField'
    #原来是 name = 'user',改成下面的
    name = 'apps.user'

3、注册到settings.py文件中

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'apps.user.apps.UserConfig', #注册user应用,from apps.user.apps import UserConfig 
]

三、celery的配置

概述:celery应用程序和任务都放到celery_task包中,其中celery.py是创建Celery的实例对象的,async_task.py用来写异步任务,scheduler_task.py用来写定时任务的。utils/check_task.py用来检测任务id是否结束并获取任务的返回值的。

2.1、celery_task/celery.py

from celery import Celery
from celery.schedules import crontab
from datetime import timedelta

# 消息中间件,密码是你redis的密码
# broker='redis://:123456@127.0.0.1:6379/2' 密码123456
broker = 'redis://127.0.0.1:6379/0'  # 无密码

# 任务结果存储
backend = 'redis://127.0.0.1:6379/1'

#包含任务的所有模块的导入路径:
task_module = [
    'celery_task.async_task', #写任务模块导入路径,该模块主要写异步任务的方法
    'celery_task.scheduler_task', #写任务模块导入路径,该模块主要写定时任务的方法
]

# 生成celery对象,'task'相当于key,用于区分celery对象
# broker是指定消息处理,backend是指定结果后端的存储位置 include参数需要指定任务模块
app = Celery('task', broker=broker, backend=backend, include=task_module)

# 配置
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
# 定时任务配置
app.conf.beat_schedule = {
    # 名字随意命名
    'add-func-30-seconds': {
        # 执行add_task下的addy函数
        'task': 'celery_task.scheduler_task.add_func',  # 任务函数的导入路径,from celery_task.scheduler_task import add_func
        # 每10秒执行一次
        'schedule': timedelta(seconds=30),
        # add函数传递的参数
        'args': (10, 21)
    },
    #名字随意起
    'add-func-5-minutes': {
        'task': 'celery_task.scheduler_task.add_func',# 任务函数的导入路径,from celery_task.scheduler_task import add_func
        # crontab不传的参数默认就是每的意思,比如这里是每年每月每日每天每小时的5分执行该任务
        'schedule': crontab(minute='5'),  # 之前时间点执行,每小时的第5分钟执行任务, 改成小时,分钟,秒 就是每天的哪个小时哪分钟哪秒钟执行
        'args': (19, 22)  #定时任务需要的参数
    },
    #缓存用户数据到cache中
    'cache-user-func':{
        'task':'celery_task.scheduler_task.cache_user_func',#导入任务函数:from celery_task.scheduler_task import cache_user_func
        'schedule':timedelta(minutes=1),#每1分钟执行一次,将用户消息缓存到cache中
    }
}


'''
配置:也可以使用下面这种方式:
app.conf.update(
    task_serializer='json',
    accept_content=['json'],  # Ignore other content
    result_serializer='json',
    timezone='Asia/Shanghai',
    enable_utc=False,
)
'''

2.2、celery_task/async_task.py

# 因为需要用到django中的内容,所以需要配置django环境
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "study_celery.settings")
import django
django.setup()

# 导入celery对象app
from celery_task.celery import app
# 导入django自带的发送邮件模块
from django.core.mail import send_mail
import threading
from study_celery import settings
'''
1、没有返回值的,@app.task(ignore_result=True)
2、有返回值的任务,@app.task
'''

#没有返回值,禁用掉结果后端
@app.task
def send_email_task(email,code):  # 此时可以直接传邮箱,还能减少一次数据库的IO操作
    '''
    :param email: 接收消息的邮箱,用户的邮箱
    :return:
    '''
    # 启用线程发送邮件,此处最好加线程池
    t = threading.Thread(
        target=send_mail,
        args=(
            "登录前获取的验证码",  # 邮件标题
            '点击该邮件激活你的账号,否则无法登陆',  # 给html_message参数传值后,该参数信息失效
            settings.EMAIL_HOST_USER,  # 用于发送邮件的邮箱地址
            [email],  # 接收邮件的邮件地址,可以写多个
         ),
         # html_message中定义的字符串即HTML格式的信息,可以在一个html文件中写好复制出来放在该字符串中
         kwargs={
                'html_message': f"<p></p> <p>验证码:{code}</p>"
         }
        )
    t.start()
    return {'email':email,'code':code}

2.3、celery_task/scheduler_task.py

# 因为需要用到django中的内容,所以需要配置django环境
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "study_celery.settings")
import django
django.setup()

from celery_task.celery import app
from apps.user.views import models as user_models
from django.core.cache import cache
import time
from django.forms import model_to_dict

#有返回值,返回值可以从结果后端中获取
@app.task
def add_func(a,b):
    print('执行了加法函数')
    cache.set('add_ret',{'time':time.strftime('%Y-%m-%d %H:%M:%S'),'ret':a+b})
    return a+b

#不需要返回值,禁用掉结果后端
@app.task(ignore_result=True)
def cache_user_func():
    user = user_models.UserModel.objects.all()
    user_dict = {}
    for obj in user:
        user_dict[obj.account] = model_to_dict(obj)
    cache.set('all-user-data',user_dict,timeout=35*60)



2.4、utils/check_task.py

from celery.result import AsyncResult
from celery_task.celery import app
'''验证任务的执行状态的'''

def check_task_status(task_id):
    '''
    任务的执行状态:
        PENDING :等待执行
        STARTED :开始执行
        RETRY   :重新尝试执行
        SUCCESS :执行成功
        FAILURE :执行失败
    :param task_id:
    :return:
    '''
    result = AsyncResult(id=task_id, app=app)
    dic = {
        'type':result.status,
        'msg':'',
        'data':'',
        'code':400
    }
    if result.status == 'PENDING':
       dic['msg'] = '任务等待中'
    elif result.status == 'STARTED':
        dic['msg'] = '任务开始执行'
    elif result.status == 'RETRY':
        dic['msg']='任务重新尝试执行'
    elif result.status =='FAILURE':
        dic['msg'] = '任务执行失败了'
    elif result.status == 'SUCCESS':
        result = result.get()
        dic['msg'] = '任务执行成功'
        dic['data'] = result
        dic['code'] = 200
        # result.forget() # 将结果删除
        # async.revoke(terminate=True)  # 无论现在是什么时候,都要终止
        # async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。
    return dic


四、apps/user中配置相关处理视图

4.1、基本配置

1、study_celery/settings.py

#cache缓存
CACHES = {
    "default": {
        "BACKEND": "django_redis.cache.RedisCache",
        "LOCATION": "redis://127.0.0.1:6379",
        "OPTIONS": {
            "CLIENT_CLASS": "django_redis.client.DefaultClient",
            "CONNECTION_POOL_KWARGS": {"max_connections": 100}
            # "PASSWORD": "123",
        },
        'TIMEOUT':30*60 #缓存过期时间
    }
}

#邮件配置
# EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_HOST = 'smtp.qq.com'  # 如果是 163 改成 smtp.163.com
EMAIL_PORT = 465
EMAIL_HOST_USER = '2414155342@qq.com'  # 发送邮件的邮箱帐号
EMAIL_HOST_PASSWORD = 'qq邮箱的授权码'  # 授权码,各邮箱的设置中启用smtp服务时获取
DEFAULT_FROM_EMAIL = EMAIL_HOST_USER
# 这样收到的邮件,收件人处就会这样显示
# DEFAULT_FROM_EMAIL = '<'xxxxx@qq.com>'
EMAIL_USE_SSL = True   # 使用ssl
# EMAIL_USE_TLS = False # 使用tls
# EMAIL_USE_SSL 和 EMAIL_USE_TLS 是互斥的,即只能有一个为 True

2、study_celery/urls.py

from django.contrib import admin
from django.urls import path,include

urlpatterns = [
    path('admin/', admin.site.urls),
    path('user/',include('apps.user.urls'))
]

3、apps/user/urls.py

from django.urls import path
from . import views

urlpatterns = [

]

4.2、user的models

apps/user/models.py

from django.db import models
# Create your models here.

class UserModel(models.Model):
    id = models.AutoField(primary_key=True)
    name = models.CharField(max_length=32)
    account = models.CharField(max_length=64)
    password = models.CharField(max_length=256)
    email = models.EmailField()

执行数据库迁移命令

python manage.py makemigrations

python manage.py migrate

4.3、user的视图函数

1、apps/user/views.py

from django.contrib.auth.hashers import make_password, check_password
from django.views import View
from django.http import JsonResponse
from . import models
from django.core.cache import cache
import time
from celery_task.async_task import send_email_task
# Create your views here.

class ResgiterView(View):
    #注册用户
    def post(self,request):
        name = request.POST.get('name')
        account = request.POST.get('account')
        password = request.POST.get('password')
        email = request.POST.get('email')
        obj = models.UserModel.objects.filter(account=account).first()
        if obj:
            return JsonResponse({'code':400,'msg':'账户已经存在了'})
        password = make_password(password)
        instance = models.UserModel.objects.create(name=name,account=account,password=password,email=email)
        return JsonResponse({'code':200,'msg':'注册用户成功'})

class LoginView(View):
    #用户登录
    def post(self,request):
        account = request.POST.get('account')
        password = request.POST.get('password')
        code = request.POST.get('code') #验证码
        email_code = cache.get(f'email_{account}')#发给邮箱的验证码.get(f'email_{account})
        print(code,email_code)
        if code and email_code:
            if code != email_code:
                return JsonResponse({'code':400,'msg':'验证码错误'})
        else:
            return JsonResponse({'code':400,'msg':'请先点击发送邮件获取验证码'})
        obj = models.UserModel.objects.filter(account=account).first()
        if not obj:
            return JsonResponse({'code':400,'msg':'当前用户不存在'})
        pwd_true = check_password(password,obj.password)
        response = JsonResponse({'code':200,'msg':'登录成功'})
        if pwd_true:
            return response
        else:
            return JsonResponse({'code':400,'msg':'用户名或密码错误'})

class LoginSendEmailView(View):
    #用户登录前,需要验证码,发送验证码给用户的邮箱
    def post(self,request):
        account = request.POST.get('account')
        email = request.POST.get('email')
        code = str(time.time())[-5:]
        cache.set(f'email_{account}',code)
        print(cache.get(f'email_{account}'))
        res = send_email_task.delay(email,code)
        task_id = res.id
        print('验证码是',code)
        return JsonResponse({'code':200,'msg':f'请查看{email}邮箱中是否收到邮件','task_id':task_id})

class AllUserDataView(View):
    #查询cache_user_func定时任务执行时存到cache中的用户数据
    def get(self,request):
        key = 'all-user-data'
        data = cache.get(key)
        if data:
            return JsonResponse({'code':200,'data':data})
        else:
            return JsonResponse({'code':400,'msg':'没有相关数据'})

class AddFuncDataView(View):
    #查询add_func 定时任务执行时存到cache中的数据
    def get(self, request):
        data = cache.get('add_ret')
        print(data, type(data))
        return JsonResponse({'code': 200, 'data': data})

class UserTaskIdGetDataView(View):
    def get(self,request):
        from utils.check_task import check_task_status
        #检查任务是否成功了,获取任务的返回值
        task_id = request.GET.get('task_id')
        if not task_id:
            return JsonResponse({'code':400,'msg':'没有携带任务id'})
        ret = check_task_status(task_id)
        return JsonResponse(ret)

2、apps/user/urls.py

from django.urls import path
from . import views

urlpatterns = [
    path('login/',views.LoginView.as_view(),name='user-login'),#登录
    path('register/', views.ResgiterView.as_view(), name='user-register'),#注册
    path('login/code/',views.LoginSendEmailView.as_view(),name='user-login-send-email'),#登录验证码
    path('all/user/data/',views.AllUserDataView.as_view(),name='user-all-user-data'),#获取定时任务cache_user_func缓存到cache中的用户数据
    path('add/result/',views.AddFuncDataView.as_view(),name='user-add-result'),#获取定时任务add_func缓存到cache的计算结果
    path('task-id/result/',views.UserTaskIdGetDataView.as_view(),name='user-task-id-data'),#通过任务的task-id获取到任务返回值
]

五、调用函数测试

5.1、启动项目

1、启动django项目

python manage.py runserver

2、启动celery异步

#windows系统

celery -A celery_task worker -l info  -P  eventlet

#linux系统

celery -A celery_task worker -l info 

3、启动celery定时(定时任务也是提交给异步的)

celery -A celery_task beat -l info

5.2、测试项目:Postman接口工具

1、注册:url= /user/register/

2、登录前点击获取验证码:返回任务id, url=/user/login/code/

 把task_id=17ee8389-14ab-4da1-b88a-56446afb4493 复制下来,4中可以用到

3、登录:code是发送步骤2中发送给邮箱的验证码 ,url=/user/login/

4、通过任务id,获取到发送邮箱异步任务的返回值,url=/user/task-id/result/

复制2返回值中的task_id,获取该任务的返回值

5、获取定时任务中,add_func缓存在cache中的计算数据

6、获取定时任务中,cache_user_func缓存到cache中的用户数据

 总结:

1、异步任务,一般是保存文件、发送邮件、耗时操作等

2、定时任务,定时处理某些数据。

六、报错

1、先去celery.py中查看定时任务的配置,模块的路径是不是有问题,千万不要多了一个空格啥的

2、如果通过任务id获取任务的返回值不成功,看看是不是添加@app.task(ignore_result=True),如果是就去掉这个参数配置

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/864507.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Transformer(一)简述(注意力机制,NLP,CV通用模型)

目录 1.Encoder 1.1简单理解Attention 1.2.什么是self-attention 1.3.怎么计算self-attention 1.4.multi-headed 1.5.位置信息表达 2.decorder&#xff08;待补充&#xff09; 参考文献 1.Encoder 1.1简单理解Attention 比方说&#xff0c;下图中的热度图中我们希望专注于…

【Windows API】获取卷标、卷名

1、卷->卷标 使用FindFirstVolume()和FindNextVolume()函数体系&#xff0c;枚举系统所有卷&#xff08;Volume&#xff09;的例子&#xff0c;然后获取卷标、卷类型。这个方式可以枚举出没有驱动器号&#xff08;卷标&#xff09;的卷。 int TestMode1() {HANDLE hVolume…

DAY19

题目一 空间尝试模型 一个样本做行一个样本做列 范围尝试模型 以....做分隔 dp[i][j] 为以i为左界限 以j为右界限 求这个范围内的计算值(不对 是方法数) 这& | ^ 都是双目运算符 观察一下规律 整体字符数量一定为奇数(包括运算符和数字) 对应到数组中 数组的位一定是偶数…

CosmosAI欧盟数字超算新时代战略合作签约仪式在伦敦举行

据英国权威媒体获悉&#xff0c;由分布式超算网络服务商CosmosAI主办的欧盟数字超算新时代战略合作签约仪式将于8月14日英国伦敦历史悠久的莱福士OWO酒店隆重举办&#xff0c;该酒店曾作为爱德华七世国王加冕仪式以及丘吉尔二战办公室享誉盛名。 本次活动CosmosAI基金会联合创…

海思ss928部署手写数字识别模型

大致流程--------------------------------------------------------------------------------------------------------------------- 模型转换---------------------------------------------------------------------------------------------------- 1&#xff1a;准备MNI…

QPainter - 八卦时钟

QPainter - 八卦时钟 上一篇我们在画时钟的时候&#xff0c;已经把基本的钟表指针和刻度都绘制过了 想要完成八卦时钟&#xff0c;就要绘制这个里面的八卦了。 先上个图&#xff1a; 有人和我说八卦不能转 再来一张图&#xff1a; 背景的绘制 我们需要删除之前所绘制的白色…

攻防世界-web-getit

1. 题目描述 菜鸡发现这个程序偷偷摸摸在自己的机器上搞事情&#xff0c;它决定一探究竟。 获取到文件后&#xff0c;先查看文件信息 说明是一个可执行程序&#xff0c;没啥思路&#xff0c;先逆向 2. 思路分析 逆向后&#xff0c;找到main函数&#xff0c;查看逻辑 通过逆…

智安网络|网络安全:危机下的创新与合作

随着信息技术的迅猛发展和互联网的普及&#xff0c;我们进入了一个高度网络化的社会。网络在提供便利和连接的同时&#xff0c;也带来了许多安全隐患和挑战。 一、网络安全的危险 **1.数据泄露和隐私侵犯&#xff1a;**网络上的个人和机构数据存在遭受泄露和盗取的风险&#…

C#,入门教程(42)——各种括号“()[]{}<>“的用法总结

&#xff08;成对的&#xff09;括号是各种编程语言的核心要素。很多年前就想着写这样一篇专门关于各种括号的技术文章。一直未动笔&#xff0c;因为总想着偷懒&#xff0c;但凡有一个人写了&#xff0c;就无需我动手了。可惜的是&#xff0c;等了十多年&#xff0c;也没有出现…

集成接近和环境光传感器市场调查报告

集成接近和环境光传感器在单个传感器中集成接近和环境光感应功能。该传感器广泛应用于物联网 (IoT) 设备、消费电子产品和可穿戴设备&#xff0c;集成接近和环境光传感器可以自动调整屏幕亮度并根据接近情况打开/关闭屏幕&#xff0c;以降低设备功耗。集成接近和环境光传感器广…

【嵌入式学习笔记】嵌入式入门4——独立看门狗IWDG

1.IWDG简介 IWDG的全称&#xff1a;Independent watchdog&#xff0c;即独立看门狗&#xff0c;IWDG的本能&#xff1a;产生系统复位信号的计数器IWDG的特性&#xff1a;递减的计数器&#xff0c;时钟由独立的RC振荡器提供&#xff08;可在待机和停止模式下运行&#xff09; 看…

16bit、8 通道、500kSPS、 SAR 型 ADC——MS5188N

MS5188N 是 8 通道、 16bit 、电荷再分配逐次逼近型模数 转换器&#xff0c;采用单电源供电。 MS5188N 拥有多通道、低功耗数据采集系统所需的所有 组成部分&#xff0c;包括&#xff1a;无失码的真 16 位 SAR ADC &#xff1b;用于将输入配 置为单端输入&#xff0…

水力发电厂测量装置配置选型及厂用电管理系统

NB/T 10861-2021《水力发电厂测量装置配置设计规范》对水电厂的测量装置配置做了详细要求和指导。测量装置是水力发电厂运行监测的重要环节&#xff0c;水电厂的测量主要分为电气量测量和非电量测量。电气测量指使用电的方式对电气实时参数进行测量&#xff0c;包括电流、电压、…

linux下实现生产者和消费者 pv操作

线程同步与线程安全 生产者和消费者特点图示理解编程实现测试结果 生产者和消费者 特点 1.解耦:因为多了一个缓冲区&#xff0c;所以生产者和消费者并不直接相互调用&#xff0c;这样生产者和消费者的代码发生变化&#xff0c;都不会对对方产生影响。这样其实就是把生产者和消…

使用 Gradio 构建生成式 AI 应用程序(一): 图片内容读取app

今天我们来学习DeepLearning.AI的在线课程&#xff1a;Building Generative AI Applications with Gradio&#xff0c;该课程主要讲述利用gradio来部署机器学习算法应用程序, 今天我们来学习第一课&#xff1a;Image captioning app&#xff0c;该课程主要讲述如何从图片中读取…

优秀项目团队最突出的5项重要特征

一个优秀的开发团队&#xff0c;对于软件项目而言&#xff0c;其重要性不言而喻。否则项目团队一盘散沙&#xff0c;直接影响项目准时保质保量地交付。一般从大家的认可度来说&#xff0c;优秀团队最突出的特征&#xff0c;主要集中在以下几个方面&#xff1a; 1、目标明确 优秀…

oracle连表查询in后边跟另一张表中的字符串字段

今天在做通过in进行连表查询的时候发现以下问题记录下 我的需求是A,B两张表连接查询&#xff0c;A中有一个FOOD_TYPES字段 存的值类型为1&#xff0c;2&#xff0c;3 B表中的字段是FOOD_TYPE 存的是单个数字字符串 我需要where b.food_type in a.food_types 但是无论怎么写都…

Live800:客服常用的6大提问技巧,帮助客服服务更高效

作为企业最前线的客服&#xff0c;提升服务质量是必须要做到的&#xff0c;而如何提升服务质量呢&#xff1f;其中一项关键点就是提问技巧。在客户沟通中&#xff0c;提问的方式和技巧直接影响着客户对企业服务的满意度。下面&#xff0c;本文将介绍客服常用的6大提问技巧&…

go语言的database/sql结合squirrel工具sql生成器完成数据库操作

database/sql database/sql是go语言内置数据库引擎&#xff0c;使用sql查询数据库&#xff0c;配置datasource后使用其数据库操作方法对数据库操作&#xff0c;如下&#xff1a; package mainimport ("database/sql""fmt"_ "github.com/Masterminds…

Android简单封装Matrix工具类

Android简单封装Matrix工具类 1.简介&#xff1a; Matrix 是一款微信研发并日常使用的应用性能接入框架&#xff0c;支持iOS, macOS和Android。 Matrix 通过接入各种性能监控方案&#xff0c;对性能监控项的异常数据进行采集和分析&#xff0c;输出相应的问题分析、定位与优化…