redis + celery

news2024/11/17 22:28:50

首先,部署Redis数据库:

先下载包:

wget http://download.redis.io/releases/redis-5.0.7.tar.gz

解压redis包:

tar -xvf redis-5.0.7.tar.gz

编译:

make 

sudo make install   (这样没有指定安装目录)

 

# 注意,redis默认安装路径:/usr/local/bin,这样其实挺好的,并不需要折腾,其实准确

# 的来说,当执行完两个make之后,就会在redis包下的src目录下生成所有必要文件,同

# 时,将一些可执行文件扔一份到 /usr/local/bin 当然,如果不想将这些可执行二进制文件

# 扔到 /usr/local/bin,可以自行指定位置,安下面命令执行即可

 

sudo make PREFIX=/usr/local/redis install  (指定redis的安装目录)

 安装完成后长这样:

c813fb02320940b59969d98bc26316d4.png

将redis配置文件复制到bin目录下(先新建文件夹然后再将redis配置文件coyp进去)

我们要将配置文件复制一份,我们以后就是用这个配置文件来启动。

cd /usr/local/bin

sudo mkdir redis_config

# 回到安装redis目录,因为redis.conf文件在这里

c160d40048b34cf4abdebf52dbae7cc2.png

sudo cp redis.conf /usr/local/bin/redis_config

 

接下来修改配置文件:

vi /usr/local/bin/redis_config/redis.conf

这里有几个地方需要注意:

第一个地方,绑定地址,允许访问的地址,默认是127.0.0.1,会导致只能在本地访问。修改为0.0.0.0则可以在任意IP访问,生产环境不要设置为0.0.0.0

0c773ee90a4e4414a8c7595978345929.png

 

第二个地方,设置守护,守护进程,修改为yes后即可后台运行

e879e96c7b3d4521888df5b8a355bb7e.png

 

第三个地方,设置密码,设置后访问Redis必须输入密码。这里注意,redis没有用户名一说,只有服务地址和密码,密码还可以不给,不像postgres等数据库,需要严格的身份验证。

57012d8d626849f59bff844220fa727b.png

其他的,基本不太动...

# 监听的端口
port 6379
# 工作目录,默认是当前目录,也就是运行redis-server时的命令,日志、持久化等文件会保存在这个目录
dir .
# 数据库数量,设置为1,代表只使用1个库,默认有16个库,编号0~15
databases 1
# 设置redis能够使用的最大内存
maxmemory 512mb
# 日志文件,默认为空,不记录日志,可以指定日志文件名
logfile "redis.log"

接下来,启动redis:

cd /usr/local/bin

redis-server redis_config/redis.conf

1c4c0444649444f29996dd394d1afcf4.png

启动客户端:

cd /usr/local/bin

redis-cli -h 127.0.0.1 -p 6379

设置Redis开机自启动

首先,新建一个系统服务文件:

vi /etc/systemd/system/redis.service

文件内容为:

[Unit]
Description=redis-server
After=network.target

[Service]
Type=forking
ExecStart=/usr/local/bin/redis-server /usr/local/bin/redis_config/redis.conf
PrivateTmp=true

[Install]
WantedBy=multi-user.target

这里其他的没啥,注意这个参数 ExecStart,填对就行。

systemctl daemon-reload 

现在,我们可以用下面这组命令来操作redis了:

# 启动
systemctl start redis
# 停止
systemctl stop redis
# 重启
systemctl restart redis
# 查看状态
systemctl status redis

执行下面的命令,可以让redis开机自启:

systemctl enable redis

 

好了,接下来,开始扯 celery 

Celery 是一款简单灵活可靠的分布式任务执行框架,支持大量任务的并发执行。

Celery 采用典型生产者和消费者模型。生产者提交任务到任务队列,众多消费者从任务队列中取任务执行。

8e6f2b706cac40efba4eb17a07be3efa.png

  • 提交任务给 Broker 队列
  • 如果是异步任务,Worker 会立即从队列中取出任务并执行,执行结果保存在 Backend 中
  • 如果是定时任务,任务由 Celery Beat 进程周期性地将任务发往 Broker 队列,Worker 实时监视消息队列获取队列中的任务执行

应用场景

  • 长时间任务的异步执行, 如上传大文件
  • 实时任务执行,支持集群部署,如支持高并发的机器学习推理
  • 定时任务执行,如定时发送邮件

节点总结:

到这里,我先记录一些理解。

首先,celery它是一个典型的生产者消费者模型。也就是说,这个模型里,可以没有生产者,但是必须得有消费者。

