Django框架集成Celery异步-【2】:django集成celery,拿来即用,可用操作django的orm等功能

news2025/1/12 23:44:21

一、项目结构和依赖

study_celery

          | --user

                |-- models.py

                |--views.py

                |--urls.py

          |--celery_task

                  |--__init__.py

                  |--async_task.py

                  |-- celery.py

                  | --check_task.py

                  | --config.py

                  | --scheduler_task.py

        | --study_celery

                | --settings.py

        | --manage.py

依赖:redis数据库
 

redis==4.6.0
Django==3.2
django-redis==5.3.0
celery==5.3.1

二、celery框架配置详情

1、配置文件

config.py

from celery.schedules import crontab
from datetime import timedelta
'''
参数解析:
accept_content:允许的内容类型/序列化程序的白名单,如果收到不在此列表中的消息,则该消息将被丢弃并出现错误,默认只为json;
task_serializer:标识要使用的默认序列化方法的字符串,默认值为json;
result_serializer:结果序列化格式,默认值为json;
timezone:配置Celery以使用自定义时区;
enable_utc:启用消息中的日期和时间,将转换为使用 UTC 时区,与timezone连用,当设置为 false 时,将使用系统本地时区。
result_expires: 异步任务结果存活时长
beat_schedule:设置定时任务
'''
#手动注册celery的异步任务:将所有celery异步任务所在的模块找到,写成字符串
task_module = [
    'celery_task.async_task',  # 写任务模块导入路径,该模块主要写异步任务的方法
    'celery_task.scheduler_task',  # 写任务模块导入路径,该模块主要写定时任务的方法
]

#celery的配置
config = {
    "broker_url" :'redis://127.0.0.1:6379/0',   #'redis://:123456@127.0.0.1:6379/1' 有密码时,123456是密码
    "result_backend" : 'redis://127.0.0.1:6379/1',
    "task_serializer" : 'json',
    "result_serializer" : 'json',
    "accept_content" : ['json'],
    "timezone" : 'Asia/Shanghai',
    "enable_utc" : False,
    "result_expires" : 1*60*60,
    "beat_schedule" : { #定时任务配置
            # 名字随意命名
            'add-func-30-seconds': {
                # 执行add_task下的addy函数
                'task': 'celery_task.scheduler_task.add_func',  # 任务函数的导入路径,from celery_task.scheduler_task import add_func
                # 每10秒执行一次
                'schedule': timedelta(seconds=30),
                # add函数传递的参数
                'args': (10, 21)
            },
            # 名字随意起
            'add-func-5-minutes': {
                'task': 'celery_task.scheduler_task.add_func',  # 任务函数的导入路径,from celery_task.scheduler_task import add_func
                # crontab不传的参数默认就是每的意思,比如这里是每年每月每日每天每小时的5分执行该任务
                'schedule': crontab(minute='5'),  # 之前时间点执行,每小时的第5分钟执行任务, 改成小时,分钟,秒 就是每天的哪个小时哪分钟哪秒钟执行
                'args': (19, 22)  # 定时任务需要的参数
            },
            # 缓存用户数据到cache中
            'cache-user-func': {
                'task': 'celery_task.scheduler_task.cache_user_func',
                # 导入任务函数:from celery_task.scheduler_task import cache_user_func
                'schedule': timedelta(minutes=1),  # 每1分钟执行一次,将用户消息缓存到cache中
            }
        }
}

2、celery对象创建

celery.py

from celery import Celery
from celery.schedules import crontab
from datetime import timedelta
from .config import config,task_module

# 生成celery对象,'task'相当于key,用于区分celery对象
# broker是指定消息处理,backend是指定结果后端的存储位置 include参数需要指定任务模块
app = Celery('task', broker=config.get('broker_url'), backend=config.get('result_backend'), include=task_module)
app.conf.update(**config)

3、异步任务模块

async_task.py

'1、因为需要用到django中的内容,所以需要配置django环境'
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "study_celery.settings")#根目录下study_celery/settings.py
import django
django.setup()

# 导入celery对象app
from celery_task.celery import app
# 导入django自带的发送邮件模块
from django.core.mail import send_mail
import threading
from study_celery import settings
from apps.user.models import UserModel
'''
2、异步任务
不保存函数返回值的,@app.task(ignore_result=True)
保存函数返回值的任务,@app.task
'''

