RabbitMQ入门到实战教程,消息队列实战,改造配置MQ

news2025/1/6 19:00:29

RabbitMQ入门到实战教程,MQ消息中间件,消息队列实战-CSDN博客

 3.7.Topic交换机

3.7.1.说明

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。

只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符!

BindingKey 一般都是有一个或多个单词组成,多个单词之间以.分割,例如: item.insert

通配符规则:

  • #:匹配一个或多个词
  • *:匹配不多不少恰好1个词

举例:

  • item.#:能够匹配item.spu.insert 或者 item.spu
  • item.*:只能匹配item.spu

图示:

假如此时publisher发送的消息使用的RoutingKey共有四种:

  • china.news 代表有中国的新闻消息;
  • china.weather 代表中国的天气消息;
  • japan.news 则代表日本新闻
  • japan.weather 代表日本的天气消息;

解释:

  • topic.queue1:绑定的是china.# ,凡是以 china.开头的routing key 都会被匹配到,包括:
    • china.news
    • china.weather
  • topic.queue2:绑定的是#.news ,凡是以 .news结尾的 routing key 都会被匹配。包括:
    • china.news
    • japan.news

接下来,我们就按照上图所示,来演示一下Topic交换机的用法。

首先,在控制台按照图示例子创建队列、交换机,并利用通配符绑定队列和交换机。此处步骤略。最终结果如下:

3.7.2.消息发送

在publisher服务的SpringAmqpTest类中添加测试方法:

/**
 * topicExchange
 */
@Test
public void testSendTopicExchange() {
    // 交换机名称
    String exchangeName = "hmall.topic";
    // 消息
    String message = "喜报!孙悟空大战哥斯拉,胜!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}

3.7.3.消息接收

在consumer服务的SpringRabbitListener中添加方法:

@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg){
    System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}

@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg){
    System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}

3.7.4.总结

描述下Direct交换机与Topic交换机的差异?

  • Topic交换机接收的消息RoutingKey必须是多个单词,以 **.** 分割
  • Topic交换机与队列绑定时的bindingKey可以指定通配符
  • #:代表0个或多个词
  • *:代表1个词

3.8.声明队列和交换机

在之前我们都是基于RabbitMQ控制台来创建队列、交换机。但是在实际开发时,队列和交换机是程序员定义的,将来项目上线,又要交给运维去创建。那么程序员就需要把程序中运行的所有队列和交换机都写下来,交给运维。在这个过程中是很容易出现错误的。

因此推荐的做法是由程序启动时检查队列和交换机是否存在,如果不存在自动创建。

3.8.1.基本API

SpringAMQP提供了一个Queue类,用来创建队列:

SpringAMQP还提供了一个Exchange接口,来表示所有不同类型的交换机:

我们可以自己创建队列和交换机,不过SpringAMQP还提供了ExchangeBuilder来简化这个过程:

而在绑定队列和交换机时,则需要使用BindingBuilder来创建Binding对象:

3.8.2.fanout示例

在consumer中创建一个类,声明队列和交换机:

package com.itheima.consumer.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {
    /**
     * 声明交换机
     * @return Fanout类型交换机
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("hmall.fanout");
    }

    /**
     * 第1个队列
     */
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    /**
     * 第2个队列
     */
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

3.8.2.direct示例

direct模式由于要绑定多个KEY,会非常麻烦,每一个Key都要编写一个binding:

package com.itheima.consumer.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectConfig {

    /**
     * 声明交换机
     * @return Direct类型交换机
     */
    @Bean
    public DirectExchange directExchange(){
        return ExchangeBuilder.directExchange("hmall.direct").build();
    }

    /**
     * 第1个队列
     */
    @Bean
    public Queue directQueue1(){
        return new Queue("direct.queue1");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
    }
    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
    }

    /**
     * 第2个队列
     */
    @Bean
    public Queue directQueue2(){
        return new Queue("direct.queue2");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue2).to(directExchange).with("red");
    }
    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");
    }
}

3.8.4.基于注解声明

基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。

例如,我们同样声明Direct模式的交换机和队列:

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue1"),
    exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
    System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue2"),
    exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
    System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

是不是简单多了。

再试试Topic模式:

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue1"),
    exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
    key = "china.#"
))
public void listenTopicQueue1(String msg){
    System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue2"),
    exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
    key = "#.news"
))
public void listenTopicQueue2(String msg){
    System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}

3.9.消息转换器

Spring的消息发送代码接收的消息体是一个Object:

而在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。

只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:

  • 数据体积过大
  • 有安全漏洞
  • 可读性差

我们来测试一下。

3.9.1.测试默认转换器

1)创建测试队列

首先,我们在consumer服务中声明一个新的配置类:

利用@Bean的方式创建一个队列,

具体代码:

