Spring Boot整合RabbitMQ之发布与订阅模式

news2025/1/22 17:50:24

RabbitMQ的模式中,常用的模式有:简单模式,发布与订阅模式,工作模式,路由模式,主题模式。简单模式不太会运用到工作中,我们可以使用 RabbitMQ 的发布订阅模式,实现:

  1. 用户发布动态,其“粉丝”收到其发布动态的消息
  2. 用户下订单,库存模块、支付模块等收到消息并处理
  3. 等等

1. 创建RabbitMQ的生产者

创建一个springboot项目,项目创建idea的默认创建springboot项目

然后进行rabbitMq的整合过程

1.1 引入rabbitmq的jar包

在项目的pom.xml中引入rabbitmq的jar包,详情如下:

<dependency>	
	<groupId>org.springframework.boot</groupId>	
	<artifactId>spring-boot-starter-amqp</artifactId>	
	<version>2.3.12.RELEASE</version>
</dependency>

1.2 配置文件中添加配置

在项目的配置文件中添加rabbitmq的相关配置,配置详情如下:

server:
  port: 10001
 
# rabbitMq 相关配置
spring:
  application:
    name: springboot-rabbitmq-s1
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    virtual-host: /
    username: guest
    password: guest

guest是rabbitmq的默认密码,不需要重新设置,不过在生产中为了安全是需要改密码的
1.3 创建配置类

配置类用于将队列和交换机进行绑定,该操作也可以使用rabbitmq的管理界面操作,并不是一定需要的步骤。配置类详情如下:

package com.study.rabbitmq.config;
 
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
/**
 * @Author alen
 * @DATE 2022/6/7 23:50
 */
@Configuration
public class RabbitMQConfig {
 
    public static final String EXCHANGE_NAME = "fanout-order-exchange";
    public static final String SMS_QUEUE = "sms-fanout-queue";
    public static final String EMAIL_QUEUE = "email-fanout-queue";
    public static final String WECHAT_QUEUE = "wechat-fanout-queue";
 
    /**
     * 1.
     * 声明交换机
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        /**
         * FanoutExchange的参数说明:
         * 1. 交换机名称
         * 2. 是否持久化 true:持久化,交换机一直保留, false:不持久化,用完就删除
         * 3. 是否自动删除 false:不自动删除, true:自动删除
         */
        return new FanoutExchange(EXCHANGE_NAME, true, false);
    }
 
    /**
     * 2.
     * 声明队列
     * @return
     */
    @Bean
    public Queue smsQueue() {
        /**
         * Queue构造函数参数说明
         * 1. 队列名
         * 2. 是否持久化 true:持久化, false:不持久化
         */
        return new Queue(SMS_QUEUE, true);
    }
 
    @Bean
    public Queue emailQueue() {
        return new Queue(EMAIL_QUEUE, true);
    }
 
    @Bean
    public Queue wechatQueue() {
        return new Queue(WECHAT_QUEUE, true);
    }
 
    /**
     * 3.
     * 队列与交换机绑定
     */
    @Bean
    public Binding smsBinding() {
        return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
    }
 
    @Bean
    public Binding emailBinding() {
        return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
    }
 
    @Bean
    public Binding wechatBinding() {
        return BindingBuilder.bind(wechatQueue()).to(fanoutExchange());
    }
}

1.4 模拟发送消息

创建一个service类,在类中进行rabbitMq消息的发送,源码如下:

package com.study.rabbitmq.service;
 
import cn.hutool.json.JSONUtil;
import com.study.rabbitmq.entity.Order;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
 
/**
 * @Author alen
 * @DATE 2022/6/7 23:31
 */
@Service
@Slf4j
public class OrderService {
 
    @Autowired
    private RabbitTemplate rabbitTemplate;
 
    public void createOrder(Order order) {
        String body = JSONUtil.toJsonStr(order);
        log.info("订单信息:{}", body);
        //交换机名称
        String exchangeName = "fanout-order-exchange";
        //路由key 由于我们实现的是fanout模式(广播模式),不需要路由key,所有的消费者都可以进行监听和消费
        String routeKey = "";
        //发送mq消息
        rabbitTemplate.convertAndSend(exchangeName, routeKey, body);
        log.info("rabbitmq发送广播模式消息成功。。。");
    }
}

使用单元测试模拟消息发送,单元测试详情如下:

package com.study.rabbitmq;
 
import com.study.rabbitmq.entity.Order;
import com.study.rabbitmq.service.OrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
 
import java.util.UUID;
 
@SpringBootTest
class SpringbootRabbitmqS1ApplicationTests {
 
    @Autowired
    private OrderService orderService;
 
    @Test
    void contextLoads() {
        for (long i = 1; i < 50; i++) {
            Order order = new Order();
            order.setRequestId(i);
            order.setUserId(i);
            order.setOrderNo(UUID.randomUUID().toString());
            order.setAmount(10L);
            order.setGoodsNum(1);
            order.setTotalAmount(10L);
            orderService.createOrder(order);
        }
    }
}

