原文作者:我辈李想
版权声明:文章原创,转载时请务必加上原文超链接、作者信息和本声明。
文章目录
- 前言
- 一、基础的生产者消费者
- 二、ACK消息确认机制
- 三、类的写法
- 1.新建MyRabbitMQ.py文件
- 2.基础RabiitMQ
- 3.发布订阅
- 4.多次执行RabiitMQ_生产
- 四、多消息队列
- 五、异常处理
- 1. 死信队列
前言
Rabbitmq消息队列,Windows安装RabbitMQ教程
一、基础的生产者消费者
Rabbitmq和kafka消息队列都是一样的,使用时需要生产者和消费者。
# !/usr/bin/env python
import pika
# ######################### 生产者 #########################
# 如果设置密码,那么就需要加以下一句话配置用户名与密码
credentials = pika.PlainCredentials("root", "123")
# host:ip地址 credentials:链接凭证
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost', credentials=credentials))
# 声明一个channel,类似数据库打开一个链接
channel = connection.channel()
# 创建一个队列,队列名称叫做hello
channel.queue_declare(queue='hello')
# 向hello这个队列里发送一个Hello World! exchange:如果当做一个普通队列,就为空
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
消费者
# !/usr/bin/env python
import pika
# ########################## 消费者 ##########################
# 如果设置密码,那么就需要加以下一句话配置用户名与密码
credentials = pika.PlainCredentials("root", "123")
# host:ip地址 credentials:链接凭证
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost', credentials=credentials))
channel = connection.channel()
# channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
# 取一个就关掉的方法
channel.stop_consuming()
# 去hello队列里拿数据,一但有数据,就执行callback
channel.basic_consume(callback, queue='hello', no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
# 开始去取得意思【表示一直去队列中取会夯住】注意可以去一个就关掉
channel.start_consuming()
二、ACK消息确认机制
ACK机制是消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将次消息从队列中删除。
生产者
# !/usr/bin/env python
import pika
# ######################### 生产者 #########################
# 如果设置密码,那么就需要加以下一句话配置用户名与密码
credentials = pika.PlainCredentials("root", "123")
# host:ip地址 credentials:链接凭证
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost', credentials=credentials))
# 声明一个channel,类似数据库打开一个链接
channel = connection.channel()
# 创建一个队列,队列名称叫做hello
channel.queue_declare(queue='hello')
# 向hello这个队列里发送一个Hello World! exchange:如果当做一个普通队列,就为空
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
消费者
# !/usr/bin/env python
import pika
# ########################## 消费者 ##########################
# 如果设置密码,那么就需要加以下一句话配置用户名与密码
credentials = pika.PlainCredentials("root", "123")
# host:ip地址 credentials:链接凭证
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost', credentials=credentials))
channel = connection.channel()
# channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
# 取值做确认工作
ch.basic_ack(delivery_tag=method.deliver_tag)
# 去hello队列里拿数据,一但有数据,就执行callback,
# no_ack=Flask必须在取值时做确认工作,否则值不会被取出
channel.basic_consume(callback, queue='hello', no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
# 开始去取得意思【表示一直去队列中取会夯住】注意可以去一个就关掉
channel.start_consuming()
三、类的写法
这个类使用 pika 库进行与 RabbitMQ 的通信。当你使用 send_message() 或 receive_message() 、consume_messages方法时,Channel 对象必须是打开的。如果没有连接或者通道没有打开,这些方法将引发 ValueError 异常。
1.新建MyRabbitMQ.py文件
文件包含rabbitmq的类,类中包含连接到RabbitMQ,并在连接对象上创建一个管道,然后就可以使用send_message()
和receive_message()
方法、consume_messages发送和接收消息,接收消息会调用回调方法。
下面是一个带有消费回调的完整 RabbitMQ 类
import pika
import time
class RabbitMQ:
def __init__(self, host, port, username, password):
self.host = host
self.port = port
self.username = username
self.password = password
self.connection = None
self.channel = None
def connect(self, timeout=10):
credentials = pika.PlainCredentials(self.username, self.password)
parameters = pika.ConnectionParameters(host=self.host,
port=self.port,
credentials=credentials)
start_time = time.time()
while time.time() - start_time < timeout:
try:
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()
return True
except pika.exceptions.AMQPConnectionError:
time.sleep(1)
return False
def send_message(self, exchange, routing_key, message):
try:
self.channel.basic_publish(exchange=exchange,
routing_key=routing_key,
body=message,
properties=pika.BasicProperties(delivery_mode=2))
except AttributeError:
raise ValueError("Channel is not open. Call connect() before send_message().")
def receive_message(self, queue, auto_ack=False):
try:
method_frame, properties, body = self.channel.basic_get(queue=queue, auto_ack=auto_ack)
if method_frame:
return body.decode('utf-8')
else:
return None
except AttributeError:
raise ValueError("Channel is not open. Call connect() before receive_message().")
def consume_messages(self, queue, callback):
try:
self.channel.basic_consume(queue=queue, on_message_callback=callback, auto_ack=True)
self.channel.start_consuming()
except AttributeError:
raise ValueError("Channel is not open. Call connect() before consume_messages().")
def create_queue(self, name):
try:
self.channel.queue_declare(queue=name, durable=True)
except AttributeError:
raise ValueError("Channel is not open. Call connect() before create_queue().")
def bind_queue(self, queue, exchange, routing_key):
try:
self.channel.queue_bind(queue=queue, exchange=exchange, routing_key=routing_key)
except AttributeError:
raise ValueError("Channel is not open. Call connect() before bind_queue().")
def close(self):
try:
self.connection.close()
except AttributeError:
raise ValueError("Connection is not open. Call connect() before close().")
2.基础RabiitMQ
基于队列_生产
创建RabiitMQ_生产.py文件,内容如下:
from MyRabbitMQ import RabbitMQ
if __name__ == '__main__':
print('RabbitMQ生产')
my_host = '127.0.0.1'
my_username = 'guest'
my_password = 'guest'
my_queue = 'hello'
my_exchange = 'BBB'
my_routing_key = 'hello'
rabbitmq = RabbitMQ(my_host, 5672, my_username, my_password)
if rabbitmq.connect():
rabbitmq.create_queue(my_queue)
rabbitmq.send_message('', my_queue, message='开始了')
else:
print("Failed to connect to RabbitMQ.")
基于队列_消费
from MyRabbitMQ import RabbitMQ
if __name__ == '__main__':
print('RabbitMQ消费')
my_host = '127.0.0.1'
my_username = 'guest'
my_password = 'guest'
my_queue = 'hello'
my_exchange = 'BBB'
my_routing_key = 'hello'
def callback(channel, method, properties, body):
print("Received message: %s" % body.decode('utf-8'))
channel.basic_ack(delivery_tag=method.delivery_tag)
rabbitmq = RabbitMQ(my_host, 5672, my_username, my_password)
if rabbitmq.connect():
rabbitmq.create_queue(my_queue)
rabbitmq.consume_messages(my_queue, callback)
else:
print("Failed to connect to RabbitMQ.")
在此例中,当一个新的消息从名为 my_queue
的队列中接收时,回调函数 callback
将被调用并打印消息内容。
注意:如果你的回调函数需要执行较复杂的操作(例如长时间运行或使用多线程),则你应该确保它是线程安全的,并且在操作完成后调用 ch.basic_ack
,这样 RabbitMQ 就知道消息已经被处理并可以将其从队列中删除。
3.发布订阅
基于exchangs交换机的生产者
from MyRabbitMQ import RabbitMQ
if __name__ == '__main__':
print('RabbitMQ消费')
my_host = '127.0.0.1'
my_username = 'guest'
my_password = 'guest'
my_queue = 'hello'
my_exchange = 'BBB'
my_routing_key = 'hello'
rabbitmq = RabbitMQ(my_host, 5672, my_username, my_password)
if rabbitmq.connect():
rabbitmq.send_message(my_exchange, my_routing_key, message='开始了')
else:
print("Failed to connect to RabbitMQ.")
基于exchangs交换机的消费者
from MyRabbitMQ import RabbitMQ
if __name__ == '__main__':
print('RabbitMQ消费')
my_host = '127.0.0.1'
my_username = 'guest'
my_password = 'guest'
my_queue = 'hello'
my_exchange = 'BBB'
my_routing_key = 'hello'
def callback(channel, method, properties, body):
print("Received message: %s" % body.decode('utf-8'))
channel.basic_ack(delivery_tag=method.delivery_tag)
rabbitmq = RabbitMQ(my_host, 5672, my_username, my_password)
if rabbitmq.connect():
rabbitmq.create_queue(my_queue)
# rabbitmq.send_message(my_exchange, my_routing_key, message='开始了')
rabbitmq.bind_queue(my_queue, my_exchange, my_routing_key)
rabbitmq.consume_messages(my_queue, callback)
else:
print("Failed to connect to RabbitMQ.")
4.多次执行RabiitMQ_生产
四、多消息队列
import pika
import random
from retry import retry
def on_message(channel, method_frame, header_frame, body)
print(method_frame.delivery_tag)
print(body)
print(header_frame)
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
node1 = pika.URLParameters('amqp://node1')
node2 = pika.URLParameters('amqp://node2')
node3 = pika.URLParameters('amqp://node3')
all_endpoints = [node1, node2, node3]
@retry(pika.exceptions.AMQPConnectionError, delay=5, jitter(1, 3)
def consume():
random.shuffle(all_endpoints)
connection = pika.BlockingConnection(all_endpoints)
channel = connection.channel()
channel.basic_qos(prefetch_count=1)
channel.queue_declare('recovery-example', durable=False, auto_delete=True)
channel.basic_consume('recovery-example', on_message)
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
connection.close()
except pika.excaptions.ConnectionClosedByBroker:
continue
consume()
五、异常处理
from pika.exceptions import ChannelClosed
from pika.exceptions import ConnectionClosed
try:
mq.start_consuming_message()
except ConnectionClosed as e:
mq.clear()
mq.reconnect(queue_oname, exchange, route_key, is_use_rabbitpy=1)
mq.start_consuming_message()
except ChannelClosed as e:
mq.clear()
mq.reconnect(queue_oname, exchange, route_key, is_use_rabbitpy=1)
mq.start_consuming_message()
1. 死信队列
死信队列就是备份队列,rabbitMQ有,kafka还没有