SpringBoot 整合 RabbitMQ (四十一)

news2024/10/7 15:27:00

二八佳人体似酥,腰间仗剑斩愚夫。虽然不见人头落,暗里教君骨髓枯。

上一章简单介绍了SpringBoot 实现 Web 版本控制 (四十),如果没有看过,请观看上一章

关于消息中间件 RabbitMQ, 可以看老蝴蝶之前的文章: https://blog.csdn.net/yjltx1234csdn/category_12130444.html

创建一个 普通的 Spring Boot Web 项目

整合 RabbitMQ

pom.xml 添加依赖


    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.11.RELEASE</version>
    </parent>
        

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--引入 amqp 即rabbitmq 的依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--RabbitMQ 依赖-->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
        </dependency>

    </dependencies>
        

application.yml 配置文件配置

#配置服务器端相应消息
server:
  port: 8088
  servlet:
    context-path: /Server
#配置rabbitmq的相关信息
spring:
  rabbitmq:
    host: 127.0.0.1  # 配置主机
    port: 5672  # 配置端口
    virtual-host: yjl   # 配置虚拟主机
    username: guest   # 配置用户名
    password: guest  # 配置密码
    connection-timeout: 15000
    # 配置回调
    publisher-confirm-type: correlated
#配置rabbit 队列,交换器,路由等相关信息
rabbit:
  fanout:
    exchange: fanout_logs
    queue1: debug_console
    queue2: debug_file
    queue3: debug_db
  direct:
    exchange: direct_logs
    queue1: debug_console
    queue2: debug_file
    queue3: debug_db
  topic:
    exchange: topic_logs
    queue1: topic_log_console
    queue2: topic_log_file
  ttl:
    x_exchange: x
    queue_a: QA
    queue_b: QB
    queue_c: QC
    y_dead_exchange: y
    y_dead_queue_d: QD
    delayed_exchange: delayed_exchange2
    delayed_queue: delayed.queue
    delayed_routing_key: delayed_routing
  confirm:
    # 确认
    exchange: confirm_exchange_1
    queue: confirm_queue
    routing-key: key1
    backup_exchange: backup_exchange
    backup_queue: backup_queue
    warn_queue: warn_queue    

项目结构

项目结构如下:

image.png

SendMessageService 为 生产者发送消息的接口服务。

RecieveMessageService 为 消费者接收到消息后,进行的业务操作流程。

SendMessageController 为生产者创建消息的 Controller 入口。

创建队列

手动在 RabbitMQ 上创建一个队列 debug_console, 如果不存在的话。

image.png

简单的生产者发送消息

    @Resource
    private SendMessageService sendMessageService;

    @RequestMapping("/queue")
    public String queue() {
        Integer randNum = (int) (Math.random() * 1000 + 1);
        sendMessageService.sendQueue(randNum);
        return "存储到队列中的数据是:" + randNum;

    }

    @RequestMapping("/work")
    public String work() {
        sendMessageService.sendWork();
        return "批量生成循环数字";
    }

往队列发送消息, 使用 RabbitTemplate rabbitTemplate (与 RedisTemplate, JdbcTemplate 形似)

SendMessageServiceImpl.java

@Service
public class SendMessageServiceImpl implements SendMessageService {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Value("${rabbit.direct.queue1}")
    private String queueName;


    // 最普通的.
    @Override
    public void sendQueue(Integer randNum) {
        // 只发送一条消息
        rabbitTemplate.convertAndSend(queueName, String.valueOf(randNum));
    }


    @Override
    public void sendWork() {
        for (int i = 0; i < 10; i++) {
            // 发送多条消息
            rabbitTemplate.convertAndSend(queueName, "第" + i + "条消息,消息内容是:" + i);
        }
    }
}

队列消息消费

ReceiveMessageServiceImpl.java

    @Override
    public void handlerMessage(String message) {
        log.info(">>>> 获取到消息 {},开始进行业务处理",message);
        // 接下来,就是具体的业务去处理这些消息了.
    }
@Component
@Slf4j
public class DirectMqConsumer {

    @Resource
    private ReceiveMessageService receiveMessageService;


    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue("${rabbit.direct.queue1}"),
                    exchange = @Exchange(type = "direct", name = "${rabbit.direct.exchange}"),
                    key = {"debug", "info", "warn", "error"}
            )
    })
    public void fanoutQueueConsumerConsole(String message) {
        log.info("控制台打印输出:" + message);
        receiveMessageService.handlerMessage("控制台打印输出 direct:" + message);
    }
}