其次,这里先了解2个命令:

celery -A tasks worker --loglevel=info --pool=solo

celery -A proj.period_task beat -l info

这里面出现了一个 worker, 一个 beat。worker 就是消费者的意思,beat 是指周期任务。

2个命令很像,但是意思完全不一样。celery beat -A ...  是说,周期的向队列里放入任务。而celery worker -A ... 是说,一旦队列里有任务,就立刻去执行任务。所以,beat 就属于生产者,而 worker 属于消费者。如果没有 worker 那么任务只会堆积,没人处理。因此,使用celery 一定得启动 worker。

第三,selery跟我们django服务里面自定义的app一样,它本身也是一个app。

安装
本文使用 Redis 作为 Broker 即消息队列

pip install celery
pip install redis

需要持久化任务的话,Broker 使用 RabbitMQ 并设置持久化队列。
官方建议生产环境首选 RabbitMQ ,突然停止或断电 Redis 可能会数据丢失。

 

Celery 的开发主要有四个步骤:

  1. 实例化 Celery
  2. 定义任务
  3. 启动任务 Worker
  4. 调用任务

先看一个简单的celery实现:

from celery import Celery

# broker 是用于存储任务的队列,backend 是用于存储任务执行结果的队列
test_celery = Celery('test', broker='redis://127.0.0.1:6379/0', 
                     backend='redis://127.0.0.1:6379/1')
# 也可以以这种方式,导入任务模块,当然,这里作为最简单的例子,是不需要的
app.conf.imports = ['tasks']

@test_celery.task
def my_add(a, b):
    return a + b

这样,一个最简单的最基本的 celery 就已经完成了。现在已经构建了一个 celery 的异步任务。但是光有任务是没有用的,首先得有消费者,就是我之前写的小结里记录的,celery 必须得有worker,然后,在搞一个生产者,将任务放到队列里,然后,自然会有worker去执行任务,代码也就会被执行了。

启动任务 Worker

celery -A my_celery worker -l info -c 4

这里千万注意,worker 的位置:

e388c599cf724c2b8e6f20806d1e9305.png

新版本,worker不让写前面了。

连接成功后,长这样子...

316260c61e4348c98e532c3c69f2af0c.png

到这里,celery 的消费者就搞定了,然后是生产者...其实生产者无非就2种,一次性的,循环性的。但是其本质又都是一样的,就是给 task 到任务队列完事。一次性的就是只触发一次将 task 压入队列,周期性的就是间隔的将 task 压入队列。

现在构建生产者,最简单的,就是这样,直接弄一个,然后,执行这个文件即可。

from my_celery import my_add

my_add.delay(1, 2)

看下结果:

81e1b6bea5174a0ea122a6df0df6ab82.png

这是刚起的 celery 的 worker 的执行结果...

6bff08848da64ef6b8e3fd9b70963651.png

换一种写法:

from my_celery import my_add

my_add.delay(1, 2)

# 使用签名模式,得到的是一个新的 task, 这种 task 可以跨越进程被调用
new_task = my_add.s(1, 2)
new_task.delay()

结果就成了:

40f31ad18b5d490ebe4a4719ac3c23b0.png

这是两种写法,先不做,讨论,一会在说。

就现在为止,基本上,我们就已经可以使用selery做事情了。尤其是在django服务中,完全可以搞一个 url 配合视图函数做任务触发,就可以利用celery做异步任务。

上面是个简单的实现,通常情况,都是写出配置文件来用的,会显得规范一些。目录结构通常是这样的。构建一个celery app的文件夹,让它和 manage.py 在同一级目录。

bbb4cedffc494485b4cc37f5e9e7ad4e.png

所有的 task 都可以放到 tasks.py 中,celery 的实例化对象可以放到 __init__.py 中,相关的配置可以放到 config.py 中。

__init__.py

from celery import Celery
from celery.schedules import crontab


test_celery = Celery('test')
# 加载配置文件
test_celery.config_from_object('my_celery.config')


# 添加周期任务,在没有调度的时候,周期任务是不会执行的,只有通过周期调度命令启动的时候
# 它们才会被执行
test_celery.conf.beat_schedule = {
    'test001': {
        'task': 'my_celery.tasks.my_add',
        # 每周一07:30执行my_add任务
        'schedule': crontab(minute='30', hour='7', day_of_week='1'),
        'args': (1, 3)
    },
    'test002': {
        'task': 'my_celery.tasks.my_add',
        # 每分钟执行一次 my_add 任务
        'schedule': crontab(minute='*/1'),
        'args': (1, 3)
    },
}

