springboot使用rabbitmq

news2025/2/4 6:06:47

使用springboot创建rabbitMQ的链接。

整个项目结构如下:

img

1.maven依赖

<dependency>
    	<groupId>com.rabbitmq</groupId>
    	<artifactId>amqp-client</artifactId>
    	<version>3.4.1</version>
</dependency>

application.yaml的配置如下

spring:
  application:
    name: rabbitMQ
  rabbitmq:
    host: 192.168.142.128  #rabbitmq的主机名
    port: 5672			   #端口,默认5672
    username: itheima      #rabbitmq的账号
    password: 123321	   #密码
server:
  port: 8081               #项目启动端口

2.创建rabbitMQ配置类 – RabbitConfig。

@Configuration
@Slf4j
public class RabbitConfig {

    @Bean("directExchange")
    public DirectExchange directExchange() {
        return new DirectExchange(MQConstant.DIRECT_EXCHANGE);
    }

    @Bean("directQueue")
    public Queue directQueue() {
        return new Queue(MQConstant.DIRECT_QUEUE);
    }

    @Bean("bindingDirectExchange")
    public Binding bindingDirectExchange(@Qualifier("directExchange") DirectExchange directExchange,
                                         @Qualifier("directQueue") Queue directQueue) {
        return BindingBuilder.bind(directQueue).to(directExchange).with(MQConstant.ROUTING_KEY);
    }

}

3.创建RabbitMQ客户端类,主要是用来发送消息用的。

@Component
@Slf4j
public class RabbitMqClient {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(MessageBody messageBody){
        try{
            String uuid = UUID.randomUUID().toString();
            CorrelationData correlationData = new CorrelationData(uuid);
            rabbitTemplate.convertAndSend(MQConstant.DIRECT_EXCHANGE, MQConstant.ROUTING_KEY, JSON.toJSONString(messageBody),
                    new MessagePostProcessor() {
                        @Override
                        public Message postProcessMessage(Message message) throws AmqpException {
                            // 消息持久化
                            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                            log.info("message send,{}", message);
                            return message;
                        }
                    },correlationData);
            log.info("message send successful");
        }catch (Exception e){
            log.info("send message error:{}",e);
        }
    }
}

4.创建接收消息类 —RabbitMqServer

@Component
@Slf4j
public class RabbitMqServer {

    @RabbitListener(queues = MQConstant.DIRECT_QUEUE)
    public void receive(String message) {
        try {
            log.info("receive message:{}", message);
            MessageBody messageBody = JSON.parseObject(message, MessageBody.class);
            switch (messageBody.getTopic()) {
                case QueueTopic.USER_LOGIN:
                    User user = JSON.parseObject(messageBody.getData(), User.class);
                    log.info("receive user:{}", user);
                    break;
                default:
                    log.info("no need hanndle message:{},topic:{}", message, messageBody.getTopic());
                    break;
            }
        }catch (Exception e){
            log.error("rabbitmq receive message error:{}", e);
        }
    }
}

5.有了以上准备后就可以开始向mq里面发送消息了,在单元测试编写测试代码。

@SpringBootTest(classes = RabbitMqApplication.class)
class RabbitMqApplicationTests {

	@Autowired
	private RabbitMqClient rabbitMqClient;

	@Test
	void testDirectSend() {
		//数据
		User user = new User();
		user.setId(123);
		user.setName("Lewin-jie2");
		user.setPassword("123");

		MessageBody messageBody = new MessageBody();
		messageBody.setData(JSON.toJSONString(user));

		long time = new Date().getTime();
		messageBody.setSendTime(time);
		//添加主题
		messageBody.setTopic(QueueTopic.USER_LOGIN);
		rabbitMqClient.send(messageBody);
	}

}

6.运行后,可以看到后台的日志,证明我们消息发送已经成功了。

image-20241109151751035

