RabbitMQ 在 Java 和 Spring Boot 中的应用详解

news2024/11/17 13:26:41

1. 引言

RabbitMQ 是一种开源消息代理软件,广泛用于实现消息传递、队列管理和负载均衡。它通过实现 AMQP(Advanced Message Queuing Protocol)来支持复杂的消息传递模式,是常见的消息中间件之一。本文将深入探讨如何在纯 Java 环境和 Spring Boot 项目中使用 RabbitMQ,并涵盖详细的配置参数、常见方法以及实际应用案例。

2. RabbitMQ 在 Java 中的应用

2.1 基本概念回顾

RabbitMQ 的基础架构由以下几个核心组件组成:

  • Broker:消息中转站,用于接收和分发消息。
  • Exchange:用于决定消息路由规则。
  • Queue:存储消息的缓冲区。
  • Binding:定义 Exchange 和 Queue 之间的绑定关系。
  • Routing Key:用于匹配消息和队列的规则。
  • Consumer/Producer:消息消费者和生产者。
2.2 使用 Java 原生库连接 RabbitMQ

要在 Java 中使用 RabbitMQ,通常使用 com.rabbitmq.client 提供的 Java 客户端库。以下是简单的 Java 消息生产者和消费者示例。

2.2.1 环境准备

  • 引入 RabbitMQ 客户端依赖:

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.14.2</version>
    </dependency>
    

2.2.2 编写生产者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    private final static String QUEUE_NAME = "test_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            String message = "Hello RabbitMQ!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

2.2.3 编写消费者

import com.rabbitmq.client.*;

public class Consumer {
    private final static String QUEUE_NAME = "test_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
    }
}

2.2.4 参数说明

  • queueDeclare
    

    方法的参数解释:

    • queue:队列的名称。
    • durable:队列是否持久化。
    • exclusive:是否只在本连接中可用。
    • autoDelete:当消费者断开连接时是否自动删除队列。
    • arguments:其他可选参数。

3. RabbitMQ 在 Spring Boot 中的应用

3.1 Spring Boot 与 RabbitMQ 整合

Spring Boot 通过 spring-boot-starter-amqp 提供了对 RabbitMQ 的开箱即用支持,使开发者能够更轻松地集成和配置消息队列。

3.1.1 添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

3.1.2 配置 RabbitMQ: 在 application.propertiesapplication.yml 中配置 RabbitMQ 连接信息。

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

3.1.3 创建消息生产者

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class RabbitMQProducer {
    @Autowired
    private AmqpTemplate amqpTemplate;

    public void sendMessage(String exchange, String routingKey, String message) {
        amqpTemplate.convertAndSend(exchange, routingKey, message);
        System.out.println("Message Sent: " + message);
    }
}

3.1.4 创建消息消费者

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class RabbitMQConsumer {
    @RabbitListener(queues = "test_queue")
    public void receiveMessage(String message) {
        System.out.println("Message Received: " + message);
    }
}
3.2 配置与调优
  • 配置自定义连接工厂

    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class RabbitMQConfig {
        @Bean
        public ConnectionFactory connectionFactory() {
            CachingConnectionFactory factory = new CachingConnectionFactory("localhost");
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("/");
            return factory;
        }
    }
    
  • 消费者确认模式: 配置手动消息确认,以确保消息不会在消费过程中丢失。

    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    

    代码实现

    @RabbitListener(queues = "test_queue")
    public void receiveMessage(Message message, Channel channel) throws IOException {
        try {
            String msg = new String(message.getBody(), "UTF-8");
            System.out.println("Message Received: " + msg);
            // 手动确认消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            // 处理失败时拒绝消息
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }
    

4. 常见方法和配置详解

4.1 参数详解
  • prefetchCount:控制每个消费者在确认消息之前能收到的最大消息数。适用于限流。
  • durable:持久化设置,确保消息和队列在服务器重启时不会丢失。
  • TTL(Time-To-Live):配置消息或队列的过期时间。
  • DLX(Dead Letter Exchange):死信交换器,用于处理无法被消费的消息。
4.2 高级配置示例

配置死信交换器和队列:

@Bean
public Queue mainQueue() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-dead-letter-exchange", "dlx.exchange");
    return new Queue("main_queue", true, false, false, args);
}

