Flask框架配置celery-[1]:flask工厂模式集成使用celery,可在异步任务中使用flask应用上下文,即拿即用,无需更多配置

news2024/12/24 18:33:32

一、概述

1、celery框架和flask框架在运行时,是在不同的进程中,资源是独占的。

2、celery异步任务如果想使用flask中的功能,如orm,是需要在flask应用上下文管理器中执行orm操作的

3、使用celery是需要使用到中间件的,简单点就使用redis做中间件

注意:

在flask工厂模式集成celery异步框架,在celery的异步任务中能够获取到flask的应用上下文管理器,也就是说在celery异步任务中你可以去调用flask项目中功能,如orm操作等。

使用本文配置,可以无需修改flask创建app应用的程序,直接将celery相关包创建,运行就可以使用,且能够在异步任务使用flask的功能。

二、项目结构

依赖环境:

celery==4.4.7
eventlet==0.33.3
Flask==2.1.3
Flask-Caching==1.10.1
Flask-Cors==3.0.10
Flask-Migrate==2.7.0
Flask-RESTful==0.3.9
Flask-SocketIO==5.1.1
Flask-SQLAlchemy==2.5.1
PyMySQL==1.0.2
redis==3.5.3
SQLAlchemy==1.4.0
Werkzeug==2.0.2

目录结构:

flask-project

        |--apps

                |-- user

                        |-- models

                        |--views.py

                        |--urls.py

                |--__init__.py

        |--ext

                |--__init__.py

                |--config.py

        |--celery_task

                |--__init__.py

                |--async_task.py

                |--celery.py

                |--celeryconfig.py

                |--check_task.py

                |--scheduler_task.py

        app.py

三、flask工厂模式下各模块功能

1、apps/user/models.py : 写了一个user表

2、apps/user/views.py:写了测试调用celery异步任务的接口

3、apps/user/urls.py: 注册路由的

4、ext/__init__.py:cache、db、cors的拓展

5、ext/config.py : cache和cors使用到的配置

6、apps/__init__.py: 一个函数create_app,生成flask应用对象

7、app.py: 启动flask应用对象的模块

本文重点不在flask工厂模式,默认看官都懂如何创建flaks工厂模式的项目了。

在视图中在执行异步任务,并获取异步任务的id:

from celery_task.async_task import send_email_task,cache_user_task
#用户资源:get\put\delete, 对单个进行操作
class UserOneResource(ResourceBase):
    def put(self,id):
        #测试异步发邮件
        email = request.args.get('email')
        code = request.args.get('code')
        res = send_email_task.delay(email,code)
        print(res.id)
        return NewResponse(msg='put',data={'task_id':res.id})

    def patch(self,id):
        #测试异步操作flask的orm和cache
        p = request.args.get('p')
        if p=='set':
            res = cache_user_task.delay()
            print(res,type(res))
            return NewResponse(msg='patch',data={'task_id':res.id})
        else:
            from ext import cache
            data = cache.get('all-user-data')
            return NewResponse(msg='patch',data=data)

res = 异步函数.delay(函数需要的参数)

task_id = res.id

注意:task_id 可以知道对应的任务的完成情况,获取任务的返回值等。

四、celery项目的配置

1、celery的配置

将celery的配置都放到一个py文件中,方便后期的维护和使用

celeryconfig.py

from celery.schedules import crontab
from datetime import timedelta
'''
参数解析:
accept_content:允许的内容类型/序列化程序的白名单,如果收到不在此列表中的消息,则该消息将被丢弃并出现错误,默认只为json;
task_serializer:标识要使用的默认序列化方法的字符串,默认值为json;
result_serializer:结果序列化格式,默认值为json;
timezone:配置Celery以使用自定义时区;
enable_utc:启用消息中的日期和时间,将转换为使用 UTC 时区,与timezone连用,当设置为 false 时,将使用系统本地时区。
result_expires: 异步任务结果存活时长
beat_schedule:设置定时任务
'''
#手动注册celery的异步任务:将所有celery异步任务所在的模块找到,写成字符串
task_module = [
    'celery_task.async_task',  # 写任务模块导入路径,该模块主要写异步任务的方法
    'celery_task.scheduler_task',  # 写任务模块导入路径,该模块主要写定时任务的方法
]

