本篇博文目录
- 一.Spring整合RabbitMQ
- 1.导入依赖
- 2.生产者
- 3.消费者
- 4.测试
- 二.SpringBoot整合RabbitMQ
- 1.导入依赖
- 2.生产者
- 3.消费者
- 4.测试
- 三.代码下载
一.Spring整合RabbitMQ
在spring项目中使用RabbitMQ的Exchange模式的Topics,项目分为消费者spring项目和生产者spring项目,其中都导入amqp-client和spring-rabbit依赖,前者为RabbitMQ的client依赖,后者为spring整合RabbitMQ的依赖。
1.导入依赖
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.3.0</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.0.5.RELEASE</version>
</dependency>
</dependencies>
备注:amqp-client 最新版本为5.16.0;spring-rabbit最新版本3.0.1。
2.生产者
- 在resources包中创建applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!-- 设置连接工厂,配置基本参数 -->
<rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="guest" password="guest"
virtual-host="/test"></rabbit:connection-factory>
<!--
fanout-exchange | direct-exchange | topic-exchange
声明一个名为topicExchange的topic交换机,如果这个交换机不存在,则自动创建
-->
<rabbit:topic-exchange name="topicExchange" auto-declare="true">
</rabbit:topic-exchange>
<!-- Spring为我们封装了RabbitTemplate对象来简化生产者发送数据的过程,对常用的方法进行了封装。 -->
<rabbit:template id="template" connection-factory="connectionFactory" exchange="topicExchange"></rabbit:template>
<!--在生产者中配置template对象,用于发送数据-->
<bean id="newsProducer" class="com.itlaoqi.rabbit.exchange.NewsProducer">
<property name="rabbitTemplate" ref="template"/>
</bean>
<!-- RabbitAdmin对象用于创建、绑定、管理队列与交换机 -->
<rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory"/>
</beans>
生产者的配置信息其实就是将Rabbit-client代码中的代码配置转换为配置文件的配置,会简单的进行修改即可,下来对上面相关配置信息进行解释。
<rabbit:connection-factory
标签是设置连接工厂,用来初始化Rabbitmq的连接对象,根据自己的Rabbitmq的配置信息填入连接对象的参数。
- id:bean的名称
- host:主机地址
- port:主机端口号
- username:登入的用户名
- password:登入的密码
- virtual-host:虚拟路径
<rabbit:topic-exchange
标签用来初始化Topics类型的交换机信息,详细参数解释如下:
- id:bean的名称
- name:交换机的名称
- auto-declare:自动创建
备注:交换机有三种方式:fanout-exchange(Sub/Pub) | direct-exchange(Routing) | topic-exchange(Topics)
<rabbit:template
Spring为我们封装了RabbitTemplate对象来简化生产者发送数据的过程,对常用的方法进行了封装。
- id:bean的名称
- connection-factory:连接工厂
- exchange:交换机
既然有了上面的RabbitTemplate对象,我们可以通过RabbitTemplate对象来发送数据,创建一个NewsProducer类,用来处理数据的发送:
NewsProducer类:用来处理生产者的数据发送(该类是一个封装类,数据的发送通过该类的sendNews()进行实现)
package com.itlaoqi.rabbit.exchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import java.util.Date;
public class NewsProducer {
private RabbitTemplate rabbitTemplate = null;
public RabbitTemplate getRabbitTemplate() {
return rabbitTemplate;
}
public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendNews(String routingKey , News news){
//convertAndSend 用于向exchange发送数据
//第一个参数是routingkey
//第二个参数是要传递的对象,可以是字符串、byte【】或者任何实现了【序列化接口】的对象
rabbitTemplate.convertAndSend(routingKey , news);
System.out.println("新闻发送成功");
}
public static void main(String[] args) {
ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");
NewsProducer np = (NewsProducer)ctx.getBean("newsProducer");
np.sendNews("us.20190101" , new News("新华社" , "特朗普又又又退群啦" , new Date() , "国际新闻内容"));
np.sendNews("china.20190101" , new News("凤凰TV" , "XXX企业荣登世界500强" , new Date() , "国内新闻内容"));
}
}
上面的NewsProducer类,通过外部传递一个RabbitTemplate对象进行初始化,然后通过 sendNews()方法中的
rabbitTemplate.convertAndSend(routingKey , news);
来发送数据,routingKey 表示路由主键,news表示数据对象,详细的news代码如下,注意该数据对象需要实现Serializable不然会因为无法序列化而抛出异常信息。
News类:传递的数据对象
package com.itlaoqi.rabbit.exchange;
import java.io.Serializable;
import java.util.Date;
public class News implements Serializable{
private String source;
private String title;
private Date createTime;
private String content;
public News(String source, String title, Date createTime, String content) {
this.source = source;
this.title = title;
this.createTime = createTime;
this.content = content;
}
public String getSource() {
return source;
}
public void setSource(String source) {
this.source = source;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
}
<bean id="newsProducer"
创建NewsProducer类的bean对象,并传入初始化参数rabbitTemplate,通过该bean对象实现数据的发送,详细代码如下:
public static void main(String[] args) {
ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");
NewsProducer np = (NewsProducer)ctx.getBean("newsProducer");
np.sendNews("us.20190101" , new News("新华社" , "特朗普又又又退群啦" , new Date() , "国际新闻内容"));
np.sendNews("china.20190101" , new News("凤凰TV" , "XXX企业荣登世界500强" , new Date() , "国内新闻内容"));
}
<rabbit:admin
RabbitAdmin对象用于创建、绑定、管理队列与交换机
- id:bean的名称
- connection-factory:连接工厂
通过上面的<rabbit:admin,我们可以通过RabbitAdmin对象来实现队列和交换机的创建,绑定和管理,详细的测试代码如下:
package com.itlaoqi.rabbit.exchange;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import javax.annotation.Resource;
import java.util.HashMap;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath:applicationContext.xml"})
public class RabbitAdminTestor {
@Resource(name="rabbitAdmin")
private RabbitAdmin rabbitAdmin;
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 创建交换机
*/
@Test
public void testCreateExchange(){
rabbitAdmin.declareExchange(new FanoutExchange("test.exchange.fanout" , true ,false));
rabbitAdmin.declareExchange(new DirectExchange("test.exchange.direct" , true ,false));
rabbitAdmin.declareExchange(new TopicExchange("test.exchange.topic" , true ,false));
}
/**
* 创建队列,绑定交换机并通过生产者发送数据
*/
@Test
public void testQueueAndBind(){
rabbitAdmin.declareQueue(new Queue("test.queue"));
rabbitAdmin.declareBinding(new Binding(
"test.queue", Binding.DestinationType.QUEUE,
"test.exchange.topic", "#", new HashMap<String, Object>()
));
rabbitTemplate.convertAndSend("test.exchange.topic" , "abc" , "abc123");
}
/**
* 删除队列和交换机
*/
@Test
public void testDelete(){
rabbitAdmin.deleteQueue("test.queue");
rabbitAdmin.deleteExchange("test.exchange.fanout");
rabbitAdmin.deleteExchange("test.exchange.direct");
rabbitAdmin.deleteExchange("test.exchange.topic");
}
}
3.消费者
- 在resources包中创建applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="guest" password="guest"
virtual-host="/test"></rabbit:connection-factory>
<rabbit:admin connection-factory="connectionFactory"></rabbit:admin>
<!--创建队列-->
<rabbit:queue name="topicQueue" auto-declare="true" auto-delete="false" durable="false" exclusive="false"/>
<!--交换机与队列绑定,并指明筛选条件-->
<rabbit:topic-exchange name="topicExchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding queue="topicQueue" pattern="us.*"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--启动消费者后,Spring底层自动监听对应的topicQueue数据,一旦有新的消息进来,自动传入到consumer Bean的recv的News参数中,
之后再程序对News进一步处理-->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="consumer" method="recv" queue-names="topicQueue"/>
</rabbit:listener-container>
<bean id="consumer" class="com.itlaoqi.rabbitmq.NewsConsumer"></bean>
</beans>
消费者的配置信息和生产者的存在很多相同,下来主要对没有出现过的配置信息进行解释。
<rabbit:queue
创建队列
- id:bean的名称
- name:队列的名称
- auto-declare:自动创建
- auto-delete:自动删除
- exclusive:是否独占
- durable:是否持久化
<rabbit:bindings
交换机与队列绑定,并指明筛选条件
- queue:队列
- pattern:筛选条件,*表示任意一个字符,#表示任意多个字符
<rabbit:listener-container
启动消费者后,Spring底层自动监听对应的topicQueue数据,一旦有新的消息进来,自动传入到consumer Bean的recv的News参数中,之后再程序对News进一步处理
- connection-factory:连接工厂
<rabbit:listener
具体的监听对象
- ref:监听对象的bean
- method:监听方法
- queue-names:队列名称
<bean id="consumer"
监听的bean对象,具体的监听方法recv
package com.itlaoqi.rabbitmq;
import com.itlaoqi.rabbit.exchange.News;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class NewsConsumer {
public void recv(News news){
System.out.println("接收到最新新闻:" + news.getTitle() + ":" + news.getSource());
}
public static void main(String[] args) {
//初始化IOC容器
ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");
}
}
4.测试
二.SpringBoot整合RabbitMQ
1.导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.生产者
- 添加一个名为springboot-exchange的交换机
- application.properties配置
# rabbitmq连接配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/test
spring.rabbitmq.connection-timeout=1000ms
#producer
#confirmlistener
# 在springboot2.2.0.RELEASE版本之后该属性已经过时了,不能在使用了
#spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-confirm-type=CORRELATED
#returnlistener
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
spring.rabbitmq.publisher-confirms
配置信息在springboot2.2.0.RELEASE版本之后该属性已经过时了,不能在使用了,而是采用spring.rabbitmq.publisher-confirm-type,该属性一共有三种值(来源于:https://blog.csdn.net/qingyuan2014/article/details/113916449):
- NONE值是禁用发布确认模式,是默认值
- CORRELATED值是发布消息成功到交换器后会触发回调方法
- SIMPLE值经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker;
spring.rabbitmq.publisher-returns
开启rabbitmq确认机制的return
spring.rabbitmq.template.mandatory
mandatory true代表如果消息无法正常投递则return回生产者,如果false,则直接将消息放弃
- 生产者数据发送对象
MessageProducer类:用于生产者数据发送,通过sendMsg()里的rabbitTemplate.convertAndSend()方法发送数据。
package com.itlaoqi.rabbitmq.springboot;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Date;
@Component
public class MessageProducer {
@Resource
private RabbitTemplate rabbitTemplate ;
RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
/**
* CorrelationData 消息的附加信息,即自定义id
* isack 代表消息是否被broker(MQ)接收 true 代表接收 false代表拒收。
* cause 如果拒收cause则说明拒收的原因,帮助我们进行后续处理
*/
public void confirm(CorrelationData correlationData, boolean isack, String cause) {
System.out.println(correlationData);
System.out.println("ack:" + isack);
if(isack == false){
System.err.println(cause);
}
}
};
RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingkey) {
System.err.println("Code:" + replyCode + ",Text:" + replyText );
System.err.println("Exchange:" + exchange + ",RoutingKey:" + routingkey );
}
};
public void sendMsg(Employee emp){
//CorrelationData对象的作用是作为消息的附加信息传递,通常我们用它来保存消息的自定义id
CorrelationData cd = new CorrelationData(emp.getEmpno() + "-" + new Date().getTime());
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
rabbitTemplate.convertAndSend("springboot-exchange" , "hr.employee" , emp , cd);
}
}
和spring相比springboot使用起来就比较简单了,只需要在application.properties中进行配置(没有进行配置的参数,采用默认参数),就可以使用Rabbitmq提供的RabbitTemplate进行操作,上文中的confirmCallback和returnCallback是Rabbitmq消息确认机制里面对Confirm和return进行监听,通过rabbitTemplate.setConfirmCallback()和rabbitTemplate.setReturnCallback()进行设置。在springboot2.2.0.RELEASE版本之后ReturnCallback和setReturnCallback 已经过时了,建议采用ReturnsCallback和setReturnsCallback(),ReturnsCallback和ReturnCallback的区别就是后者对参数进行了封装。
ReturnsCallback中returnedMessage的参数为ReturnedMessage
ReturnCallback中returnedMessage的参数为(message, replyCode, replyText, exchange, routingkey)
convertAndSend方法中第四个参数CorrelationData 对象作用是作为消息的附加信息传递,通常我们用它来保存消息的自定义id
- 数据对象
Employee 类:传递的数据对象,注意需要实现Serializable接口不然会因为无法序列化而产生异常。
package com.itlaoqi.rabbitmq.springboot;
import java.io.Serializable;
public class Employee implements Serializable{
private String empno;
private String name;
private Integer age;
public Employee(String empno, String name, Integer age) {
this.empno = empno;
this.name = name;
this.age = age;
}
public String getEmpno() {
return empno;
}
public void setEmpno(String empno) {
this.empno = empno;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
}
- 发送数据
- 添加一个名为springboot-queue的队列,并绑定到交换机springboot-exchange上,如果不怎么做的话会进入到return中,如下:
添加后,再次运行:
3.消费者
- application.properties配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/test
spring.rabbitmq.connection-timeout=1000ms
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.concurrency=1
spring.rabbitmq.listener.simple.max-concurrency=5
spring.rabbitmq.listener.simple.acknowledge-mode
表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto,下面对各种参数进行了解释。来源于(https://juejin.cn/post/7029232312197840904)
- NONE : 不确认 :
1、默认所有消息消费成功,会不断的向消费者推送消息
2、因为 rabbitmq 认为所有消息都被消费成功。所以队列中存在丢失消息风险。
- AUTO:自动确认
1、根据消息处理逻辑是否抛出异常自动发送 ack(正常)和nack(异常)给服务端,如果消费者本身逻辑没有处理好这条数据就存在丢失消息的风险。
2、使用自动确认模式时,需要考虑的另一件事情就是消费者过载。
- MANUAL:手动确认
1、手动确认在业务失败后进行一些操作,消费者调用 ack、nack、reject 几种方法进行确认,如果消息未被 ACK 则发送到下一个消费者或重回队列。
2、ack 用于肯定确认;nack 用于 否定确认 ;reject 用于否定确认(一次只能拒绝单条消息)
spring.rabbitmq.listener.simple.concurrency
: 最小的消费者数量
spring.rabbitmq.listener.simple.max-concurrency
: 最大的消费者数量
来源于:https://blog.csdn.net/ystyaoshengting/article/details/105267542
- 消费者接收类
package com.itlaoqi.rabbitmq.springboot;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
@Component
public class MessageConsumer {
//@RabbitListener注解用于声明式定义消息接受的队列与exhcange绑定的信息
//在SpringBoot中,消费者这端使用注解获取消息
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value="springboot-queue" , durable="true"),
exchange = @Exchange(value = "springboot-exchange" , durable = "true" , type = "topic") ,
key = "#"
)
)
//用于接收消息的方法
@RabbitHandler //通知SpringBoot下面的方法用于接收消息。
// 这个方法运行后将处于等待的状态,有新的消息进来就会自动触发下面的方法处理消息
//@Payload 代表运行时将消息反序列化后注入到后面的参数中
public void handleMessage(@Payload Employee employee , Channel channel ,
@Headers Map<String,Object> headers) {
System.out.println("=========================================");
System.out.println("接收到" + employee.getEmpno() + ":" + employee.getName());
//所有消息处理后必须进行消息的ack,channel.basicAck()
Long tag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
try {
channel.basicAck(tag , false);
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("=========================================");
}
}
使用 @RabbitListener 注解用于声明式定义消息接受的队列与exhcange绑定的信息,其中 value = @Queue()指向队列, exchange = @Exchange()指向交换机,key表示条件。
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value="springboot-queue" , durable="true"),
exchange = @Exchange(value = "springboot-exchange" , durable = "true" , type = "topic") ,
key = "#"
)
)
使用 @RabbitHandler 注解用于表明该方法为接收消息的方法,运行后将处于等待的状态,有新的消息进来就会自动触发下面的方法进行处理,使用 @Payload 注解表示运行时将消息反序列化后注入到后面的参数中, 使用 @Headers 注解表示消息头的信息,包括描述的辅助信息可以用来进行消息确认。
@RabbitHandler //通知SpringBoot下面的方法用于接收消息。
// 这个方法运行后将处于等待的状态,有新的消息进来就会自动触发下面的方法处理消息
//@Payload 代表运行时将消息反序列化后注入到后面的参数中
public void handleMessage(@Payload Employee employee , Channel channel ,
@Headers Map<String,Object> headers) {
System.out.println("=========================================");
System.out.println("接收到" + employee.getEmpno() + ":" + employee.getName());
//所有消息处理后必须进行消息的ack,channel.basicAck()
Long tag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
try {
channel.basicAck(tag , false);
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("=========================================");
}
}
headers中的内容:
{amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=springboot-exchange, amqp_deliveryTag=3, amqp_consumerQueue=springboot-queue, amqp_redelivered=false, amqp_receivedRoutingKey=hr.employee, spring_listener_return_correlation=eac10f9e-b1e0-4b5a-92a1-c6aa8ea1f959, spring_returned_message_correlation=3306-1674918565983, id=6e5d0290-77c6-58c3-6fe7-7ed554a4667a, amqp_consumerTag=amq.ctag-mt44wHOpGCeH_e69j7zFwg, amqp_lastInBatch=false, contentType=application/x-java-serialized-object, timestamp=1674918566121}
其中headers里的 spring_returned_message_correlation=3306-1674918565983
就是生产者中CorrelationData的自定义id
4.测试
三.代码下载
在我的微信公众号后台回复 rabbitmq
就可以获取本篇博文相关的源代码了,如果有什么疑问后台给为留言,我看见会第一时间回复你的。