使用RabbitMQ实现微服务间的异步消息传递

news2024/11/5 22:52:33

使用RabbitMQ实现微服务间的异步消息传递

      • RabbitMQ简介
      • 安装RabbitMQ
        • 在Ubuntu上安装RabbitMQ
        • 在CentOS上安装RabbitMQ
      • 配置RabbitMQ
      • 创建微服务
        • 生产者服务
          • 安装依赖
          • 生产者代码
        • 消费者服务
          • 消费者代码
      • 运行微服务
      • 消息模式
        • 直接模式
          • 生产者代码
          • 消费者代码
        • 扇出模式
          • 生产者代码
          • 消费者代码
        • 主题模式
          • 生产者代码
          • 消费者代码
      • 高级特性
        • 持久化
          • 生产者代码
          • 消费者代码
        • 确认机制
          • 消费者代码
      • 监控和日志
        • 监控
        • 日志
      • 故障排除
      • 总结

在现代分布式系统中,微服务架构越来越受到欢迎。微服务之间需要进行高效、可靠的消息传递。RabbitMQ作为一个成熟的开源消息中间件,能够很好地满足这一需求。本文将详细介绍如何使用RabbitMQ实现微服务间的异步消息传递。

RabbitMQ简介

RabbitMQ是一个开源的消息代理和队列服务器,基于AMQP(Advanced Message Queuing Protocol)协议。它支持多种消息模式,如发布/订阅、路由、主题等。

安装RabbitMQ

RabbitMQ可以在多种操作系统上安装,包括Linux、macOS和Windows。
在Ubuntu上安装RabbitMQ
sudo apt-get update
sudo apt-get install rabbitmq-server
sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server
在CentOS上安装RabbitMQ
sudo yum install epel-release
sudo yum install rabbitmq-server
sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server

配置RabbitMQ

安装完成后,可以使用以下命令进行基本配置。
sudo rabbitmq-plugins enable rabbitmq_management
sudo systemctl restart rabbitmq-server
访问RabbitMQ管理界面:`http://localhost:15672`,默认用户名和密码都是`guest`。

创建微服务

我们将创建两个简单的微服务:生产者服务和消费者服务。
生产者服务
生产者服务负责发送消息到RabbitMQ。
安装依赖
pip install pika
生产者代码
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

message = 'Hello World!'
channel.basic_publish(exchange='', routing_key='hello', body=message)
print(f'Sent: {message}')
connection.close()
消费者服务
消费者服务负责从RabbitMQ接收消息。
消费者代码
import pika

def on_message_received(ch, method, properties, body):
    print(f'Received: {body.decode()}')

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_consume(queue='hello', auto_ack=True, on_message_callback=on_message_received)

print('Waiting for messages...')
channel.start_consuming()

运行微服务

先启动消费者服务,然后启动生产者服务。
# 启动消费者服务
python consumer.py

# 启动生产者服务
python producer.py

消息模式

RabbitMQ支持多种消息模式,包括直接模式、扇出模式、主题模式和头部模式。
直接模式
直接模式是最简单的模式,消息会被发送到指定的队列。
生产者代码
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='direct_queue')

message = 'Direct message'
channel.basic_publish(exchange='', routing_key='direct_queue', body=message)
print(f'Sent: {message}')
connection.close()
消费者代码
import pika

def on_message_received(ch, method, properties, body):
    print(f'Received: {body.decode()}')

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='direct_queue')

channel.basic_consume(queue='direct_queue', auto_ack=True, on_message_callback=on_message_received)

print('Waiting for messages...')
channel.start_consuming()
扇出模式
扇出模式将消息广播到所有绑定的队列。
生产者代码
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='fanout_exchange', exchange_type='fanout')

message = 'Fanout message'
channel.basic_publish(exchange='fanout_exchange', routing_key='', body=message)
print(f'Sent: {message}')
connection.close()
消费者代码
import pika

def on_message_received(ch, method, properties, body):
    print(f'Received: {body.decode()}')

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='fanout_exchange', exchange_type='fanout')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='fanout_exchange', queue=queue_name)

channel.basic_consume(queue=queue_name, auto_ack=True, on_message_callback=on_message_received)

print('Waiting for messages...')
channel.start_consuming()
主题模式
主题模式允许更复杂的路由规则。
生产者代码
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_exchange', exchange_type='topic')

routing_key = 'kern.critical'
message = 'Critical kernel message'
channel.basic_publish(exchange='topic_exchange', routing_key=routing_key, body=message)
print(f'Sent: {message}')
connection.close()
消费者代码
import pika

def on_message_received(ch, method, properties, body):
    print(f'Received: {body.decode()}')

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_exchange', exchange_type='topic')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

binding_keys = ['*.critical', 'kern.*']
for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_exchange', queue=queue_name, routing_key=binding_key)

