Celery快速使用(定时任务、Django中使用Celery、秒杀逻辑、双写一致性)

news2025/1/12 11:57:10

文章标题

  • 一、Celery快速使用
  • 二、Celery包结构
  • 三、Celery异步任务 延时任务 定时任务
  • 四、Django中使用Celery
  • 五、秒杀逻辑
  • 六、双写一致性
    • 1)路飞项目接口加缓存
    • 2)Celery定时任务实现双写一致性

一、Celery快速使用

简单介绍Celery

  1. Celery官网:http://www.celeryproject.org/
  2. Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform(芹菜是一个资金很少的项目,所以我们不支持微软Windows。 请不要打开任何与该平台相关的问题 )
  3. Celery是独立的服务
    - 可以不依赖任何服务器,通过自身命令,启动服务
    - celery服务为为其他项目服务提供异步解决任务需求的
    - 会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求

安装Celery插件

pip install celery

使用步骤

  1. 新建文件 实例化得到app对象 写函数、任务、注册成Celery的任务
import time
from celery import Celery

backend = 'redis://@127.0.0.1:6379/1'
broker = 'redis://@127.0.0.1:6379/2'
app = Celery('test', backend=backend, broker=broker)

@app.task
def add(a, b):
    time.sleep(2)
    print(a + b)
    return a + b
  1. 新建第二个文件 提交任务>>>>>提交到broker中
from main import add

print('from s1')

res = add.delay(3, 7)
print(res)      # 6c3cd997-2726-4ce6-a994-23287c476889 唯一的uuid
  1. 启动worker 从broker中获取任务执行 执行完放到backend里面
命令:
	Windows
		celery worker -A main -l info -P eventlet  # 4.x及之前用这个 
        celery -A main worker -l info -P eventlet  # 5.x及之后用这个
	Mac、Linux
		celery worker -A main -l info				# 4.x及之前用这个 
		celery -A main worker -l info				# 5.x及之后用这个
  1. 通过代码查看任务执行结果
from main import app
from celery.result import AsyncResult
id = '7bef14a0-f83e-4901-b4d8-e47aaac09b39'		# 任务id
if __name__ == '__main__':
    res = AsyncResult(id=id, app=app)
    if res.successful():
        result = res.get()  #10
        print(result)
    elif res.failed():
        print('任务失败')
    elif res.status == 'PENDING':
        print('任务等待中被执行')
    elif res.status == 'RETRY':
        print('任务异常后正在重试')
    elif res.status == 'STARTED':
        print('任务已经开始被执行')
  1. 也可以通过图形化软件查看结果了
    在这里插入图片描述

二、Celery包结构

写一个celery的包以后再任意项目中想用把包copy进去导入使用即可

目录结构

	celery_task		# 包
		__init__.py
		celery.py
		user_task.py
		home_task.py
	add_task.py
	get_result.py

使用步骤

	新建包celery_task
	在包先新建一个 celery.py
	在里面写app的初始化
	在包里新建user_task.py 编写用户相关任务 
	在包里新建home_task.py 编写首页相关任务 
	其它程序,提交任务
	启动worker ---》它可以先启动,在提交任务之前-->包所在的目录下
	celery -A celery_task worker -l info			# # 注意名称错误就会报错
	查看任务执行的结果

celery_task/celery.py

from celery import Celery

backend = 'redis://127.0.0.1:6379/1'
broker = 'redis://127.0.0.1:6379/0'					# 一定不要忘了include
app = Celery(__name__, broker=broker, backend=backend,include=['celery_task.home_task','celery_task.user_task'])	

celery_task/home_task.py

from .celery import app
@app.task
def add(a, b):
    time.sleep(3)
    print('计算结果是:%s' % (a + b))
    return a + b

celery_task/user_task

import time
from .celery import app
@app.task
def send_sms(mobile, code):
    time.sleep(1)
    print('短信发送成功:%s,验证吗是%s' % (mobile, code))
    return True

add_task.py

from celery_task.user_task import send_sms
# 提交了一个发送短信异步任务	
res=send_sms.delay('18723345455','9999')
print(res)  # 672237ce-c941-415e-9145-f31f90b94627

# 任务执行,要启动worker

# 查看任务执行的结果

get_resuly.py

# 查询执行完的结果
from celery_task.celery import app

from celery.result import AsyncResult

id = '672237ce-c941-415e-9145-f31f90b94627'
if __name__ == '__main__':
    res = AsyncResult(id=id, app=app)
    if res.successful():
        result = res.get()  #7
        print(result)
    elif res.failed():
        print('任务失败')
    elif res.status == 'PENDING':
        print('任务等待中被执行')
    elif res.status == 'RETRY':
        print('任务异常后正在重试')
    elif res.status == 'STARTED':
        print('任务已经开始被执行')

