celery

news2025/1/15 12:56:14

一 介绍

官网:https://docs.celeryq.dev/en/latest/index.html

celery是一个简单、灵活、可靠的分布式系统,用于 处理大量消息,同时为操作提供 维护此类系统所需的工具。

Celery架构

Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(task result store)组成。

在这里插入图片描述

消息中间件

Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等。

任务执行单元

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

任务结果存储

Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等。

使用场景

异步执行:解决耗时任务,将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等。

延迟执行:解决延迟任务。

定时执行:解决周期(周期)任务,比如每天数据统计。

二 安装使用

安装

pip install celery
pip install eventlet

使用

tasks.py

from celery import Celery

# 任务提交保存的地方
broker = 'redis://127.0.0.1:6379/0'

# 任务执行完结果保存的地方
backend = 'redis://127.0.0.1:6379/1'

app = Celery(main=__name__, broker=broker, backend=backend)

# 创建任务
@app.task
def add(x, y):
    return x + y

提交任务

submit_task.py

from tasks import add

# 使用delay方法
res = add.delay(10, 10)

# 返回值是celery.result.AsyncResult类的对象,可以根据这个对象查看执行结果等。

# 也可以通过返回值直接查看任务的状态
print(res)  # 3dcef2f9-d266-4d70-8aab-73073ba9e691

# 这才是真正的id号
print(res.task_id)

执行后,会将任务保存到broker对应的redis缓存库中。
在这里插入图片描述

启动celery工作服务器

在工作路径下终端输入命令

celery -A tasks worker -l info -P eventlet 
或
celery -A tasks worker --loglevel=INFO -P eventlet

backend中查看任务执行结果

check_result.py

from tasks import app

from celery.result import AsyncResult

task_id = '3dcef2f9-d266-4d70-8aab-73073ba9e691'

if __name__ == '__main__':
    res = AsyncResult(id=task_id, app=app)
    if res.successful():
        result = res.get()
        print(result)
    # 等同上面代码
    # if res.state == 'SUCCESS':
    #     result = res.get()
    #     print(result)
    elif res.failed():
        print('任务失败')
    # elif res.state == 'FAILURE':
    #     print('任务失败')
    elif res.status == 'PENDING':
        print('任务等待中被执行')
    elif res.status == 'RETRY':
        print('任务异常后正在重试')
    elif res.status == 'STARTED':
        print('任务已经开始被执行')

AsyncResult下的方法

def failed(self):
    """Return :const:`True` if the task failed."""
    return self.state == states.FAILURE

def successful(self):
    """Return :const:`True` if the task executed successfully."""
    return self.state == states.SUCCESS

三 自定义celery包

新建包:celery_tasks。

在包先新建一个 celery.py,初始化app。

from celery import Celery

broker = 'redis://127.0.0.1:6379/0'
# backend='redis://:123456@127.0.0.1:6379/1' 加密码
backend = 'redis://127.0.0.1:6379/1'
app = Celery(main=__name__, broker=broker, backend=backend,
             include=['celery_tasks.home_tasks', 'celery_tasks.user_tasks'])

在包里新建user_tasks.py 编写用户相关任务 。

# 用户相关任务
from .celery import app

在包里新建home_task.py 编写首页相关任务 。

# 首页相关任务
from .celery import app

其它程序,提交任务。

启动worker

celery -A celery_tasks worker -l info -P eventlet

四 celery异步任务,延迟任务,定时任务

异步任务

task.delay(*args, **kwargs)

延迟任务

task.apply_async(args=[参数,参数],eta=时间对象(utc时间))
from datetime import timedelta, datetime

res = add.apply_async(args=(1, 2), eta=(datetime.utcnow() + timedelta(seconds=20)))

print(res.task_id)  # c78505e2-614d-4bb2-930c-c73c325af519

在这里插入图片描述

定时任务

app的配置文件中配置

app.conf.beat_schedule = {
    'add': {
        'task': 'celery_tasks.home_tasks.add',
        'schedule': timedelta(seconds=5),  # 每隔五秒提交任务
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        'args': ('100', '200'),
    },
}

启动worker

celery -A celery_tasks worker -l info -P eventlet

启动beat(真正干活的人)

celery -A celery_tasks beat -l info

在这里插入图片描述

五 django中使用celery

将包复制到django项目路径下

在包内的celery.py中添加代码

import os
from celery import Celery
from datetime import timedelta
from celery.schedules import crontab

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy.settings.dev')
import django
django.setup()

broker = 'redis://127.0.0.1:6379/0'
backend = 'redis://127.0.0.1:6379/1'
app = Celery(main=__name__, broker=broker, backend=backend,
             include=['celery_tasks.home_tasks', 'celery_tasks.user_tasks'])

