目录
- 0.为什么用消息队列
- 1.代码文件创建结构
- 2.pom.xml文件
- 3.三个配置文件开发和生产环境
- 4.Rabbitmq 基础配置类 TtlQueueConfig
- 5.建立netty服务器 + rabbitmq消息生产者
- 6.建立常规队列的消费者 Consumer
- 7.建立死信队列的消费者 DeadLetterConsumer
- 8.建立mapper.xml文件
- 9.建立mapper文件接口
- 10.建立接口ProducerController 测试
- 11.测试接口请求1
- 12.测试接口请求2
- 13.网络助手测试NetAssist.exe
- 14.观察rabbitmq界面管理简单介绍
0.为什么用消息队列
- 流量消峰
- 应用解耦
- 异步确认
1.代码文件创建结构
2.pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.test</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo</name>
<description>Demo project for Spring Boot</description>
<packaging>war</packaging>
<url/>
<licenses>
<license/>
</licenses>
<developers>
<developer/>
</developers>
<scm>
<connection/>
<developerConnection/>
<tag/>
<url/>
</scm>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.86.Final</version> <!-- 根据需要选择版本 -->
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>dynamic-datasource-spring-boot-starter</artifactId>
<version>3.5.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>easyexcel</artifactId>
<version>3.1.1</version>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
</project>
3.三个配置文件开发和生产环境
文件一 application.properties
#开发环境
spring.profiles.active=dev
#生产环境
#spring.profiles.active=prod
文件二 application-dev.properties 开发环境
spring.application.name=demo
server.servlet.context-path=/demo1
server.port=1001
#spring.main.allow-circular-references=true
spring.rabbitmq.host=实际ip地址
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root
spring.rabbitmq.virtual-host=/
##创建单线程监听容器 本项目目前用的是单线程 这里预期值2000 需根据实际情况调整
spring.rabbitmq.listener.simple.prefetch=2000
##创建多线程监听容器
#spring.rabbitmq.listener.direct.prefetch=2000
#spring.rabbitmq.listener.simple.acknowledge-mode=auto
# application.properties 示例
spring.rabbitmq.listener.simple.acknowledge-mode=MANUAL
#开启消息确认机制
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
netty.server.port=1002
netty.server.bossThreads=1
netty.server.workerThreads=1
server.max-http-header-size=655360
mybatis.mapper-locations=classpath:mapper/*Mapper.xml
mybatis.type-aliases-package=com.mt.entity
spring.mvc.pathmatch.matching-strategy=ant_path_matcher
logging.level.com.ysd.mapper=info
logging.file.name=demo.log
logging.level.com.mt.mapper=info
#mybatis.configuration.log-impl=org.apache.ibatis.logging.stdout.StdOutImpl
mybatis.configuration.map-underscore-to-camel-case=true
pagehelper.helper-dialect=mysql
pagehelper.reasonable=true
spring.main.allow-circular-references=true
spring.jackson.default-property-inclusion=non_null
#<!--=====================数据库1 ====================-->
spring.datasource.dynamic.英文数据库名称1.url=jdbc:mysql://ip地址:3306/英文数据库名称?serverTimezone=UTC&useUnicode=true&characterEncoding=utf8&useSSL=false
spring.datasource.dynamic.datasource.英文数据库名称1.username=root
spring.datasource.dynamic.datasource.英文数据库名称1.password=root
spring.datasource.dynamic.datasource.英文数据库名称1.driver-class-name=com.mysql.cj.jdbc.Driver
#<!--=====================数据库2 ====================-->
spring.datasource.dynamic.英文数据库名称2.url=jdbc:mysql://ip地址:3306/英文数据库名称?serverTimezone=UTC&useUnicode=true&characterEncoding=utf8&useSSL=false
spring.datasource.dynamic.datasource.英文数据库名称2.username=root
spring.datasource.dynamic.datasource.英文数据库名称2.password=root
spring.datasource.dynamic.datasource.英文数据库名称2.driver-class-name=com.mysql.cj.jdbc.Driver
4.Rabbitmq 基础配置类 TtlQueueConfig
rabbitmq基础信息配置已经在application-dev.properities中进行配置过一部分
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class TtlQueueConfig {
public static final String X_EXCHANGE = "X";
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
public static final String DEAD_LETTER_QUEUE = "QD";
@Bean("xExchange")
public DirectExchange xExchange() {
return new DirectExchange(X_EXCHANGE);
}
@Bean("yExchange")
public DirectExchange yExchange() {
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
@Bean("queueA")
public Queue queueA() {
Map<String, Object> args = new HashMap<>(3);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
//声明队列的 TTL
args.put("x-message-ttl", 150000);//超出150秒没有被消费 就会进入死信队列
return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
}
@Bean
public Binding queueaBindingX(@Qualifier("queueA") org.springframework.amqp.core.Queue queueA,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
//声明队列 B ttl 为 40s 并绑定到对应的死信交换机
@Bean("queueB")
public Queue queueB() {
Map<String, Object> args = new HashMap<>(3);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
//声明队列的 TTL
args.put("x-message-ttl", 40000);//超出40秒没有被消费 就会进入死信队列
return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
}
//声明队列 B 绑定 X 交换机
@Bean
public Binding queuebBindingX(@Qualifier("queueB") Queue queue1B,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queue1B).to(xExchange).with("XB");
}
//声明死信队列 QD
@Bean("queueD")
public Queue queueD() {
return new Queue(DEAD_LETTER_QUEUE);
}
//声明死信队列 QD 绑定关系
@Bean
public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,
@Qualifier("yExchange") DirectExchange yExchange) {
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
@Bean
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
//这个connectionFactory就是我们自己配置的连接工厂直接注入进来
simpleRabbitListenerContainerFactory.setConnectionFactory(connectionFactory);
//这边设置消息确认方式由自动确认变为手动确认
simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//设置消息预取数量
// simpleRabbitListenerContainerFactory.setPrefetchCount(1);
return simpleRabbitListenerContainerFactory;
}
}
5.建立netty服务器 + rabbitmq消息生产者
创建服务器类 初始化启动服务器NettyServer
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class NettyServer implements ApplicationRunner {
@Autowired
private RabbitTemplate rabbitTemplate;
//有可能在项目初始化的时候加载不出来导致项目隐形报错
//加载application-dev.proerities文件中 对应参数配置项
@Value("${netty.server.port}")
private int port;
public int getPort () {
return port;
}
@Override
public void run(ApplicationArguments args) throws Exception {
log.info("netty服务启动端口"+getPort());
EventLoopGroup bossGroup = new NioEventLoopGroup(); // 用于接收进来的连接
EventLoopGroup workerGroup = new NioEventLoopGroup(); // 用于处理已经被接收的连接
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // 使用Nio的通道类型
.childHandler(new ChannelInitializer<SocketChannel>() { // 添加一个处理器来处理接收到的数据
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new StringDecoder());
p.addLast(new StringEncoder());
p.addLast(new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
//netty监听端口消息发送消息通过rabbitmq生产者发送到消息队列
sendMessage(ctx,msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
});
}
});
// 绑定端口,开始接收进来的连接
ChannelFuture f = b.bind(getPort()).sync();
// 等待服务器socket关闭
f.channel().closeFuture().sync();
}
//netty监听端口消息发送消息通过rabbitmq生产者发送到消息队列中
//充当消息的生产者发送
public void sendMessage(ChannelHandlerContext ctx, String msg){
// 接收netty监听信息来源作为消息生产者
rabbitTemplate.convertAndSend("X","XA","来自QA"+msg);
ctx.writeAndFlush("===>"+msg);
}
}
6.建立常规队列的消费者 Consumer
import com.rabbitmq.client.Channel;
import com.test.demo.service.MessageProcessService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j
@Component
public class Consumer {
@Resource
private MessageProcessService messProceService;
private Map<Integer, Message> messageMap = new HashMap<>();
private int i =1;
private int j =1;
@RabbitListener(queues = "QA")
public void receiveQA(Message message, Channel channel) throws InterruptedException, IOException {
i++;
synchronized (this) {
messageMap.put(i,message);
if (messageMap.size() >= 1500) {
// 模拟数据库插入操作
System.out.println("模拟插入数据库操作,QA队列处理消息数:" + messageMap.size());
processMessagesBatch(channel);
System.out.println("模拟插入成功, 准备进行下一次收集");
}
}
}
// @RabbitListener(queues = "QB")
// public void receiveQB(Message message, Channel channel) throws IOException {
// String msg = new String(message.getBody());
// log.info("当前时间:{},监听QB队列信息{}", new Date().toString(), msg);
// }
public static class SleepUtils {
public static void sleep(int second) {
try {
Thread.sleep(1000 * second);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
private void processMessagesBatch(Channel channel) throws IOException {
List<String> list = new ArrayList<>();
long startTime = System.currentTimeMillis();
Map<Integer, Message> tempMessageMap = new HashMap<>(messageMap);
messageMap.clear();
for (Map.Entry<Integer, Message> map : tempMessageMap.entrySet()) {
list.add("C" + (j++) + ":===>" + "QA队列收集" + new String(map.getValue().getBody()));
}
int count = messProceService.add(list);
if (count == list.size()) {
System.out.println("插入数据成功");
for (Map.Entry<Integer, Message> map : tempMessageMap.entrySet()) {
channel.basicAck(map.getValue().getMessageProperties().getDeliveryTag(), false);
}
list.clear();
}
long durationSeconds = (System.currentTimeMillis() - startTime) / 1000;
System.out.println("插入1500条数据执行时间: " + durationSeconds);
}
}
7.建立死信队列的消费者 DeadLetterConsumer
import com.rabbitmq.client.Channel;
import com.test.demo.service.MessageProcessService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.*;
@Slf4j
@Component
public class DeadLetterConsumer {
@Resource
private MessageProcessService messProceService;
private Map<Integer, Message> messageMap = new HashMap<>();
private int i = 1;
private int j =1;
@RabbitListener(queues = "QD")
public void receiveD(Message message, Channel channel) throws IOException {
i++;
synchronized (this) {
messageMap.put(i, message);
if (messageMap.size() >= 100) {
// 模拟数据库插入操作
System.out.println("模拟插入数据库操作,死信队列处理消息数:" + messageMap.size());
processMessagesBatch(channel);
System.out.println("模拟插入成功, 准备进行下一次收集");
// 确认当前消息,以便RabbitMQ知道它可以释放此消息
}
}
}
private void processMessagesBatch(Channel channel) throws IOException {
List<String> list = new ArrayList<>();
// 复制当前消息映射以避免在迭代时修改
Map<Integer, Message> tempMessageMap = new HashMap<>(messageMap);
messageMap.clear();
long startTime = System.currentTimeMillis();
for (Map.Entry<Integer, Message> map : tempMessageMap.entrySet()) {
list.add("C" + (j++) + ":===>" + "死信队列收集" + new String(map.getValue().getBody()));
}
int count = messProceService.add(list);
long durationSeconds = (System.currentTimeMillis() - startTime) / 1000;
System.out.println("插入50条数据执行时间: " + durationSeconds);
if (count == list.size()) {
System.out.println("插入数据成功");
list.clear();
for (Map.Entry<Integer, Message> map : tempMessageMap.entrySet()) {
channel.basicAck(map.getValue().getMessageProperties().getDeliveryTag(), false);
}
}
}
}
8.建立mapper.xml文件
MessageProcessMapper.xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.test.demo.mapper.MessageProcessMapper">
<insert id="add">
INSERT INTO t_xinyang_direct (direct,create_date)
VALUES
<foreach collection="list" item="item" separator=",">
(
#{item},
SYSDATE()
)
</foreach>
</insert>
</mapper>
9.建立mapper文件接口
MessageProcessMapper
import com.baomidou.dynamic.datasource.annotation.DS;
import org.apache.ibatis.annotations.Mapper;
import java.util.List;
@Mapper
@DS("英文数据库名称1")
public interface MessageProcessMapper {
int add(List<String> list);
}
10.建立接口ProducerController 测试
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
@RequestMapping("/api")
public class ProducerController {
@GetMapping("/hello/{message}")
public ResponseEntity<String> sayHello(@PathVariable String message) {
return ResponseEntity.ok("Hello, RabbitMQ!==>"+message);
}
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{message}")
public void sendMsg(@PathVariable String message){
log.info("接收接口发送的消息请求");
rabbitTemplate.convertAndSend("X","XA","消息来自tt1为10s的队列"+message);
rabbitTemplate.convertAndSend("X","XB","消息来自tt1为150s的队列"+message);
}
}
11.测试接口请求1
测试请求接口 /hello
http://实际本机ip地址:1001/demo1/api/hello/返回浏览器输入内容
12.测试接口请求2
测试请求接口 /hello
http://实际本机ip地址:1001/demo1/api/sendRabbitMq/发给rabbitmq消息
13.网络助手测试NetAssist.exe
主要目的模拟向netty端口进行发送数据,通过netty监听到的信息然后通过rabbitmq的生产者发送rabbitmq的队列中,让消费者进行消费,如果消费者绑定死信队列,那么消费者从队列中取出消息后,经过一定时间未确认即不进行消费确认或者拒绝,然后入之前绑定好的死信队列中,供死信队列绑定的死信消费者进行消费处理。