在这里插入图片描述

三、Celery异步任务 延时任务 定时任务

定时任务配置

app.conf.beat_schedule = {
       'send_sms_task': {
           'task': 'celery_task.user_task.send_sms',
           'schedule': timedelta(seconds=5),
           # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
           'args': ('1897334444', '7777'),
       },
       'add_task': {
           'task': 'celery_task.home_task.add',
           'schedule': crontab(hour=12, minute=10, day_of_week=3),  # 每周3十二点十分
           'args': (10, 20),
       }
   }

add_task.py

from datetime import datetime, timedelta
from celery_task.home_task import add

eta = datetime.utcnow() + timedelta(seconds=10)
res = add.apply_async(args=(200, 50), eta=eta)
print(res)
  1. 运行项目提价任务
  2. 启动worker执行任务
celery -A celery_task worker -l info
  1. 启动beat定时任务
celery -A celery_task beat -l info

在这里插入图片描述
在这里插入图片描述

四、Django中使用Celery

  1. 将我们写好的复制到项目路径下
  2. 在包内celery.py中添加代码
import os       # django中集成celery需要加入
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')
import django
django.setup()
  1. 在django的视图类中,导入,提交任务
from celery_task.user_task import send_sms

def index(request):
    mobile = request.GET.get('mobile')
    res = send_sms.delay(mobile, '8888')
    print(res)
    return HttpResponse('以及发送了!!')

user_task.py

import time
from .celery import app
from lib.send_tx_sms import send_sms_by_phone
from user.models import UserInfo

@app.task
def send_sms(mobile, code):
    send_sms_by_phone(mobile, code)
    user = UserInfo.objects.all().filter(mobile=mobile).first()
    print('给%s发送短信,短息发送成功:%s, 验证码是%s' % (user.username, mobile, code))
    return True
  1. 启动worker beat (这个时候访问视图就会发送短信了 并发量高)

在这里插入图片描述

五、秒杀逻辑

  1. 前端编写秒杀按钮
  2. 事件:向后端秒杀接口发送请求,发送完立马起了一个定时任务,每个5s,向后端查看一下是否秒杀成功,如果秒杀没成功,定时任务继续执行,如果秒杀成功了,清空定时任务,弹窗告诉他
handleClick() {
      this.$axios.get(this.$settings.BASE_URL + 'userinfo/seckill/').then(res => {
        if (res.data.code == 100) {
          let task_id = res.data.id
          this.$message({
            message: res.data.msg,
            type: 'error'
          });
          // 起个定时任务,每隔5s向后端查询一下是否秒杀成功
          let t = setInterval(() => {
            this.$axios.get(this.$settings.BASE_URL + 'userinfo/get_result/?id=' + task_id).then(
                res => {
                  if (res.data.code == 100 || res.data.code == 101) {  //秒杀结束了,要么成功,要么失败了
                    alert(res.data.msg)
                    // 销毁掉定时任务
                    clearInterval(t)
                  } else if (res.data.code == 102) {
                    //什么事都不干
                  }
                }
            )
          }, 5000)
        }
      })
    }
  1. 后端秒杀接口 提交秒杀任务
 def seckill(request):
     # 提交秒杀任务
     res = seckill_task.delay()
     return JsonResponse({'code': 100, 'msg': '正在排队', 'id': str(res)})
  1. 查询是否秒杀成功的接口 根据用户传入的id,查询任务是否成功
def get_result(request):
            task_id = request.GET.get('id')
            res = AsyncResult(id=task_id, app=app)
            if res.successful():
                result = res.get()  # 7
                return JsonResponse({'code': 100, 'msg': str(result)})
            elif res.failed():
                print('任务失败')
                return JsonResponse({'code': 101, 'msg': '秒杀失败'})
            elif res.status == 'PENDING':
                print('任务等待中被执行')
                return JsonResponse({'code': 102, 'msg': '还在排队'})

六、双写一致性

1)路飞项目接口加缓存

轮播图接口添加缓存 提高响应速度 提高并发量(第一次走数据库查询 第二次之以后都走缓存)

from django.core.cache import cache
from utils.response import APIResponse