config.py 

注意:这里一定得记着导入模块,因为如果这里没写导入任务模块,那么就会导致任务模块里的任务统统没被注册,那就无法使用。

BROKER_URL = 'redis://127.0.0.1:6379/0'  # Broker,中间件,进行消息传输,使用Redis
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'  # Backend,结果后端,使用Redis
CELERY_RESULT_SERIALIZER = 'json'  # 结果序列化方案
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24  # 任务过期时间
CELERY_TIMEZONE = 'Asia/Shanghai'  # 时区配置
CELERY_IMPORTS = (  # 导入的任务模块
    'my_celery.tasks',
)

tasks.py

from my_celery import test_celery


@test_celery.task
def my_add(a, b):
    return a + b

test.py

from my_celery.tasks import my_add

my_add.delay(1, 2)

# 定时任务,延时3秒执行
my_add.apply_async((3, 4), countdown=3)

new_task = my_add.s(5, 6)
new_task.delay()

普通的调用和之前的简单模式一样,这次看下周期调用:

celery -A my_celery beat -l info

这里有第二种写法,其意义在于,周期模式是需要记录时间的,因此,可以指定一个地方让其记录时间。 

celery -A proj beat -s /home/celery/var/run/celerybeat-schedule

结果:

每隔1分钟将task压入队列

1fd5234c5e0c4fa09c1bfa7c9d4a5cf9.png

每隔1分钟,worker就能获取到task,并执行它 

b059de3b3dd84afa94d229ac44538a1b.png

到这里,基本的celery,就搞定了,可以使用了....

另外的一些花哨的用法,记录下:

调用任务

常规任务

  • delay():直接调用任务,是 apply_async() 的封装
  • apply_async():通过发送异步消息调用任务,可指定倒计时 countdown ,执行时间 eta ,过期时间 expires 等参数
  • signature():创建签名,可传递任务签名给别的进程使用,或作为其他函数的参数
  • s():创建签名的快捷方式
from my_celery.tasks import my_add

result = my_add.delay(1, 2)  # 直接调用
print(result.get())

result = my_add.apply_async((1, 2), countdown=2)  # 2s后执行
print(result.get())

t1 = my_add.signature((1, 2), countdown=2)  # 签名Signatures,可传递任务签名给别的进程使用,或作为其他函数的参数
result = t1.delay()
print(result.get())

t1 = my_add.s(1, 2).set(countdown=2)  # 创建签名的快捷方式
result = t1.delay()
print(result.get())

组合任务

  • group():组合,接受一个可并行调用的任务列表
  • chain():串联,将签名连接在一起,一个接一个调用(前一个签名的结果作为下一个签名的第一个参数)
  • chord():和弦,类似 group() 但包含回调,在所有任务执行完后再调用任务
  • map():将参数列表应用于该任务
  • starmap():将复合参数列表应用于该任务
  • chunks():将一个很长的参数列表分块成若干部分

任务状态跟踪

这种情况,需要对 __init__.py 做出一定的修改,添加一些内容即可

from celery import Celery, Task
from celery.schedules import crontab
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)  # 日志


test_celery = Celery('test')
test_celery.config_from_object('my_celery.config')


test_celery.conf.beat_schedule = {
    'test001': {
        'task': 'my_celery.tasks.my_add',
        'schedule': crontab(minute='30', hour='7', day_of_week='1'),
        'args': (1, 3)
    },
    'test002': {
        'task': 'my_celery.tasks.my_add',
        'schedule': crontab(minute='*/1'),
        'args': (1, 3)
    },
}


class TaskMonitor(Task):
    def on_success(self, retval, task_id, args, kwargs):
        """success时回调"""
        logger.info('task id:{} , arg:{} , successful !'.format(task_id, args))

    def on_retry(self, exc, task_id, args, kwargs, einfo):
        """retry时回调"""
        logger.info('task id:{} , arg:{} , retry !  einfo: {}'.format(task_id, args, exc))

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        """failure时回调"""
        logger.info('task id: {0!r} failed: {1!r}'.format(task_id, exc))





然后,再修改下 tasks.py

from my_celery import test_celery, TaskMonitor


@test_celery.task
def my_add(a, b):
    return a + b


@test_celery.task(base=TaskMonitor)
def my_add_1(a, b):
    return a + b

命令行参数:

关于 celery 命令行的启动等参数,都在这了

