SpringAMQP的使用

news2024/9/24 5:29:49

目录

  • 一、什么是SpringAMQP
  • 二、基本消息队列
    • 消息发送
    • 消息接收
  • 三、WorkQueue队列
  • 四、发布订阅模型
    • FanoutExchange
    • DirectExchange
    • TopicExchange
  • 五、消息转换器

一、什么是SpringAMQP

它可以大大的简化我们的开发,不用我们再自己创建连接写一堆代码,具有便捷的发送,便捷的接收,便捷的绑定。它可以实现自动化的声明队列,交换和绑定。
SpringAMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。

二、基本消息队列

消息发送

1.第一步:引入AMQP依赖
在父工程中引入依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.第二步:在publisher中编写测试方法
2.1在publisher的application.yml,添加mq连接信息:

spring:
  rabbitmq:
    host: 192.168.19.20 #rabbitMQ的ip地址
    port: 5672 #端口
    username: lxwork
    password: 123456
    virtual-host: / #虚拟主机目录

2.2编写测试方法

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSendSimpleQueue(){
        String queueName = "simple.queue";
        String message = "hello SimpleQueue SpringAMQP";
        rabbitTemplate.convertAndSend(queueName,message);
    }
}

消息接收

1.在consumer的application.yml,添加mq连接信息:

spring:
  rabbitmq:
    host: 192.168.19.20 #rabbitMQ的ip地址
    port: 5672 #端口
    username: lxwork
    password: 123456
    virtual-host: / #虚拟主机目录

2.在consumer中编写消费逻辑:

@Component //声明成一个bean
public class SpringRabbitListener {
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg){
        System.out.println("消费者接收到simple.queue的消息:"+msg);
    }
}

三、WorkQueue队列

工作队列可以提高消息处理速度,避免队列消息堆积
在这里插入图片描述
我们让publisher发送每隔0.2秒发送50条消息
代码:

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSendWorkQueue() throws InterruptedException {
        String queueName = "simple.queue";
        String message = "hello workQueue SpringAMQP-->";
        for (int i = 0; i < 51; i++) {
            rabbitTemplate.convertAndSend(queueName,message+i);
            Thread.sleep(20);
        }
        }
}

然后在定义两个消费者都监听simple.queue队列
这里有一个消息预取机制,意思就是消费者会提前拿队列中的消息,可以配置preFetch控制消息的上限
配置consumer的yml文件

spring:
  rabbitmq:
    host: 192.168.19.20 #rabbitMQ的ip地址
    port: 5672 #端口
    username: lxwork
    password: 123321
    virtual-host: / #虚拟主机
    listener:
      simple:
        prefetch: 1 # 消息预取机制:每次只能获取一条消息,处理完成才能获取下一条消息
@Component
public class SpringRabbitListener {
    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueue1(String msg) throws InterruptedException {
        System.out.println("消费者1接收到的消息:"+msg+ "---"+LocalDateTime.now());
        Thread.sleep(20);
    }
    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueue2(String msg) throws InterruptedException{
        System.err.println("消费者2接收到的消息:"+msg+ "---"+ LocalDateTime.now());
        Thread.sleep(200);
    }
}

四、发布订阅模型

首先我们先来看看发布订阅的模型介绍:发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。实现方式是加入了exchange (交换机)。
常见的exchange类型为:

FanoutExchange:广播
DirectExchange:路由
TopicExchange:话题

在这里插入图片描述

FanoutExchange

  • Fanout Exchange会将接收到的消息路由到每一个跟其绑定的queue

  • 配置交换机有两种方式,一个是通过配置类,一个是用@RabbitListener注解。我们先使用配置类的形式来创建FanoutExchange交换机。
    在这里插入图片描述

在消费者中创建一个FanoutConfig配置类,声明队列也可以通过配置类的形式也可以通过@RabbitListener注解声明。以下是配置类的形式声明。

1.声明一个交换机和两个队列。

java代码:

//声明成配置类,让spring可以识别到它
@Configuration
public class FanoutConfig {
    //itcast.fanout,声明交换机  会将接收到的消息路由到每一个跟其绑定的queue
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("lxwork.fanout");
    }
    //fanout.queue1,声明队列
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }
    //当定队列1到交换机
    @Bean
    public Binding fanoutBinding1(Queue fanoutQueue1,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }
    //fanout.queue2
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }
    //当定队列2到交换机
    @Bean
    public Binding fanoutBinding2(Queue fanoutQueue2,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

然后在消费者的监听类中监听fanout.queue1和fanout.queue2这两个队列。
代码:

@Component
public class SpringRabbitListener {
    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg){
        System.out.println("消费者接收到fanout.queue1的消息:"+msg);
    }
    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String msg){
        System.out.println("消费者接收到fanout.queue2的消息:"+msg);
    }
}

