微服务——服务异步通讯RabbitMQ

news2025/1/9 16:32:02

 前置文章

消息队列——RabbitMQ基本概念+容器化部署和简单工作模式程序_北岭山脚鼠鼠的博客-CSDN博客

消息队列——rabbitmq的不同工作模式_北岭山脚鼠鼠的博客-CSDN博客

消息队列——spring和springboot整合rabbitmq_北岭山脚鼠鼠的博客-CSDN博客

目录

Work queues 工作队列模式 

案例:

 在生产者端

在消费者端

结果如下

 消费预取限制

 发布订阅模型

 Fanout Exchange(配置文件实现)

案例

消费者代码

生产者代码

 Direct Exchange (注解实现)

 案例

消费者代码

生产者代码

Topic Exchange

案例 

 消费者代码

生产者代码

 消息转换器

生产者代码

JSON方式序列化

生产者代码 (jackson)

 消费者代码(jackson)

 总结


Work queues 工作队列模式 

这里用的不是上面第三篇文章里面的定义配置类的形式。

案例:

 在生产者端

队列要存在才可以上传。不然代码运行不会报错,但是消息也会不知道发到哪里去。

    @Test
    public void testSendMessage2() throws InterruptedException {
        String queue_Name= "simple.queue";
        String message="hello 鼠鼠";

        for(int i=1;i<=50;i++)
        rabbitTemplate.convertAndSend(queue_Name,message+i);
        Thread.sleep(20);
    }

在消费者端

定义了两个消费者监听上面的队列,本来想三个的,但是不知道默认的交换机名字,所以弄了两个。并且根据注解的不同,第一个是可以直接创建一个队列,第二个需要队列已存在才行。

@Component
public class RabbitMQListener {


    //自动创建队列

    @RabbitListener(queuesToDeclare=@Queue("simple.queue"))
    public void ListenerWorkQueue1(Message message) throws InterruptedException {
        System.out.println("11111"+message.getBody());
        Thread.sleep(20);
    }

    //需要在rabbit_mq上手动创建队列,不然会报错
    @RabbitListener(queues="simple.queue")
    public void ListenerWorkQueue2(Message message) throws InterruptedException {
        System.out.println("22222"+message.getBody());
        Thread.sleep(200);
    }

    //3. 自动创建队列,Exchange 与 Queue绑定
//    @RabbitListener(bindings = @QueueBinding(
//            value = @Queue("simple.queue"),
//            exchange = @Exchange("/")  //绑定默认交换机
//    ))
//    public void ListenerWorkQueue3(Message message) throws InterruptedException {
//        System.out.println("33333"+message.getBody());
//        Thread.sleep(200);
//    }
}

结果如下

两个队列轮流取消息导致反而变慢了。

 消费预取限制

要指定队列才有效果。

 这里就相当于指定了在simple前缀的队列上每次只能获取一条消息。

运行结果如下,大多数都交给了快的队列执行。

 发布订阅模型

 Fanout Exchange(配置文件实现)

消息路由到每个绑定的消息队列。

案例

 

消费者代码

 spring读取到这个Bean之后就会向RabbitMq发请求,创建交换机,绑定队列了。 

@Configuration
public class FanoutConfig {
    //itcast.fanout
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("itcast.fanout");
    }
    //fanout.queue1
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fannout.queue1");
    }
    //绑定队列1到交换机
    @Bean
    public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    //fanout.queue1
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fannout.queue2");
    }
    //绑定队列2到交换机
    @Bean
    public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

定义两个监听用的方法

@Component
public class RabbitMQListener {
//    @RabbitListener(queues="boot_queue")
//    public void ListenerQueue(Message message){
//        System.out.println(message);
//    }


    //自动创建队列

//    @RabbitListener(queuesToDeclare=@Queue("simple.queue"))
//    public void ListenerWorkQueue1(Message message) throws InterruptedException {
//        System.out.println("11111"+message.getBody()+ LocalDateTime.now());
//        Thread.sleep(20);
//    }
//
//    //需要在rabbit_mq上手动创建队列,不然会报错
//    @RabbitListener(queues="simple.queue")
//    public void ListenerWorkQueue2(Message message) throws InterruptedException {
//        System.out.println("22222"+message.getBody()+ LocalDateTime.now());
//        Thread.sleep(200);
//    }