#没有返回值,禁用掉结果后端
@app.task
def send_email_task(email,code):  # 此时可以直接传邮箱,还能减少一次数据库的IO操作
    '''
    :param email: 接收消息的邮箱,用户的邮箱
    :return:
    '''
    # 启用线程发送邮件,此处最好加线程池
    t = threading.Thread(
        target=send_mail,
        args=(
            "登录前获取的验证码",  # 邮件标题
            '点击该邮件激活你的账号,否则无法登陆',  # 给html_message参数传值后,该参数信息失效
            settings.EMAIL_HOST_USER,  # 用于发送邮件的邮箱地址
            [email],  # 接收邮件的邮件地址,可以写多个
         ),
         # html_message中定义的字符串即HTML格式的信息,可以在一个html文件中写好复制出来放在该字符串中
         kwargs={
                'html_message': f"<p></p> <p>验证码:{code}</p>"
         }
        )
    t.start()
    return {'email':email,'code':code}

@app.task
def search_user_task():
    users = UserModel.objects.all()
    lis = []
    for user in users:
        dic = {'id':user.id,'name':user.name}
        lis.append(dic)
    return {'users':lis}

4、定时任务模块

scheduler_task.py

'1、因为需要用到django中的内容,所以需要配置django环境 '
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "study_celery.settings")#根目录下study_celery/settings.py
import django
django.setup()

from celery_task.celery import app
from apps.user.views import models as user_models
from django.core.cache import cache
import time
from django.forms import model_to_dict

'2、定时任务'
#有返回值,返回值可以从结果后端中获取
@app.task
def add_func(a,b):
    print('执行了加法函数')
    cache.set('add_ret',{'time':time.strftime('%Y-%m-%d %H:%M:%S'),'ret':a+b})
    return a+b

#不需要返回值,禁用掉结果后端
@app.task(ignore_result=True)
def cache_user_func():
    user = user_models.UserModel.objects.all()
    user_dict = {}
    for obj in user:
        user_dict[obj.account] = model_to_dict(obj)
    cache.set('all-user-data',user_dict,timeout=35*60)



5、检测任务完成状态

check_task.py

from celery.result import AsyncResult
from celery_task.celery import app
'''验证任务的执行状态的'''

def check_task_status(task_id):
    '''
    任务的执行状态:
        PENDING :等待执行
        STARTED :开始执行
        RETRY   :重新尝试执行
        SUCCESS :执行成功
        FAILURE :执行失败
    :param task_id:
    :return:
    '''
    result = AsyncResult(id=task_id, app=app)
    dic = {
        'type':result.status,
        'msg':'',
        'data':'',
        'code':400
    }
    if result.status == 'PENDING':
       dic['msg'] = '任务等待中'
    elif result.status == 'STARTED':
        dic['msg'] = '任务开始执行'
    elif result.status == 'RETRY':
        dic['msg']='任务重新尝试执行'
    elif result.status =='FAILURE':
        dic['msg'] = '任务执行失败了'
    elif result.status == 'SUCCESS':
        result = result.get()
        dic['msg'] = '任务执行成功'
        dic['data'] = result
        dic['code'] = 200
        # result.forget() # 将结果删除
        # async.revoke(terminate=True)  # 无论现在是什么时候,都要终止
        # async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。
    return dic


三、django项目的配置

1、settings.py

模块注册

#app注册
INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'user.apps.UserConfig',
]

#cache缓存
CACHES = {
    "default": {
        "BACKEND": "django_redis.cache.RedisCache",
        "LOCATION": "redis://127.0.0.1:6379/2",
        "OPTIONS": {
            "CLIENT_CLASS": "django_redis.client.DefaultClient",
            "CONNECTION_POOL_KWARGS": {"max_connections": 1000}
            # "PASSWORD": "123",
        },
        'TIMEOUT':30*60 #缓存过期时间
    }
}

#邮件配置
# EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_HOST = 'smtp.qq.com'  # 如果是 163 改成 smtp.163.com
EMAIL_PORT = 465
EMAIL_HOST_USER = 'xxxx@qq.com'  # 发送邮件的邮箱帐号
EMAIL_HOST_PASSWORD = 'xxx'  # 授权码,各邮箱的设置中启用smtp服务时获取
DEFAULT_FROM_EMAIL = EMAIL_HOST_USER
# 这样收到的邮件,收件人处就会这样显示
# DEFAULT_FROM_EMAIL = '2333<'1234567890@qq.com>'
EMAIL_USE_SSL = True   # 使用ssl
# EMAIL_USE_TLS = False # 使用tls
# EMAIL_USE_SSL 和 EMAIL_USE_TLS 是互斥的,即只能有一个为 True

2、关于视图函数的方法

1、获取cache对象,操作cache

from django.core.cache import cache

cache.set(key,value)

