Python 中的 Kombu 类库

news2024/12/30 3:13:01

Kombu 是一个用于 Python 的消息队列库,提供了高效、灵活的消息传递机制。它是 Celery 的核心组件之一,但也可以单独使用。Kombu 支持多种消息代理(如 RabbitMQ、Redis、Amazon SQS 等),并提供了消息生产者和消费者的功能。安装命令 pip install kombu redis

一.主要功能

1.消息队列

提供可靠的消息传递和队列机制,允许将消息从生产者发送到消费者。

2.消息代理支持

支持多种消息代理,如 RabbitMQ、Redis、Amazon SQS、MongoDB 等。

3.异步任务

可以用来实现异步任务处理,配合 Celery 使用时,可以构建分布式任务队列。

4.消息格式

支持多种消息格式,包括 JSON、YAML、pickle 等。

5.路由和交换

提供了高级的消息路由和交换功能,可以实现复杂的消息分发逻辑。

二.基本使用

1. 创建消息生产者

生产者负责向消息队列发送消息。

(1)Redis 消息代理

from kombu import Connection, Exchange, Producer, Queue

# 设置消息代理的连接URL(Redis)
broker_url = 'redis://localhost:6379/0'

# 创建连接
with Connection(broker_url) as conn:
    # 创建交换机和队列
    exchange = Exchange('my_exchange', type='direct')
    queue = Queue('my_queue', exchange, routing_key='my_key')

    # 创建生产者
    with Producer(conn) as producer:
        # 发送消息
        producer.publish(
            {'key': 'value'},
            exchange=exchange,
            routing_key='my_key',
            serializer='json'
        )
        print("Message sent.")

(2)RabbitMQ 消息代理

from kombu import Connection, Exchange, Producer, Queue

# 设置消息代理的连接URL
broker_url = 'amqp://guest:guest@localhost//'

# 创建连接
with Connection(broker_url) as conn:
    # 创建交换机和队列
    exchange = Exchange('my_exchange', type='direct')
    queue = Queue('my_queue', exchange, routing_key='my_key')

    # 创建生产者
    with Producer(conn) as producer:
        # 发送消息
        producer.publish(
            {'key': 'value'},
            exchange=exchange,
            routing_key='my_key',
            serializer='json'
        )
        print("Message sent.")

2. 创建消息消费者

消费者从消息队列中接收和处理消息。

(1)Redis 消息代理

from kombu import Connection, Exchange, Queue, Consumer

# 设置消息代理的连接URL(Redis)
broker_url = 'redis://localhost:6379/0'

def callback(body, message):
    print(f"Received message: {body}")
    message.ack()  # 确认消息已处理

# 创建连接
with Connection(broker_url) as conn:
    # 创建交换机和队列
    exchange = Exchange('my_exchange', type='direct')
    queue = Queue('my_queue', exchange, routing_key='my_key')

    # 创建消费者
    with Consumer(conn, [queue], callback=callback) as consumer:
        print("Waiting for messages...")
        # 运行消费者,等待消息
        while True:
            conn.drain_events()

(2)RabbitMQ 消息代理

from kombu import Connection, Exchange, Queue, Consumer

# 设置消息代理的连接URL
broker_url = 'amqp://guest:guest@localhost//'

def callback(body, message):
    print(f"Received message: {body}")
    message.ack()  # 确认消息已处理

# 创建连接
with Connection(broker_url) as conn:
    # 创建交换机和队列
    exchange = Exchange('my_exchange', type='direct')
    queue = Queue('my_queue', exchange, routing_key='my_key')

    # 创建消费者
    with Consumer(conn, [queue], callback=callback) as consumer:
        print("Waiting for messages...")
        # 运行消费者,等待消息
        while True:
            conn.drain_events()

3. 高级用法:消息路由

Kombu 支持复杂的消息路由配置,以下示例展示了如何使用路由功能将消息发送到不同的队列。

(1)Redis 消息代理

from kombu import Connection, Exchange, Producer, Queue

# 设置消息代理的连接URL(Redis)
broker_url = 'redis://localhost:6379/0'

# 创建交换机和队列
exchange = Exchange('my_exchange', type='direct')
queue1 = Queue('queue1', exchange, routing_key='key1')
queue2 = Queue('queue2', exchange, routing_key='key2')

def route_message(message):
    if message['type'] == 'type1':
        return 'key1'
    return 'key2'

# 创建连接
with Connection(broker_url) as conn:
    with Producer(conn) as producer:
        # 发送消息
        producer.publish(
            {'type': 'type1', 'data': 'value1'},
            exchange=exchange,
            routing_key=route_message({'type': 'type1'}),
            serializer='json'
        )
        print("Message routed and sent.")

(2)RabbitMQ 消息代理

from kombu import Connection, Exchange, Producer, Queue

