Django中使用celery实现异步任务、延时任务、周期定时任务

news2024/11/15 19:44:54

配置celery

1. 安装以下环境

pip install celery
pip install redis
pip install eventlet # celery 4.0+版本以后不支持在windows运行,还需额外安装eventlet库

本文环境为:python3.9.4+Django4.2.11+celery5.3.6+redis5.0.3

2. 配置setting.py文件
在setting.py文件中加入以下代码

# 设置redis消息队列
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/10'

# celery内容等消息的格式设置,默认json
CELERY_ACCEPT_CONTENT = ['application/json', ]
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

# 时间格式化为中国时间
CELERY_TIMEZONE = 'Asia/Shanghai'
# 是否使用UTC时间
CELERY_ENABLE_UTC = False

# 为存储结果设置过期日期,默认1天过期。如果beat开启,Celery每天会自动清除。
# 设为0,存储结果永不过期
CELERY_RESULT_EXPIRES = 60 * 60 * 24

# 任务限流
CELERY_TASK_ANNOTATIONS = {'tasks.add': {'rate_limit': '2/s'}}

# Worker并发数量,一般默认CPU核数,可以不设置
CELERY_WORKER_CONCURRENCY = 2

3. 在setting.py的同级目录新建一个celery_work.py文件,加入以下代码

import os
from celery import Celery
from celery.schedules import crontab
from datetime import timedelta

# 设置环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'project_name.settings')

# 实例化,需要改成自己的项目名称
app = Celery('project_name')

# namespace='CELERY'作用是允许你在Django配置文件中对Celery进行配置
# 但所有Celery配置项必须以CELERY开头,防止冲突
app.config_from_object('django.conf:settings', namespace='CELERY')

# 自动从Django的已注册app中发现任务
app.autodiscover_tasks()

4.在setting.py所在目录中的__init__文件中加入以下代码

from .celery_work import app as celery_app

__all__ = ('celery_app',)

实现异步任务

1. 创建任务函数
(1)新建一个tasks模块
在这里插入图片描述
(2)在tasks模块下创建tasks.py文件,并编写任务函数代码:

from celery import shared_task
from time import sleep

@shared_task
def test_celery_task1():
    print("test_celery_task1")

@shared_task
def test_celery_task2(x, y):
    sleep(10)
    print(f"test_celery_task2---->{x}, {y}")

(3)新建一个app,添加url, 在view.py文件中调用任务函数

from rest_framework import status
from rest_framework.views import APIView
from rest_framework.response import Response
from tasks.tasks import test_celery_task1, test_celery_task2

class TestView(APIView):
    def get(self, request):
        test_celery_task2.apply_async(args=[111, 222])
        test_celery_task1.apply_async(args=[])
        return Response({'message': 'test success'}, status=status.HTTP_200_OK)

现在可在终端输入以下命令启动worker:

celery -A celery_study worker -l info -P eventlet # celery_study为项目名称

出现以下内容就是配置成功
在这里插入图片描述
运行项目后可在postman测试,得到以下结果:
在这里插入图片描述

实现延时任务

1.通过直接设置执行时间
eta参数为指定执行时间

class TestView(APIView):
    def get(self, request):
        # test_celery_task2.apply_async(args=[111, 222])
        ctime = datetime.now()
        # 默认使用utc时间
        utc_time = datetime.utcfromtimestamp(ctime.timestamp())
        task_delay = timedelta(seconds=10) # 定义时间间隔
        task_time = utc_time + task_delay
        print(f"任务时间:{task_time}")
        test_time_task.apply_async(args=[ctime], eta=task_time) # 10秒后执行

        # test_celery_task1.apply_async(args=[])
        return Response({'message': 'test success'}, status=status.HTTP_200_OK)

2.通过设置延时时间
countdown参数为延时时间:

class TestView(APIView):
    def get(self, request):
        # test_celery_task2.apply_async(args=[111, 222])
        ctime = datetime.now()
        test_time_task.apply_async(args=[ctime], countdown=10) # 10秒后执行
        # test_celery_task1.apply_async(args=[])
        return Response({'message': 'test success'}, status=status.HTTP_200_OK)

测试结果如下,执行时间延迟了10秒
在这里插入图片描述

实现周期定时任务

1.定义任务函数

@shared_task
def test_scheduled_task(x, y):
    print(f'10秒执行一次---参数:{x},{y}')