验证

访问网址: http://localhost:8088/Server/send/queue

image.png

访问网址: http://localhost:8088/Server/send/work

image.png

普通的消息异步处理是完成了。 但重要的,应该是 Fanout, Direct 和 Topic 的主题处理.

Fanout 交换机消息配置

创建交换机,队列并绑定 FanoutConfig

package com.yjl.amqp.config.fanout;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

/**
 * Fanout 形式的 生产
 *
 * @author yuejianli
 * @date 2022-11-22
 */
@Component
public class FanoutConfig {

    @Value("${rabbit.fanout.queue1}")
    private String queue1;
    @Value("${rabbit.fanout.queue2}")
    private String queue2;

    @Value("${rabbit.fanout.exchange}")
    private String exchange;


    // 构建队列 Bean 和 Exchange Bean
    @Bean(value = "fanout_queue1")
    public Queue queue1() {
        return new Queue(queue1);
    }

    @Bean(value = "fanout_queue2")
    public Queue queue2() {
        return new Queue(queue2);
    }

    @Bean(value = "fanout_exchange")
    FanoutExchange fanoutExchange() {
        return new FanoutExchange(exchange);
    }


    //进行绑定
    @Bean
    Binding bindingFanoutExchange1(@Qualifier("fanout_queue1") Queue queue,
                                   @Qualifier("fanout_exchange") FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }

    @Bean
    Binding bindingFanoutExchange2(@Qualifier("fanout_queue2") Queue queue,
                                   @Qualifier("fanout_exchange") FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }

}

监听队列 FanoutMqConsumer

也可以使用 RabbitListener 进行绑定

package com.yjl.amqp.config.fanout;

import com.yjl.amqp.service.ReceiveMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * fanout 的消费
 *
 * @author yuejianli
 * @date 2022-11-22
 */
@Component
@Slf4j
public class FanoutMqConsumer {

    @Resource
    private ReceiveMessageService receiveMessageService;


    @RabbitListener(queues = {"${rabbit.fanout.queue1}", "${rabbit.fanout.queue2}"})
    public void fanoutQueueConsumer1An2(String message) {
        log.info("队列 fanout:" + message);
        receiveMessageService.handlerMessage("第一个消费者和第二个消费者获取消息 fanout:" + message);
    }

    // 也可以通过 RabbitListener 进行配置
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue("${rabbit.fanout.queue3}"),
                    exchange = @Exchange(type = "fanout", name = "${rabbit.fanout.exchange}"),
                    key = {}
            )
    })
    public void fanoutQueueConsumer3(String message) {
        log.info("第三个消费者获取消息 fanout:" + message);
        receiveMessageService.handlerMessage("第三个消费者获取消息 fanout:" + message);
    }
}

发送消息

SendMessageController.java

    @RequestMapping("/fanout")
    public String fanout() {
        sendMessageService.fanout();
        return "fanout生成消息";
    }

SendMessageServiceImpl.java

    @Override
    public void fanout() {
        for (int i = 1; i <= 5; i++) {
            rabbitTemplate.convertAndSend(fanoutExchange, "", "fanout 发送消息:" + i);
        }
    }

验证

输入网址: http://localhost:8088/Server/send/fanout

image.png

Direct 交换机消息配置

通过注解绑定和消费队列消息 DirectMqConsumer

ackage com.yjl.amqp.config.direct;

import com.yjl.amqp.service.ReceiveMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * 用途描述
 *
 * @author yuejianli
 * @date 2022-11-22
 */
@Component
@Slf4j
public class DirectMqConsumer {