#celery的配置
config = {
    "broker_url" :'redis://127.0.0.1:6379/0',   #'redis://:123456@127.0.0.1:6379/1' 有密码时,123456是密码
    "result_backend" : 'redis://127.0.0.1:6379/1',
    "task_serializer" : 'json',
    "result_serializer" : 'json',
    "accept_content" : ['json'],
    "timezone" : 'Asia/Shanghai',
    "enable_utc" : False,
    "result_expires" : 1*60*60,
    "beat_schedule" : { #定时任务配置
            # 名字随意命名
            'add-func-30-seconds': {
                # 执行add_task下的addy函数
                'task': 'celery_task.scheduler_task.add_func',  # 任务函数的导入路径,from celery_task.scheduler_task import add_func
                # 每10秒执行一次
                'schedule': timedelta(seconds=30),
                # add函数传递的参数
                'args': (10, 21)
            },
            # 名字随意起
            'add-func-5-minutes': {
                'task': 'celery_task.scheduler_task.add_func',  # 任务函数的导入路径,from celery_task.scheduler_task import add_func
                # crontab不传的参数默认就是每的意思,比如这里是每年每月每日每天每小时的5分执行该任务
                'schedule': crontab(minute='5'),  # 之前时间点执行,每小时的第5分钟执行任务, 改成小时,分钟,秒 就是每天的哪个小时哪分钟哪秒钟执行
                'args': (19, 22)  # 定时任务需要的参数
            },
            # 缓存用户数据到cache中
            'cache-user-func': {
                'task': 'celery_task.scheduler_task.cache_user_func',
                # 导入任务函数:from celery_task.scheduler_task import cache_user_func
                'schedule': timedelta(minutes=1),  # 每1分钟执行一次,将用户消息缓存到cache中
            }
        }
}

2、创建celery对象

celery.py

from celery import Celery,Task
from .celeryconfig import config,task_module
import sys
import os
'1、把flask项目路径添加到系统环境变量中'
project_path = os.path.dirname(os.path.dirname(__file__))
sys.path.append(project_path)

'''
2、创建celery应用对象
  'task'可以任务是该celery对象名字,用于区分celery对象
  broker是指定消息中间件
  backend是指定任务结果存储位置
  include是手动指定异步任务所在的模块的位置
'''
#创建celery异步对象
celery = Celery('task', broker=config.get('broker_url'), backend=config.get('result_backend'), include=task_module)
#导入一些基本配置
celery.conf.update(**config)

'3、给celery所有任务添加flask的应用上下文,在celery异步任务中就可以调用flask中的对象了'
class ContextTask(celery.Task):
    def __call__(self, *args, **kwargs):
        from apps import create_app
        app = create_app()
        with app.app_context():
            return self.run(*args, **kwargs)
celery.Task = ContextTask

注意:

1、第一步很关键,设置到python项目运行时,加载环境变量的问题。这一步是将flask项目的根目录加载环境变量中,这样第3步才能从apps中导入create_app函数。

2、第二步是创建celery通用的方法了,没什么好说的。

3、第三步很关键,涉及到celery异步任务能否在flask应用上下文管理器运行,从而可以调用flask中的功能,例如orm操作,cache操作.。(在执行任务时,先套上flask的应用上下文管理器)

3、异步任务模块

将所有异步任务相关的函数都集中到一个模块中,方便维护和使用。

async_task.py

# 导入celery对象app
from celery_task.celery import celery
from ext import cache
import time


'''
1、没有返回值的,@app.task(ignore_result=True)
2、有返回值的任务,@app.task 默认就是(ignore_result=False)
'''


# 没有返回值,禁用掉结果后端
@celery.task
def send_email_task(receiver_email,code):  # 此时可以直接传邮箱,还能减少一次数据库的IO操作
    '''
    :param email: 接收消息的邮箱,用户的邮箱
    :return:
    '''
    # 模拟邮件发送验证码
    time.sleep(5)
    return {'result':'邮件已经发送',receiver_email:'2356'}

@celery.task
def cache_user_task():
    #orm查询数据,放到cache中
    from apps.user.models import UserModel
    user = UserModel.query.all()
    lis = []
    for u in user:
        id = u.id
        name = u.name
        dic = {'id':id,'name':name}
        lis.append(dic)
        print(dic)
    cache.set('all-user-data',lis)
    return {'code':200,'msg':'查询数据成功'}

4、定时任务模块

将所有定时任务相关的函数都集中到一个模块中,方便维护和使用。

schedulser_task.py

from celery_task.celery import celery
import time


# 有返回值,返回值可以从结果后端中获取
@celery.task
def add_func(a, b):
    print('执行了加法函数',a+b)
    return a + b


# 不需要返回值,禁用掉结果后端
@celery.task(ignore_result=True)
def cache_user_func():
    print('all')


5、检测任务id获取任务状态和返回值

check_task.py:

from celery.result import AsyncResult
from celery_task.celery import celery

'''验证任务的执行状态的'''