@Bean
public Exchange dlxExchange() {
    return new DirectExchange("dlx.exchange");
}

@Bean
public Queue deadLetterQueue() {
    return new Queue("dlx_queue");
}

5. 在实际项目中的优化和实践

在实际项目中,RabbitMQ 的使用不仅限于简单的消息传递,更重要的是优化系统性能、增强稳定性、提升可维护性。以下是一些实践和优化技巧,帮助开发者在项目中更高效地使用 RabbitMQ。

5.1 使用异步消息提高系统性能

在微服务架构和高并发场景中,同步调用往往会导致系统响应速度变慢。通过引入异步消息处理,开发者可以解耦服务,提高系统的响应速度和吞吐量。

5.1.1 引入异步消息处理的优势

  • 非阻塞处理:请求不会因为等待其他服务响应而停滞,释放线程以处理更多请求。
  • 提高吞吐量:使用异步消息队列可以缓冲大量请求,避免高峰期过载。
  • 解耦服务:通过异步消息传递,服务之间不直接依赖,使其更易于维护和扩展。

5.1.2 结合 Spring Boot 的 @Async 注解: Spring Boot 提供了方便的异步调用支持,通过简单的配置即可实现异步方法的执行。

示例:使用 @Async 实现异步调用

  1. 配置异步支持

    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.annotation.EnableAsync;
    
    @Configuration
    @EnableAsync
    public class AsyncConfig {
    }
    
  2. 实现异步消息发送

    import org.springframework.scheduling.annotation.Async;
    import org.springframework.stereotype.Service;
    
    @Service
    public class MessageService {
        @Async
        public void sendAsyncMessage(String exchange, String routingKey, String message) {
            // 假设 amqpTemplate 已经通过 @Autowired 注入
            amqpTemplate.convertAndSend(exchange, routingKey, message);
            System.out.println("Message sent asynchronously: " + message);
        }
    }
    
  3. 调用异步方法: 在服务中调用 sendAsyncMessage(),该方法将在独立的线程中执行,不会阻塞主线程。

5.1.3 配置线程池: 默认情况下,Spring 使用 SimpleAsyncTaskExecutor,这在生产环境中可能不够高效。可以自定义线程池来提高性能和可控性。

自定义线程池配置

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

@Configuration
public class AsyncConfig {
    @Bean(name = "taskExecutor")
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(50);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("RabbitMQ-Executor-");
        executor.initialize();
        return executor;
    }
}
5.2 监控和日志

监控 RabbitMQ 和相关服务的状态是维护系统稳定性的重要环节。通过监控和日志,可以及时发现问题并采取相应措施。

5.2.1 使用 Spring Boot Actuator: Spring Boot Actuator 提供了全面的监控功能,包括应用程序的健康检查、度量和审计。

启用 Actuator 监控: 在 application.properties 中添加以下配置:

management.endpoints.web.exposure.include=health,metrics,info

查看 RabbitMQ 健康状态: 在 Spring Boot 中集成 RabbitMQ 后,Actuator 会显示与其连接状态相关的监控信息。通过访问 /actuator/health,可以获取 RabbitMQ 连接的健康状态。

5.2.2 RabbitMQ Management Plugin: RabbitMQ 自带的 Management Plugin 提供了图形化的用户界面,用于查看队列、交换器、连接和通道的状态。

启用插件

rabbitmq-plugins enable rabbitmq_management

启用后可以通过 http://localhost:15672 访问界面,默认的用户名和密码均为 guest

5.2.3 集成日志工具: 通过日志工具(如 Logback 和 SLF4J),可以记录 RabbitMQ 消息的发送和接收情况,便于审计和问题排查。

Logback 配置示例: 在 logback-spring.xml 中添加日志配置,以记录与 RabbitMQ 相关的操作。

<configuration>
    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n</pattern>
        </encoder>
    </appender>
    
    <logger name="org.springframework.amqp" level="DEBUG"/>
    <root level="INFO">
        <appender-ref ref="CONSOLE"/>
    </root>
</configuration>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2242186.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