参数含义全称
-A指定模块 
-l日志level–loglevel
-c进程数–concurrency
-Q指定队列–queue
-B周期性任务–beat
-P池的实现–pool

 

搭建redis:

Redis基础——1、Linux下安装Redis(超详细)_linux安装redis_原首的博客-CSDN博客

使用redis:

https://www.cnblogs.com/fuminer/p/17254164.html

celery的使用:

Python定时任务库Celery——分布式任务队列_python celery_XerCis的博客-CSDN博客

 

其他参考,总之,看到的帖子,都有错误之处,往往不能让我通达,故写此贴:

https://www.cnblogs.com/clark1990/p/17174251.html

Periodic Tasks — Celery 5.3.5 documentation

https://docs.celeryq.dev/en/stable/reference/cli.html

Python定时任务库Celery——分布式任务队列_python 使用分布式消息系统celery实现定时任务 自动执行python 脚本_XerCis的博客-CSDN博客

Python-Celery定时任务、延时任务、周期任务、crontab表达式及清除任务的基本使用与踩坑 - 知乎

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1222344.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux入门(三)

Linux grep 命令 1: 作用 ​ grep是一种文本搜索工具,它能使用特定的搜索模式,包括[正则表达式]搜索文本,并默认输出匹配行。 ​ windows类似的命令是findstr. 2:语法 grep -options(参数)…

eclipse启动无法找到类(自定义监听器)

一.报错 二.排查 1.首先检查代码是否有问题 本人报错是找不到监听器,故检查监听器的代码和web.xml文件是否有问题 public class DoorListener implements ServletContextListener 监听器是否继承并实现ServletContextListener中的方法。 web.xml中: &…

el-table固定表头(设置height)出现内容过多时不能滚动问题

主要原因是el-table没有div包裹 解决&#xff1a;加一个div并设置其高度和overflow 我自己的主要代码 <div class"contentTable"><el-tableref"table":data"tableData"striperow-dblclick"onRowDblclick"height"100%&q…

前置语音群呼与语音机器人群呼哪个更好

最近通过观察自己接到的营销电话&#xff0c;通过语音机器人外呼的量应该有所下降。同时和客户交流获取到的信息&#xff0c;也是和这个情况类似&#xff0c;很多AI机器人群呼的量转向了OKCC前置语音群呼。询问原因&#xff0c;说是前置语音群呼转化更快&#xff0c;AI机器人群…

ChatGPT 从零到一打造私人智能英语学习助手

近几年&#xff0c;随着智能化技术的发展和人工智能的兴起&#xff0c;越来越多的应用程序开始涌现出来。在这些应用中&#xff0c;语音识别、自然语言处理以及机器翻译等技术都得到了广泛的应用。其中&#xff0c;聊天机器人成为了最受欢迎的人工智能应用之一&#xff0c;它们…

Go 字符串处理:fmt.Sprintf与string.Builder的比较

在Go语言中&#xff0c;我们通常会遇到两种主要的方式来处理和操作字符串&#xff1a;使用fmt.Sprintf函数和string.Builder类型。尽管两者都可以实现字符串的格式化和连接&#xff0c;但它们在性能和用法上有一些关键区别。 1. fmt.Sprintf fmt.Sprintf是一个函数&#xff0c…

23111707[含文档+PPT+源码等]计算机毕业设计基于javawebmysql的旅游网址前后台-全新项目

文章目录 **软件开发环境及开发工具&#xff1a;****功能介绍&#xff1a;****论文截图&#xff1a;****实现&#xff1a;****代码:** 编程技术交流、源码分享、模板分享、网课教程 &#x1f427;裙&#xff1a;776871563 软件开发环境及开发工具&#xff1a; 前端使用技术&a…

Bulk RNA-seq上下游分析

Bulk-RNA-seq上下游分析还是相对简单的&#xff0c;这次我以mouse为例&#xff0c;进行Bulk-RNA-seq上下游分析&#xff0c;并进行对应的图片绘制。 上游分析 1.软件准备 #安装所需软件 sudo apt install fastqc sudo apt install hisat2 sudo apt install cutadapt sudo ap…

机器学习第5天:多项式回归与学习曲线

文章目录 多项式回归介绍 方法与代码 方法描述 分离多项式 学习曲线的作用 场景 学习曲线介绍 欠拟合曲线 示例 结论 过拟合曲线 示例 ​结论 多项式回归介绍 当数据不是线性时我们该如何处理呢&#xff0c;考虑如下数据 import matplotlib.pyplot as plt impo…