class BannerView(GenericViewSet, CommonListModelMixin):
    queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[
               :settings.common_settings.BANNER_NUM]
    serializer_class = BannerSerializer

    def list(self, request, *args, **kwargs):
        result = cache.get('banner_list')
        if result:
            print('走了缓存 速度很快!!')
            return APIResponse(result=result)
        else:
            print('走了数据库查询 很慢')
            res = super().list(request, *args, **kwargs)
            result = res.data.get('result')
            cache.set('banner_list', result)
            return res

在这里插入图片描述

加了缓存,如果mysql数据变了,由于请求的都是缓存的数据,导致mysql和redis的数据不一致
解决方法

  1. 修改mysql数据库,删除缓存 【缓存的修改是在后】
  2. 修改数据库,修改缓存 【缓存的修改是在后】
  3. 定时更新缓存 —》针对于实时性不是很高的接口适合定时更新

2)Celery定时任务实现双写一致性

celery.py

app.conf.beat_schedule = {
    'update_banner': {
        'task': 'celery_task.home_task.update_banner',
        'schedule': timedelta(seconds=5),
        'args': (),
    }
}

home_task.py

@app.task
def update_banner():
    queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:common_settings.BANNER_NUM]
    ser = BannerSerializer(instance=queryset, many=True)
    print(ser.data)
    for item in ser.data:
        item['image'] = dev.HOST_URL + item['image']
    cache.set('banner_list', ser.data)
    return True

整体流程

  1. 启动Django
  2. 启动worker
  3. 启动beat
    在这里插入图片描述

技术小白记录学习过程,有错误或不解的地方请指出,如果这篇文章对你有所帮助请点点赞收藏+关注谢谢支持 !!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/10938.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

busybox的实现原理分析(C语言实现简易版的busybox)

1、linux中实现命令的两种方式 1.1、命令都是单独的可执行程序 aston:~$ ls -l /bin/ls -rwxr-xr-x 1 root root 138208 2鏈 8 2022 /bin/ls aston:~$ aston:~$ ls -l /bin/mkdir -rwxr-xr-x 1 root root 68096 2鏈 8 2022 /bin/mkdir aston:~$ aston:~$ file /bin/ls…

元数据性能大比拼:HDFS vs S3 vs JuiceFS

元数据是存储系统的核心大脑,元数据性能对整个大数据平台的性能和扩展能力至关重要。尤其在处理海量文件的时候。在平台任务创建、运行和结束提交阶段,会存在大量的元数据 create,open,rename 和 delete 操作。因此,在…

Sass 使用说明

CSS 样式表越来越大、 越来越复杂、越来越难以维护。这就是预处理可以提供帮助的地方。 Sass 为你提供了 CSS 中还不存在的特性,例如变量、 嵌套、混合、继承和其它实用的功能,让编写 CSS 代码变得有意思。 最直接的方式就是在命令行中调用 sass 命令。安…

java和vue的狱警管理系统监狱系统狱务管理系统

简介 狱警管理系统监狱系统狱务管理系统,主要是管理罪犯教育改造、劳动改造、案件管理,罪犯信息管理等 演示视频 https://www.bilibili.com/video/BV1VG411P7YL/?zw&vd_sourcefa4ffd66538a5ca679a754398a6fdb5f 技术:springbootvueel…

git的下载与安装

1. 下载地址 根据自己的电脑配置信息,选用合适的版本进行下载即可,我的电脑上64位win11,所以我选择了64位的widnows版本,下面其他内容也以此版本展开。 windows:Git - Downloading Package macOS:Git - …

Linux-进程管理

基本介绍 在Linux中,每个执行的程序都称为一个进程,每一个进程都分配一个ID号(pid) 程序运行起来就产生了进程 ps ps命令用来查看在目前系统中,有哪些正在执行的进程,以及他们执行的状况,可以不加任何参…

K_A05_004 基于 STM32等单片机驱动2X2块(8X8)点阵模块(MAX7219)显示0-9与中文

目录 一、资源说明 二、基本参数 1、参数 2、引脚说明 三、通信协议说明 工作时序 对应程序: 四、部分代码说明 1、接线说明 1.1、STC89C52RC2X2块(8X8)点阵模块(MAX7219) 1.2、STM32F103C8T62X2块(8X8)点阵模块(MAX7219) 2、亮…

年产10000吨餐厨垃圾制备氨基酸有机肥工厂设计

目录 摘 要 I Abstract II 第1章 餐厨垃圾概况 1 1.1餐厨垃圾性质 1 1.2餐厨垃圾无害化处理的必要性 1 1.3餐厨垃圾资源化处理工艺 1 1.3.1加工有机肥 2 1.3.2好氧堆肥 3 1.3.3厌氧消化 3 第2章 项目概述 4 2.1氨基酸有机肥的介绍 4 2…2 氨基酸有机肥的性质 4 2.3 氨基酸有机肥…

