注册装饰器
@ app . task和 @ shared_task是Celery中用于定义任务的两种不同装饰器 , 它们之间存在明显的区别 .
from celery import Celery
app = Celery( 'my_app' , broker= 'amqp://guest@localhost//' )
@app. task
def my_task ( ) :
pass
from celery import shared_task
@shared_task
def add ( x, y) :
return x + y
以下是这两者的详细比较 :
* 1. 定义与用途 .
- @ app . task : 这是Celery库提供的装饰器 , 用于在具体的Celery应用程序中定义任务 .
当创建一个Celery应用程序对象 ( 通常命名为 app ) , 并使用 @ app . task装饰器来定义任务函数时 ,
这些任务函数仅在该特定的Celery应用程序中可用 .
- @ shared_task : 这是Celery提供的另一个装饰器 , 用于定义共享任务 ( shared task ) .
共享任务是指可以在多个Celery应用程序之间共享的任务 .
通过使用 @ shared_task装饰器 , 可以在一个Celery应用程序中定义任务 , 并将其标记为共享任务 ,
以便其他使用相同配置的Celery应用程序可以直接导入和使用该任务 .
* 2. 依赖性与可移植性 .
- @ app . task : 任务依赖于特定的Celery应用程序实例 .
因此 , 如果在不同的Celery应用程序中想要使用相同的任务 , 需要在每个应用程序中都重新定义它 .
这在单个项目或单个Celery应用程序的上下文中很有用 , 但不利于跨项目的代码复用 .
- @ shared_task : 不依赖于特定的Celery应用程序实例 .
它加载到内存后会自动添加到Celery对象中 , 因此可以在多个Celery应用程序之间共享 .
这使得任务的可移植性和可重用性更强 , 特别是在多个项目或组件需要共享通用任务时 .
* 3. 使用场景 .
- 如果任务仅在特定的Celery应用程序中使用 , 那么使用 @ app . task就足够了 .
- 如果任务需要在多个Celery应用程序之间共享 , 或者想要提高任务的代码复用性和可维护性 , 那么使用 @ shared_task会更合适 .
* 4. 注意事项 .
- 当使用 @ shared_task时 , 请确保所有相关的Celery应用程序都已正确配置 ,
并且它们使用相同的消息代理 ( 如 : RabbitMQ , Redis等 ) 和结果后端 ( 如果需要的话 ) .
- 无论是 @ app . task还是 @ shared_task , 定义的任务都可以异步执行 , 并可以返回结果 .
可以使用delay ( ) 方法或apply_async ( ) 方法来异步调用这些任务 .
通过一个具体的例子来说明任务在多个Celery应用程序之间共享 .
假设你有两个Python项目 , 它们都需要执行一些通用的后台任务 , 比如发送电子邮件 , 处理图片上传等 .
这些任务在逻辑上是相似的 , 因此希望避免在两个项目中重复编写相同的代码 .
使用 @ app . task的情况需要会在每个Django项目的Celery配置中分别定义这些任务 .
例如 , 在第一个项目中 :
from celery import Celery
app = Celery( 'project1' , broker= 'amqp://guest@localhost//' )
@app. task
def send_email ( recipient, subject, body) :
pass
然后在第二个项目中 , 需要再次定义相同的任务 :
from celery import Celery
app = Celery( 'project2' , broker= 'amqp://guest@localhost//' )
@app. task
def send_email ( recipient, subject, body) :
pass
这种方法的问题在于 , 如果发送电子邮件的逻辑需要更新 , 需要在两个项目中都进行更改 , 这增加了维护的复杂性和出错的风险 .
使用 @ shared_task , 可以在一个独立的Python模块或包中定义这些共享任务 , 并在需要时从两个项目中导入它们 .
注意 : 使用 @ shared_task , 多个Celery应用程序必须连接到同一个消息代理 ( Redis等中间件 ) , 或者至少能够互相通信的消息代理 .
只要它们都能连接到同一个消息代理实例或兼容的实例集 , 任务就可以被正确发送和接收 .
先说明一下 @ shared_task的绑定任务特性 :
from celery import Celery, shared_task
@shared_task
def add ( x, y) :
return x + y
print ( add. app)
app1 = Celery( broker= 'amqp://' )
print ( add. app is app1)
print ( add. app)
app2 = Celery( broker= 'redis://' )
print ( add. app is app2)
print ( add. app)
调用 @ shared_task饰后的add任务 , 它会尝试找到一个已经存在的Celery应用实例 .
( 通常是通过查找当前模块或父模块中的celery应用实例 , '如果有多个则使用离调用最近的' ) .
如果没有找到 , 它会创建一个新的默认Celery应用实例 . 这个新创建的实例将绑定到被装饰的任务上 , 并通过task . app属性访问 .
代码中 , 首先定义了一个add任务 , 然后创建了app1和app2两个Celery应用实例 .
add任务在app1和app2创建之前就已经被定义 , 并且由于此时还没有显式创建Celery应用实例 ,
@ shared_task装饰器会创建一个默认的Celery应用实例并将其绑定到add任务上 .
在定义Celery实例之后 , 调用饰后的add任务 , 那么它会自动将任务重新绑定到最近Celery实例上 .
了解情况后可以继续进行实验了 .
首先 , 创建一个包含共享任务的模块 ( 例如 , shared_tasks . py ) :
from celery import shared_task
@shared_task
def add ( x, y) :
return x + y
然后 , 在第一个Python项目中导入共享任务 :
from celery import Celery
from shared_task import add
app = Celery( 'project1' ,
broker= 'redis://localhost:6379/0' )
print ( add. name in app. tasks)
启动Celery Worker进程 : celery -A p1 worker --loglevel = info -P eventlet
在第二个Django目中导入这个共享任务 :
from celery import Celery
from shared_task import add
app = Celery( 'project2' ,
broker= 'redis://localhost:6379/1' )
print ( add. name in app. tasks)
并启动Celery Worker进程 : celery -A p2 worker --loglevel = info -P eventlet
定义一个脚本使用提交任务 , 使用标识符为project1的Celery实例作为配置 :
from p1 import add
result = add. delay( 4 , 4 )
print ( f'任务ID:: { result. id } ' )
再定义一个脚本使用提交任务 , 使用标识符为project1的Celery实例作为配置 :
from p2 import add
result = add. delay( 5 , 5 )
print ( f'使用结果ID触发的任务: { result. id } ' )
分别运行脚本p3 , p4两次 , 查看消息日志 :
这样 , 如果发送电子邮件的逻辑需要更新 , 只需在shared_tasks . py中进行更改 ,
然后两个项目都会自动使用更新后的逻辑 , 而无需在每个项目中分别进行更改 .
这大大提高了代码的复用性和可维护性 .
如果将两个Celery实例配置为使用同一个消息中间件 ,
这些实例可以共享任务队列 , worker就是随机的了 ( 一个累死累活 , 另外一个不干活 ) .
app = Celery( 'project1' ,
BROKER_CONNECTION_RETRY_ON_STARTUP= True ,
broker= 'redis://localhost:6379/0' )
app = Celery( 'project2' ,
BROKER_CONNECTION_RETRY_ON_STARTUP= True ,
broker= 'redis://localhost:6379/0' )