【Python】部署Rabbitmq消息队列实现生产和消费

news2024/9/28 7:20:46

原文作者:我辈李想
版权声明:文章原创,转载时请务必加上原文超链接、作者信息和本声明。


文章目录

  • 前言
  • 一、基础的生产者消费者
  • 二、ACK消息确认机制
  • 三、类的写法
    • 1.新建MyRabbitMQ.py文件
    • 2.基础RabiitMQ
    • 3.发布订阅
    • 4.多次执行RabiitMQ_生产
  • 四、多消息队列
  • 五、异常处理
    • 1. 死信队列


前言

Rabbitmq消息队列,Windows安装RabbitMQ教程


一、基础的生产者消费者

Rabbitmq和kafka消息队列都是一样的,使用时需要生产者和消费者。

# !/usr/bin/env python
import pika

# ######################### 生产者 #########################
# 如果设置密码,那么就需要加以下一句话配置用户名与密码
credentials = pika.PlainCredentials("root", "123")
# host:ip地址 credentials:链接凭证
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost', credentials=credentials))
# 声明一个channel,类似数据库打开一个链接
channel = connection.channel()
# 创建一个队列,队列名称叫做hello
channel.queue_declare(queue='hello')
# 向hello这个队列里发送一个Hello World!   exchange:如果当做一个普通队列,就为空
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

消费者

# !/usr/bin/env python
import pika

# ########################## 消费者 ##########################
# 如果设置密码,那么就需要加以下一句话配置用户名与密码
credentials = pika.PlainCredentials("root", "123")
# host:ip地址 credentials:链接凭证
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost', credentials=credentials))

channel = connection.channel()


# channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    # 取一个就关掉的方法
    channel.stop_consuming()


