ActiveMQ、RabbitMQ 和 Kafka 在 Spring Boot 中的实战

news2024/11/15 2:00:06

在现代的微服务架构和分布式系统中,消息队列 是一种常见的异步通信工具。消息队列允许应用程序之间通过 生产者-消费者模型 进行松耦合、异步交互。在 Spring Boot 中,我们可以通过简单的配置来集成不同的消息队列系统,包括 ActiveMQRabbitMQKafka。本文将重点介绍它们的实战案例及使用时需要注意的地方。
在这里插入图片描述

一、Spring Boot 集成 ActiveMQ

1. ActiveMQ 概述

ActiveMQ 是一个开源、支持 JMS(Java Message Service)的消息中间件。它支持点对点(Queue)和发布/订阅(Topic)模式,是 Spring Boot 常用的消息队列之一。

2. ActiveMQ 实战:生产者和消费者

依赖配置

pom.xml 中添加 ActiveMQ 的依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
配置 ActiveMQ 连接

application.properties 中配置 ActiveMQ 的连接地址:

spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
生产者代码示例
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

@Component
public class ActiveMQProducer {

    private final JmsTemplate jmsTemplate;

    public ActiveMQProducer(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    public void sendMessage(String queueName, String message) {
        jmsTemplate.convertAndSend(queueName, message);
        System.out.println("Message sent: " + message);
    }
}
消费者代码示例
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class ActiveMQConsumer {

    @JmsListener(destination = "testQueue")
    public void receiveMessage(String message) {
        System.out.println("Message received: " + message);
    }
}

3. 注意事项

  1. JMS 模式的选择:ActiveMQ 支持 点对点发布/订阅 两种模式。要根据场景选择合适的模式,比如订单处理适合点对点模式,而系统通知适合发布/订阅。
  2. 消息持久化:确保配置了持久化存储,尤其是当队列中消息量很大时,ActiveMQ 默认使用 KahaDB 存储,建议对其进行优化。

二、Spring Boot 集成 RabbitMQ

1. RabbitMQ 概述

RabbitMQ 是基于 AMQP(Advanced Message Queuing Protocol)的开源消息代理,广泛应用于微服务系统。RabbitMQ 提供了更复杂的消息路由功能,例如 交换机(Exchange)和 绑定(Binding)。

2. RabbitMQ 实战:生产者和消费者

依赖配置

pom.xml 中添加 RabbitMQ 的依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置 RabbitMQ 连接

application.properties 中配置 RabbitMQ 的连接地址:

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
生产者代码示例
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Component
public class RabbitMQProducer {

    private final RabbitTemplate rabbitTemplate;

    public RabbitMQProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendMessage(String exchange, String routingKey, String message) {
        rabbitTemplate.convertAndSend(exchange, routingKey, message);
        System.out.println("Message sent: " + message);
    }
}
消费者代码示例
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitMQConsumer {

    @RabbitListener(queues = "testQueue")
    public void receiveMessage(String message) {
        System.out.println("Message received: " + message);
    }
}

3. 注意事项

  1. 交换机和队列的绑定:RabbitMQ 提供了丰富的交换机类型,如 Direct、FanoutTopic。选择合适的交换机类型非常关键,例如 Direct 适合路由到特定队列,而 Fanout 适合广播消息到所有绑定队列。
  2. 消息确认机制:RabbitMQ 支持消息的 手动确认,确保消费者已经正确处理了消息,避免消息丢失。

三、Spring Boot 集成 Kafka

1. Kafka 概述

Kafka 是一个分布式的流处理平台,最初由 LinkedIn 开发,用于 实时数据流处理。与 ActiveMQ 和 RabbitMQ 不同,Kafka 主要用于处理 大规模的、持续的数据流,例如日志采集、消息传递等。

2. Kafka 实战:生产者和消费者

依赖配置

pom.xml 中添加 Kafka 的依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
配置 Kafka 连接

application.properties 中配置 Kafka 的连接地址:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
生产者代码示例
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class KafkaProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
        System.out.println("Message sent to topic " + topic + ": " + message);
    }
}
消费者代码示例
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumer {

    @KafkaListener(topics = "testTopic", groupId = "my-group")
    public void receiveMessage(String message) {
        System.out.println("Message received: " + message);
    }
}

3. 注意事项

  1. 分区与副本机制:Kafka 的分区机制允许数据被并行处理,提升吞吐量。合理规划 分区数副本数,可以提高数据的可靠性和吞吐量。
  2. 消费偏移管理:Kafka 消费者需要管理消费偏移(offset),确保在重启或发生故障时,能够从上次的位置继续消费。Spring Boot 提供了自动和手动管理偏移的选项,建议根据需求选择合适的策略。