在任务中就可以用到django的ORM等。
在django的视图类中,导入任务,提交任务。
启动worker,beat。

六 秒杀接口

新建秒杀商品表

class Shop(models.Model):
    name = models.CharField(max_length=32)
    # 秒杀商品数量不能为负
    shop_num = models.PositiveIntegerField()

user_tasks.py

# 用户相关任务
from .celery import app


# 秒杀任务
@app.task
def seckill_task():
    from user.models import Shop
    try:
        from django.db.models import F
        import time
        Shop.objects.filter(name='秒杀商品').update(shop_num=F('shop_num') - 1)
        time.sleep(10)
        return True
    # 出错解释商品库存不足 不能秒杀
    except:
        return False

views.py

# 提交秒杀
 @action(methods=['GET'], detail=False, url_path='submit_seckill')
 def submit_seckill(self, request):
     from celery_tasks.user_tasks import seckill_task
     res = seckill_task.delay()
     return APIResponse(task_id=res.task_id)

 # 查看秒杀结果
 @action(methods=['GET'], detail=False, url_path='check_seckill')
 def check_seckill(self, request):
     from celery.result import AsyncResult
     from celery_tasks.celery import app
     task_id = request.query_params.get('task_id')
     res = AsyncResult(id=task_id, app=app)
     if res.successful():
         is_true = res.get()
         if is_true:
             return APIResponse(code=100, msg='秒杀成功')
         return APIResponse(code=101, msg='手慢了没秒到')
     elif res.status == 'PENDING':
         return APIResponse(code=102, msg='任务等待中被执行')
     elif res.status == 'RETRY':
         return APIResponse(code=103, msg='任务异常后正在重试')
     elif res.status == 'STARTED':
         return APIResponse(code=104, msg='任务已经开始被执行')

前端

<template>
    <div>
        <img src="https://img1.baidu.com/it/u=3467439571,3022033088&fm=253&app=138&size=w931&n=0&f=JPEG&fmt=auto?sec=1668704400&t=5ff8a17feab5b05d5e27c41ad2776bc9" alt="" width="300px" height="300px">
        <br>
        <el-button type="danger" round @click.once="submit">秒杀按钮</el-button>
    </div>
</template>

<script>
export default {
    name: 'Seckill',
    methods: {
        submit() {
            this.$axios.get(this.$settings.BASE_URL + 'user/submit_seckill/').then(res => {
                console.log(res)
                if (res.data.code === 100) {
                    let task_id = res.data.task_id
                    console.log(task_id)
                    let t = setInterval(() => {
                        this.$axios.get(this.$settings.BASE_URL + 'user/check_seckill/?task_id=' + task_id).then(re => {
                            console.log(re)
                            if (re.data.code === 100 || re.data.code === 101) {
                                this.$message({
                                    message: re.data.msg,
                                    type: 'success'
                                });
                                alert(re.data.msg)
                                clearInterval(t)
                            }
                        })
                    }, 1000)
                }
            })
        }
    }

}
</script>

在这里插入图片描述

七 双写一致性

7.1 轮播图接口加缓存

提交了接口的响应速度
提高并发量

class SlideShowView(GenericViewSet, ListMixinView):
    queryset = SlideShow.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[
               :settings.SLIDE_SHOW_COUNT]
    serializer_class = SlideShowSer

    def list(self, request, *args, **kwargs):
        result = cache.get('banner_list')
        if result:
            print('走了缓存')
            return APIResponse(code=1001, result=result)
        res = super().list(request, *args, **kwargs)
        result = res.data.get('result')
        cache.set('banner_list', result)
        print('走了数据库')
        return res

7.2 celery定时任务实现双写一致性

加了缓存,如果mysql数据变了,由于请求的都是缓存的数据,导致mysql和redis的数据不一致。

双写一致性问题:

  1. 修改mysql数据库,删除缓存 【缓存的修改是在后】
  2. 修改数据库,修改缓存 【缓存的修改是在后】
  3. 定时更新缓存,针对于实时性不是很高的接口适合定时更新。

home_tasks.py

# 首页相关任务
import time

from .celery import app
from home.models import SlideShow
from django.conf import settings
from home.serializer import SlideShowSer
from django.core.cache import cache


@app.task
def update_banner():
    # 更新缓存
    queryset = SlideShow.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:settings.SLIDE_SHOW_COUNT]
    ser = SlideShowSer(instance=queryset, many=True)
    # print(ser.data)
    for item in ser.data:
        item['image'] = settings.HOST_URL + item['image']
    cache.set('banner_list', ser.data)
    return True

celery.py