# 设置消息代理的连接URL
broker_url = 'amqp://guest:guest@localhost//'

# 创建交换机和队列
exchange = Exchange('my_exchange', type='direct')
queue1 = Queue('queue1', exchange, routing_key='key1')
queue2 = Queue('queue2', exchange, routing_key='key2')

def route_message(message):
    if message['type'] == 'type1':
        return 'key1'
    return 'key2'

# 创建连接
with Connection(broker_url) as conn:
    with Producer(conn) as producer:
        # 发送消息
        producer.publish(
            {'type': 'type1', 'data': 'value1'},
            exchange=exchange,
            routing_key=route_message({'type': 'type1'}),
            serializer='json'
        )
        print("Message routed and sent.")

4. 结合 Celery 使用

Kombu 通常与 Celery 一起使用来处理异步任务。简单理解,Kombu 是 Celery 的依赖库,Celery 需要 Kombu 来访问消息队列系统。同时 Celery 扩展了 Kombu 的功能,提供了一个高级的任务队列系统。Celery 使用 Kombu 来处理与消息代理之间的连接、消息发送、消息接收等操作。

(1)Redis 消息代理

from celery import Celery

# 配置 Celery 使用 Redis 作为消息代理(通过 Kombu 处理)
app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task
def add(x, y):
    return x + y

在 Dify 中默认消息代理使用 Redis,如下所示:

(2)RabbitMQ 消息代理

from celery import Celery

# 配置 Celery 使用 RabbitMQ 作为消息代理(通过 Kombu 处理)
app = Celery('tasks', broker='amqp://guest:guest@localhost//')

@app.task
def add(x, y):
    return x + y

Kombu 是一个强大的消息传递库,提供了多种消息代理的支持,并能实现复杂的消息队列和路由功能。它支持多种消息格式和高级功能,如交换机、队列、路由等。基础用法 包括创建生产者和消费者,通过消息代理发送和接收消息。高级用法 包括消息路由、与 Celery 集成等,用于构建分布式系统和异步任务处理。

参考文献

[1] https://github.com/celery/kombu

[2] https://docs.celeryq.dev/projects/kombu/en/stable/

[3] 消息队列 Kombu 之 基本架构:https://www.cnblogs.com/rossiXYZ/p/14454761.html

[4] Kombu 库用法详解(连接、连接池、生产者、消费者):https://blog.csdn.net/weixin_44799217/article/details/128490325

NLP工程化(星球号)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2151271.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ByteTrack多目标跟踪流程图

ByteTrack多目标跟踪流程图 点个赞吧,谢谢。

用 Pygame 实现一个乒乓球游戏

用 Pygame 实现一个乒乓球游戏 伸手需要一瞬间,牵手却要很多年,无论你遇见谁,他都是你生命该出现的人,绝非偶然。若无相欠,怎会相见。 引言 在这篇文章中,我将带领大家使用 Pygame 库开发一个简单的乒乓球…

python文字转wav音频

借鉴博客 一.前期准备 1. pip install baidu-aip 2. pip install pydub 3. sudo apt-get install ffmpeg 二.代码 from aip import AipSpeech from pydub import AudioSegment import time#input your own APP_ID/API_KEY/SECRET_KEY APP_ID 14891501 API_KEY EIm2iXtvD…

【可变模板参数】

文章目录 可变参数模板的概念可变参数模板的定义方式参数包的展开方式递归展开参数包逗号表达式展开参数包 STL容器中的emplace相关接口函数 可变参数模板的概念 可变参数模板是C11新增的最强大的特性之一,它对参数高度泛化,能够让我们创建可以接受可变…

linux入门——“linux基本指令”下

1.mv指令 mv指令用于移动文件或者目录。语法是mv 源文件 目标文件。它的用法需要注意: 当目标文件不存在的时候,默认是将源文件进行重命名操作,名字就是目标文件的名字,当目标文件存在的时候才会把源文件移动到目标文件。 目标文…

centos 7.9安装k8s

前言 Kubernetes单词来自于希腊语,含义是领航员,生产环境级别的容器编排技术,可实现容器的自动部署扩容以及管理。Kubernetes也称为K8S,其中8代表中间8个字符,是Google在2014年的开源的一个容器编排引擎技术&#xff…

WebLogic 后台弱⼝令GetShell

漏洞描述 通过弱⼝令进⼊后台界⾯ , 上传部署war包 , getshell 影响范围 全版本(前提后台存在弱⼝令) 环境搭建 cd vulhub-master/weblogic/weak_password docker-compose up -d 漏洞复现 默认账号密码:weblogic/Oracle123 weblogic…

SQL编程题复习(24/9/19)

练习题 x25 10-145 查询S001学生选修而S003学生未选修的课程(MSSQL)10-146 检索出 sc表中至少选修了’C001’与’C002’课程的学生学号10-147 查询平均分高于60分的课程(MSSQL)10-148 检索C002号课程的成绩最高的二人学号&#xf…

