SSM整合RabbitMQ目录
- 前言
- 版本
- 实现
- 目录参考
- pom.xml依赖
- rabbitmq.properties配置文件
- spring-rabbitmq.xml
- spring-mvc.xml或applicationContext.xml
- rabbitmq目录下
- MessageConsumer.java
- MessageConsumer2.java
- MessageProducer.java
- MessageConstant.java
- 测试调用
- 扩展
- 消息重发
前言
SSM框架整合RabbitMQ【比较简单,复制粘贴可用】
本人使用的Spring版本是4.x
版本
RabbitMQ相关
erl10.0.1
RabbitMQ3.7.9
安装步骤参考:https://www.cnblogs.com/saryli/p/9729591.html
相关依赖
spring4.0.2.RELEASE
spring-rabbit1.3.5.RELEASE
实现
目录参考
这是我整合时的项目结构
关键:rabbitmq文件包和rabbitmq.properties、spring-rabbitmq.xml、spring-mvc.xml
pom.xml依赖
在现成的SSM项目中整合
<!--rabbitmq依赖 -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.3.5.RELEASE</version>
</dependency>
rabbitmq.properties配置文件
将 rabbitmq.properties配置文件添加到resources目录下
mq.host=127.0.0.1
mq.username=guest
mq.password=guest
mq.port=5672
mq.virtual-host=/
spring-rabbitmq.xml
将spring-rabbitmq.xml添加到resources目录下
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd">
<!-- 引入连接配置文件 -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="location" value="classpath:rabbitmq.properties" />
</bean>
<!-- 定义rabbitmq connectionFactory连接工厂 -->
<rabbit:connection-factory id="connectionFactory"
username="${mq.username}"
password="${mq.password}"
host="${mq.host}"
port="${mq.port}"
virtual-host="${mq.virtual-host}" />
<!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
<rabbit:admin id="connectAdmin" connection-factory="connectionFactory" />
<!--定义queue队列 -->
<rabbit:queue name="queueTest" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin" />
<rabbit:queue name="queueTest1" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin" />
<!-- 定义direct exchange(也就是交换机),绑定queueTest队列(queueTest名称可以自定义) -->
<rabbit:direct-exchange name="exchangeTest" durable="true" auto-delete="false" declared-by="connectAdmin">
<rabbit:bindings>
<rabbit:binding queue="queueTest" key="queueTestKey"></rabbit:binding>
<rabbit:binding queue="queueTest1" key="queueTestKey1"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<!--定义rabbit template用于数据的接收和发送 将amqpTemplate对象绑定exchange中交换机-->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="exchangeTest" />
<!-- 消息接收处理 -->
<bean id="messageReceiver" class="com.rabbitmq.MessageConsumer"></bean>
<bean id="messageReceiver1" class="com.rabbitmq.MessageConsumer2"></bean>
<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象(监听) -->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener queues="queueTest" ref="messageReceiver" />
<rabbit:listener queues="queueTest1" ref="messageReceiver1" />
</rabbit:listener-container>
<!-- 扫描注入使用注解实例对象 -->
<context:component-scan base-package="com.rabbitmq" />
</beans>
spring-mvc.xml或applicationContext.xml
我这里使用的spring-mvc.xml,根据自己配置文件使用
<import resource="classpath:spring-rabbitmq.xml" />
将这个import引入添加到 spring-mvc.xml 里的最前面,如果不添加到前面可能会报错
rabbitmq目录下
这个目录下的java文件已在spring-rabbitmq.xml中进行扫描注入
MessageConsumer.java
说明:MessageConsumer和MessageConsumer2其实都可以使用同一个类,修改xml指向即可,但是分开明了些
package com.rabbitmq;
import java.nio.charset.Charset;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
/**
* @Title消息消费者
* @date 2023/10/8
*/
public class MessageConsumer implements MessageListener {
@Override
public void onMessage(Message message) {
// 逻辑处理
System.out.println("message------->:" + new String(message.getBody(), Charset.forName("utf-8")));
}
}
MessageConsumer2.java
package com.rabbitmq;
import java.nio.charset.Charset;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
/**
* @Title消息消费者2
* @date 2023/10/8
*/
public class MessageConsumer2 implements MessageListener {
@Override
public void onMessage(Message message) {
// 逻辑处理
System.out.println("message2------->:" + new String(message.getBody(), Charset.forName("utf-8")));
}
}
MessageProducer.java
package com.rabbitmq;
import javax.annotation.Resource;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Service;
/**
* @Title 消息生产者
* @date 2023/10/8
*/
@Service
public class MessageProducer {
@Resource
private AmqpTemplate amqpTemplate;
public void sendMessage(String key, Object message){
amqpTemplate.convertAndSend(key, message);
}
}
MessageConstant.java
package com.rabbitmq;
/**
* @Title 消息队列常量
* @date 2023/10/8
*/
public class MessageConstant{
public static String queueTestKey = "queueTestKey";
public static String queueTestKey1 = "queueTestKey1";
}
测试调用
比如这个下面在某个类里作为接口调用测试
@Autowired
private MessageProducer messageProducer;
@RequestMapping(value = "/testMq")
@ResponseBody
public Result testMq(HttpServletRequest request) throws IOException {
messageProducer.sendMessage(MessageConstant.queueTestKey, "登录");
messageProducer.sendMessage(MessageConstant.queueTestKey1, "退出");
return Result.success("测试成功");
}
调用接口后打印结果
连接结果
以上即可!
扩展
包括消息手动确认,消息失败重新加入队列处理
消息重发
package com.rabbitmq;
import java.nio.charset.Charset;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import com.rabbitmq.client.Channel;
/**
* @Title 消息消费者
* @date 2023/10/8
*/
public class MessageConsumer2 implements ChannelAwareMessageListener {
private int aa = 1;
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try {
// 逻辑处理
if(aa == 1) {
aa = 2;
int a = 1/0;
}
System.out.println("成功处理确认message2------->:" + new String(message.getBody(), Charset.forName("utf-8")));
// 消费者ack确认【消息处理成功确认】
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e) {
System.out.println("失败重新入队message2------->:" + new String(message.getBody(), Charset.forName("utf-8")));
// 消费者reject确认【消息失败重新加入队列-重发】
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
}
参考类似博客1:https://blog.csdn.net/u012988901/article/details/89499634
参考类似博客2:https://blog.csdn.net/weixin_42654295/article/details/109006276