2.在发布者的测试类中去定义发送消息的代码
代码:

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSendFanoutExchange(){
        //交换机名称
        String exchangeName = "lxwork.fanout";
        //消息
        String message = "hello everyone";
        //发送
        rabbitTemplate.convertAndSend(exchangeName,"",message);
    }
}

接着启动消费者和发布者,即可看到效果。

DirectExchange

DirectExchange会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes) 。
在这里插入图片描述

我们通过@RabbitListener注解的形式类声明队列,交换机,规则。

1.在消费者中的监听类中定义交换机
下面我们声明一个交换机两个队列
代码:

@Component
public class SpringRabbitListener {
    //使用RabbitListener注解声明队列和交换机,也可以用配置的形式声明
    /*
        * 描述下Direct交换机与Fanout交换机的差异?
        Fanout交换机将消息路由给每一一个与之绑定的队列
        Direct交换机根据RoutingKey判断路由给哪个队列
        如果多个队列具有相同的RoutingKey,则与Fanout功能类似
        基于@RabbitListener注解声明队列和交换机有哪些常见注解?
        @Queue:声明队列
        @Exchange:声明交换机
    * */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("direct.queue1"),
            exchange = @Exchange(value = "lxwork.direct",type = ExchangeTypes.DIRECT),
            key = {"red","blue"}
    ))
    public void listenDirectQueue1(String msg){
        System.out.println("消费者接收到direct.queue1的消息:"+msg);
    }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("direct.queue2"),
            exchange = @Exchange(value = "itcast.direct",type = ExchangeTypes.DIRECT),
            key = {"red","yellow"}
    ))
    public void listenDirectQueue2(String msg){
        System.out.println("消费者接收到direct.queue2的消息:"+msg);
    }
}

可以看到相比配置类的形式,通过注解声明要方便许多,直接都写在监听类中即可,不用单独创建一个配置类。
2.在发布者测试类中定义发送消息的代码
代码:

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSendDirectExchange(){
        //交换机名称
        String exchangeName = "lxwork.direct";
        //消息
        String message = "hello direct";
        //发送
        rabbitTemplate.convertAndSend(exchangeName,"red",message);
    }
}

TopicExchange

TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以.分割。
Queue与Exchange指定BindingKey时可以使用通配符:
#:代指0个或多个单词
*: 代指一个单词

在这里插入图片描述
1.在消费者中的监听类中配置交换机,声明队列,定义规则
代码:

@Component
public class SpringRabbitListener {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("topic.queue1"),
            exchange = @Exchange(value = "lxwork.topic",type=ExchangeTypes.TOPIC),
            key = {"china.#"}
    ))//#代表0个或多个,*代表一个单词
    public void listenTopicQueue1(String msg){
        System.out.println("消费者接收到topic.queu1的消息:"+msg);
    }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("topic.queue2"),
            exchange = @Exchange(value = "lxwork.topic",type=ExchangeTypes.TOPIC),
            key = {"#.news"}
    ))
    public void listenTopicQueue2(String msg){
        System.out.println("消费者接收到topic.queu2的消息:"+msg);
    }
}

2.在发布者中的测试类中定义发送消息的代码
代码:

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSendTopicExchange(){
        //交换机名称
        String exchangeName = "lxwork.topic";
        //消息
        String message = "hello topic";
        //发送
        rabbitTemplate.convertAndSend(exchangeName,"china.weather",message);
    }
}

五、消息转换器

  • 在SpringAMQP的发送方法中,接收消息的类型是Object,也就是说我们可以发送任意对象类型
    的消息,SpringAMQP会 帮我们序列化为字节后发送。
  • Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。

如果要修改只需要定义一个MessageConverter类型的Bean即可。推荐用JSON方式序列化,步骤如下:
1.导入依赖

<dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
</dependency>

2.编写配置类
在发布者和消费者中的启动类中定义:
代码:
发布者