# 去hello队列里拿数据,一但有数据,就执行callback
channel.basic_consume(callback, queue='hello', no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
# 开始去取得意思【表示一直去队列中取会夯住】注意可以去一个就关掉
channel.start_consuming()

二、ACK消息确认机制

ACK机制是消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将次消息从队列中删除。

生产者

# !/usr/bin/env python
import pika

# ######################### 生产者 #########################
# 如果设置密码,那么就需要加以下一句话配置用户名与密码
credentials = pika.PlainCredentials("root", "123")
# host:ip地址 credentials:链接凭证
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost', credentials=credentials))
# 声明一个channel,类似数据库打开一个链接
channel = connection.channel()
# 创建一个队列,队列名称叫做hello
channel.queue_declare(queue='hello')
# 向hello这个队列里发送一个Hello World!   exchange:如果当做一个普通队列,就为空
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

消费者

# !/usr/bin/env python
import pika

# ########################## 消费者 ##########################
# 如果设置密码,那么就需要加以下一句话配置用户名与密码
credentials = pika.PlainCredentials("root", "123")
# host:ip地址 credentials:链接凭证
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost', credentials=credentials))

channel = connection.channel()


# channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    # 取值做确认工作
    ch.basic_ack(delivery_tag=method.deliver_tag)


# 去hello队列里拿数据,一但有数据,就执行callback,
# no_ack=Flask必须在取值时做确认工作,否则值不会被取出
channel.basic_consume(callback, queue='hello', no_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
# 开始去取得意思【表示一直去队列中取会夯住】注意可以去一个就关掉
channel.start_consuming()

三、类的写法

这个类使用 pika 库进行与 RabbitMQ 的通信。当你使用 send_message() 或 receive_message() 、consume_messages方法时,Channel 对象必须是打开的。如果没有连接或者通道没有打开,这些方法将引发 ValueError 异常。

1.新建MyRabbitMQ.py文件

文件包含rabbitmq的类,类中包含连接到RabbitMQ,并在连接对象上创建一个管道,然后就可以使用send_message()receive_message()方法、consume_messages发送和接收消息,接收消息会调用回调方法。

下面是一个带有消费回调的完整 RabbitMQ 类

import pika
import time

class RabbitMQ:
    def __init__(self, host, port, username, password):
        self.host = host
        self.port = port
        self.username = username
        self.password = password
        self.connection = None
        self.channel = None

    def connect(self, timeout=10):
        credentials = pika.PlainCredentials(self.username, self.password)
        parameters = pika.ConnectionParameters(host=self.host,
                                               port=self.port,
                                               credentials=credentials)
        start_time = time.time()
        while time.time() - start_time < timeout:
            try:
                self.connection = pika.BlockingConnection(parameters)
                self.channel = self.connection.channel()
                return True
            except pika.exceptions.AMQPConnectionError:
                time.sleep(1)
        return False

    def send_message(self, exchange, routing_key, message):
        try:
            self.channel.basic_publish(exchange=exchange,
                                       routing_key=routing_key,
                                       body=message,
                                       properties=pika.BasicProperties(delivery_mode=2))
        except AttributeError:
            raise ValueError("Channel is not open. Call connect() before send_message().")

    def receive_message(self, queue, auto_ack=False):
        try:
            method_frame, properties, body = self.channel.basic_get(queue=queue, auto_ack=auto_ack)
            if method_frame:
                return body.decode('utf-8')
            else:
                return None
        except AttributeError:
            raise ValueError("Channel is not open. Call connect() before receive_message().")

    def consume_messages(self, queue, callback):
        try:
            self.channel.basic_consume(queue=queue, on_message_callback=callback, auto_ack=True)
            self.channel.start_consuming()
        except AttributeError:
            raise ValueError("Channel is not open. Call connect() before consume_messages().")
    
    def create_queue(self, name):
        try:
            self.channel.queue_declare(queue=name, durable=True)
        except AttributeError:
            raise ValueError("Channel is not open. Call connect() before create_queue().")

    def bind_queue(self, queue, exchange, routing_key):
        try:
            self.channel.queue_bind(queue=queue, exchange=exchange, routing_key=routing_key)
        except AttributeError:
            raise ValueError("Channel is not open. Call connect() before bind_queue().")

    def close(self):
        try:
            self.connection.close()
        except AttributeError:
            raise ValueError("Connection is not open. Call connect() before close().")

2.基础RabiitMQ

基于队列_生产

创建RabiitMQ_生产.py文件,内容如下:

from MyRabbitMQ import RabbitMQ

if __name__ == '__main__':
    print('RabbitMQ生产')
    my_host = '127.0.0.1'
    my_username = 'guest'
    my_password = 'guest'
    my_queue = 'hello'
    my_exchange = 'BBB'
    my_routing_key = 'hello'

    rabbitmq = RabbitMQ(my_host, 5672, my_username, my_password)
    if rabbitmq.connect():
        rabbitmq.create_queue(my_queue)
        rabbitmq.send_message('', my_queue, message='开始了')
    else:
        print("Failed to connect to RabbitMQ.")

基于队列_消费

from MyRabbitMQ import RabbitMQ

if __name__ == '__main__':
    print('RabbitMQ消费')
    my_host = '127.0.0.1'
    my_username = 'guest'
    my_password = 'guest'
    my_queue = 'hello'
    my_exchange = 'BBB'
    my_routing_key = 'hello'


    def callback(channel, method, properties, body):
        print("Received message: %s" % body.decode('utf-8'))
        channel.basic_ack(delivery_tag=method.delivery_tag)


    rabbitmq = RabbitMQ(my_host, 5672, my_username, my_password)
    if rabbitmq.connect():
        rabbitmq.create_queue(my_queue)
        rabbitmq.consume_messages(my_queue, callback)
    else:
        print("Failed to connect to RabbitMQ.")

在此例中,当一个新的消息从名为 my_queue 的队列中接收时,回调函数 callback 将被调用并打印消息内容。

注意:如果你的回调函数需要执行较复杂的操作(例如长时间运行或使用多线程),则你应该确保它是线程安全的,并且在操作完成后调用 ch.basic_ack,这样 RabbitMQ 就知道消息已经被处理并可以将其从队列中删除。

3.发布订阅

基于exchangs交换机的生产者

from MyRabbitMQ import RabbitMQ

if __name__ == '__main__':
    print('RabbitMQ消费')
    my_host = '127.0.0.1'
    my_username = 'guest'
    my_password = 'guest'
    my_queue = 'hello'
    my_exchange = 'BBB'
    my_routing_key = 'hello'


    rabbitmq = RabbitMQ(my_host, 5672, my_username, my_password)
    if rabbitmq.connect():
		rabbitmq.send_message(my_exchange, my_routing_key, message='开始了')
    else:
        print("Failed to connect to RabbitMQ.")

基于exchangs交换机的消费者

from MyRabbitMQ import RabbitMQ

if __name__ == '__main__':
    print('RabbitMQ消费')
    my_host = '127.0.0.1'
    my_username = 'guest'
    my_password = 'guest'
    my_queue = 'hello'
    my_exchange = 'BBB'
    my_routing_key = 'hello'


    def callback(channel, method, properties, body):
        print("Received message: %s" % body.decode('utf-8'))
        channel.basic_ack(delivery_tag=method.delivery_tag)


    rabbitmq = RabbitMQ(my_host, 5672, my_username, my_password)
    if rabbitmq.connect():
        rabbitmq.create_queue(my_queue)
        # rabbitmq.send_message(my_exchange, my_routing_key, message='开始了')
        rabbitmq.bind_queue(my_queue, my_exchange, my_routing_key)
        rabbitmq.consume_messages(my_queue, callback)
    else:
        print("Failed to connect to RabbitMQ.")

4.多次执行RabiitMQ_生产

在这里插入图片描述

四、多消息队列

import pika
import random
from retry import retry
def on_message(channel, method_frame, header_frame, body)
    print(method_frame.delivery_tag)
    print(body)
    print(header_frame)
    channel.basic_ack(delivery_tag=method_frame.delivery_tag)

node1 = pika.URLParameters('amqp://node1')
node2 = pika.URLParameters('amqp://node2')
node3 = pika.URLParameters('amqp://node3')
all_endpoints = [node1, node2, node3]

@retry(pika.exceptions.AMQPConnectionError, delay=5, jitter(1, 3)
def consume():
    random.shuffle(all_endpoints)
    connection = pika.BlockingConnection(all_endpoints)
    channel = connection.channel()
    channel.basic_qos(prefetch_count=1)
    channel.queue_declare('recovery-example', durable=False, auto_delete=True)
    channel.basic_consume('recovery-example', on_message)
    try:
        channel.start_consuming()
    except KeyboardInterrupt:
        channel.stop_consuming()
        connection.close()
    except pika.excaptions.ConnectionClosedByBroker:
        continue
consume()

五、异常处理

from pika.exceptions import ChannelClosed
from pika.exceptions import ConnectionClosed

    try:
        mq.start_consuming_message()
    except ConnectionClosed as e:
        mq.clear()
        mq.reconnect(queue_oname, exchange, route_key, is_use_rabbitpy=1)
        mq.start_consuming_message()
    except ChannelClosed as e:
        mq.clear()
        mq.reconnect(queue_oname, exchange, route_key, is_use_rabbitpy=1)
        mq.start_consuming_message()

1. 死信队列

死信队列就是备份队列,rabbitMQ有,kafka还没有

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1309910.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Cannot find cache named ‘‘ for Builder Redis

当引入 Redissson 时&#xff0c;springCache 缓存机制失效 原因&#xff1a;springCache 默认使用本地缓存 Redisson 使用redis 缓存 最后都转成redis了。。。 总感觉哪不对 两者居然不共存

SU渲染受到电脑性能影响大吗?如何提高渲染速度

一般3d设计师们在进行设计工作前都需要提供一台高配电脑&#xff0c;那么你这知道su渲染对电脑要求高吗&#xff1f;电脑带不动su怎么解决&#xff1f;su对电脑什么配件要求高&#xff1f;今天这篇文章就详细为大家带来电脑硬件对su建模渲染的影响&#xff0c;以及su渲染慢怎么…

学习深度强化学习---第3部分----RL蒙特卡罗相关算法

文章目录 3.1节 蒙特卡罗法简介3.2节 蒙特卡罗策略评估3.3节 蒙特卡罗强化学习3.4节 异策略蒙特卡罗法 本部分视频所在地址&#xff1a;深度强化学习的理论与实践 3.1节 蒙特卡罗法简介 在其他学科中的蒙特卡罗法是一种抽样的方法。 如果状态转移概率是已知的&#xff0c;则是…

Axure的流程图/泳道图以及自定义元件库的使用

目录 1.ProcessOn的介绍 2.流程图以及泳道图的介绍 2.1流程图 2.2流程图的特点 2.3泳道图 2.4泳道图的特点 2.5流程图跟泳道图的优缺点 2.5.1优点&#xff1a; 2.5.2缺点&#xff1a; 2.6流程图的使用 2.7流程图的案列 2.8泳道图的使用 3.自定义元件库 4.门诊流程…

【Bug修复】秒杀服务器异常,轻松恢复网站访问--从防火墙到Docker服务的全面解析

&#xff08;秒杀方案&#xff09;服务器异常&#xff1a;connection is closed by foreign host… 月初部署了一个私人项目到服务器上&#xff0c;刚开始还能用&#xff0c;用了不到两天报了上面的错误&#xff1a;connection is closed by foreign host… &#x1f33a;问题描…

Echarts多图表动态更新示例

前端框架(html、echarts、jquery) <!DOCTYPE html> <html><head><meta charset"utf-8"><title>echarts多图表动态更新示例</title><script src"jquery.min.js"></script><script type"text/java…

java.lang.IllegalArgumentException: Could not resolve placeholder XXX‘ in value

问题描述 使用Springcloudalibaba的nacos作为配置中心&#xff0c;服务启动时报错&#xff1a; java.lang.IllegalArgumentException: Could not resolve placeholder XXX‘ in value java.lang.IllegalArgumentException: Param ‘serviceName’ is illegal, serviceName is …

差分进化算法DE

此算法是一种基于贪心的并行直接搜索算法。 1.过程 &#xff08;1&#xff09;初始化种群 NP个D维的参数向量(i1,2,...,NP)作为每一代G种群&#xff0c;种群规模必须>4 &#xff08;2&#xff09;变异 使用种群中两个不同向量来干扰一个现有向量&#xff0c;进行差分操…

生活是自己的,请尽情打扮,尽情可爱

端庄大气又尽显GAO级感 的明制汉服处处都是是惊喜 领口袖口拼接仿貂毛环保毛条 保暖又精致 袖子贴民族风珠片刺绣织带 门襟搭配金属子母扣 前胸欧根纱刺绣圆形布贴 每一处都是用心制作 红色喜庆&#xff0c;用来做拜年服来穿再合适不过啦

[C语言]大小端及整形输出问题

假设在一个32位little endian 的机器上运行下面的程序&#xff0c;结果是多少 ? 1.1先看以下三个程序 #include <stdio.h> int main() {long long a 1, b 2, c 3;printf("%lld %lld %lld\n", a, b, c); // 1 2 3printf("%d %d %d %d %d %d\n&quo…

AXB外呼系统怎样提高工作效率呼叫系统

AXB 外呼系统是一种帮助企业提高外呼效率的解决方案&#xff0c;它结合了自动拨号&#xff08;A&#xff09;和人工坐席&#xff08;X&#xff09;&#xff0c;使企业能够更快速、高效地与潜在客户进行沟通。以下是提高工作效率的一些方法&#xff1a; 预设任务和脚本&#xff…

【Linux】fork()函数详解

什么是fork&#xff1f; fork&#xff08;&#xff09;函数通过系统调用并创建 一个与原来进程几乎完全相同的进程 此进程叫做子进程&#xff0c;两个进程做一样的事 但初始参数或者传入的变量不同&#xff0c;两个 进程便可以做不同的事 fork的返回值 在父进程中&#xff0…

05进程间通信-学习笔记

进程间通信&#xff08;IPC&#xff09; 概念 进程信技术简称IPC,可以利用此技木让多个进程相传建消数据&#xff0c;有大量的进程间通信方案 pipe 匿名管道fifo 命名管简单理解&#xff0c;管道文件是一个指向内核管道缓冲区的指针&#xff0c;所有向管道文件读写的操作&am…

Mybatis 拦截器实现单数据源内多数据库切换

大家好&#xff0c;我是 方圆。物流的分拣业务在某些分拣场地只有一个数据源&#xff0c;因为数据量比较大&#xff0c;将所有数据存在一张表内查询速度慢&#xff0c;也为了做不同设备数据的分库管理&#xff0c;便在这个数据源内创建了多个不同库名但表完全相同的数据库&…

阿昌教你如何使用通义灵码

阿昌教你如何使用通义灵码 Hi&#xff0c;我是阿昌&#xff0c;今天教你如何使用通义灵码。 一、通义灵码是什么 在使用前&#xff0c;肯定要知道通义灵码是个啥东西&#xff1b; 通义灵码&#xff0c;是阿里云出品的一款基于通义大模型的智能编码辅助工具&#xff0c;提供…

三大维度解码剑南春“高质量发展”丨年度盘点

执笔 | 洪大大 编辑 | 扬 灵 2023年即将画上句点&#xff0c;当我们回首这一年为行业带来惊喜的品牌&#xff0c;剑南春是其中之一。 回顾剑南春今年一整年的动作&#xff0c;从新品频发到双节&#xff08;618、双11&#xff09;热销&#xff0c;从全国巡展到荣誉满载&…

经纬恒润以太网网关,智能时代网络通关

汽车产业新四化步伐持续加速&#xff0c;智能网联逐渐成为整车标配&#xff0c;随着近年来相关政策频出以及对网联需求和功能的深度挖掘与发展&#xff0c;中国本土市场及本土供应商在这场新浪潮中逐渐走向C位。经纬恒润深耕智能网联领域多年&#xff0c;先后推出四代网关产品&…

UE Niagara - Bean 制作闪电

开启Beam的四个前提条件 Jitter Position可以使得Bean弯曲&#xff0c;但是是有曲线的弯曲&#xff0c;没有硬度 所以得调这个 把该值设置为1 Mode改为Custom

程序员月经焦虑 :如何成为高级工程师

高级工程师意味着什么&#xff1f; ChatGPT的回复&#xff1a;高级工程师对编程语言、软件设计原则和开发方法有深刻的理解。 开发方法 开发方法学是旨在使团队有效的组织方法。这些对我们来说可能很无聊&#xff0c;但我们希望你在这方面有专业知识。 我已经对非敏捷开发方法…

学生上课犯困怎么办

当你作为学生上课犯困时&#xff0c;首先不要过于自责或沮丧&#xff0c;因为这是很常见的情况。以下是一些建议&#xff0c;帮助你克服这个问题&#xff0c;保持专注并提高学习效率。 保持良好的作息习惯 睡眠对于大脑的健康和功能至关重要。确保每晚获得足够的睡眠&#xff…