34. 模型材质父类Material

学习到现在大家对threejs的材质都有简单的了解,本节课主要结合文档,从JavaScript语法角度,给大家总结一下材质API的语法。 材质父类Material 查询threejs文档,你可以看到基础网格材质MeshBasicMaterial、漫反射网格材质MeshLamb…

-bash: apt-get: command not found -bash: yum: command not found

1. 现象: 1.1. 容器内使用apt-get, yum 提示命令未找到 1.2. dockerfile制作镜像时候,使用apt-get, yum同样报此错误。 2.原因: 2.1. linux 分为: 1. RedHat系列: Redhat、Centos、Fedora等 2. Debian系列&#xff1a…

4G 网络下资源加载失败?一次运营商封禁 IP 的案例分享

在工作中,网络问题是不可避免的挑战之一。最近,我们在项目中遇到了一起网络资源加载异常的问题:某同事在使用 4G 网络连接公司 VPN 时,云服务的前端资源居然无法加载!通过一系列的排查和分析,我们发现问题的…

alias 后门从入门到应急响应

目录 1. alias 后门介绍 2. alias 后门注入方式 2.1 方式一(以函数的方式执行) 2.2 方式二(执行python脚本) 3.应急响应 3.1 查看所有连接 3.2 通过PID查看异常连接的进程,以及该进程正在执行的命令行命令 3.3 查看别名 3.4 其他情况 3.5 那么检查这些…

Arthas jvm(查看当前JVM的信息)

文章目录 二、命令列表2.1 jvm相关命令2.1.3 jvm(查看当前JVM的信息) 二、命令列表 2.1 jvm相关命令 2.1.3 jvm(查看当前JVM的信息) 基础语法: jvm [arthas18139]$ jvmRUNTIME …

C++_21_模板

模板 简介&#xff1a; 一种用于实现通用编程的机制。 通过使用模板我们可以编写可复用的代码&#xff0c;可以适用于多种数据类型。 C模板的语法使用角括号 < > 来表示泛型类型&#xff0c;并使用关键字 template 来定义和声明模板 概念&#xff1a; c范式编程 特点&…

OSPFv3协议几类LSA介绍

OSPFv3协议介绍 与OSPFv2相比&#xff0c;OSPFv3在工作机制上与OSPFv2基本相同&#xff1b;但为了支持IPv6地址格式&#xff0c;OSPFv3对OSPFv2做了一些改动。OSPFv3基于OSPFv2基本原理增强&#xff0c;是一个独立的路由协议&#xff08;v3不兼容v2&#xff09;协议号仍然是89…

java -versionbash:/usr/lib/jvm/jdk1.8.0_162/bin/java:无法执行二进制文件:可执行文件格式错误

实验环境&#xff1a;Apple M1在VMwareFusion使用Utubun Jdk文件错误 &#xfffc; 尝试&#xff1a; 1、重新在网盘下载java1.8 2、在终端通过命令下载 3、确保 JDK 正确安装在系统中&#xff0c;可以通过 echo $JAVA_HOME 检查 JAVA_HOME 环境变量是否设置正确。 &#xfff…

前端JavaScript导出excel,并用excel分析数据,使用SheetJS导出excel

前言&#xff1a;哈喽&#xff0c;大家好&#xff0c;今天给大家分享今天给大家分享一篇文章&#xff01;并提供具体代码帮助大家深入理解&#xff0c;彻底掌握&#xff01;创作不易&#xff0c;如果能帮助到大家或者给大家一些灵感和启发&#xff0c;欢迎收藏关注哦 &#x1f…

C语言的文件基础知识

一、文件存在的意义 ① 文件的定义是什么&#xff1f; 文件是以单个名称在计算机上存储的信息集合。文件可以是文本文档、图片、程序等等。文件通常具有三个字母的文件扩展名&#xff0c;用于指示文件类型&#xff08;例如&#xff0c;图片文件常常以 JPEG 格式保存并且文件扩…

【技术编辑与写作】优秀文章整理——如何做个好编辑

【技术编辑与写作】优秀文章整理——如何做个好编辑 本文适用人群&#xff1a; 本文不是严谨教程贴&#xff0c;更多的是好文搜集、经验分享和闲聊&#xff0c;适用于感兴趣想简单了解运营、编辑等行业的人&#xff0c;如果是追求高精尖和专业的&#xff0c;已有N年经验人士请…

【HTTP】方法(method)以及 GET 和 POST 的区别

文章目录 方法&#xff08;method&#xff09;登录上传GET 和 POST 有什么区别&#xff08;面试&#xff09;区别不准确的说法 方法&#xff08;method&#xff09; 首行中的第一部分。首行是由方法、URL 和版本号组成 方法描述了这次请求想干什么&#xff0c;最主要的是&…