channel.basic_consume(queue=queue_name, auto_ack=True, on_message_callback=on_message_received)

print('Waiting for messages...')
channel.start_consuming()

高级特性

RabbitMQ还支持许多高级特性,如持久化、确认机制、死信队列等。
持久化
可以配置消息和队列的持久化,以确保消息不会因为RabbitMQ服务器重启而丢失。
生产者代码
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='durable_queue', durable=True)

message = 'Persistent message'
channel.basic_publish(exchange='', routing_key='durable_queue', body=message, properties=pika.BasicProperties(delivery_mode=2))
print(f'Sent: {message}')
connection.close()
消费者代码
import pika

def on_message_received(ch, method, properties, body):
    print(f'Received: {body.decode()}')
    ch.basic_ack(delivery_tag=method.delivery_tag)

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='durable_queue', durable=True)

channel.basic_consume(queue='durable_queue', on_message_callback=on_message_received)

print('Waiting for messages...')
channel.start_consuming()
确认机制
可以配置消费者在处理完消息后发送确认,以确保消息不会被重复处理。
消费者代码
import pika

def on_message_received(ch, method, properties, body):
    print(f'Received: {body.decode()}')
    ch.basic_ack(delivery_tag=method.delivery_tag)

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='ack_queue')

channel.basic_consume(queue='ack_queue', on_message_callback=on_message_received)

print('Waiting for messages...')
channel.start_consuming()

监控和日志

RabbitMQ提供了丰富的监控和日志功能,可以用于监控和调试。

监控
可以通过RabbitMQ管理界面监控队列、交换机和连接等。

日志
可以通过配置文件调整日志级别和输出方式。

故障排除

如果RabbitMQ配置出现问题,可以使用以下命令进行故障排除。

sudo rabbitmqctl status
sudo journalctl -u rabbitmq-server

总结

通过本文,你已经学习了如何使用RabbitMQ实现微服务间的异步消息传递。我们介绍了RabbitMQ的基本概念、安装方法、配置RabbitMQ、创建微服务、消息模式(直接模式、扇出模式、主题模式)、高级特性(持久化、确认机制)、监控和日志、故障排除等内容。掌握了这些知识,将有助于你在实际工作中更好地利用RabbitMQ来构建高效、可靠的微服务架构。
RabbitMQ管理界面示例
RabbitMQ消息传递模式示例

使用RabbitMQ可以显著提高微服务间消息传递的可靠性和效率。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2229398.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数字教学的创新引擎:构建数字教学知识库

在教育行业,数字化转型正成为推动教育现代化的重要力量。数字教学知识库作为这一转型的核心组成部分,对于整合教育资源、提升教学质量、促进教育公平具有重要意义。本文将探讨数字教学知识库的构建策略、应用价值,并分析其在教育行业的深远影…

【ArcGISPro】制作简单的ArcGISPro-AI助手

【python】AI Navigator的使用及搭建本机大模型_anaconda ai navigator-CSDN博客 【Python】AI Navigator对话流式输出_ai大模型流式输出效果(打字效果) python-CSDN博客 【C#】调用本机AI大模型流式返回_怎么实现调用本地大模型时实现流式输出-CSDN博客 【ArcGISPro】宣布推…

web文件包含include

php伪协议 在 PHP 中,伪协议(Pseudo Protocols) 也被称为 流包装器,这些伪协议以 php:// 开头,后面跟着一些参数,用于指定 要执行的操作 或 需要访问的资源。 伪协议表明这些协议并不是一个 真实的外部协议…

【力扣打卡系列】验证二叉搜索树

坚持按题型打卡&刷&梳理力扣算法题系列,语言为go,Day17 验证二叉搜索树 题目描述 解题思路 前序遍历:先访问节点值,再访问左右子树有效二叉搜索树的定义 节点的左子树只包含小于当前节点的数节点的右子树只包含大于当前节…

【天线&空中农业】蜜蜂检测系统源码&数据集全套:改进yolo11-ASF

改进yolo11-dysample等200全套创新点大全:蜜蜂检测系统源码&数据集全套 1.图片效果展示 项目来源 人工智能促进会 2024.10.30 注意:由于项目一直在更新迭代,上面“1.图片效果展示”和“2.视频效果展示”展示的系统图片或者视频可…

hive将包含逗号的字段拆分为多列

目录 一、概述 二、行动 1.准备数据 2.数据清洗 3.substring_index函数 4.split函数实现 一、概述 想将hive表中包含逗号的字段按逗号做分隔符进行分列操作 二、行动 1.准备数据 --1 select {1,2,3,4,5,6,7,8} as num_str --使用的数据2.数据清洗 --2 select num_s…

基于MPC控制器的混合动力EMS能量管理系统simulink建模与仿真

