安装
服务端
https://www.jianshu.com/p/2fb6d5ac17b9
客户端
pip install pika
文档
https://rabbitmq.com/tutorials/tutorial-one-python.html
简单示例
生产者
import pika
import rabbitmq_study.settings as settings
credentials = pika.PlainCredentials(settings.USER, settings.PASSWORD)
connection = pika.BlockingConnection(pika.ConnectionParameters(settings.HOST, credentials=credentials))
channel = connection.channel()
# 创建一个队列
channel.queue_declare(queue="rb_queue_01")
channel.basic_publish(exchange="",
routing_key='rb_queue_01',
body='hello world3')
connection.close()
消费者
import pika
import rabbitmq_study.settings as settings
credentials = pika.PlainCredentials(settings.USER, settings.PASSWORD)
connection = pika.BlockingConnection(pika.ConnectionParameters(settings.HOST, credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue="rb_queue_01")
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(queue='rb_queue_01', auto_ack=True, on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
ack
消息确认机制:确认消费了,移除队列
消费者
import pika
import rabbitmq_study.settings as settings
credentials = pika.PlainCredentials(settings.USER, settings.PASSWORD)
connection = pika.BlockingConnection(pika.ConnectionParameters(settings.HOST, credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue="rb_queue_01")
def callback(ch, method, properties, body):
print("消费者接受任务:==> %r" % body )
# int('ffff')
# ack回传,告诉服务端已经取走了
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue='rb_queue_01', on_message_callback=callback, auto_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
durable
服务端挂掉了, 声明队列时做持久化
import pika
import rabbitmq_study.settings as settings
credentials = pika.PlainCredentials(settings.USER, settings.PASSWORD)
connection = pika.BlockingConnection(pika.ConnectionParameters(settings.HOST, credentials=credentials))
channel = connection.channel()
# 创建一个队列 ,声明持久化
channel.queue_declare(queue="rb_queue_02", durable=True)
channel.basic_publish(exchange="",
routing_key='rb_queue_01',
body='ack',
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
))
connection.close()
闲置的就消费
消费者
import pika
import rabbitmq_study.settings as settings
credentials = pika.PlainCredentials(settings.USER, settings.PASSWORD)
connection = pika.BlockingConnection(pika.ConnectionParameters(settings.HOST, credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue="rb_queue_02")
def callback(ch, method, properties, body):
print("消费者接受任务:==> %r" % body)
# int('ffff')
# ack回传,告诉服务端已经取走了
ch.basic_ack(delivery_tag=method.delivery_tag)
# Work Queues
# 闲置的就派发
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rb_queue_01', on_message_callback=callback, auto_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
发布订阅
https://rabbitmq.com/tutorials/tutorial-three-python.html
type = fanout
消费者(订阅者)
import pika
import rabbitmq_study.settings as settings
credentials = pika.PlainCredentials(settings.USER, settings.PASSWORD)
connection = pika.BlockingConnection(pika.ConnectionParameters(settings.HOST, credentials=credentials))
channel = connection.channel()
# 创建交换机,m1交换机名字
# fanout:将消息发送给所有的队列
channel.exchange_declare(exchange="m1", exchange_type="fanout")
# 声明队列,随机生成一个队列
result = channel.queue_declare(queue="", exclusive=True)
queue_name = result.method.queue
# 绑定exchange和队列
channel.queue_bind(exchange='m1', queue=queue_name)
# 接收消息
def callback(ch, method, properties, body):
print("消费者接受任务:==> %r" % body)
# int('ffff')
# ack回传,告诉服务端已经取走了
ch.basic_ack(delivery_tag=method.delivery_tag)
# 闲置的就派发
# channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
发布者(消费者)
import pika
import rabbitmq_study.settings as settings
credentials = pika.PlainCredentials(settings.USER, settings.PASSWORD)
connection = pika.BlockingConnection(pika.ConnectionParameters(settings.HOST, credentials=credentials))
channel = connection.channel()
# 发布者先运行就创建
channel.exchange_declare(exchange='m1', exchange_type='fanout')
# routing_key为空,通过交换机而不是直接交付,一个发布到多个
channel.basic_publish(exchange="m1", routing_key="", body="hhsaf")
type = direct
根据关键字不同交给不同的人
direct_发布者
import pika
import rabbitmq_study.settings as settings
credentials = pika.PlainCredentials(settings.USER, settings.PASSWORD)
connection = pika.BlockingConnection(pika.ConnectionParameters(settings.HOST, credentials=credentials))
channel = connection.channel()
# 发布者先运行就创建
channel.exchange_declare(exchange='m2', exchange_type='direct')
# routing_key为空,通过交换机而不是直接交付,一个发布到多个
channel.basic_publish(exchange="m2", routing_key="r2", body="r2")
direct_订阅者1
import pika
import rabbitmq_study.settings as settings
credentials = pika.PlainCredentials(settings.USER, settings.PASSWORD)
connection = pika.BlockingConnection(pika.ConnectionParameters(settings.HOST, credentials=credentials))
channel = connection.channel()
# 创建交换机,m1交换机名字
# fanout:将消息发送给所有的队列
channel.exchange_declare(exchange="m2", exchange_type="direct")
# 声明队列,随机生成一个队列
result = channel.queue_declare(queue="", exclusive=True)
queue_name = result.method.queue
# 绑定exchange和队列, 加上routingkey, 交换机收到来自发布者的消息+routing_key, 然后通过消费者绑定的routing_key进行分发
channel.queue_bind(exchange='m2', queue=queue_name, routing_key="r1")
channel.queue_bind(exchange='m2', queue=queue_name, routing_key="r2")
# 接收消息
def callback(ch, method, properties, body):
print("消费者接受任务:==> %r" % body)
# int('ffff')
# ack回传,告诉服务端已经取走了
ch.basic_ack(delivery_tag=method.delivery_tag)
# 闲置的就派发
# channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
direct_订阅者2
import pika
import rabbitmq_study.settings as settings
credentials = pika.PlainCredentials(settings.USER, settings.PASSWORD)
connection = pika.BlockingConnection(pika.ConnectionParameters(settings.HOST, credentials=credentials))
channel = connection.channel()
# 创建交换机,m1交换机名字
# fanout:将消息发送给所有的队列
channel.exchange_declare(exchange="m2", exchange_type="direct")
# 声明队列,随机生成一个队列
result = channel.queue_declare(queue="", exclusive=True)
queue_name = result.method.queue
# 绑定exchange和队列, 加上routingkey, 交换机收到来自发布者的消息+routing_key, 然后通过消费者绑定的routing_key进行分发
channel.queue_bind(exchange='m2', queue=queue_name, routing_key="r1")
# 接收消息
def callback(ch, method, properties, body):
print("消费者接受任务:==> %r" % body)
# int('ffff')
# ack回传,告诉服务端已经取走了
ch.basic_ack(delivery_tag=method.delivery_tag)
# 闲置的就派发
# channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
type = topic
路由模糊匹配
匹配规则
-
*:匹配一个单词(单词)
old.alex: old.* 可以 old.# 可以
-
#:匹配多个单词
old.alex.ll: old.* 不可以 old.# 可以
和direct一样,只不过改改模式和routingkey
RPC
远程过程调用
待补充
_consuming()
#### type = topic
[外链图片转存中...(img-14qQSLwu-1682781289112)]
路由模糊匹配
##### 匹配规则
- *:匹配一个单词(单词)
old.alex:
old.* 可以
old.# 可以
- #:匹配多个单词
old.alex.ll:
old.* 不可以
old.# 可以
和direct一样,只不过改改模式和routingkey
### RPC
**远程过程调用**
待补充