cache.get(key)

2、执行异步任务

from celery_task.async_task import send_email_task

res = send_email_task.delay('xxx@qq.com','23456')

task_id = res.id #获取异步任务的id,通过该id可用获取任务的运行状态

from celery_task.async_task import search_user_task
res = search_user_task.delay()

四、启动项目:项目根目录下执行

django项目

python manage.py runserver

celery框架

#windows系统

celery -A celery_task.celery worker -l info  -P  eventlet

#linux系统

celery -A celery_task.celery worker -l info 

定时任务启动

celery -A celery_task beat -l info

五、码云地址

django配置celery: django配置使用celery,django使用celery,django+celeryicon-default.png?t=N7T8https://gitee.com/liuhaizhang/django-configuration-celery

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1090523.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

竞赛 深度学习+opencv+python实现昆虫识别 -图像识别 昆虫识别

文章目录 0 前言1 课题背景2 具体实现3 数据收集和处理3 卷积神经网络2.1卷积层2.2 池化层2.3 激活函数&#xff1a;2.4 全连接层2.5 使用tensorflow中keras模块实现卷积神经网络 4 MobileNetV2网络5 损失函数softmax 交叉熵5.1 softmax函数5.2 交叉熵损失函数 6 优化器SGD7 学…

HTTP Basic 认证

HTTP Basic 认证 难度等级&#xff1a;【初级】 由RFC7617定义的HTTP Basic认证是一种非常基础而简单的认证模式&#xff0c;因此叫他Basic认证。他本质上就是浏览器提供的一个接口&#xff0c;能够根据HTTP返回值&#xff0c;自动弹出一个登录框&#xff0c;让用户输入ID和密码…

利达卓越:以数字金融,追梦新未来

秉持初心、勇敢前行,便能如火炬照彻黑暗,在平凡的生活中不断创新、保持优势,一步步走向梦想的远方。在金融投资领域,利达卓越广招贤才,坚持创新的原则,以数字技术为金融赋能,与多方市场参与建立长期合作关系,为推动全球经济和社会发展贡献力量,以团队金融优势续写时代华美篇章,…

用Golang手写一个Container

本文作者系360奇舞团前端开发工程师 前言 Docker 作为一种流行的容器化技术&#xff0c;对于每一个程序开发者而言都具有重要性和必要性。因为容器化相关技术的普及大大简化了开发环境配置、更好的隔离性和更高的安全性&#xff0c;对于部署项目和团队协作而言也更加方便。本文…

【git的使用方法】——上传文件到gitlab仓库

先进入到你克隆下来的仓库的目录里面 比如&#xff1a;我的仓库名字为zhuox 然后将需要上传推送的文件拷贝到你的克隆仓库下 这里的话我需要拷贝的项目是t3 输入命令ls&#xff0c;就可以查看该文件目录下的所有文件信息 然后输入git add 文件名 我这边输入的是 &#x…

LLMs的终局是通用人工智能AGI总结 生成式AI和大语言模型 Generative AI LLMs

终于学完了 生成式AI和大语言模型 Generative AI & LLMs. LLMs 解决了如下问题&#xff1a; 对NLP的不能够理解长句子&#xff0c;解决方案 自注意力机制Transformers architecture Attention is all you need大模型算力不够&#xff0c;解决方案 LLMs 缩放法则和计算最…

服务器使用u盘安装麒麟系统报错“dracut-initqueue timeout”,/dev/root does not exist

最近使用u盘安装麒麟系统&#xff0c;发现找不到u盘引导程序&#xff0c;提示dracut-initqueue timeout或者/dev/root does not exist 解决方法&#xff0c;先确定启动u盘所在盘符&#xff0c;使用 blkid 命令&#xff0c;我这边显示启动u盘所在盘符是 /dev/sdd4 blkid重启服…

基于Linux安装Hive

Hive安装包下载地址 Index of /dist/hive 上传解压 [rootmaster opt]# cd /usr/local/ [rootmaster local]# tar -zxvf /opt/apache-hive-3.1.2-bin.tar.gz重命名及更改权限 mv apache-hive-3.1.2-bin hivechown -R hadoop:hadoop hive配置环境变量 #编辑配置 vi /etc/pro…

LLMs AWS Sagemaker JumpStart

现在您已经探讨了使用LLM构建应用程序的基础知识&#xff0c;我想向您展示一项名为Amazon Sagemaker JumpStart的AWS服务&#xff0c;它可以帮助您快速进入生产并进行大规模操作。 以下是您在先前视频中探讨的应用程序堆栈。正如您所看到的&#xff0c;构建一个LLM驱动的应用程…