package com.itheima.consumer.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MessageConfig {

    @Bean
    public Queue objectQueue() {
        return new Queue("object.queue");
    }
}

注意,这里我们先不要给这个队列添加消费者,我们要查看消息体的格式。

重启consumer服务以后,该队列就会被自动创建出来了:

2)发送消息

我们在publisher模块的SpringAmqpTest中新增一个消息发送的代码,发送一个Map对象:

@Test
public void testSendMap() throws InterruptedException {
    // 准备消息
    Map<String,Object> msg = new HashMap<>();
    msg.put("name", "柳岩");
    msg.put("age", 21);
    // 发送消息
    rabbitTemplate.convertAndSend("object.queue", msg);
}

发送消息后查看控制台:

可以看到消息格式非常不友好。

3.9.2.配置JSON转换器

显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。

publisherconsumer两个服务中都引入依赖:

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

注意,如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。

配置消息转换器,在publisherconsumer两个服务的启动类中添加一个Bean即可:

@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jackson2JsonMessageConverter.setCreateMessageIds(true);
    return jackson2JsonMessageConverter;
}

消息转换器中添加的messageId可以便于我们将来做幂等性判断。

此时,我们到MQ控制台删除object.queue中的旧的消息。然后再次执行刚才的消息发送的代码,到MQ的控制台查看消息结构:

3.9.3.消费者接收Object

我们在consumer服务中定义一个新的消费者,publisher是用Map发送,那么消费者也一定要用Map接收,格式如下:

@RabbitListener(queues = "object.queue")
public void listenSimpleQueueMessage(Map<String, Object> msg) throws InterruptedException {
    System.out.println("消费者接收到object.queue消息:【" + msg + "】");
}

4.业务改造

案例需求:改造余额支付功能,将支付成功后基于OpenFeign的交易服务的更新订单状态接口的同步调用,改为基于RabbitMQ的异步通知。

如图:

说明,我们只关注交易服务,步骤如下:

  • 定义topic类型交换机,命名为pay.topic
  • 定义消息队列,命名为mark.order.pay.queue
  • mark.order.pay.queuepay.topic绑定,BindingKeypay.success
  • 支付成功时不再调用交易服务更新订单状态的接口,而是发送一条消息到pay.topic,发送消息的RoutingKeypay.success,消息内容是订单id
  • 交易服务监听mark.order.pay.queue队列,接收到消息后更新订单状态为已支付

4.1.配置MQ

不管是生产者还是消费者,都需要配置MQ的基本信息。分为两步:

1)添加依赖:

  <!--消息发送-->
  <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
  </dependency>

2)配置MQ地址:

spring:
  rabbitmq:
    host: 192.168.150.101 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: /hmall # 虚拟主机
    username: hmall # 用户名
    password: 123 # 密码

4.1.接收消息

在trade-service服务中定义一个消息监听类:

其代码如下:

package com.hmall.trade.listener;

import com.hmall.trade.service.IOrderService;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class PayStatusListener {

    private final IOrderService orderService;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "mark.order.pay.queue", durable = "true"),
            exchange = @Exchange(name = "pay.topic", type = ExchangeTypes.TOPIC),
            key = "pay.success"
    ))
    public void listenPaySuccess(Long orderId){
        orderService.markOrderPaySuccess(orderId);
    }
}

4.2.发送消息

修改pay-service服务下的com.hmall.pay.service.impl.PayOrderServiceImpl类中的tryPayOrderByBalance方法:

private final RabbitTemplate rabbitTemplate;

@Override
@Transactional
public void tryPayOrderByBalance(PayOrderDTO payOrderDTO) {
    // 1.查询支付单
    PayOrder po = getById(payOrderDTO.getId());
    // 2.判断状态
    if(!PayStatus.WAIT_BUYER_PAY.equalsValue(po.getStatus())){
        // 订单不是未支付,状态异常
        throw new BizIllegalException("交易已支付或关闭!");
    }
    // 3.尝试扣减余额
    userClient.deductMoney(payOrderDTO.getPw(), po.getAmount());
    // 4.修改支付单状态
    boolean success = markPayOrderSuccess(payOrderDTO.getId(), LocalDateTime.now());
    if (!success) {
        throw new BizIllegalException("交易已支付或关闭!");
    }
    // 5.修改订单状态
    // tradeClient.markOrderPaySuccess(po.getBizOrderNo());
    try {
        rabbitTemplate.convertAndSend("pay.topic", "pay.success", po.getBizOrderNo());
    } catch (Exception e) {
        log.error("支付成功的消息发送失败,支付单id:{}, 交易单id:{}", po.getId(), po.getBizOrderNo(), e);
    }
}

5.练习

5.1.抽取共享的MQ配置

将MQ配置抽取到Nacos中管理,微服务中直接使用共享配置。

5.2.改造下单功能