重温Python基础,都是最基础的知识点

前言 最近有很多朋友刚接触python学的还是有点模糊 还有的朋友就是想重温一下基础内容,毕竟基础不牢地动山摇 行吧,就总结了以下的一些知识点,可以都看看哈 一、开发环境搭建 更多学习资料.点击领取即可 1.1 Python解释器的安装 Python解…

三个最常见OSPF故障的实操检测步骤

大家好,我是小咖老师。 OSPF排错咱们已经讲过几期了,有同学反馈说看不懂,内容太多也不好记,今天咱就挑最常见的三个,给大家分析讲解一下。 1、OSPF邻居建立不成功 2、OSPF不能发现其他区域的路由 3、CPU过高问题 O…

-1- threejs 场景常见的方法和属性

场景常见的方法和属性场景的作用场景的坐标系常用的属性常用的方法场景的作用 场景(THREE.Scene)用于存储物体、光源、摄像机及其渲染所需要的其他的对象集合。THREE.Scene 对象又是被称为场景图,它不仅仅是一个对象数组,还包含了整个场景图树形结构中的…

Android API—序列化与反序列化学习+案例

概述 序列化是指将对象的状态信息转换为可以存储或传输形式的过程.在序列化期间,对象将其当前状态写入到临时或持久性存储区.以后可以通过从存储区中读取或者反序列化对象的状态,重新创建该对象. 序列化:利用ObjectOutputStream,把对象的信息,按照固定的格式转成一串字节值输…

论文笔记: 数据驱动的地震波形反演--健壮性与泛化性研究

摘要: 分享对论文的理解, 原文见 Zhongping Zhang and Youzuo Lin, Data-driven seismic waveform inversion: A study on the robustness and generalization. 1. 论文贡献 提供实时预测的 VelocityGAN与其他基于编码器-解码器的数据驱动地震波形反演方法不同, VelocityGAN …

c++ - 第11节 - stack和queue类

1.标准库中的stack类 1.1.stack类 stack类的文档介绍:https://cplusplus.com/reference/stack/stack/?kwstack 注: 1. stack是一种容器适配器,专门用在具有后进先出操作的上下文环境中,其删除只能从容器的一端进行元素的插入与提…

深度学习项目:男女性别识别【附完整源码】

性别分类对于人机交互应用和计算机辅助生理或心理分析等商业领域的许多应用至关重要,因为它包含有关男女特征差异的广泛信息。 本次案例收集了接近二十万的男女数据集图片。 文章目录性别分类简介使用 Python 进行性别分类的机器学习项目导入相关库和数据模型搭建…

Chapter3 Pytorch与机器学习有关函数(一)

3.1 Tensor中统计学有关的函数 3.1.1 平均值、总和 、累积 1.测试结果1 import torcha torch.rand(2, 2)print(a) print(torch.mean(a,)) print(torch.sum(a)) print(torch.prod(a)) 2.测试结果2:数组对第1维操作 import torcha torch.tensor([[1.0,2.0,3.0],[4.…

【毕业设计】酒店评价情感倾向分析系统 - python 深度学习

文章目录0 前言1 概述2 项目所需模块3 数据3.1 数据说明3.1.1 字段说明3.2 数据处理3.2.1 分词处理3.2.3 停用词处理3.2.4 样本均衡3.2.5 建立多层感知机分类模型3.2.6 训练模型3.2.7 网络检测率以及检测结果4 最后0 前言 🔥 Hi,大家好,这里…

回归模型介绍

Datawhale开源学习,机器学习课程,项目地址:https://github.com/datawhalechina/leeml-notes 首先讲机器学习中的:回归,回归Regression可以做哪些东西呢? 股票预测 输入为以往股票走势,预测未来…

HTML标签(下)

一、表格标签 1. 表格的主要作用 表格主要用于显示、展示数据。可以让数据规整、有可读性、有条理。 2. 表格的基本语法 <table><tr><td>单元格内的文字</td>...</tr>... </table><table> </table>是用于定义表格的标签 …

nodejs+vue+elementui零食食品o2o商城系统

目 录 摘 要 1 Abstract 1 1 系统概述 4 1.1 概述 4 1.2课题意义 4 1.3 主要内容 4 2 系统开发环境 5 3 需求分析 7 3.1技术可行性&#xff1a;技术背景 7 3.2经济可行性 7 3.3操作可行性&#xff1a; 8 3.4系统设计规则 8 3.5…