    @Resource
    private ReceiveMessageService receiveMessageService;


    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue("${rabbit.direct.queue1}"),
                    exchange = @Exchange(type = "direct", name = "${rabbit.direct.exchange}"),
                    key = {"debug", "info", "warn", "error"}
            )
    })
    public void fanoutQueueConsumerConsole(String message) {
        log.info("控制台打印输出:" + message);
        receiveMessageService.handlerMessage("控制台打印输出 direct:" + message);
    }


    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue("${rabbit.direct.queue2}"),
                    exchange = @Exchange(type = "direct", name = "${rabbit.direct.exchange}"),
                    key = {"info", "warn", "error"}
            )
    })
    public void fanoutQueueConsumerFile(String message) {
        log.info("文件 打印输出:" + message);
        receiveMessageService.handlerMessage("文件打印输出 direct:" + message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue("${rabbit.direct.queue3}"),
                    exchange = @Exchange(type = "direct", name = "${rabbit.direct.exchange}"),
                    key = {"warn", "error"}
            )
    })
    public void fanoutQueueConsumerDb(String message) {
        log.info("Db 打印输出:" + message);
        receiveMessageService.handlerMessage("DB 打印输出 direct:" + message);
    }

}

发送消息

SendMessageController.java

  @RequestMapping("/direct")
    public String direct() {
        sendMessageService.direct();
        return "direct 生成消息";
    }

SendMessageServiceImpl.java

     @Override
    public void direct() {
        rabbitTemplate.convertAndSend(directExchange, "debug", "debug 消息");
        rabbitTemplate.convertAndSend(directExchange, "info", "info 消息");
        rabbitTemplate.convertAndSend(directExchange, "warn", "warn 消息");
        rabbitTemplate.convertAndSend(directExchange, "error", "error 消息");
    }

验证

输入网址: http://localhost:8088/Server/send/direct

image.png

Topic 交换机消息配置

创建交换机,队列并绑定 TopicConfig

package com.yjl.amqp.config.topic;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

/**
 * Topic 形式的 生产
 *
 * @author yuejianli
 * @date 2022-11-22
 */
@Component
public class TopicConfig {

    @Value("${rabbit.topic.queue1}")
    private String queue1;

    @Value("${rabbit.topic.exchange}")
    private String exchange;


    // 构建队列 Bean 和 Exchange Bean
    @Bean(value = "topic_queue1")
    public Queue queue1() {
        return new Queue(queue1);
    }

    @Bean(value = "topic_exchange")
    TopicExchange topicExchange() {
        return new TopicExchange(exchange);
    }


    //进行绑定
    @Bean
    Binding bindingTopicExchange(@Qualifier("topic_queue1") Queue queue,
                                 @Qualifier("topic_exchange") TopicExchange topicExchange) {
        return BindingBuilder.bind(queue).to(topicExchange)
                .with("*.orange.*");
    }

}

监听队列 TopicMqConsumer

package com.yjl.amqp.config.topic;

import com.yjl.amqp.service.ReceiveMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * topic 的队列配置
 *
 * @author yuejianli
 * @date 2022-11-22
 */
@Component
@Slf4j
public class TopicMqConsumer {

    @Resource
    private ReceiveMessageService receiveMessageService;

    @RabbitListener(queues = {"${rabbit.topic.queue1}"})
    public void fanoutQueueConsumer1An2(String message) {
        log.info("队列 topic:" + message);
        receiveMessageService.handlerMessage("console topic:" + message);
    }


    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue("${rabbit.topic.queue2}"),
                    exchange = @Exchange(type = "topic", name = "${rabbit.topic.exchange}"),
                    key = {"lazy.#", "*.*.rabbit"}
            )
    })
    public void fanoutQueueConsumerConsole(String message) {
        log.info("file topic:" + message);
        receiveMessageService.handlerMessage("file topic:" + message);
    }
}

发送消息

SendMessageController.java

   @RequestMapping("/topic")
    public String topic() {
        sendMessageService.topic();
        return "topic 生成消息";
    }