四、丢消息的处理方案

在使用消息队列时,丢消息是一个常见的问题,通常发生在以下场景:

  1. 生产者发送消息失败:消息未能成功送到队列。
  2. 消息未持久化:队列宕机导致消息丢失。
  3. 消费者处理消息失败:消费者在处理消息时出错,未能确认消息。

1. 生产者发送失败的处理

在生产者发送消息时,可能会由于网络问题或队列不可用,导致消息未能成功发送。这时需要确保生产者具备 重试机制失败回调,保证消息最终能到达队列。

重试机制示例 (以 Kafka 为例):
kafkaTemplate.send(topic, message).addCallback(
    success -> System.out.println("Message sent successfully: " + message),
    failure -> {
        System.err.println("Message failed to send: " + message);
        // 可以在此进行重试逻辑或存储消息到数据库,后续处理
    }
);

注意事项

  • 重试机制:生产者可以通过配置重试策略,例如在 Kafka 中通过 retries 属性配置发送失败后的重试次数。
  • 备份存储:对于无法发送的消息,可以选择将其保存到数据库或日志文件中,以便后续重新发送。

2. 消息未持久化的处理

大多数消息队列(如 ActiveMQ、RabbitMQ、Kafka)都提供了 消息持久化 的功能。在配置消息队列时,必须确保消息被持久化存储在磁盘上,防止消息在队列宕机时丢失。

ActiveMQ 持久化配置示例:
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.in-memory=false
RabbitMQ 持久化配置示例:
@Bean
public Queue durableQueue() {
    return new Queue("testQueue", true); // 设置队列为持久化
}

注意事项

  • 消息的持久化:确保生产者发送的消息和队列都是持久化的,尤其是在高可靠性系统中。
  • 集群化部署:对于 RabbitMQ 和 Kafka 等分布式消息系统,建议使用集群部署来提高可用性,防止单点故障。

3. 消费者处理失败的处理

在消费者从队列接收到消息后,如果发生处理失败,需要有相应的机制确保消息不会丢失。最常用的策略是 手动确认 消息和 消息重试

RabbitMQ 消费者手动确认:
@RabbitListener(queues = "testQueue")
public void receiveMessage(Message message, Channel channel) throws IOException {
    try {
        // 处理消息逻辑
        processMessage(message);
        // 手动确认消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    } catch (Exception e) {
        // 处理失败,拒绝消息,消息可以重新入队或丢弃
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
    }
}

注意事项

  • 手动确认机制:确保消费者在处理完消息后才确认消费成功。如果处理失败,可以拒绝消息并重新入队,防止消息丢失。
  • 死信队列(DLQ):如果消息经过多次重试仍然无法成功处理,可以将其发送到死信队列,进行人工检查或报警。

五、分布式环境下的消息处理

在分布式环境中,消息队列扮演着关键的角色。消息的 可靠投递顺序保证幂等性处理 是分布式系统中消息处理的核心问题。

1. 消息的可靠投递

在分布式系统中,网络延迟、节点宕机等问题会影响消息的可靠投递,常见的解决方案有以下几点:

  • 消息确认机制:如 Kafka 中的 acks=all 确保消息被所有副本写入成功后,生产者才会认为消息发送成功。

    spring.kafka.producer.acks=all
    
  • 消息重试和补偿机制:当网络分区或队列不可用时,生产者和消费者都应具备 重试机制。此外,当消息经过多次重试后仍然失败,通常会选择通过 补偿机制(如重新发送、人工干预)来处理。

2. 顺序保证

在某些业务场景下,消息的处理顺序非常关键。例如,订单的创建、支付和发货步骤必须按照顺序进行处理。在分布式环境中保证消息的顺序处理可以通过以下方法:

  • 单分区队列:确保消息按顺序发送到同一个分区,这样可以保证消息的顺序性。例如在 Kafka 中,可以通过配置相同的 partition key 来保证顺序。

    kafkaTemplate.send(topic, key, message);
    
  • 消息的排序机制:如果不能使用单分区,可以通过在消息中附加时间戳或序列号,在消费者侧进行排序处理。

3. 消息的幂等性

在分布式系统中,由于网络抖动或超时,消息可能会被 重复消费。为了避免重复处理消息,消费者需要实现 幂等性,即对相同消息的多次处理只产生一次效果。

  • 消息 ID 去重:使用消息的唯一 ID 或业务主键来判断消息是否已经处理过。例如,可以使用数据库或缓存(如 Redis)存储已经处理过的消息 ID。

    if (!redisTemplate.hasKey(messageId)) {
        // 处理消息
        processMessage(message);
        // 将消息ID存入Redis,标记为已处理
        redisTemplate.opsForValue().set(messageId, "processed");
    }
    
  • 分布式事务:对于某些场景,可能需要使用 分布式事务 保证消息处理的一致性,确保生产者和消费者的操作要么全部成功,要么全部失败。可以使用 Kafka 的事务 APIRabbitMQ 的 Confirm 模式 实现。