2. 在celery_work.py文件中加入以下代码

# 导入库
from datetime import timedelta
# 设置定时任务
app.conf.beat_schedule = {
    'scheduled_task': {
        'task': 'celery_app.tasks.scheduled_task', # 任务函数
        'schedule': timedelta(seconds=10), # 每10秒钟执行一次
        'args': () # 任务函数的参数
    },
}

还可使用crontab定义周期:

app.conf.beat_schedule = {
    'scheduled_task': {
        'task': 'celery_app.tasks.scheduled_task',
        'schedule': timedelta(seconds=10), # 每10秒钟执行一次
        'args': ()
    },
    'scheduled_task2': {
            'task': 'tasks.tasks.test_celery_task',
            'schedule': crontab(minute='*/1'), # 每1分钟执行一次
            'args': (111, 222) # 参数
        },

}

开两个终端,分别执行以下两条命令即可:

# celery_study为项目名称
celery -A celery_study worker -l info -P eventlet # 启动worker命令
celery -A celery_study beat # 启动定时调度器命令

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1527439.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

汽车制造产生的污废水如何处理排放

汽车制造业是一个重要的工业领域,然而,伴随着汽车制造过程中的各种化学反应和材料加工,大量污废水也随之产生。为了保护环境和社会的可持续发展,汽车制造产生的污废水需要得到妥善处理和排放。 首先,针对汽车制造中涉及…

前端vue实现甘特图

1 什么是甘特图 甘特图(Gantt chart)又称为横道图、条状图(Bar chart)。以提出者亨利L甘特先生的名字命名,是项目管理、生产排程、节点管理中非常常见的一个功能。 甘特图内在思想简单,即以图示的方式通过活动列表和时间刻度形象地表示出任何特定项目的…

01.Linked-List-Basic

1. 链表简介 1.1 链表定义 链表(Linked List):一种线性表数据结构。它使用一组任意的存储单元(可以是连续的,也可以是不连续的),来存储一组具有相同类型的数据。 简单来说,「链表」…

web渗透测试漏洞复现:Elasticsearch未授权漏洞复现

web渗透测试漏洞复现 Elasticsearch未授权漏洞复现Elasticsearch简介Elasticsearch复现Elasticsearch漏洞修复和加固措施 Elasticsearch未授权漏洞复现 Elasticsearch简介 Elasticsearch 是一款 Java 编写的企业级搜索服务,它以分布式多用户能力和全文搜索引擎为特…

功能齐全的免费 IDE Visual Studio 2022 社区版

面向学生、开放源代码和单个开发人员的功能齐全的免费 IDE 下载地址 Visual Studio 2022 社区版 - 下载最新的免费版本 Visual Studio 2022 Community Edition – Download Latest Free Version 准备安装 选择需要安装的程序 安装进行中 使用C学习程序设计相关知识并培养编程…

AI基础知识(3)--神经网络,支持向量机,贝叶斯分类器