import os
from celery import Celery
from datetime import timedelta
from celery.schedules import crontab

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy.settings.dev')
import django

django.setup()

broker = 'redis://127.0.0.1:6379/0'
backend = 'redis://127.0.0.1:6379/1'
app = Celery(main=__name__, broker=broker, backend=backend,
             include=['celery_tasks.home_tasks', 'celery_tasks.user_tasks'])

app.conf.beat_schedule = {
    # 定时任务
    'update_banner': {
        'task': 'celery_tasks.home_tasks.update_banner',
        'schedule': timedelta(minutes=30),
        # 'schedule': crontab(hour=8, day_of_week=1),
        'args': (),
    },
}

启动django,worker,beat。

每隔30分钟查询数据库中的轮播图,放进缓存中,请求来之后,缓存中有先从缓存中拿,没有才去数据库拿。

mysql数据修改后,前端拿到的数据可能不一致,但是最多30分钟缓存中的数据就会更新。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/10542.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

纸牌游戏新版小猫钓鱼设计制作

新版纸牌游戏《小猫钓鱼》设计制作 此游戏设计是我新创制的简单的卡牌游戏。属于儿童益智类游戏&#xff0c;适用于儿童的认知教育。 游戏规则很简单&#xff1a;找配对的牌消去。 游戏设置2个玩家对玩&#xff0c;鱼池置牌21张&#xff0c;玩家每人5张牌&#xff0c;二人轮转…

從turtle海龜動畫 學習 Python - 高中彈性課程系列 6.1 內嵌正多邊形 類似禪繞圖

Goal: 藉由有趣的「海龜動畫繪圖」學會基礎的 Python 程式設計 本篇介紹基礎的 Python 海龜動畫繪圖, 確實可以只以簡單的指令畫出極為複雜有趣或美麗的圖案: 內嵌正多邊形之圖案, 禪繞圖等 “Talk is cheap. Show me the code.” ― Linus Torvalds 老子第41章 上德若谷 大白…

Redis--1.CentOS8安装redis服务器

一、登录root账号 设置root密码&#xff1a; sudo passwd root切换到root账号&#xff1a; su root二、下载解压安装包 切换到根目录: cd / 1、创建存放路径: mkdir -p /usr/local/redis cd /usr/local/redis2、下载redis安装包&#xff1a;去官网找到redis连接地址如&…

基于jsp+mysql+ssm进销存管理系统-计算机毕业设计

本java进销存系统主要完成对超市的管理&#xff0c;包括会员管理、厂家管理、商品管理、退货管理&#xff0c;销售管理、进货管理、员工管理、系统管理等几个方面。系统可以完成对各类信息的浏览、查询、添加、删除、修改等功能。 系统采用了jsp的mvc框架,SSM(springMvcspringM…

告别手机自带浏览器,分享2022年好用的手机浏览器

对于喜欢使用手机上网冲浪的人来说&#xff0c;最喜欢用的一般都是小巧、强大、干净简洁的APP。作为上网常用的软件&#xff0c;好用的浏览器能够提高工作效率。而手机自带的浏览器往往占用资源大&#xff0c;而且广告很多&#xff0c;并夹带新闻、小说等无用功能&#xff0c;不…

自制操作系统日记(7):字符串显示

代码仓库地址&#xff1a;https://github.com/freedom-xiao007/operating-system 简介 上篇中我们在屏幕上画出了界面的大致轮廓&#xff0c;系统有了点模样&#xff0c;本篇继续跟着书籍&#xff0c;让程序中的字符串显示在屏幕上 效果展示 先放最终的效果&#xff0c;可以…

Linux磁盘分区,挂载介绍

分区的方式: mbr分区: 1.最多支持四个主分区 ⒉系统只能安装在主分区 3.扩展分区要占一个主分区 4.MBR最大只支持2TB&#xff0c;但拥有最好的兼容性 gtp分区: 1.支持无限多个主分区&#xff08;但操作系统可能限制&#xff0c;比如windows下最多128个分区) ⒉.最大支持18E…

仿大众点评——秒杀系统部分02

秒杀系统优化 接口限流和安全措施 令牌桶限流单用户访问频率限流抢购接口隐藏 接口限流&#xff1a; 在面临高并发的请购请求时&#xff0c;我们如果不对接口进行限流&#xff0c;可能会对后台系统造成极大的压力。尤其是对于下单的接口&#xff0c;过多的请求打到数据库会对…

MVCC 底层实现原理

文章目录概述事务并发出现的问题脏读不可重复读幻读事务隔离级别MVCC 底层实现原理隐式字段undo 日志Read View总结概述 MVCC(Multi-Version Concurrency Control) &#xff0c;叫做基于多版本的并发控制协议。 MVCC 是乐观锁的一种实现方式&#xff0c;它在很多情况下&#…