Macos数字音乐库:Elsten Software Bliss for Mac

Elsten Software Bliss for Mac是一款优秀的音乐管理软件&#xff0c;它可以帮助用户自动化整理和标记数字音乐库&#xff0c;同时可以自动识别音乐信息并添加标签和元数据。 此外&#xff0c;Bliss还可以修复音乐库中的问题&#xff0c;例如重复的音乐文件和缺失的专辑封面等…

深耕全面预算管理 拥抱企业数字未来

随着世界数字未来的不断发展&#xff0c;我国也正经历着一场更大范围、更深层次的科技变革。企业面对构建内部生态平衡体系的艰巨任务&#xff0c;对于其信息化部署也提出了更高的要求。增强预算编制的全面性&#xff0c;启动预算管理一体化改革成为了我国企业提高数字化水平的…

Rocket Typist pro for mac 「Macos文本快速输入工具」

Rocket Typist Pro是一款在Mac上使用的文本快速输入工具&#xff0c;它可以帮助用户更快速、更准确地输入文本。 这款软件的设计非常简单、高效&#xff0c;它通过使用短语或宏&#xff0c;可以快速插入文本&#xff0c;减少重复性工作&#xff0c;提高工作效率。 Rocket Typ…

华为校招机试题- 机器人活动区域-2023年

题目描述: 现有一个机器人,可放置于 M N的网格中任意位置,每个网格包含一个非负整数编号。当相邻网格的数字编号差值的绝对值小于等于 1 时,机器人可在网格间移动 问题:求机器人可活动的最大范围对应的网格点数目。 说明: 1)网格左上角坐标为 (0, 0),右下角坐标为 (m-…

Vue 的响应式数据 ref的使用

ref 是 vue 提供给我们用于创建响应式数据的方法。 ref 常用于创建基本数据&#xff0c;例如&#xff1a;string、number、boolean 等。 ref 还是通过 Object.defineProperty 的 get 与 set 方法&#xff0c;实现的响应式数据。 ref 创建基本数据&#xff1a; <template…

springboot 通过url下载文件并上传到OSS

DEMO流程 传入一个需要下载并上传的url地址下载文件上传文件并返回OSS的url地址 springboot pom文件依赖 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w…

【【萌新的SOC学习之基于BRAM的PS和PL数据交互实验】】

萌新的SOC学习之基于BRAM的PS和PL数据交互实验 基于BRAM的PS和PL的数据交互实验 先介绍 AXI BRAM IP核控制器的简介 AXI BRAM ip核 是xilinx提供的一个软核 这个ip核被设计成 AXI的一个从机接口 用于AXI互联的集成 系统的主设备和本地的RAM进行通信 &#xff08;我们可以通过这…

大数据分析/开发项目实战班

大数据分析/开发项目实战班采用新型教学模式&#xff0c;让学生“学有所用&#xff0c;学能所用”&#xff0c;角色演练开展项目式教学&#xff0c;将产业项目与教学知识结合&#xff0c;突出学生的主体性&#xff0c;打破传统教学壁垒。 大数据分析/开发项目实战班介绍&#x…

ubuntu下yolov6 tensorrt模型部署

文章目录 ubuntu下yolov6 tensorrt模型部署一、Ubuntu18.04环境配置1.1 安装工具链和opencv1.2 安装Nvidia相关库1.2.1 安装Nvidia显卡驱动1.2.2 安装 cuda11.31.2.3 安装 cudnn8.21.2.4 下载 tensorrt8.4.2.41.2.5 下载仓库TensorRT-Alpha并设置 二、从yolov6源码中导出onnx文…

Linux高性能服务器编程 学习笔记 第十三章 多线程编程

早期Linux不支持线程&#xff0c;直到1996年&#xff0c;Xavier Leroy等人开发出第一个基本符合POSIX标准的线程库LinuxThreads&#xff0c;但LinuxThreads效率低且问题多&#xff0c;自内核2.6开始&#xff0c;Linux才开始提供内核级的线程支持&#xff0c;并有两个组织致力于…

【灵动 Mini-G0001开发板】+Keil5开发环境搭建+ST-Link/V2程序下载和仿真+4颗LED100ms闪烁。

我们拿到手里的是【灵动 Mini-G0001开发板】 如下图 我们去官网下载开发板对应资料MM32G0001官网 我们需要下载Mini—G0001开发板的库函数与例程&#xff08;第一手学习资料&#xff09;Keil支持包&#xff0c; PCB文件有需要的&#xff0c;可以自行下载。用户指南需要下载&a…