def check_task_status(task_id):
    '''
    任务的执行状态:
        PENDING :等待执行
        STARTED :开始执行
        RETRY   :重新尝试执行
        SUCCESS :执行成功
        FAILURE :执行失败
    :param task_id:
    :return:
    '''
    result = AsyncResult(id=task_id, app=celery)
    dic = {
        'type': result.status,
        'msg': '',
        'data': None,
        'code': 400
    }
    if result.status == 'PENDING':
        dic['msg'] = '任务等待中'
    elif result.status == 'STARTED':
        dic['msg'] = '任务开始执行'
    elif result.status == 'RETRY':
        dic['msg'] = '任务重新尝试执行'
    elif result.status == 'FAILURE':
        dic['msg'] = '任务执行失败了'
    elif result.status == 'SUCCESS':
        result = result.get()
        dic['msg'] = '任务执行成功'
        dic['data'] = result
        dic['code'] = 200
        # result.forget() # 将结果删除
        # async.revoke(terminate=True)  # 无论现在是什么时候,都要终止
        # async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。
    return dic

在视图函数中调用该方法,通过task_id ,返回任务的运行结果。

五、测试

1、运行项目

flask项目:

        flask run --host 0.0.0.0 --port 5000

celery项目:

启动celery进程:

windows系统:

        celery -A celery_task.celery worker -l info  -P  eventlet

linux系统:

        celery -A celery_task.celery worker -l info 

启动定时任务(先启动celery进程在启动定时任务):

celery -A celery_task.celery beat -l info

2、运行结果

1、执行异步任务中,将orm数据存到cache中

2、执行定时任务了

六、注意事项

1、在系统中要先安装好redis和mysql,并都启动了

2、在测试异步操作orm时,会使用到flask的cache存数据,注意flask的cache不能配置内存模式,不然celery进程存到cache中的数据,flask进程中取不到的。

3、当前的配置下,celery的目录必须是在flask根目录下

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1084546.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2023年【煤气】试题及解析及煤气复审模拟考试

题库来源:安全生产模拟考试一点通公众号小程序 煤气试题及解析考前必练!安全生产模拟考试一点通每个月更新煤气复审模拟考试题目及答案!多做几遍,其实通过煤气理论考试很简单。 1、【单选题】100mm以上的煤气管道着火,( )一下把煤…

vue3封装分页组件

1.新建Pagination文件以及该文件夹下新建index.vue 2.在index.vue文件中编写一下代码 <template><div :class"{ hidden: hidden }" class"pagination-container"><el-pagination:background"background"v-model:current-page&qu…

全国快递查询接口,快递,全球快递,配送,物流管理,物流数据,电子商务

一、接口介绍 支持国内外1500快递物流公司的物流跟踪服务&#xff0c;包括顺丰、圆通、申通、中通、韵达等主流快递公司。同时&#xff0c;支持单号识别快递物流公司、按次与按单计费、物流轨迹返回等功能&#xff0c;以满足企业对快递物流查询多维度的需求。 二、使用案例截…

IDEA中.gitignore配置不生效的解决方案

一、创建项目 二、执行以下Git命令 git rm -r --cached . git add . git commit -m "update .gitignore"

接口测试的几种方法

其实无论用那种测试方法&#xff0c;接口测试的原理是通过测试程序模拟客户端向服务器发送请求报文&#xff0c;服务器接收请求报文后对相应的报文做出处理然后再把应答报文发送给客户端&#xff0c;客户端接收应答报文这一个过程。 方法一、用LoadRunner实现接口测试 大家都知…

风光十几年的MIUI要无了,小米自研MIOS即将就位

今年8月小米新品发布会上&#xff0c;K60 至尊版亮相的同时带来了下一代系统&#xff08;或者说UI &#xff09;的名字&#xff1a;MIUI 15 。 好家伙不知不觉已经要更到 15 了&#xff0c;大家用过几个版本呢&#xff1f; 回想起 MIUI V1 内测发布、MIUI V5 全民刷机、MIUI 8…

显卡、SSD二合一,华硕这波「技术革命」给我看傻了

通常来讲&#xff0c;高端、低端主板除了芯片组、用料、供电规格等差异&#xff0c;在接口拓展性方面也存在着明显分级。 最常用的 USB 接口倒还好&#xff0c;即便目前最入门级主板基本也能保证至少 4 个&#xff1b; 加上机箱前面板拓展 USB 口&#xff0c;保证一般用户需求…

Android studio安装详细教程

Android studio安装详细教程 文章目录 Android studio安装详细教程一、下载Android studio二、安装Android Studio三、启动Android Studio 一、下载Android studio Android studio安装的前提是必须保证安装了jdk1.8版本以上 1、打开android studio的官网&#xff1a;Download…

3D模型格式转换工具HOOPS Exchange:模型数据自由导入和导出

HOOPS Exchange是一套高性能软件库&#xff0c;可以为软件开发人员提供导入和导出3D文件格式的能力。HOOPS Exchange导入3D数据后&#xff0c;会将3D数据转换为PRC格式存放到内存中&#xff0c;最后导出成为其他3D格式。&#xff08;点击申请HOOPS Exchange免费试用&#xff09…