目录 1.课题概述 2.系统仿真结果 3.核心程序与模型 4.系统原理简介 4.1 MPC 控制器原理 4.2 能量管理模块 4.3 动力模型 5.完整工程文件 1.课题概述 整个系统可以划分为如下几个模块。 其中,能量管理模块其包括:MPC控制器模块,驱动扭…

11-Python基础编程之错误和异常

Python基础编程之错误和异常 概念错误异常 常见的系统异常异常的解决预防捕捉处理异常with语句 手动抛出异常自定义异常 概念 错误 可以通过代码进行修复; 异常 需要提前考虑,设定限制条件;不能通过代码进行修复; 常见的系…

使用 Elastic、OpenLLMetry 和 OpenTelemetry 跟踪 LangChain 应用程序

作者:来自 Elastic Bahubali Shetti Langchain 应用程序的使用正在增长。构建基于 RAG 的应用程序、简单的 AI 助手等的能力正在成为常态。观察这些应用程序更加困难。考虑到现有的各种选项,本博客展示了如何将 OpenTelemetry 检测与 OpenLLMetry 结合使…

b站小土堆PyTorch视频学习笔记(二)

Dataloader:提供不同类型的数据集;为后面的网络提供不同的数据形式 Dataset:提供一种方式去获取数据及其label(标签) 主要实现以下两个功能: {如何获取每一个数据及其lable;告诉我们总共有多少数据} fr…

操作系统实验记录

实验零:虚拟机安装 一、安装vmware虚拟机 与vmware匹配搜索结果 - 考拉软件 (rjctx.com),下载17.5.1版本即可下载后对照教程安装 二、下载iso虚拟驱动 搜索清华大学镜像网站,点击再搜ubuntu,下载这个4.1GB的iso文件安装后打开vmware虚拟机 三、配置vmware虚拟机 右键管…

【YOLO 系列】基于YOLO的行人口罩检测系统【python源码+Pyqt5界面+数据集+训练代码】

前言 在当前全球公共卫生形势下,戴口罩已成为预防呼吸道疾病传播的重要措施。然而,确保每个人都遵守这一规定仍然是一项挑战。为了提高公共场合的口罩佩戴合规性,我们开发了基于YOLO V8的行人口罩检测系统。该系统利用深度学习技术&#xff…

SpringBoot节奏:Web音乐网站构建手册

2相关技术 2.1 MYSQL数据库 MySQL是一个真正的多用户、多线程SQL数据库服务器。 是基于SQL的客户/服务器模式的关系数据库管理系统,它的有点有有功能强大、使用简单、管理方便、安全可靠性高、运行速度快、多线程、跨平台性、完全网络化、稳定性等,非常…

《Python游戏编程入门》注-第4章2

《Python游戏编程入门》的“4.2.2 键盘事件”中介绍了通过键盘事件来监听键盘按键的方法。 1 键盘事件 玩家点击键盘中某个按键实际上包含了两个动作:点击按键和释放按键,也就是按键按下和松开。按键按下的对应的事件是KEYDOWN,按键松开对应…

ifuse挂载后,在python代码中访问iOS沙盒目录获取app日志

上一次使用pymobiledevice3,在python代码中访问app的沙盒目录并分析业务日志,在使用过程中发现,在获取app日志的时候速度很慢,执行时间很长,需要30-61秒,所以这次尝试使用libimobiledevic和ifuse&#xff0…

Vue2指令原理手写

文件结构 index.js /** Author: RealRoad* Date: 2024-10-31 17:13:50* LastEditors: Do not edit* LastEditTime: 2024-10-31 17:15:57* Description: * FilePath: \project_10_08\vite-project\src\testVue\index.js*/ import Vue from ./Vue.js window.VueVue Vue.js imp…

信而泰防火墙安全测试解决方案:为网络安全保驾护航

在当今数字化时代,网络安全至关重要。防火墙作为网络安全的第一道防线,其性能和可靠性直接影响到网络的安全性。信而泰提供的防火墙安全测试解决方案,旨在通过全面的测试流程,确保防火墙能够高效、准确地执行其安全任务。 针对防火…

我在命令行下剪辑视频

是的,你不需要格式工厂,你也不需要会声会影,更不需要爱剪辑这些莫名其妙的流氓软件,命令行下视频处理,包括剪辑,转码,提取,合成,缩放,字幕,特效等…

攻防世界5

cgpwn2 发现是32位文件 打开main函数发现hello双击进入 这里我们发现栈溢出了,双击name 我们发现了bss 发现这题的system有点问题,后门需要我们自己输入,刚好有bss我们直接用它 知道system的地址 exp: from pwn import * context(oslinux,a…

vue项目中如何在路由变化时增加一个进度条

在 Vue.js 项目中,使用路由(如 Vue Router)时,为了提升用户体验,你可能会想要在路由变化时显示一个进度条。这可以通过多种方式实现,其中一种流行的做法是使用第三方库,如 vue-loading-bar 或 n…