    //3. 自动创建队列,Exchange 与 Queue绑定
//    @RabbitListener(bindings = @QueueBinding(
//            value = @Queue("simple.queue"),
//            exchange = @Exchange("/")  //绑定默认交换机
//    ))
//    public void ListenerWorkQueue3(Message message) throws InterruptedException {
//        System.out.println("33333"+message.getBody());
//        Thread.sleep(200);
//    }



    @RabbitListener(queuesToDeclare=@Queue("fanout.queue1"))
    public void ListenerFanoutQueue1(Message message) throws InterruptedException {
        System.out.println("11111"+message.getBody());
    }

    @RabbitListener(queuesToDeclare=@Queue("fanout.queue2"))
    public void ListenerFanoutQueue2(Message message) throws InterruptedException {
        System.out.println("22222"+message.getBody());
    }
}

生产者代码

@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {
    //1.注入RabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;

//    @Test
//    public void testSend(){
//        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME,"boot.haha","hello 鼠鼠");
//    }

//    @Test
//    public void testSendMessage2() throws InterruptedException {
//        String queue_Name= "simple.queue";
//        String message="hello 鼠鼠";
//
//        for(int i=1;i<=50;i++)
//        rabbitTemplate.convertAndSend(queue_Name,message+i);
//        Thread.sleep(20);
//    }

    @Test
    public void testSendFanoutExchange(){
        //交换机名称
        String exchangeName="itcast.fanout";
        //消息
        String message="hello 鼠鼠";
        //发送消息
        rabbitTemplate.convertAndSend(exchangeName,"",message);
    }
}

 Direct Exchange (注解实现)

 案例

消费者代码


@Component
public class RabbitMQListener {   
     @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name="itcast.direct" , type= ExchangeTypes.DIRECT),
            key={"red","blue"}
    ))
    public void listenDirectQueue1(String msg){
        System.out.println("消费者接收到:"+msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name="itcast.direct" , type= ExchangeTypes.DIRECT),
            key={"red","yellow"}
    ))
    public void listenDirectQueue2(String msg){
        System.out.println("消费者接收到:"+msg);
    }
}

生产者代码

    @Test
    public void testSendDirectExchange(){
        //交换机名称
        String exchangeName="itcast.direct";
        //消息
        String message="hello 鼠鼠";
        //发送消息
        rabbitTemplate.convertAndSend(exchangeName,"blue",message);
    }

此条代码只有绑定了blue这个key的队列才可以收到。

换成red就是两个队列都可以收到了。

Topic Exchange

案例 

 

 消费者代码

    @RabbitListener(bindings = @QueueBinding(
            value=@Queue(name="topic.queue1"),
            exchange=@Exchange(name="itcast.topic",type = ExchangeTypes.TOPIC),
            key="japan.#"
    ))
    public void listenTopicQueue1(String msg){
        System.out.println("消费者接收到:"+msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value=@Queue(name="topic.queue2"),
            exchange=@Exchange(name="itcast.topic",type = ExchangeTypes.TOPIC),
            key="#.news"
    ))
    public void listenTopicQueue2(String msg){
        System.out.println("消费者接收到:"+msg);
    }

生产者代码

    @Test
    public void testSendTopicExchange(){
        //交换机名称
        String exchangeName="itcast.topic";
        //消息
        String message="北岭山脚鼠鼠横死街头,究竟是人性的沦丧还是道德的....";
        //发送消息
        rabbitTemplate.convertAndSend(exchangeName,"japan.news",message);
    }

两个都符合,所以都能收到。

 消息转换器

定义一个队列

    @Bean
    public Queue objectQueue(){
        return new Queue("object.queue");
    }

生产者代码

    @Test
    public void testSendObjectQueue(){
        //消息
        Map<String,Object> msg=new HashMap<>();
        msg.put("name","北岭山脚鼠鼠");
        msg.put("age","22");
        //发送消息
        rabbitTemplate.convertAndSend("object.queue",msg);
    }

可以看见消息被转换成了一长串字符,content_type写着java的序列化。

效率差,安全性也差。 

JSON方式序列化

声明好MessageConveter之后就可以自动覆盖默认序列化方式了。

导入一个核心依赖

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>

生产者代码 (jackson)

 修改生产者的启动类代码,加上一个Bean

@SpringBootApplication
public class ProducerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class);
    }

    @Bean
    public Jackson2JsonMessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