皮具生产ERP都有哪些功能?企业应当如选型

箱包皮具型号多、款式多、营销渠道广泛&#xff0c;各个销售平台的管理模式各不相同&#xff0c;而皮具生产企业经营策略和管理模式的差异&#xff0c;也导致企业在市场竞争力等方面参差不齐。 此外&#xff0c;不同的客户有不同的需求&#xff0c;再加上皮具行业竞争的加剧&a…

【鸿蒙应用ArkTS开发系列】- 云开发入门简介

目录 概述开发流程工程概览工程模板工程结构 工程创建与配置 概述 HarmonyOS云开发是DevEco Studio新推出的功能&#xff0c;可以让您在一个项目工程中&#xff0c;使用一种语言完成端侧和云侧功能的开发。 基于AppGallery Connect Serverless构建的云侧能力&#xff0c;开发…

直线插补-逐点比较法

直线插补-逐点比较法 逐点比较法四个节拍的工作流程如图所示举例1 逐点比较法 逐点比较法逐点比较法是通过逐点比较刀具与所需插补曲线之间的相对位置&#xff0c;确定刀具的进给方向&#xff0c;进而加工出工件轮廓的插补方法。刀具从加工起点开始&#xff0c;按照“靠近曲线…

ssrf学习笔记总结

SSRF概述 ​ 服务器会根据用户提交的URL 发送一个HTTP 请求。使用用户指定的URL&#xff0c;Web 应用可以获取图片或者文件资源等。典型的例子是百度识图功能。 ​ 如果没有对用户提交URL 和远端服务器所返回的信息做合适的验证或过滤&#xff0c;就有可能存在“请求伪造”的…

mount /dev/mapper/centos-root on sysroot failed处理

今天发现centos7重启开不进去系统 通过查看日志主要告警如下 修复挂载目录 xfs_repair /dev/mapper/centos-root不行加-L参数 xfs_repair -L /dev/mapper/centos-root重启 reboot

软件测试基础-01

目标&#xff1a; 1.软件测试的定义&#xff1b; 2.7种测试分类的区别&#xff1b; 3.质量模型的重点5项&#xff1b; 4.测试流程的6个步骤&#xff1b; 5.测试模板8要素&#xff1b; 就业方向&#xff1a; 1.功能测试接口测试 2.功能测试性能测试 3.功能测试web自动化…

DAY57 739. 每日温度 + 496.下一个更大元素 I

739. 每日温度 题目要求&#xff1a; 请根据每日 气温 列表&#xff0c;重新生成一个列表。对应位置的输出为&#xff1a;要想观测到更高的气温&#xff0c;至少需要等待的天数。如果气温在这之后都不会升高&#xff0c;请在该位置用 0 来代替。 例如&#xff0c;给定一个列…

ChatGPT暂时停止开通puls,可能迎来封号高峰期

前言: 前两日,chat gpt的创始人 San Altman在网上发表了,由于注册的使用量超过了他们的承受能力,为了确保每个人的良好使用体验,chat gpt将暂时停止开通gpt plus。 情况: 前段时间好像出现了官网崩溃的情况,就连api key都受到了影响,所以现在就开始了暂时停止puls的注…

OpenAI的Whisper蒸馏:蒸馏后的Distil-Whisper速度提升6倍

1 Distil-Whisper诞生 Whisper 是 OpenAI 研发并开源的一个自动语音识别&#xff08;ASR&#xff0c;Automatic Speech Recognition&#xff09;模型&#xff0c;他们通过从网络上收集了 68 万小时的多语言&#xff08;98 种语言&#xff09;和多任务&#xff08;multitask&am…

零代码编程:用ChatGPT自动合并多个Word文件

一个文件夹中有多个docx格式的word文档&#xff1a; 想要把它们都合并成一个文件&#xff0c;然后打印&#xff0c;可以在ChatGPT中输入提示词&#xff1a; 你是一个Python编程专家&#xff0c;要完成一个处理word内容的任务&#xff0c;具体步骤如下&#xff1a; 打开文件夹…

lvgl 画圆弧时进入 HardFault

目录 一、现象描述 lvgl 版本 二、问题分析 lvgl 需要的资源新建mcu 工程时默认分配的资源问题解决 一、现象描述 移植完lvgl 之后&#xff0c;能正常显示label&#xff0c;但是button arc 等复杂的控件都不能正常显示。调用官方的画圆弧demo 时&#xff0c;在多次调用 _lv…