1.什么是误差逆传播算法(error BackPropagation,简称BP)? 是一种神经网络学习算法。BP是一个迭代学习算法,在迭代的每一轮使用广义的感知机学习规则对参数进行更新估计。基于梯度下降(gradient descent&am…

安卓RecyclerView简单用法

废话不多说上代码 <?xml version"1.0" encoding"utf-8"?> <LinearLayout xmlns:android"http://schemas.android.com/apk/res/android"xmlns:app"http://schemas.android.com/apk/res-auto"xmlns:tools"http://schem…

LeetCode---388周赛

题目列表 3074. 重新分装苹果 3075. 幸福值最大化的选择方案 3076. 数组中的最短非公共子字符串 3077. K 个不相交子数组的最大能量值 一、重新分装苹果 注意题目中说同一个包裹中的苹果可以分装&#xff0c;那么我们只要关心苹果的总量即可&#xff0c;在根据贪心&#x…

华为汽车业务迎关键节点,长安深蓝加入HI模式,车BU预计今年扭亏

‍编辑 |HiEV 一年之前&#xff0c;同样是在电动汽车百人会的论坛上&#xff0c;余承东在外界对于华为和AITO的质疑声中&#xff0c;第一次公开阐释了华为选择走智选车模式的逻辑。 一年之后&#xff0c;伴随问界M7改款、问界M9上市&#xff0c;华为智选车模式的面貌已经发生了…

让图片适应标签的CSS object-fit属性

在实际的项目运行过程中&#xff0c;可能出现运营人员上传的文件与预期的图片尺寸不同的情况&#xff0c;为了解决这一问题可以使用 object-fit 属性&#xff0c;对嵌入的图像&#xff08;以及其他替代元素&#xff0c;如视频&#xff09;做相应的变化&#xff0c;更加精确地控…

数据结构 二叉树 力扣例题AC——代码以及思路记录

LCR 175. 计算二叉树的深 某公司架构以二叉树形式记录&#xff0c;请返回该公司的层级数。 AC int calculateDepth(struct TreeNode* root) {if (root NULL){return 0;}else{return 1 fmax(calculateDepth(root->left), calculateDepth(root->right));} } 代码思路 …

WPF连接MySqldemo

界面总要管理数据嘛,于是便学习了一下WPF与MySql的基本连接. 运行结果: 环境配置 需要下载安装Mysql,网上教程很多,不详说,创建的工程需要下载或者引入相关的包(MySql.Data) 连接的部分直接看具体的代码即可 xaml代码(只放置了一个按钮和文本框) <Grid><Button x:Name…

Android下的匀速贝塞尔

画世界pro里的画笔功能很炫酷 其画笔配置可以调节流量&#xff0c;密度&#xff0c;色相&#xff0c;饱和度&#xff0c;亮度等。 他的大部分画笔应该是通过一个笔头图片在触摸轨迹上匀速绘制的原理。 这里提供一个匀速贝塞尔的kotlin实现&#xff1a; class EvenBezier {p…

hadoop分布式环境搭建

准备三台centos虚拟机 。&#xff08;master&#xff0c;slave1&#xff0c;slave2&#xff09; (hadoop、jdk文件链接&#xff1a;https://pan.baidu.com/s/1wal1CSF1oO2h4dkSbceODg 提取码&#xff1a;4zra) 前四步可参考hadoop伪分布式环境搭建详解-CSDN博客 1.修改主机名…

pycharm里test connection连接成功,但是无法同步服务器文件,deployment变灰

如果服务器test connection连接成功&#xff0c;但是无法同步文件。 可以尝试以下方式&#xff1a; 点击tools-deployment-browse remonte host&#xff0c;选择要连接的服务器的文件夹 如果能正常显示服务器文件夹&#xff0c;再点击tools-deployment&#xff0c;注意要把要…

B002-springcloud alibaba 微服务环境搭建

目录 创建父工程创建基础模块创建用户微服务创建商品微服务创建订单微服务微服务调用 创建父工程 新建项目springcloud-alibaba&#xff0c;本工程不需要写代码&#xff0c;删除src 导包 <parent><groupId>org.springframework.boot</groupId><artifact…

redis设计与实现(二)——持久化

1. 前言&#xff1a; redis是一个基于内存是键值对数据库&#xff0c;但是并非把数据存入内存就高枕无忧了。为了应对可能出现的进程中止&#xff0c;断电等意外情况&#xff0c;redis提供了持久化功能把数据持久化到硬盘。 2. RDB持久化 2.1. rdb文件的创建 rdb通过创建二…

智能合约 - 部署ERC20

Remix介绍 Remix是一个由以太坊社区开发的在线集成开发环境&#xff08;IDE&#xff09;&#xff0c;旨在帮助开发者编写、测试和部署以太坊智能合约。它提供了一个简单易用的界面&#xff0c;使得开发者可以在浏览器中直接进行智能合约的开发&#xff0c;而无需安装任何额外的…

借助Aspose.html控件,在 C# 中更改 HTML 边框颜色

在这篇博文中&#xff0c;我们将学习如何在 C# 中更改 HTML 边框颜色。本指南将为您提供使用 C# 以编程方式有效更改 HTML 文件中的边框颜色、CSS 边框颜色、 HTML表格边框颜色等所需的知识和技能。 Aspose.Html 是一种高级的HTML操作API&#xff0c;可让您直接在.NET应用程序…

Linux TCP参数——tcp_adv_win_scale

文章目录 tcp_adv_win_scaleip-sysctl.txt解释buffering overhead内核缓存和应用缓存示例计算深入理解从2到1(tcp_adv_win_scale的值)总结 tcp_adv_win_scale adv-advise&#xff1b;win-window; 用于指示TCP中接收缓存比例的值。 static inline int tcp_win_from_space(int …