“南海明珠”-黄岩岛(民主礁)领海基线WebGIS绘制实战

目录 前言 一、关于岛屿的基点位置 1、领海基点 二、基点坐标的转换 1、最底层的左边转换 2、单个经纬度坐标点转换 3、完整的转换 三、基于天地图进行WebGIS展示 1、领海基点的可视化 2、重要城市距离计算 四、总结 前言 南海明珠黄岩岛&#xff0c;这座位于南海的…

19.UE5道具掉落

2-21 道具掉落&#xff0c;回血、回蓝、升级提升伤害_哔哩哔哩_bilibili 目录 1.道具的创建&#xff0c;道具功能的实现 2.随机掉落 1.道具的创建&#xff0c;道具功能的实现 新建Actor蓝图&#xff0c;并命名为道具总类&#xff0c;添加一个Niagara粒子组件和一个碰撞箱bo…

Cartographer激光雷达slam -20241116

Cartographer Cartographer代码结构 cartographer&#xff1a;负责处理来自雷达、IMU和里程计的数据并基于这些数据进行地图的构建&#xff0c;是cartographer理论的底层实现cartographer_ros&#xff1a;基于ros的通信机制获取传感器的数据并将它们转换成cartographer中定义…

node.js学习笔记-Window下MongoDB数据库安装(二)

一、介绍 MongoDB 是一个基于分布式文件存储的开源数据库系统&#xff0c;在当前的软件开发和数据存储领域中应用广泛&#xff0c;以下是对 MongoDB 的详细介绍&#xff1a; 文档型数据库&#xff1a;MongoDB 以 BSON&#xff08;Binary JSON&#xff09;格式存储数据&#x…

STM32G4的数模转换器(DAC)的应用

目录 概述 1 DAC模块介绍 2 STM32Cube配置参数 2.1 参数配置 2.2 项目架构 3 代码实现 3.1 接口函数 3.2 功能函数 3.3 波形源代码 4 DAC功能测试 4.1 测试方法介绍 4.2 波形测试 概述 本文主要介绍如何使用STM32G4的DAC模块功能&#xff0c;笔者使用STM32Cube工具…

【论文复现】轻松利用自适应特征融合实现去雾

&#x1f4dd;个人主页&#x1f339;&#xff1a;Eternity._ &#x1f339;&#x1f339;期待您的关注 &#x1f339;&#x1f339; ❀ 智慧医疗 介绍创新点网络结构特征提取阶段自适应融合阶段图像重建阶段上下文增强模块CEM特征融合模块AFM 结果分析 提示 论文题目&#xff1…

常用在汽车PKE无钥匙进入系统的高度集成SOC芯片:CSM2433

CSM2433是一款集成2.4GHz频段发射器、125KHz接收器和8位RISC&#xff08;精简指令集&#xff09;MCU的SOC芯片&#xff0c;用在汽车PKE无钥匙进入系统里。 什么是汽车PKE无钥匙进入系统&#xff1f; 无钥匙进入系统具有无钥匙进入并且启动的功能&#xff0c;英文名称是PKE&…

《TCP/IP网络编程》学习笔记 | Chapter 11:进程间通信

《TCP/IP网络编程》学习笔记 | Chapter 11&#xff1a;进程间通信 《TCP/IP网络编程》学习笔记 | Chapter 11&#xff1a;进程间通信进程间通信的基本概念通过管道实现进程间通信通过管道进行进程间双向通信 运用进程间通信习题&#xff08;1&#xff09;什么是进程间通信&…

计算机网络各层设备总结归纳(更新ing)

计算机网络按照OSI&#xff08;开放式系统互联&#xff09;模型分为七层&#xff0c;每一层都有其特定的功能和对应的网络设备。以下是各层对应的设备&#xff1a; 1. 物理层&#xff08;Physical Layer) 设备&#xff1a;中继器&#xff08;Repeater&#xff09;、集线器…

在kile 5中一个新工程的创建

这两天博主学习到了在kile5中创建一个工程&#xff0c;当然博主不会忘了小伙伴们的&#xff0c;这就和你们分享。 本次创建以STM32F103C8为例 创建过程&#xff1a; 1首先创建文件 名字随意&#xff0c;但也不要太随意&#xff0c;因为是外国软件&#xff0c;所以多少对中文…