改造下单功能,将基于OpenFeign的清理购物车同步调用,改为基于RabbitMQ的异步通知:

  • 定义topic类型交换机,命名为trade.topic
  • 定义消息队列,命名为cart.clear.queue
  • cart.clear.queuetrade.topic绑定,BindingKeyorder.create
  • 下单成功时不再调用清理购物车接口,而是发送一条消息到trade.topic,发送消息的RoutingKeyorder.create,消息内容是下单的具体商品、当前登录用户信息
  • 购物车服务监听cart.clear.queue队列,接收到消息后清理指定用户的购物车中的指定商品

5.3.登录信息传递优化

某些业务中,需要根据登录用户信息处理业务,而基于MQ的异步调用并不会传递登录用户信息。前面我们的做法比较麻烦,至少要做两件事:

  • 消息发送者在消息体中传递登录用户
  • 消费者获取消息体中的登录用户,处理业务

这样做不仅麻烦,而且编程体验也不统一,毕竟我们之前都是使用UserContext来获取用户。

大家思考一下:有没有更优雅的办法传输登录用户信息,让使用MQ的人无感知,依然采用UserContext来随时获取用户。

参考资料:

Spring AMQP

5.4.改造项目一

思考一下,项目一中的哪些业务可以由同步方式改为异步方式调用?试着改造一下。

举例:短信发送


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1155816.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

吊打98%的JAVA同行,这份阿里P8架构师升级手册登上天梯!

前言&#xff1a; 我们都是IT人&#xff0c;所以&#xff0c;我们注定了很像。 前段时间有个朋友去阿里面试&#xff0c;作为一个社招生&#xff0c;太多痛苦了。都知道进大厂最好的时机就是应届生的时候。作为社招生&#xff0c;太难了。 我这位朋友经历了五轮面试最后才上…

【AIFEM案例操作】水泵强度分析

AIFEM是由天洑自主研发的一款通用的智能结构仿真软件&#xff0c;助力用户解决固体结构相关的静力学、动力学、振动、热力学等实际工程问题&#xff0c;软件提供高效的前后处理工具和高精度的有限元求解器&#xff0c;帮助用户快速、深入地评估结构的力学性能&#xff0c;加速产…

reactor (百万并发服务器) -- 1

为了从点滴开始&#xff0c;文章会先从一些基础socket去补充一些经常发生但是没有很深入去思考的细节。然后我们再开始去设计reactor的设计&#xff0c;可以选择跳过起过前面部分。 为了能从0开始去设计&#xff0c;测试&#xff0c;优化...整个过程会分为2-3篇文章输出&#x…

【NI-DAQmx入门】传感器基础知识

1.什么是传感器&#xff1f; 传感器可将真实的现象&#xff08;例如温度或压力&#xff09;转换为可测量的电流和电压&#xff0c;因而对于数据采集应用必不可少。接下来我们将介绍您所需的测量类型及其对应的传感器类型。在开始之前&#xff0c;您还可以先了解一些传感器术语&…

优化python程序执行速度

1、问题背景 最近使用python编写的一个蓝牙应用程序&#xff0c;在使用过程中发现传输大量数据时会产生丢包&#xff0c;导致无法接收到完整的数据包。蓝牙接收程序的代码如下。 #蓝牙数据接收处理线程def bt_recv_thread(self):recv_time time.time()while(self.thread_run)…

BUUCTF 神秘龙卷风 1

BUUCTF:https://buuoj.cn/challenges 题目描述&#xff1a; 神秘龙卷风转转转&#xff0c;科学家用四位数字为它命名&#xff0c;但是发现解密后居然是一串外星人代码&#xff01;&#xff01;好可怕&#xff01; 密文&#xff1a; 下载附件&#xff0c;解压得到一个.rar压缩…

动态蛇形卷积管状结构分割【CVPR 2023】

论文下载地址&#xff1a;Excellent-Paper-For-Daily-Reading/nn-block at main 类别&#xff1a;模块 时间&#xff1a;2023/10/31 摘要 血管和道路等管状结构在各种临床和自然环境中具有极其重要的意义&#xff0c;在这些环境中&#xff0c;精确分割对于下游任务的准确性…

C++ Set

定义 set不同于vector,strin,list这种存储容器&#xff0c;set是一种关联式容器&#xff0c;底层是搜二叉&#xff1b; 功能 set可以确定唯一的值&#xff0c;可以排序去重。 接口 insert() #include <iostream> #include<set> using namespace std;int main…

机泵设备如何通过设备健康管理平台实施预测性维护

机泵设备在工业生产中起着至关重要的作用&#xff0c;但长时间运行和频繁使用容易引发各种故障。为了提高机泵设备的可靠性和效率&#xff0c;预测性维护成为一种重要的管理策略。设备健康管理平台作为一种先进的工具&#xff0c;为机泵设备的预测性维护提供了有力支持。本文将…