我们打开rabbitmq的控制台(http://你的主机名:15672),可以开到队列里面也收到了消息,但是还没有被消费。

image-20241109152401671

以上出现结果就证明rabbit已经是配置好了。那么我们来了解一下啊rabbitmq

简介:rabbitmq是基于amqp协议,用elang语言开发的一个高级的消息队列,以高性能,高可靠,高吞吐量而被大量应用到应用系统作为第三方消息中间件使用,为应用系统实现应用解耦削峰减流,异步消息

rabbitmq主要构造有,producter,consumer,exchange,queue组成

1.直连交换机(direct_exchange)。

刚刚配置的时候就是演示的producter发消息到直连交换机,然后再发送到queue中的过程。

2.广播交换机(fanout_exchange).

顾名思义,就是绑定该交换机的所有队列都可以收到这个交换机的消息

	@Bean("fanoutExchange")
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(MQConstant.FANOUT_EXCHANGE);
    }

    @Bean("aQueue")
    public Queue aQueue(){
        return new Queue(MQConstant.FANOUT_QUEUE_A);
    }

    @Bean("bQueue")
    public Queue bQueue(){
        return new Queue(MQConstant.FANOUT_QUEUE_B);
    }

    @Bean("cQueue")
    public Queue cQueue(){
        return new Queue(MQConstant.FANOUT_QUEUE_C);
    }

    /**
     * 绑定队列aQueue bQueue cQueue
     */
    @Bean("bindingFanoutExchange1")
    public Binding bindingFanoutExchange1(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange,
                                         @Qualifier("aQueue") Queue aQueue){
        return BindingBuilder.bind(aQueue).to(fanoutExchange);
    }

    @Bean("bindingFanoutExchange2")
    public Binding bindingFanoutExchange2(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange,
                                         @Qualifier("bQueue") Queue bQueue){
        return BindingBuilder.bind(bQueue).to(fanoutExchange);
    }

    @Bean("bindingFanoutExchange3")
    public Binding bindingFanoutExchange3(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange,
                                         @Qualifier("cQueue") Queue cQueue){
        return BindingBuilder.bind(cQueue).to(fanoutExchange);
    }