AI写作(十)发展趋势与展望(10/10)

一、AI 写作的崛起之势 在当今科技飞速发展的时代&#xff0c;AI 写作如同一颗耀眼的新星&#xff0c;迅速崛起并在多个领域展现出强大的力量。 随着人工智能技术的不断进步&#xff0c;AI 写作在内容创作领域发挥着越来越重要的作用。据统计&#xff0c;目前已有众多企业开始…

Javascript垃圾回收机制-运行机制(大厂内部培训版本)

前言 计算机基本组成&#xff1a; 我们编写的软件首先读取到内存&#xff0c;用于提供给 CPU 进行运算处理。 内存的读取和释放&#xff0c;决定了程序性能。 冯诺依曼结构 解释和编译 这两个概念怎么理解呢。 编译相当于事先已经完成了可以直接用。好比去饭店吃饭点完上…

大数据技术之Hive:还是SQL好用

虽说 MapReduce 简化了大数据编程的难度&#xff0c;但是如果每来一个需求都要写一个 MapReduce 代码&#xff0c;那岂不是太麻烦了。尤其是在全民“CRM”的2000年代&#xff0c;对于像数据分析师已经习惯使用SQL进行分析和统计的工程师&#xff0c;让他们去 MapReduce 编程还是…

使用 Grafana api 查询 Datasource 数据

一、使用grafana 的api 接口 官方API 二、生成Api key 点击 Administration -》Users and accss -》Service accounts 进入页面 点击Add service account 创建 service account 点击Add service account token 点击 Generate token , 就可以生成 api key 了 三、进入grafana…

OceanBase 闪回查询

前言 在OB中&#xff0c;drop表可以通过 回收站 或者 以往的备份恢复来还原单表。当delete数据时&#xff0c;由于delete操作的对象不会进入回收站&#xff0c;此时需要通过闪回查询功能查看delete的数据&#xff0c;以便后续恢复 本次实验版本为 OceanBase 4.2.1.8&#xff0…

vue2 动态路由的实现

概述 一般情况下&#xff0c;路由都是前端约定好的&#xff0c;但是每当项目发布上线&#xff0c;或者客户需求新的页面的时候&#xff0c;都需要做出路由改变。这样运维就可以现场支持&#xff0c;方便做出可操作的中户中台&#xff0c;来管理我们的中心项目登录及权限&#x…

华为云前台展示公网访问需要购买EIP,EIP流量走向

华为云前台网络&#xff08;VPC,安全组&#xff0c;EIP&#xff09; 1.EIP网段是从哪里划分的&#xff1f; 管理员在后台Service_OM已设置 Service_OM-网络资源-外部网络-创建外部网络基本信息&#xff1a;配置参数&#xff1a;*名称 public*网络类型 LOCAL 不带标签 类似开…

树状数组+概率论,ABC380G - Another Shuffle Window

目录 一、题目 1、题目描述 2、输入输出 2.1输入 2.2输出 3、原题链接 二、解题报告 1、思路分析 2、复杂度 3、代码详解 一、题目 1、题目描述 2、输入输出 2.1输入 2.2输出 3、原题链接 G - Another Shuffle Window 二、解题报告 1、思路分析 不难用树状数组计…

LSTM(长短期记忆网络)详解

1️⃣ LSTM介绍 标准的RNN存在梯度消失和梯度爆炸问题&#xff0c;无法捕捉长期依赖关系。那么如何理解这个长期依赖关系呢&#xff1f; 例如&#xff0c;有一个语言模型基于先前的词来预测下一个词&#xff0c;我们有一句话 “the clouds are in the sky”&#xff0c;基于&…

麒麟nginx配置

一、配置负载均衡 配置麒麟的yum源 vim /etc/yum.repos.d/kylin_aarch64.repo Copy 删除原来内容&#xff0c;写入如下yum源 [ks10-adv-os] name Kylin Linux Advanced Server 10 - Os baseurl http://update.cs2c.com.cn:8080/NS/V10/V10SP2/os/adv/lic/base/aarch64/ …