多线程增量下载K线数据

准备一份股票列表的CSV文件&#xff0c;文件格式如下 codenameclosecmvdate_ipo300434金石亚药12.89427982959020150424300380安硕信息19.31241993416320140128688123聚辰股份132.821114087266620191223300586美联新材20.34790882138120170104300534陇神戎发12.96389465063120…

Arduino与Proteus仿真实例-密码输入、验证与更新仿真

密码输入、验证与更新仿真 本次实例将通过4X4矩阵键盘、LCD1602、EEPROM实现一个密码输入匹配、储存、更新。 1、仿真电路原理图 在仿真电路原理图中,4X4矩阵键盘通过PCF8574 IO扩展器驱动,请参考前面文章: Arduino与Proteus仿真实例-PCF8574驱动4x4矩阵键盘仿真Arduino与…

大数据:Flume安装部署和配置

文章目录Flume 简介一&#xff0c;Flume下载和安装1&#xff09;登录[Flume官网](https://flume.apache.org/)&#xff0c;下载 apache-flume-1.9.0-bin.tar.gz2&#xff09;解压文件到 /opt 目录下3&#xff09;改名为 flume二&#xff0c;Flume配置1&#xff09;修改 /conf/ …

.Net开发——EFCore

1 EFCore是一个ORM框架 EFCore(EntityFramworkCore)是对底层ADO.NET重新封装的一个数据操作框架&#xff0c;因此ADO.NET支持的多种数据库都可以被EFCore支持。 EF Core 可用作对象关系映射程序 (O/RM)&#xff0c;这可以实现以下两点&#xff1a; 使 .NET 开发人员能够使用…

Python医学数据分析入门,推荐你学习这本书

医学生学习Python的难点通常在于 没有系统的编程教育&#xff0c;很难短时间内使用编程语言描述问题及其解答过程相关教程、案例少&#xff0c;想要练习缺少素材 所以这里就给大家推荐一本比较新比较前沿的教医学生学习Python的书《Python医学数据分析入门》 这本书的切入点为…

代码随想录——分割回文串 II

题目 给你一个字符串 s&#xff0c;请你将 s 分割成一些子串&#xff0c;使每个子串都是回文。 返回符合要求的 最少分割次数 。 示例 1&#xff1a; 输入&#xff1a;s “aab” 输出&#xff1a;1 解释&#xff1a;只需一次分割就可将 s 分割成 [“aa”,“b”] 这样两个回文子…

Oracle技术分享 创建外键报错ORA-00906

问题描述&#xff1a;给表创建外键报错ORA-00906&#xff0c;具体过程如下&#xff1a; 数据库&#xff1a;oracle 11.2.0.464位 scottORCL2022-10-18 19:10:40> selectindex_name,table_name,tablespace_name,status,last_analyzed from user_indexes; INDEX_NAME TABLE…

事务【mysql】

1、事务的概念 事务指逻辑上的一组操作&#xff0c;组成这组操作的各个单元&#xff0c;要么全部成功&#xff0c;要么全部失败。 在不同的环境中&#xff0c;都可以有事务。对应在数据库中&#xff0c;就是数据库事务。 把多个操作打包成一个整体&#xff0c;要么全部都执行完…

【#HDC2022】HarmonyOS体验官活动正式开启,赶快投稿赢限量奖品吧!

1. 【活动简介】 HDC 2022 于11月4日线上线下正式开启。历时一年&#xff0c;在无数开发者的共同努力下&#xff0c;我们汇聚了HarmonyOS生态的新成果、新体验、新开放能力&#xff0c;邀你参与到HarmonyOS的每一步成长和构建中。 本次HarmonyOS体验官活动&#xff0c;将以文章…

MySQL自连接和内连接和外连接_左外连接+右外连接

自连接&#xff1a;将一张表看作两张表 练习&#xff1a;查询员工id&#xff0c;员工姓名及其管理者的id和姓名select emp.employee_id,emp.last_name,mgr.employee_id,mgr.last_name from employees emp,employees mgr where emp.manager_id mgr.employee_id;内连接 只是把左…

多肽Caerulein (desulfated)、pGlu-QDYTGWMDF-NH2、20994-83-6

Caerulein, desulfated 是脱硫后的Caerulein。Caerulein 是一种十肽&#xff0c;与胃泌素和胆囊收缩素 (CCK) 具有相同的五个羧基末端氨基酸。 Caerulein, desulfated is the desulfurated form of Caerulein. Caerulein is a decapeptide having the same five carboxyl-termi…