@SpringBootApplication
public class PublisherApplication {
    public static void main(String[] args) {
        SpringApplication.run(PublisherApplication.class);
    }
    //消息转换器,覆盖spring默认的转换器,spring默认是把数据序列化之后进行传输,效率低不安全
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

消费者:

@SpringBootApplication
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
    //消息转换器,覆盖spring默认的转换器,spring默认是把数据序列化之后进行传输,效率低不安全
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/431031.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【MySQL--05】表的约束

文章目录 1.表的约束1.1空属性1.2默认值default vs null1.3列描述1.4 zerofill1.5主键primary key1.6 自增长auto_increment1.7唯一键 unique如何设计主键&#xff1f;1.8 外键 foreign key 1.表的约束 真正的约束字段的是数据类型&#xff0c;但是数据类型约束很单一&#xf…

安捷伦E4405B

18320918653 E4405B E4405B|Agilent|ESA-E系列|10G|频谱分析仪|9kHz至13.2GHz 安捷伦 Agilent 惠普 HP 测量速度&#xff1a;28次更新/秒 测量精度&#xff1a;1dB 可选用的10Hz分辨事宽滤波器 机箱可容纳6插槽选件卡 97dB三阶动态范围 能在现场使用的坚固&#xff0c…

mycat2安装配置,mycat2分库分表,mycat2一库多表,mycat2自增id

1、官网下载&#xff08;官网下载地址&#xff09; 官网下载地址 Index of /2.0/ 下载模板 下载jdk包 下载好后吧jdk包房到mycat的lib目录下 2、配置启动 配置结构 mycat配置文件夹 clusters- prototype.cluster.json //无集群的时候自动创建- c0.cluster.json- c1.cluster…

UML与代码的对应关系

五种关系的耦合强弱比较&#xff1a;依赖<关联<聚合<组合<继承 依赖 虚线箭头 可描述为&#xff1a;Uses a 依赖是类的五种关系中耦合最小的一种关系。 因为在生成代码的时候&#xff0c;这两个关系类都不会增加属性。 注意1&#xff1a; Water类的生命期&…

【机器学习】独立成分分析(ICA)及Matlab实现

独立成分分析及Matlab实现1.问题引入2.ICA原理3.ICA算法步骤4.性质与优点5.程序代码6.程序分析7.运行结果1.问题引入 独立成分分析&#xff08;ICA&#xff09;最初由Aapo Hyvrinen等人于1980年代提出&#xff0c;其起源可以追溯到对神经科学和信号处理领域的研究需求。ICA的提…

C语言判断一个日期是在该年的第几天案例讲解

今天是2023年4月11号&#xff0c;我们就用今天举例得出是2023年的第几天。 思路分析 1&#xff09;我们想知道2023年4月11号是2023年的第几天&#xff0c;只需要把1到3月份的天数累加求和然后加上今天日期也就是11就可以算出2023年4月11号是2023年的第几天。 推广&#xff1a;…

kafka集群节点重启后未被topic识别

1.案例 kafka集群的节点重启后,topic为apex的主题识别不到重启后的broker节点id,但是还能识别到副本集还在原来的broker节点上 在kafka manager上查看 继续往下查看 2.查看kafka日志报错原因 以下是两个不同的broker节点报错的报错日志 tail -f /etc/kafka/kafka/logs/ka…

(排序9)非比较排序之计数排序

非比较排序之计数排序 之前讲的七种排序方法的话&#xff0c;都是比较排序&#xff1b;除此之外还有三种非比胶排序&#xff1a;计数排序&#xff0c;基数排序&#xff0c;桶排序。后面两个实际应用没啥&#xff0c;没啥价值。非比较排序的话&#xff0c;他的条件都比较苛刻&a…

HTTP 和 HTTPS(请求响应报文格式 + 请求方法 + 响应状态码 + HTTPS 加密流程 + Cookie 和 Session)

文章目录1. HTTP 是什么2. HTTP 请求报文和响应报文的格式1&#xff09;请求报文格式2&#xff09;响应报文格式3&#xff09;报文中空行的作用3. HTTP 的长连接和短连接4. URL1&#xff09;在浏览器中输入 www.baidu.com 后执行的全部过程5. HTTP 常用的请求方法6. GET 和 POS…

自媒体都在用的5个素材网站,视频、音效、图片全部免费下载~

推荐几个自媒体必备的素材库&#xff0c;免费可商用&#xff0c;建议收藏&#xff01; 1、菜鸟图库 视频素材下载_mp4视频大全 - 菜鸟图库 国内超大的素材库&#xff0c;在这里你可以找到设计、办公、图片、视频、音频等各种素材。视频素材就有上千个&#xff0c;全部都很高清…

后端应用架构

微服务架构划分 ⚠️ 生产环境实际部署中&#xff0c;基础能力、公共基础能力将分别在国内、美国集群部署。在没有引入数据同步的场景下&#xff0c;数据是隔离的。 接入层&#xff08;交互层&#xff09; 接入层主要完成协议转换、熔断限流、统一鉴权等能力&#xff0c;起到保…

Linux网络服务之ftp

目录 一.ftp的相关知识1.1 ftp的简介1.2 ftp的数据连接模式 二.svftpd的安装和配置2.1 svftpd安装3.2 设置本地用户验证访问ftp3.2.1 设置本地用户可以访问ftp&#xff0c;禁止匿名用户登录3.2.2 对本地用户访问切换目录进行限制 3.3 黑名单和白名单的使用3.3.1 黑名单的使用3.…

ASEMI代理AD9959BCPZ原装ADI车规级AD9959BCPZ

编辑&#xff1a;ll ASEMI代理AD9959BCPZ原装ADI车规级AD9959BCPZ 型号&#xff1a;AD9959BCPZ 品牌&#xff1a;ADI /亚德诺 封装&#xff1a;LFCSP-56 批号&#xff1a;2023 安装类型&#xff1a;表面贴装型 引脚数量&#xff1a;56 类型&#xff1a;车规级芯片 工作…

【18图详解3种典型网络拓扑:如何设计一个网络?】

前言 在网络设计的时候&#xff0c;网络架构师需要根据组网的规模设计不同的组网架构&#xff0c;今天介绍3种典型网络架构。 小型组网架构 1、网络拓扑 终端用户接入到交换机&#xff0c;交换机直连防火墙构成的简单网络&#xff0c;防火墙连接internet&#xff0c;对内网的用…

Java并发控制 学习笔记1

一、并发控制的方法 1、悲观锁&#xff1a;常用的互斥锁都属于悲观锁&#xff0c;一个线程访问共享资源时其他线程不能访问。 2、乐观锁&#xff1a;允许同时访问共享数据&#xff0c;只有在提交时利用如版本号检查是否有冲突&#xff0c;应用github。 3、什么时候用乐观锁、什…

开发必备:EsayCode使用以及Oracle自定义模板

前言 写前先问一句&#xff0c;不会还有人在手动写这些基础的sql语句吧&#xff1f;&#xff01; 最近在做Oracle的项目&#xff0c;手写mapper和entity文件真是写到手软&#xff0c;以前MySQL都是找的线上自动生成的&#xff0c;现在也不行了。 找了很长时间&#xff0c;也…

【Python】tkinter的简单使用(Tk对象、三大布局、变量、事件)

本文目录1.tkinter2.Tk对象3.三大布局3.1 pack布局3.2 grid布局3.3 place布局4.变量5.事件1.tkinter tkinter是Tcl/Tk GUI工具包&#xff08;即使用Tcl语言开发Tk图形库&#xff09;的标准Python接口&#xff0c;支持在Windows、macOS、Linux多平台运行。 tkinter是Python自带…

DPDK入门(环境搭建以及小demo)

文章目录零、从0开始配置dpdk环境的虚拟机一、dpdk的编译usertool/dpdk-setup.sh二、dpdk需要什么配置来支持1.多队列网卡2.巨页三、解析接收网络数据的过程经历了什么1.物理网卡2.NIC3.内核协议栈4.标准接口层Posix API5. 应用层上述过程发生的拷贝四、DPDK介绍基于上述接收网…

Github采用Http Push失败

Github采用Http Push失败 Github的密码凭证从2021年起开始就不能用了&#xff0c;现在采用http去push代码时候提示输入的密码要换成令牌&#xff08;token&#xff09;才可以。 如何在Github上生成自己的令牌呢&#xff1f; &#xff08;1&#xff09;简单来说就是将原来输入…

Linux搭建GitLab私有仓库,并内网穿透实现公网访问

文章目录前言1. 下载Gitlab2. 安装Gitlab3. 启动Gitlab4. 安装cpolar5. 创建隧道配置访问地址6. 固定GitLab访问地址6.1 保留二级子域名6.2 配置二级子域名7. 测试访问二级子域名前言 GitLab 是一个用于仓库管理系统的开源项目&#xff0c;使用Git作为代码管理工具&#xff0c…