互动设计:深入了解用户体验的关键

交互是人与计算机系统之间的互动过程。在计算机领域中&#xff0c;交互是人机交互技术的核心内容之一。交互设计是一种基于人类行为科学、心理学、人体工程学等领域的专业设计&#xff0c;目的是创造用户友好的、易于使用的计算机软件、网络、移动应用等。交互的本质在于用户的…

JUC并发编程:Monitor和对象结构

JUC并发编程&#xff1a;Monitor和对象结构 1. Monitor1.1 对象的结构1.1.1 MarkWord1.1.2 Klass Word1.1.3 数组长度1.1.4 &#x1f330; 1. Monitor Monitor官方文档 我们可以把Monitor理解为一个同步工具&#xff0c;也可以认为是一种同步机制。它通常被描述为一个对象&…

零基础Linux_15(基础IO_文件)软硬链接+动静态库详解

目录 1. 软硬链接 1.1 创建软链接 1.2 创建硬链接 1.3 硬链接数和unlink 2. 动静态库 2.1 制作静态库 2.2 查看和打包静态库 2.3 使用静态库 2.3.1 安装在默认搜索路径 2.3.2 告知路径库路径库名 2.4 制作动态库 2.5 使用动态库 2.5.1 安装在默认搜索路径 2.5.2 …

轻松虚拟gps定位 AnyGo中文 for mac

AnyGo是一款旨在帮助用户模拟和改变iPhone的GPS位置&#xff0c;以实现虚拟定位的功能。该软件适用于iOS设备&#xff0c;并提供了一系列强大的功能&#xff0c;让用户能够自由地模拟不同地理位置&#xff0c;以满足各种需求。 以下是AnyGo可能提供的一些主要功能和特点&#…

设置全局滚动条样式

需求&#xff1a;统一滚动条样式&#xff0c;不需要每次都要在页面一个一个的设置&#xff0c;整个项目只要内容超出了显示滚动条了就会是自己设置的那个滚动条样式显示 1.效果 2. 创建scroll.scss文件 滚动条样式如下&#xff0c;可以自行更改 /* // 滚动条样式重写-chrome …

新闻api接口,新闻资讯,社交媒体,体育赛事,全国热门带正文新闻查询API接口

一、接口介绍 解决同一类新闻在不同平台上的内容获取问题&#xff0c;在归档主流新闻平台的内容数据基础上&#xff0c;对外提供统一的调用方式来完成实时、最新的相关新闻的获取&#xff0c;极大方便各类企业在自有软件中集成新闻内容的功能。支持200余个新闻大站&#xff0c;…

官网安装Python包太慢?教你三种Pytorch的下载安装方式,保证你再也不用出现Error

这一期教大家如何在Anaconda中使用CUDA来进行加速、神经网络依赖cuDNN的下载安装&#xff0c;以及下载和安装Pytorch-GPU安装包的三种方式&#xff08;conda、pip、轮子&#xff09;。 还未下载安装 CUDA 和 Anaconda&#xff0c;请看往期文章&#xff0c;是全套系列的总结&am…

@Scope 注解失效了?咋回事

scope 属性&#xff0c;相信大家都知道&#xff0c;一共有六种&#xff1a; 取值含义生效条件singleton表示这个 Bean 是单例的&#xff0c;在 Spring 容器中&#xff0c;只会存在一个实例。prototype多例模式&#xff0c;每次从 Spring 容器中获取 Bean 的时候&#xff0c;才…

如何解决MidJourney错过付费后被暂停

问题 假定你已经成功订阅购买了 MidJourney 一段时间&#xff0c;下个月扣费周期到了。 如果你卡里余额不足&#xff0c;卡被封或失效了&#xff0c;或者你想着最近没啥用得上 MidJourney 的地方先省着不续费&#xff0c;等要用的时候就用不了。 如果想要去官网的续费页&…

软文撰写技巧:中小企业品牌形象塑造新方法

在激烈的市场竞争中&#xff0c;企业仅靠产品质量和服务态度很难站稳脚跟&#xff0c;在互联网时代下&#xff0c;品牌形象才是中小企业的取胜法宝&#xff0c;良好的品牌形象能够赢得消费者的信赖&#xff0c;还能让企业占据市场资源优势&#xff0c;软文推广就是帮助企业塑造…

机器人控制算法——两轮差速驱动运动模型

1.Introduction 本文主要介绍针对于两轮差速模型的逆运动学数学推导。因为在机器人控制领域&#xff0c;决策规划控制层给执行器输出的控制指令v(车辆前进速度)和w(角速度)&#xff0c;因此&#xff0c;我们比较关心&#xff0c;当底层两个驱动电机接收到此信息&#xff0c;如何…