发送完后,我们可以在rabbitMq的管理后台看到已经发送成功的消息,效果如下:

可见消息已经全部发送完毕,因为前面的三个队列都是绑定在同一个交换机上,所以三个队列都会收到消息。

2. 创建RabbitMQ的消费者

创建消费者服务S2,项目结构参考生产者项目结构,然后进行消息消费的相关代码的实现,实现过程如下

2.1 引入RabbitMQ的jar包

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
	<version>2.3.12.RELEASE</version>
</dependency>

2.2 在项目配置文件中添加配置

配置详情如下

server:
  port: 10002
 
# rabbitmq 相关配置
spring:
  application:
    name: springboot-rabbitmq-s2
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    virtual-host: /
    username: admin
    password: admin

2.3 创建MQ消息消费者

消费者类详情如下

package com.study.rabbitmq.service;
 
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
 
/**
 * @Author alen
 * @DATE 2022/6/8 8:15
 */
@Slf4j
@Service
@RabbitListener(queues = {"email-fanout-queue"}) //监听队列
public class FanoutEmailConsumer {
 
    @RabbitHandler
    public void emailMessage(String message) {
        log.info("Email fanout --接收到消息:{}", message);
    }
}

启动消费者项目,消费效果如下:

登录rabbitMq后台查看队列的消息情况如下

到此,似乎感觉整合得很顺利,没啥毛病。但是实际的运用中,以上演示过程中忽略了两个很重要的问题,一是我如何知道消息被顺利的发送到了队列,因为实际的工作中,不大可能每个消息都去rabbitmq管理后台查看。二是如果消息在消费的过程中出现了异常导致消息丢失,不重要的数据还好,如果是支付类的消息呢?就会产生严重的线上问题。那么这两个问题需要怎么处理呢?其实rabbitmq提供了消息发送结果回调和消息消费手动确认来处理这两个问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/903189.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

KUST_LI计算机视觉实验室服务器安装与管理

第一步&#xff1a;安装 Linux-Ubuntu系统 系统语言设置为英文 ENGLISH&#xff0c;防止系统 BUG&#xff1b;选择-清除整个磁盘并安装系统&#xff1b;设置用户名和密码&#xff0c;实验室统一其余全部默认设置 开机后设置磁盘挂载 在系统设置中找到 desk 打开&#xff0c;…

YOLOv7训练结果解析

前言&#xff1a; 已训练完模型&#xff0c;且把结果下载下来&#xff0c;以下某一次id识别训练结果为例&#xff0c;如下图所示。 注&#xff1a;YOLOv7每次train完成&#xff08;如果没有中途退出&#xff09;都会在run目录下生成expX目录&#xff08;X代表生成结果次数 第一…

CentOS7.9手工配置静态网络流程

进入网卡配置文件 vim /etc/sysconfig/network-scripts/ifcfg-ens33 配置 TYPE"Ethernet" PROXY_METHOD"none" BROWSER_ONLY"no" BOOTPROTO"static" //static 配置静态网络 DEFROUTE"yes" IPV4_FAILURE_FATAL"no…

电脑找不到MSVCR120.dll怎么办,三个完美解决方法

在计算机领域&#xff0c;MSVCR120.dll是一个非常重要的动态链接库文件。它是Microsoft Visual C 2010 Redistributable Package的一部分&#xff0c;用于支持某些程序的运行。然而&#xff0c;在某些情况下&#xff0c;我们可能会遇到MSVCR120.dll丢失的问题。在这篇文章中&am…

(详解踩坑)GIT版本回滚git stash、git reset、git reset --hard、git revert

目录 背景 一、&#xff08;git log、git reflog&#xff09;查看git提交日志及命令历史 1.1 git log&#xff08;提交日志&#xff09; 1.2 git reflog&#xff08;命令历史&#xff09; 二、git reset&#xff08;回退到指定的版本&#xff0c;并且保留更改&#xff09; …

LEADTOOLS Imaging SDK Crack

LEADTOOLS Imaging SDK Crack 高级开发人员工具包包括ActiveX和WPF/XAML控件。 LEADTOOLS Imaging SDK为文件格式导入/导出、图像压缩、图像显示和效果、颜色转换、图像处理、TWAIN扫描、图像通用对话框、数据库集成、打印和互联网提供了基本和高级的彩色图像功能。 LEADTOOLS …

【数据分享】2013-2023年全国370个城市逐月空气质量数据(Excel格式/无需转发)

空气质量的好坏反映了空气污染程度&#xff0c;它是依据空气中污染物浓度的高低来判断的。在各项涉及城市环境的研究与实际项目中&#xff0c;城市空气质量都是一个十分重要的指标。那么&#xff0c;去哪里能获取到各城市空气质量的历史数据呢&#xff1f; 之前我们分享了2014…

前端vue自定义柱形图 选中更改柱形图颜色及文字标注颜色