启动测试类之后可以看见新的消息出现了。

 消费者代码(jackson)

 然后可以正常接受到消息

如果消费者不使用对应jackson解析的话,代码会报错

 总结

推荐使用jackson的方式 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/791731.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数据结构(c++实现)

数据结构 目录 数据结构1.链表实现单链表双链表 2.栈(先进后出&#xff0c;后进先出)3.单调栈4.队列&#xff08;先进先出&#xff09;5.单调队列6.小根堆操作 7.KMP8.Trie树(字典树) 1.链表实现 单链表 #include <iostream>using namespace std;const int N 100010;/…

AcWing 3708. 求矩阵的鞍点

输入样例&#xff1a; 3 4 1 2 3 4 1 2 3 4 1 2 3 4输出样例&#xff1a; 1 4 4 2 4 4 3 4 4 #include<bits/stdc.h> using namespace std; const int N1010; int n,m,a[N][N],x[N],y[N],flag1; int main(){scanf("%d%d",&n,&m);for(int i1;i<n;i…

抖音西瓜实时作品监控,一秒更新提醒

抖音西瓜实时作品监控&#xff0c;一秒更新提醒 安装必要的依赖库&#xff1a;使用pip安装aweme库。 pip install aweme 导入所需的库。 import datetime import time import schedule from aweme import API 创建一个函数&#xff0c;用于检查抖音作品是否更新。 def check_u…

端到端的视频编码方法及码率控制算法

文章目录 基于卷积神经网络的的端到端的视频编码方法自编码器 基于端到端学习的图像编码研究及进展变换量化熵编码 面向视频会议场景的 H.266/VVC 码率控制算法研究基于强化学习的视频码率自适应决策研究自适应流媒体传输技术码率自适应算法研究现状强化学习深度强化学习算法介…

mp4视频太大怎么压缩?教你轻松减小视频大小

MP4视频太大怎么办&#xff1f;很多人都会遇到这样的问题&#xff0c;MP4视频往因为画面清晰度高&#xff0c;画面流畅&#xff0c;所以视频文件会比较大&#xff0c;如果你想向朋友或者家人分享这个视频&#xff0c;但是又因为文件太大无法发送&#xff0c;那么怎么办呢&#…

可视化开发工具:让软件应用开发变得更轻松

一、前言 你是否为编程世界的各种挑战感到头痛&#xff1f;想要以更高效、简单的方式开发出专业级的项目&#xff1f; JNPF低代码工具正是你苦心寻找的产品&#xff01;它是一款专为稍微懂一点点编程思想的入门级人员设计的神奇工具&#xff0c;集成了丰富的功能和组件&#xf…

使用 CSS 自定义属性

我们常见的网站日夜间模式的变化&#xff0c;其实用到了 css 自定义属性。 CSS 自定义属性&#xff08;也称为 CSS 变量&#xff09;是一种在 CSS 中预定义和使用的变量。它们提供了一种简洁和灵活的方式来通过多个 CSS 规则共享相同的值&#xff0c;使得样式更易于维护和修改。…

深度剖析APP开发中的UI/UX设计

作为一个 UI/UX设计师&#xff0c;除了要关注 UI/UX设计之外&#xff0c;还要掌握移动开发知识&#xff0c;同时在日常工作中也需要对用户体验有一定的认知&#xff0c;在本次分享中&#xff0c;笔者就针对自己在工作中积累的一些经验来进行一个总结&#xff0c;希望能够帮助到…

暑假学生使用什么牌子台灯好?分享五款学生使用的台灯

临近暑假&#xff0c;是不是开始补课或写暑假作业了呢&#xff1f;是不是还在为选一款学生使用的台灯而发愁&#xff1f;今天小编就来给大家推荐几款台灯供大家参考参考。 那么问题来了&#xff0c;怎么选择合适的护眼台灯&#xff1f; 第一&#xff1a;先考虑个人预算选择适…

Modbus RTU通信应用

一、功能概述 1.1 概述 Modbus串行通信协议是Modicon公司在1970年开发的。 Modbus串行通信协议有Modbus ASCII和Modbus RTU两种模式&#xff0c;Modbus RTU协议通信效率较高&#xff0c;应用更加广泛。 Modbus RTU协议是基于RS232和RS485串行通信的一种协议&#xff0c;数据通…

论文解读|用于从RGB-D数据进行3D物体检测的Frustum PointNets

原创 | 文 BFT机器人 01 摘要 论文研究了室内和室外场景中基于RGBD数据的3D目标检测。论文的方法不仅仅依赖于3D方案&#xff0c;而是利用成熟的2D对象检测器和先进的3D深度学习进行对象定位&#xff0c;即使是小对象也能实现高效率和高召回。 直接在原始点云中学习&#xff0…

如何让GPT自己命令自己?榨干最后一丝智能,解放双手!

1.让GPT先别说话 2.接下来&#xff0c;看看它学的怎么样 使用成功了&#xff01;效果拔群&#xff01; 3.接下来&#xff0c;让他回答自己生成的指令&#xff1a; 效果比想象的还要好&#xff01;果然最懂GPT的还是它自己&#xff0c;生成的prompt比自己手写的prompt更加精准有…

rocketmq客户端本地日志文件过大调整配置(导致pod缓存cache过高)

现象 在使用rocketmq时&#xff0c;发现本地项目中文件越来越大&#xff0c;查找发现在/home/root/logs/rocketmqlog目录下存在大量rocketmq_client.log日志文件。 配置调整 开启slf4j日志模式&#xff0c;在项目启动项中增加-Drocketmq.client.logUseSlf4jtrue因为配置使用的…

Bug管理规范

目录 1.目的 2.角色和职责 3.缺陷等级定义 4.缺陷提交原则 5.缺陷流转流程 5.1创建缺陷 5.2缺陷分拣/分配 5.3研发认领缺陷 5.4.研发解决缺陷 5.5关闭缺陷 5.6缺陷激活 1.目的 项目过程中对缺陷管理的规则&#xff0c;明确提单规范、用例优先级的选择规则、走单流程、…

为Android构建现代应用——应用架构

选择风格(Choosing a style) 我们将依照Google在《应用架构指南》中推荐的最佳实践和架构指南来构建OrderNow的架构。 这些定义包括通过各层定义组件的一些Clean Architecture原则。 层次的定义(Definition of the layers) 在应用程序中&#xff0c;我们将定义以下主要层次…

【C++ 进阶】继承

一.继承的定义格式 基类又叫父类&#xff0c;派生类又叫子类&#xff1b; 二.继承方式 继承方式分为三种&#xff1a; 1.public继承 2.protected继承 3.private继承 基类成员与继承方式的关系共有9种&#xff0c;见下表&#xff1a; 虽然说是有9种&#xff0c;但其实最常用的还…

【教学类-34-07】20230726拼图(“菱形”凹凸拼图)3*4格子(中班主题《个别化拼图》偏美术)

作品展示&#xff1a; 背景需求 我尝试将拼图的“圆形凹凸角”变成"正方形凹凸角”&#xff0c;没有成功&#xff0c;但做出了“菱形凹凸角”。 实用性思考&#xff1a; 1、这种菱形凹凸角与正方形结构近似&#xff0c;裁剪难度中等&#xff08;比圆角容易剪&#xff0…

Android Studio Giraffe 发布,快来看有什么更新吧

又双叒叕到了「激动人心」 的 Android Studio 更新&#xff0c;这次更新的版本是 Giraffe | 2022.3.1&#xff0c;本次更新的 Giraffe&#xff08;长颈鹿&#xff09;将 IntelliJ 平台升级到 2022.3 版本&#xff0c;也将 AGP 支持提高到 8.1 &#xff0c;虽然最低支持 3.2&…

LED智能照明在商业照明中的应用都有哪些?SLM421A数明深力科带你一起去了解

数明深力科SLM421A系列产品是用于两通道、高精度恒流源的LED线性驱动芯片。无需功率电感&#xff0c;无频闪、无EMC困扰&#xff0c;支持高频率PWM调色调光&#xff0c;在LED智能照明产品运用中周边线路简单&#xff0c;成本低。 SLM421每路驱动仅需要从SET到GND接一个电阻即可…

fpga_pwm呼吸灯(EP4CE6F17C8)

文章目录 一、呼吸灯二、代码实现三、引脚分配 一、呼吸灯 呼吸灯是指灯光在微电脑的控制之下完成由亮到暗的逐渐变化&#xff0c;使用开发板上的四个led灯实现1s间隔的呼吸灯。 二、代码实现 c module pwm_led( input clk ,input rst_n ,output reg [3:0] led ); …