4. 分布式消息队列架构中的常见问题

  1. 网络分区:在分布式系统中,网络分区是不可避免的。消息队列的设计要考虑如何处理网络分区导致的消息延迟或丢失。Kafka 提供了 副本机制 来处理这种情况,而 RabbitMQ 通过 集群模式 提高可靠性。

  2. 消息堆积:在高并发情况下,生产者可能会产生大量的消息,如果消费者处理能力不足,会导致消息堆积。解决这个问题的关键在于 合理的扩展 消费者数量,同时可以使用 流控机制 限制消息的生产速度。


总结

在 Spring Boot 框架下使用 ActiveMQ、RabbitMQ 和 Kafka 进行消息处理时,开发者需要重点关注 丢消息的处理、顺序保证、幂等性分布式环境中的可靠性问题。通过合理配置消息的持久化、确认机制和集群部署,我们可以大大提高系统的稳定性和可靠性。

  1. 丢消息的处理 依赖于生产者和消费者的 重试机制手动确认 以及 持久化配置
  2. 分布式环境下的消息处理 需要考虑 顺序性幂等性,同时应对网络分区和系统扩展等问题。

通过这些策略,消息队列在分布式架构中可以更加高效可靠地运作。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2152053.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

多层感知机paddle

多层感知机——paddle部分 本文部分为paddle框架以及部分理论分析&#xff0c;torch框架对应代码可见多层感知机 import paddle print("paddle version:",paddle.__version__)paddle version: 2.6.1多层感知机&#xff08;MLP&#xff0c;也称为神经网络&#xff0…

QEMU:模拟 ARM 大端字节序运行环境

文章目录 1. 前言2. ARM 大小端模拟测试2.1 裸机模拟测试2.1.1 大端模拟测试2.1.2 小端模拟测试 2.2 用户空间模拟测试2.2.1 大端模拟测试2.2.2 小端模拟测试 2.3 结论 3. 参考链接 1. 前言 限于作者能力水平&#xff0c;本文可能存在谬误&#xff0c;因此而给读者带来的损失&…

leetcode刷题3

文章目录 前言回文数1️⃣ 转成字符串2️⃣ 求出倒序数再比对 正则表达式匹配[hard]1️⃣ 动态规划 盛最多水的容器1️⃣ 遍历分类2️⃣ 双指针贪心 最长公共前缀1️⃣ 遍历&#xff08;zip解包&#xff09; 三数之和1️⃣ 双指针递归 最接近的三数之和1️⃣ 迭代一次双指针 电…

携手阿里云CEN:共创SD-WAN融合广域网

在9月19日举行的阿里云云栖大会上&#xff0c;犀思云作为SD-WAN领域的杰出代表及阿里云的SD-WAN重要合作伙伴&#xff0c;携手阿里云共同推出了创新的企业上云方案——Fusion WAN智连阿里云解决方案。这一创新方案不仅彰显了犀思云在SD-WAN技术领域的深厚积累&#xff0c;更体现…

前端web端项目运行的时候没有ip访问地址

我们发现 没有netWork 的地址 导致 团队内其他同学无法打开我们的地址 进行访问 在page.json 中的运行 指令中 添加 --host 记得加上空格 这样我们就可以看到这个地址了 团队其他同学 就可以访问我们这个地址了

Resnet50网络——口腔癌病变识别

一 数据准备 1.导入数据 import matplotlib.pyplot as plt import tensorflow as tf import warnings as w w.filterwarnings(ignore) # 支持中文 plt.rcParams[font.sans-serif] [SimHei] # 用来正常显示中文标签 plt.rcParams[axes.unicode_minus] False # 用来正常显示负…

2024华为杯研究生数学建模竞赛(研赛)选题建议+初步分析

难度&#xff1a;DE<C<F&#xff0c;开放度&#xff1a;CDE>F。 华为专项的题目&#xff08;A、B题&#xff09;暂不进行选题分析&#xff0c;不太建议大多数同学选择&#xff0c;对自己专业技能有很大自信的可以选择华为专项的题目。后续会直接更新A、B题思路&#…

计算机网络传输层---课后综合题

线路&#xff1a;TCP报文下放到物理层传输。 TCP报文段中&#xff0c;“序号”长度为32bit&#xff0c;为了让序列号不会循环&#xff0c;则最多能传输2^32B的数据&#xff0c;则最多能传输&#xff1a;2^32/1500B个报文 结果&#xff1a; 吞吐率一个周期内传输的数据/周期时间…