随着技术的发展&#xff0c;开发的复杂度也越来越高&#xff0c;传统开发方式将一个系统做成了整块应用&#xff0c;经常出现的情况就是一个小小的改动或者一个小功能的增加可能会引起整体逻辑的修改&#xff0c;造成牵一发而动全身。 通过组件化开发&#xff0c;可以有效实现…

Dubbo高手之路3,Dubbo服务消费详解

目录 引言1. 介绍 Dubbo 服务消费的详解的目的和背景2. 概述 Dubbo 服务消费的过程和核心概念 一、Dubbo 服务消费的基础知识1. Dubbo 服务消费的架构和流程2. Dubbo 服务消费的基本配置和使用方法 二、Dubbo 服务消费的注册与发现1. Dubbo 服务消费的注册中心和发布中心的基本…

09_Redlock算法和底层源码分析

Redlock算法和底层源码分析 一、当前代码为8.0版接上一步 自研分布式锁的重点&#xff1a; 按照juc里面Lock接口规范进行编写lock加锁关键逻辑 加锁&#xff1a;在redis中&#xff0c;加锁实际上是给key设置一个值&#xff0c;为避免死锁&#xff0c;并给key一个过期时间自旋…

01.Django入门

1.创建项目 1.1基于终端创建Django项目 打开终端进入文件路径&#xff08;打算将项目放在哪个目录&#xff0c;就进入哪个目录&#xff09; E:\learning\python\Django 执行命令创建项目 F:\Anaconda3\envs\pythonWeb\Scripts\django-admin.exe&#xff08;Django-admin.exe所…

protobuf+netty自定义编码解码

protobufnetty自定义编 项目背景 protobufnetty自定义编码解码 比如心跳协议&#xff0c;客户端请求的协议是10001&#xff0c;在java端如何解码&#xff0c;心跳返回协议如何编码&#xff0c;将协议号带过去 // 心跳包 //10001 message c2s_heartbeat { }//10002 message …

【数据分享】2006-2021年我国省份级别的市政公用设施建设固定资产投资相关指标(30多项指标)

《中国城市建设统计年鉴》中细致地统计了我国城市市政公用设施建设与发展情况&#xff0c;在之前的文章中&#xff0c;我们分享过基于2006-2021年《中国城市建设统计年鉴》整理的2006—2021年我国省份级别的市政设施水平相关指标、2006-2021年我国省份级别的各类建设用地面积数…

02.案列项目Demo

1.创建项目 1. 创建项目 用pycharm 选择对应的编译器&#xff0c;输入对应的文件名&#xff0c;点击创建项目。删除默认外层生成的template和DIRS 配置项&#xff1a; 2. 创建App 创建appo1的命令&#xff1a; python manage.py startapp app01 如果使用pycharm>tool>…

Qt项目报错:Cannot run compiler ‘clang++‘. /bin/sh: 1: clang++: not found

在一台旧电脑上装了深度系统&#xff0c;装了Qt&#xff0c;导入项目&#xff0c; build提示 clang找不到&#xff1a; Project ERROR: Cannot run compiler clang. Output: /bin/sh: 1: clang: not found Maybe you forgot to setup the environment? Error while parsing …

​8th参考文献:[8]许少辉.乡村振兴战略下传统村落文化旅游设计[M]北京:中国建筑工业出版社,2022.

​&#xff18;th参考文献&#xff1a;&#xff3b;&#xff18;&#xff3d;许少辉&#xff0e;乡村振兴战略下传统村落文化旅游设计&#xff3b;&#xff2d;&#xff3d;北京&#xff1a;中国建筑工业出版社&#xff0c;&#xff12;&#xff10;&#xff12;&#xff12;&a…

2022数学建模国赛C题官网展示论文C155论文复现

2022数学建模国赛C题C155论文复现 1.内容比对2.第一问第二小问复现代码2.1 页表合并2.2 数据的正态性检验2.2.1数据的正态性检验效果图 2.3不满足正态性&#xff0c;进行中心化对数比变换2.3.1 核心步骤-inf用0值替换2.3.2中心化对数比变换效果图 2.4描述性统计2.5 箱线图绘制 …

回归预测 | MATLAB实现TSO-LSSVM金枪鱼群算法优化最小二乘支持向量机多输入单输出回归预测(多指标,多图)

回归预测 | MATLAB实现TSO-LSSVM金枪鱼群算法优化最小二乘支持向量机多输入单输出回归预测&#xff08;多指标&#xff0c;多图&#xff09; 目录 回归预测 | MATLAB实现TSO-LSSVM金枪鱼群算法优化最小二乘支持向量机多输入单输出回归预测&#xff08;多指标&#xff0c;多图&a…

五种网络IO模型

五种模型出自&#xff1a;RFC标准。可参考&#xff1a; 《UNIX网络编程-卷一》 6.2 很多程序员是从高级语言的网络编程/文件操作了解到nio&#xff0c;继而了解到五种io模型的&#xff1b; 这五种io模型不止用于网络io “阻塞与****系统调用”是怎么回事&#xff1f;我知道了线…