编写controller类,再postman上面测试[http://localhost:8081/mq/sendFanoutMsg?msg=hi i am a fanoutmag](http://localhost:8081/mq/sendFanoutMsg?msg=hi i am a fanoutmag)

@Controller
@RequestMapping("/mq")
@Slf4j
public class SendMessageController {
    @Autowired
    private RabbitMqClient rabbitMqClient;

    @PostMapping("/sendFanoutMsg")
    public String sendFanoutMsg(@RequestParam("msg") String msg){
        try {
            MessageBody messageBody = new MessageBody();
            messageBody.setData(msg);
            rabbitMqClient.send1(messageBody);
        }catch (Exception e){
            log.error("sendFanoutMsg error{}", e);
        }
        return "send fanout msg success";
    }
}

结果:控制台收到消息了!!!

image-20241109213739205

3.主题交换机(topic_exchange)

topic_exchange和direct_exchange很像,topic有通配符。direct没有。

image-20241109214721827
  1. china.news 代表只关心中国新闻

  2. china.weather 代表只关心中国天气

  3. japan.news 代表只关心日本的新闻

  4. japan.weather 代表只关心日本的天气

    controller接口

    @PostMapping("/sendTopicMsg")
        public String sendTopicMsg(@RequestParam("msg") String msg,@RequestParam("type") String type){
            try {
                MessageBody messageBody = new MessageBody();
                messageBody.setData(msg);
                rabbitMqClient.send3(messageBody,type);
            }catch (Exception e){
                log.error("sendTopicMsg error{}", e);
            }
            return "send topic msg success";
        }
    

    利用postman测试。

    1.msg: “祖国75岁生日快乐”,type:“china.news”

    image-20241110102452558

    预测:queue1,queue4会接收到消息。

    image-20241110102659758

    2.msg: “日本大量排核废水,导致哥斯拉出现”,type:“japan.news”

    image-20241110102738974

    预测:queue2,queue4会接收到消息。

    image-20241110103638277

    3.msg: “今日日本出现大暴雨,怀疑是哥斯拉来了”,type:“Japan.weather”

    image-20241110103727452

    预测:queue2,queue3会接收到消息

    image-20241110103839382

topic-exchange在代码中如何使用。首先创建交换机,和队列,绑定交换机。

/*============================topic===========================*/
    @Bean("topicExchange")
    public TopicExchange topicExchange() {
        return new TopicExchange(MQConstant.TOPIC_EXCHANGE);
    }

    @Bean("queue1")
    public Queue queue1(){
        return new Queue(MQConstant.QUEUE1);
    }

    @Bean("queue2")
    public Queue queue2(){
        return new Queue(MQConstant.QUEUE2);
    }

    @Bean("queue3")
    public Queue queue3(){
        return new Queue(MQConstant.QUEUE3);
    }

    @Bean("queue4")
    public Queue queue4(){
        return new Queue(MQConstant.QUEUE4);
    }

    @Bean("bingTopicExchange1")
    public Binding bingTopicExchange1(@Qualifier("queue1") Queue queue1,
                                      @Qualifier("topicExchange") TopicExchange topicExchange) {
        return BindingBuilder.bind(queue1).to(topicExchange).with(MQConstant.CHINA_);
    }

    @Bean("bingTopicExchange2")
    public Binding bingTopicExchange2(@Qualifier("queue2") Queue queue2,
                                      @Qualifier("topicExchange") TopicExchange topicExchange) {
        return BindingBuilder.bind(queue2).to(topicExchange).with(MQConstant.JAPAN_);
    }

    @Bean("bingTopicExchange3")
    public Binding bingTopicExchange3(@Qualifier("queue3") Queue queue3,
                                      @Qualifier("topicExchange") TopicExchange topicExchange) {
        return BindingBuilder.bind(queue3).to(topicExchange).with(MQConstant._WEATHER);
    }

    @Bean("bingTopicExchange4")
    public Binding bingTopicExchange4(@Qualifier("queue4") Queue queue4,
                                      @Qualifier("topicExchange") TopicExchange topicExchange) {
        return BindingBuilder.bind(queue4).to(topicExchange).with(MQConstant._NEWS);
    }

消息发送

 public void send3(MessageBody messageBody,String routingKey) {
        try{
            String uuid = UUID.randomUUID().toString();
            CorrelationData correlationData = new CorrelationData(uuid);
            rabbitTemplate.convertAndSend(MQConstant.TOPIC_EXCHANGE, routingKey , JSON.toJSONString(messageBody),
                    new MessagePostProcessor() {
                        @Override
                        public Message postProcessMessage(Message message) throws AmqpException {
                            // 消息持久化
                            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                            log.info("message send,{}", message);
                            return message;
                        }
                    },correlationData);
            log.info("message send successful");
        }catch (Exception e){
            log.info("send message error:{}",e);
        }
    }

消息接收

@RabbitListener(queues = MQConstant.QUEUE1)
    public void receive4(String message) {
        log.info("topic exchange");
        try {
            log.info("queue1 receive message:{}", message);
            MessageBody messageBody = JSON.parseObject(message, MessageBody.class);
            log.info("receive message:{}", messageBody.getData());
        }catch (Exception e){
            log.error("rabbitmq receive a message error:{}", e);
        }
    }

    @RabbitListener(queues = MQConstant.QUEUE2)
    public void receive5(String message) {
        log.info("topic exchange");
        try {
            log.info("queue2 receive message:{}", message);
            MessageBody messageBody = JSON.parseObject(message, MessageBody.class);
            log.info("receive message:{}", messageBody.getData());
        }catch (Exception e){
            log.error("rabbitmq receive a message error:{}", e);
        }
    }

    @RabbitListener(queues = MQConstant.QUEUE3)
    public void receive6(String message) {
        log.info("topic exchange");
        try {
            log.info("queue3 receive message:{}", message);
            MessageBody messageBody = JSON.parseObject(message, MessageBody.class);
            log.info("receive message:{}", messageBody.getData());
        }catch (Exception e){
            log.error("rabbitmq receive a message error:{}", e);
        }
    }

    @RabbitListener(queues = MQConstant.QUEUE4)
    public void receive7(String message) {
        log.info("topic exchange");
        try {
            log.info("queue4 receive message:{}", message);
            MessageBody messageBody = JSON.parseObject(message, MessageBody.class);
            log.info("receive message:{}", messageBody.getData());
        }catch (Exception e){
            log.error("rabbitmq receive a message error:{}", e);
        }
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2291641.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux——ext2文件系统(二)

Linux——ext2文件系统 ext2文件系统宏观认识一、磁盘分区与格式化二、块组&#xff08;Block Group&#xff09;结构三、文件系统特性 文件名与目录名与inode一、inode的作用原理二、文件与目录名与inode的关系 路径一&#xff0c;路径解析二&#xff0c;路径缓存三&#xff0…

如何让DeepSeek恢复联网功能?解决(由于技术原因,联网搜索暂不可用)

DeekSeek提示&#xff1a;&#xff08;由于技术原因&#xff0c;联网搜索暂不可用&#xff09; 众所周知&#xff0c;因为海外黑客的ddos攻击、僵尸网络攻击&#xff0c;deepseek的联网功能一直处于宕机阶段&#xff0c;但是很多问题不联网出来的结果都还是2023年的&#xff0c…

python的ruff简单使用

Ruff 是一个用 Rust 编写的高性能 Python 静态分析工具和代码格式化工具。它旨在提供快速的代码检查和格式化功能&#xff0c;同时支持丰富的配置选项和与现有工具的兼容性。ruff是用rust实现的python Linter&Formatter。 安装&#xff1a; conda install -c conda-forge…

【漫话机器学习系列】077.范数惩罚是如何起作用的(How Norm Penalties Work)

范数惩罚的作用与原理 范数惩罚&#xff08;Norm Penalty&#xff09; 是一种常用于机器学习模型中的正则化技术&#xff0c;它的主要目的是控制模型复杂度&#xff0c;防止过拟合。通过对模型的参数进行惩罚&#xff08;即在损失函数中加入惩罚项&#xff09;&#xff0c;使得…

LLMs之OpenAI o系列:OpenAI o3-mini的简介、安装和使用方法、案例应用之详细攻略

LLMs之OpenAI o系列&#xff1a;OpenAI o3-mini的简介、安装和使用方法、案例应用之详细攻略 目录 相关文章 LLMs之o3&#xff1a;《Deliberative Alignment: Reasoning Enables Safer Language Models》翻译与解读 LLMs之OpenAI o系列&#xff1a;OpenAI o3-mini的简介、安…

Notepad++消除生成bak文件

设置(T) ⇒ 首选项... ⇒ 备份 ⇒ 勾选 "禁用" 勾选禁用 就不会再生成bak文件了 notepad怎么修改字符集编码格式为gbk 如图所示

后台管理系统通用页面抽离=>高阶组件+配置文件+hooks

目录结构 配置文件和通用页面组件 content.config.ts const contentConfig {pageName: "role",header: {title: "角色列表",btnText: "新建角色"},propsList: [{ type: "selection", label: "选择", width: "80px&q…

Spring Boot项目如何使用MyBatis实现分页查询

写在前面&#xff1a;大家好&#xff01;我是晴空๓。如果博客中有不足或者的错误的地方欢迎在评论区或者私信我指正&#xff0c;感谢大家的不吝赐教。我的唯一博客更新地址是&#xff1a;https://ac-fun.blog.csdn.net/。非常感谢大家的支持。一起加油&#xff0c;冲鸭&#x…

Intellij 插件开发-快速开始

目录 一、开发环境搭建以及创建action1. 安装 Plugin DevKit 插件2. 新建idea插件项目3. 创建 Action4. 向新的 Action 表单注册 Action5. Enabling Internal Mode 二、插件实战开发[不推荐]UI Designer 基础JBPanel类&#xff08;JPanel面板&#xff09;需求&#xff1a;插件设…

语言月赛 202412【题目名没活了】题解(AC)

》》》点我查看「视频」详解》》》 [语言月赛 202412] 题目名没活了 题目描述 在 XCPC 竞赛里&#xff0c;会有若干道题目&#xff0c;一支队伍可以对每道题目提交若干次。我们称一支队伍对一道题目的一次提交是有效的&#xff0c;当且仅当&#xff1a; 在本次提交以前&…

MySQL锁类型(详解)

锁的分类图&#xff0c;如下&#xff1a; 锁操作类型划分 读锁 : 也称为共享锁 、英文用S表示。针对同一份数据&#xff0c;多个事务的读操作可以同时进行而不会互相影响&#xff0c;相互不阻塞的。 写锁 : 也称为排他锁 、英文用X表示。当前写操作没有完成前&#xff0c;它会…

OSCP - Proving Grounds - Roquefort

主要知识点 githook 注入Linux path覆盖 具体步骤 依旧是nmap扫描开始&#xff0c;3000端口不是很熟悉&#xff0c;先看一下 Nmap scan report for 192.168.54.67 Host is up (0.00083s latency). Not shown: 65530 filtered tcp ports (no-response) PORT STATE SERV…

集合通讯概览

&#xff08;1&#xff09;通信的算法 是根据通讯的链路组成的 &#xff08;2&#xff09;因为通信链路 跟硬件强相关&#xff0c;所以每个CCL的库都不一样 芯片与芯片、不同U之间是怎么通信的&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01; 很重要…

【贪心算法篇】:“贪心”之旅--算法练习题中的智慧与策略(二)

✨感谢您阅读本篇文章&#xff0c;文章内容是个人学习笔记的整理&#xff0c;如果哪里有误的话还请您指正噢✨ ✨ 个人主页&#xff1a;余辉zmh–CSDN博客 ✨ 文章所属专栏&#xff1a;贪心算法篇–CSDN博客 文章目录 前言例题1.买卖股票的最佳时机2.买卖股票的最佳时机23.k次取…

oracle: 表分区>>范围分区,列表分区,散列分区/哈希分区,间隔分区,参考分区,组合分区,子分区/复合分区/组合分区

分区表 是将一个逻辑上的大表按照特定的规则划分为多个物理上的子表&#xff0c;这些子表称为分区。 分区可以基于不同的维度&#xff0c;如时间、数值范围、字符串值等&#xff0c;将数据分散存储在不同的分区 中&#xff0c;以提高数据管理的效率和查询性能&#xff0c;同时…

基于SpringBoot 前端接收中文显示解决方案

一. 问题 返回给前端的的中文值会变成“???” 二. 解决方案 1. 在application.yml修改字符编码 &#xff08;无效&#xff09; 在网上看到说修改servlet字符集编码&#xff0c;尝试了不行 server:port: 8083servlet:encoding:charset: UTF-8enabled: trueforce: true2. …

java练习(5)

ps:题目来自力扣 给你两个 非空 的链表&#xff0c;表示两个非负的整数。它们每位数字都是按照 逆序 的方式存储的&#xff0c;并且每个节点只能存储 一位 数字。 请你将两个数相加&#xff0c;并以相同形式返回一个表示和的链表。 你可以假设除了数字 0 之外&#xff0c;这…

python算法和数据结构刷题[3]:哈希表、滑动窗口、双指针、回溯算法、贪心算法

回溯算法 「所有可能的结果」&#xff0c;而不是「结果的个数」&#xff0c;一般情况下&#xff0c;我们就知道需要暴力搜索所有的可行解了&#xff0c;可以用「回溯法」。 回溯算法关键在于:不合适就退回上一步。在回溯算法中&#xff0c;递归用于深入到所有可能的分支&…

大数据数仓实战项目(离线数仓+实时数仓)1

目录 1.课程目标 2.电商行业与电商系统介绍 3.数仓项目整体技术架构介绍 4.数仓项目架构-kylin补充 5.数仓具体技术介绍与项目环境介绍 6.kettle的介绍与安装 7.kettle入门案例 8.kettle输入组件之JSON输入与表输入 9.kettle输入组件之生成记录组件 10.kettle输出组件…

【开源免费】基于Vue和SpringBoot的公寓报修管理系统(附论文)

本文项目编号 T 186 &#xff0c;文末自助获取源码 \color{red}{T186&#xff0c;文末自助获取源码} T186&#xff0c;文末自助获取源码 目录 一、系统介绍二、数据库设计三、配套教程3.1 启动教程3.2 讲解视频3.3 二次开发教程 四、功能截图五、文案资料5.1 选题背景5.2 国内…