第七届山东省黄炎培职业教育创新创业大赛圆满结束

山东省黄炎培职业教育创新创业大赛作为职教领域的一项品牌赛事&#xff0c;自举办以来&#xff0c;参赛院校覆盖面不断扩大&#xff0c;大赛水平和社会影响力不断提高&#xff0c;已成为全省职业教育领域的品牌赛事&#xff0c;是激发创新创业活力的重要抓手和有效载体&#xf…

UIAlertController 修改 title 或 message 样式相关

UIAlertController 文字换行后默认对齐方式为居中,若想调整其相关样式属性可以借鉴如下方式进行修改,具体实现方式 code 如下: NSString *msg "1、注销≠退出登录;\n注销:对不再使用的账号进行清空移除;注销后,App中数据将全部丢失,不可再找回;\n2、注销后,与账号相关的…

【Linux】配置JDKTomcat开发环境及MySQL安装和后端项目部署

目录 一、jdk安装配置 1. 传入资源 2. 解压 3. 配置 二、Tomcat安装 1. 解压开启 2. 开放端口 三、MySQL安装 1. 解压安装 2. 登入配置 四、后端部署 1. 数据库 2. 导入.war包 3. 修改端口 4.开启访问 一、jdk安装配置 打开虚拟机 Centos 登入账号&#xff…

控梦术(一)之什么是清明梦

控梦术 首先&#xff0c;问大家一个问题。在梦中&#xff0c;你知道自己是在做梦吗&#xff1f;科学数据表明&#xff0c;大约23%的人在过去一个月中&#xff0c;至少有一次在梦中意识到自己正在做梦。科学家把这叫做清醒梦或者叫做清明梦。科学家说&#xff0c;每个人都能学会…

【C++ 系列文章 -- 程序员考试 下午场 C++ 专题 201711 】

文章目录 1.1 C 题目六1.1.1 填空&#xff08;1&#xff09;详解1.1.2 填空&#xff08;2&#xff09;详解1.1.2.1 C this 的使用 1.1.3 填空&#xff08;3&#xff09;详解1.1.4 填空&#xff08;4&#xff09;详解1.1.5 填空&#xff08;5&#xff09;详解1.1.6 填空&#xf…

心理咨询预约小程序

随着微信小程序的日益普及&#xff0c;越来越多的人开始关注如何利用小程序来提供便捷的服务。对于心理咨询行业来说&#xff0c;搭建一个心理咨询预约小程序可以大大提高服务的效率和用户体验。本文以乔拓云平台为例&#xff0c;详细介绍如何轻松搭建一个心理咨询预约小程序。…

2023-简单点-yolox代码

yolox代码 yolox结构瞄一眼net代码常规convDWConv第一步第二步 CBA一套FocusSPPCSPDarknetFPNPANdecoupled predict headref yolox结构瞄一眼 net代码 #!/usr/bin/env python3 # -*- coding:utf-8 -*- # Copyright (c) Megvii, Inc. and its affiliates.import torch from tor…

JMeter如何开展性能测试

文章目录 性能测试指标理解透彻以及测算微聊性能测试性能测试流程准备流程 ​&#x1f451;作者主页&#xff1a;Java冰激凌 性能测试指标理解透彻以及测算 虚拟用户数&#xff1a; 线程 用户并发数&#xff1a;指在某一时间&#xff0c;一定数量的虚拟用户同时对系统的某个功…

k8s、pod

Pod k8s中的port【端口&#xff1a;30000-32767】 port &#xff1a;为Service 在 cluster IP 上暴露的端口 targetPort&#xff1a;对应容器映射在 pod 端口上 nodePort&#xff1a;可以通过k8s 集群外部使用 node IP node port 访问Service containerPort&#xff1a;容…

【C++】多态 ⑦ ( 多态机制实现原理 | 虚函数表概念 | 虚函数表工作机制 | vptr 指针 | 虚函数表运行时机制 | 虚函数与动态联编 )

文章目录 一、多态原理1、多态成立的三个条件2、虚函数表概念3、虚函数表工作机制4、vptr 指针5、虚函数表运行时机制6、虚函数与动态联编 二、代码示例 - 虚函数表1、代码实例分析 - 虚函数表创建与使用2、完整代码示例 一、多态原理 1、多态成立的三个条件 " 多态 "…

BoredHackerBlog: Cloud AV RT日记

目录 信息搜集 WEB漏洞攻击 拿shell 信息搜集 首先ifconfig查看自己IP&#xff0c; netdiscover查看同网段下主机 第三个应该是目标靶机。用nmap查看靶机开放端口&#xff1a; 开放22和8080&#xff0c;看看8080开的啥服务 WEB漏洞攻击 看到让我们输入邀请码。有输入框的第…