2024/9/19、20 数学20题

极大线性无关组&#xff1a;

基于C#+SQL Server2005(WinForm)图书管理系统

图书管理系统 一、 首先把数据库脚本贴出来(数据库名为library) USE [library] GO /****** Object: Table [dbo].[books] Script Date: 06/12/2016 11:27:12 ******/ SET ANSI_NULLS ON GO SET QUOTED_IDENTIFIER ON GO CREATE TABLE [dbo].[books]([bNum] [nvarchar](10…

Arthas sysprop(查看和修改JVM的系统属性)

文章目录 二、命令列表2.1 jvm相关命令2.1.4 sysprop&#xff08;查看和修改JVM的系统属性&#xff09;举例1&#xff1a;sysprop 查看所有系统属性举例2&#xff1a;sysprop java.version 查看单个属性&#xff0c;支持通过tab补全 二、命令列表 2.1 jvm相关命令 2.1.4 sysp…

STL-常用算法 遍历/查找/排序/拷贝和替换/算数生成/集合算法

STL常用算法 常用的遍历算法 for_each #define _CRT_SECURE_NO_WARNINGS #include<iostream> using namespace std; #include<vector> #include<algorithm>void myPrint(int v) {cout << v << " "; }class MyPrint { public:void op…

React学习笔记(三)——React 组件通讯

1. 组件通讯-概念 了解组件通讯的意义 大致步骤&#xff1a; 知道组件的特点知道组件通讯意义 具体内容&#xff1a; 组件的特点 组件是独立且封闭的单元&#xff0c;默认情况下&#xff0c;只能使用组件自己的数据在组件化过程中&#xff0c;通常会将一个完整的功能拆分成多…

cesium.js 入门到精通(5-2)

在cesium 的配置中 有一些参数 可以配置地图的显示 显示出 水的动态显示 山的效果 相当于一些动画显示的效果 var viewer new Cesium.Viewer("cesiumContainer", {infoBox: false,terrainProvider: await Cesium.createWorldTerrainAsync({requestWaterMask: tru…

【计算机网络】计算机网络基础二

&#x1f351;个人主页&#xff1a;Jupiter. &#x1f680; 所属专栏&#xff1a;Linux从入门到进阶 欢迎大家点赞收藏评论&#x1f60a; 目录 以太网的通信原理令牌环网的通信原理网络传输基本流程 数据包封装和分用 网络传输流程图 局域网通信&#xff08;同一个网段内的两台…

PY+MySQL(等先完成mysql的学习)

第一章&#xff1a;准备工作&#xff08;重点关于mysql&#xff09; win安装 下载&#xff1a; 网址&#xff1a;MySQL :: Download MySQL Community Server版本&#xff1a;我的是8.0&#xff0c;但是建议5.7 下载&#xff1a;安装&#xff0c;因为是zip文件所以直接解压就好了…

股价预测,非线性注意力更佳?

作者:老余捞鱼 原创不易,转载请标明出处及原作者。 写在前面的话: 本文探讨了在 transformer 模型中使用非线性注意力来预测股票价格的概念。我们讨论了黎曼空间和希尔伯特空间等非线性空间的数学基础,解释了为什么非线性建模可能是有利的,并提供了在代码中实现这种…

MySQL 主从复制部署与优化

文章目录 前言 在现代数据库管理中&#xff0c;MySQL 主从复制是一种关键技术&#xff0c;用于提高数据的可用性和性能。随着 Docker 容器技术的普及&#xff0c;利用 Docker 搭建 MySQL 主从复制环境已成为一种趋势&#xff0c;它提供了一种简便、高效且可扩展的解决方案。本…

828华为云征文|Flexus X实例Docker+Jenkins+gitee实现CI/CD自动化部署-解放你的双手~

目录 前言 实验步骤 环境准备 安装Portainer 拉取镜像 更换镜像源 启动容器 安装jenkins 拉取镜像 获取管理员密码 新建流水线项目 Portainer配置 gitee配置WebHooks 构建 修改代码&#xff0c;自动部署 前言 &#x1f680; 828 B2B企业节特惠来袭&#xff0c;…

Hadoop 常用生态组件

Hadoop核心组件 安装 Hadoop 时&#xff0c;通常会自动包含以下几个关键核心组件&#xff0c;特别是如果使用了完整的 Hadoop 发行版&#xff08;如 Apache Hadoop、Cloudera 或 Hortonworks 等&#xff09;。这些组件构成了 Hadoop 的核心&#xff1a; 1. HDFS&#xff08;H…