SendMessageServiceImpl.java

  @Override
    public void topic() {

        Map<String, String> messageMap = new HashMap<>();

        messageMap.put("quick.orange.rabbit", "被队列 Q1Q2 接收到");
        messageMap.put("lazy.orange.elephant", "被队列 Q1Q2 接收到");

        messageMap.put("quick.orange.fox", "被队列 Q1 接收到");
        messageMap.put("lazy.brown.fox", "被队列 Q2 接收到");
        messageMap.put("info", "一个 info 消息3 ");

        messageMap.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列 Q2 接收一次");
        messageMap.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");
        messageMap.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃");

        messageMap.put("lazy.orange.male.rabbit", "是四个单词但匹配 Q2");

        messageMap.forEach((routingKey, message) -> {
            try {

                rabbitTemplate.convertAndSend(topicExchange, routingKey,
                        message);
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }

验证

输入网址: http://localhost:8088/Server/send/topic

image.png

这是 RabbitMQ 异步处理消息的常见用法。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/417891.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

还不懂如何与AI高效交流?保姆级且全面的chatGPT提示词工程教程来啦!(一)基础篇

还不懂如何与chatGPT高效交流&#xff1f;保姆级且全面的chatGPT提示词工程教程来啦&#xff01;&#xff08;一&#xff09;基础篇 文章目录还不懂如何与chatGPT高效交流&#xff1f;保姆级且全面的chatGPT提示词工程教程来啦&#xff01;&#xff08;一&#xff09;基础篇一&…

CDH6.3.2大数据集群生产环境安装(七)之PHOENIX组件安装

添加phoenix组件 27.1. 准备安装资源包 27.2. 拷贝资源包到相应位置 拷贝PHOENIX-1.0.jar到/opt/cloudera/csd/ 拷贝PHOENIX-5.0.0-cdh6.2.0.p0.1308267-el7.parcel.sha、PHOENIX-5.0.0-cdh6.2.0.p0.1308267-el7.parcel到/opt/cloudera/parcel-repo 27.3. 进入cm页面进行分发、…

【AIGC】9、BLIP-2 | 使用 Q-Former 连接冻结的图像和语言模型 实现高效图文预训练

文章目录一、背景二、方法2.1 模型结构2.2 从 frozen image encoder 中自主学习 Vision-Language Representation2.3 使用 Frozen LLM 来自主学习 Vision-to-Language 生成2.4 Model pre-training三、效果四、局限性论文&#xff1a;BLIP-2: Bootstrapping Language-Image Pre-…

unity 序列化那些事,支持Dictionary序列化

目录 一、普通类型和UnityEngine空间类型序列化 二、数组、list的序列化 三、自定义类的序列化支持 四、自定义asset 五、在inspector面板中支持Dictionary序列化 1、在MonoBehaviour中实现Dictionary序列化 2、自定义property&#xff0c;让其在inpsector能够显示 3、Mo…

【从零开始学Skynet】实战篇《球球大作战》(七):gateway代码设计(下)

1、确认登录接口 在完成了登录流程后&#xff0c;login会通知gateway&#xff08;第⑧阶段&#xff09;&#xff0c;让它把客户端连接和新agent&#xff08;第⑨阶段&#xff09;关联起来。 sure_agent代码如下所示&#xff1a; s.resp.sure_agent function(source, fd, play…

[Gitops--1]GitOps环境准备

GitOps环境准备 1. 主机规划 序号主机名主机ip主机功能软件1dev192.168.31.1开发者 项目代码 apidemogit,golang,goland2gitlab192.168.31.14代码仓库,CI操作git-lab,git,golang,docker,gitlab-runner3harbor192.168.31.104管理和存储镜像docker,docker-compose,harbor4k8s-m…

基础排序算法【计数排序】非比较排序

基础排序算法【计数排序】非比较排序⏰【计数排序】&#x1f550;计数&#x1f566;排序&#x1f553;测试⏰总结&#xff1a;⏰【计数排序】 计数排序又称为鸽巢原理&#xff0c;是对哈希直接定址法的变形应用 > 基本思路&#xff1a; 1.统计数据出现的次数 2.根据统计的结…

并行分布式计算 并行算法与并行计算模型

文章目录并行分布式计算 并行算法与并行计算模型基础知识定义与描述复杂性度量同步和通讯并行计算模型PRAM 模型异步 PRAM 模型 &#xff08;APRAM&#xff09;BSP 模型LogP 模型层次存储模型分层并行计算模型并行分布式计算 并行算法与并行计算模型 基础知识 定义与描述 并…

15个最适合初创公司创始人使用的生产力工具

创业是一段激动人心且收获颇丰的旅程&#xff0c;同时也伴随着一些挑战。创始人往往要面对长时间的工作、紧迫的期限和大量的压力时刻。因此&#xff0c;初创公司创始人必须最大限度地利用他们的时间并利用他们可用的生产力工具——不仅是为了发展他们的业务&#xff0c;而且是…

Cron表达式简单介绍 + Springboot定时任务的应用

前言 表达式是一个字符串&#xff0c;主要分成6或7个域&#xff0c;但至少需要6个域组成&#xff0c;且每个域之间以空格符隔开。 以7个域组成的&#xff0c;从右往左是【年 星期 月份 日期 小时 分钟 秒钟】 秒 分 时 日 月 星期 年 以6个域组成的&#xff0c;从右往左是【星…

【精华】表格识别技术-MI

表格识别是指将图片中的表格结构和文字信息识别成计算机可以理解的数据格式&#xff0c;在办公、商务、教育等场景中有着广泛的实用价值&#xff0c;也一直是文档分析研究中的热点问题。围绕这个问题&#xff0c;我们研发了一套表格识别算法&#xff0c;该算法高效准确地提取图…

RabbitMq 的消息可靠性问题(二)---MQ的消息丢失和consumer消费问题

前言 RabbitMq 消息可靠性问题(一) — publisher发送时丢失 前面我们从publisher的方向出发解决了发送时丢失的问题&#xff0c;那么我们在发送消息到exchange, 再由exchange转存到queue的过程中。如果MQ宕机了&#xff0c;那么我们的消息是如何确保可靠性的呢&#xff1f;当消…

SQL的函数

文章目录一、SQL MIN() Function二、SQL SUM() 函数三、SQL GROUP BY 语句四、SQL HAVING 子句五、SQL EXISTS 运算符六、SQL UCASE() 函数总结一、SQL MIN() Function MIN() 函数返回指定列的最小值。 SQL MIN() 语法 SELECT MIN(column_name) FROM table_name;演示数据库 …

Numba witch makes Python code fast

一. 前言&#xff1a;numba&#xff0c;让python速度提升百倍 python由于它动态解释性语言的特性&#xff0c;跑起代码来相比java、c要慢很多&#xff0c;尤其在做科学计算的时候&#xff0c;十亿百亿级别的运算&#xff0c;让python的这种劣势更加凸显。 办法永远比困难多&a…

ASP.NET Core MVC 从入门到精通之接化发(二)

随着技术的发展&#xff0c;ASP.NET Core MVC也推出了好长时间&#xff0c;经过不断的版本更新迭代&#xff0c;已经越来越完善&#xff0c;本系列文章主要讲解ASP.NET Core MVC开发B/S系统过程中所涉及到的相关内容&#xff0c;适用于初学者&#xff0c;在校毕业生&#xff0c…

4.13实验 加测试题目

今天是个好日子,要搞栈的实验 没啥就是链栈和顺序栈 和出栈入栈,强大都是从最基本开始的 来和我一起写写吧 //顺序栈 typedef struct node{int *base;int *top;int sizer; }shed;//链栈 typedef struct Node{ int data; struct Node* next; }*stact,link; //顺序栈的初始化…

《绝对坦率》速读笔记

文章目录书籍信息概览&#xff08;第一部分 一种新的管理哲学&#xff09;建立坦率的关系给予并鼓励指导了解团队中每个人的动机协同创造成果&#xff08;第二部分 工具和技巧&#xff09;关系指导团队结果书籍信息 书名&#xff1a;《绝对坦率&#xff1a;一种新的管理哲学》…

北邮22信通:(12)二叉树的遍历书上代码完整版

北邮22信通一枚~ 跟随课程进度每周更新数据结构与算法的代码和文章 持续关注作者 解锁更多邮苑信通专属代码~ 上一篇文章&#xff1a; 下一篇文章&#xff1a; 目录 一.储存最简单数据类型的二叉树 代码部分&#xff1a; 代码效果&#xff1a; 运行结果&#xff1a…

解决JD-GUI-1.6.6 中文乱码

一、背景 在window环境下使用中遇到了乱码问题。 问题有两个&#xff1a; 一、从反编译代码的界面 CTRLC 复制是如果选中内容包含中文&#xff0c;贴到其他编辑器时&#xff0c;中文丢失。 二、打开xml文件、properties文件等包含中文时。中文在反编译界面中显示乱码。用其他工…

java反射教程

反射&#xff08;Reflection&#xff09;是 Java中的一种机制&#xff0c;它是一种特殊的面向对象编程技术。在 Java中&#xff0c;反射可以分为静态反射和动态反射两种。静态反射是指在 Java程序运行时才进行的一种反射&#xff0c;它可以保证程序运行时不会出现内存泄漏等错误…