SpringBoot3整合RabbitMQ之三_工作队列模型案例
文章目录
- SpringBoot3整合RabbitMQ之三_工作队列模型案例
- 2. 工作队列模型
- 1. 消息发布者
- 1. 创建工作队列的配置类
- 2. 发布消费Controller
- 2. 消息消费者One
- 3. 消息消费者Two
- 4. 消息消费者Three
- 5. 输出结果
2. 工作队列模型
1. 消息发布者
1. 创建工作队列的配置类
package com.happy.msg.config;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* <p>
*
* @Description: 工作队列模型_创建名称为 work_queue 的队列 <br>
* </p>
* @Datetime: 2024/3/27 18:18
* @Author: Yuan · JinSheng <br>
* @Since 2024/3/27 18:18
*/
@Configuration
public class WorkQueueConfig {
@Bean
Queue workQueue() {
return QueueBuilder.durable("work_queue").build();
}
}
2. 发布消费Controller
package com.happy.msg.publisher;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* <p>
*
* @Description: 生产消息的控制器 <br>
* </p>
* @Datetime: 2024/3/27 10:53
* @Author: Yuan · JinSheng <br>
* @Since 2024/3/27 10:53
*/
@RestController
@RequestMapping("/work")
public class WorkQueuePublisherController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send")
public String sentMessage() {
for (int i = 1; i <=10 ; i++) {
rabbitTemplate.convertAndSend("work_queue", "work_queue队列第["+i+"]条消息,hello,rabbitmq"+i);
}
return "发送成功";
}
}
2. 消息消费者One
package com.happy.msg.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* <p>
* @Description: 工作队列模型_消息消费者一 <br>
* </p>
* @Datetime: 2024/3/28 20:28
* @Author: Yuan · JinSheng <br>
* @Since 2024/3/28 20:28
*/
@Slf4j
@Component
public class WorkQueueConsumerOne {
/***
* @param message 消息
* @Description: 监听work_queue队列中的消息,当客户端启动后,work_queue队列中的所有的消息都被此消费者消费并打印
* @Author: Yuan · JinSheng
*/
@RabbitListener(queues = "work_queue")
public void msg(Message message){
byte[] messageBody = message.getBody();
String msg = new String(messageBody);
log.info("WorkQueueConsumerOne接收到work_queue队列中的消息==={},===接收时间==={}",msg, LocalDateTime.now());
}
}
- 输出结果
WorkQueueConsumerOne接收到work_queue队列中的消息===work_queue队列第[1]条消息,hello,rabbitmq1,===接收时间===2024-03-29T10:34:34.468074100
WorkQueueConsumerOne接收到work_queue队列中的消息===work_queue队列第[3]条消息,hello,rabbitmq3,===接收时间===2024-03-29T10:34:34.469081600
WorkQueueConsumerOne接收到work_queue队列中的消息===work_queue队列第[5]条消息,hello,rabbitmq5,===接收时间===2024-03-29T10:34:34.469976
WorkQueueConsumerOne接收到work_queue队列中的消息===work_queue队列第[7]条消息,hello,rabbitmq7,===接收时间===2024-03-29T10:34:34.469976
WorkQueueConsumerOne接收到work_queue队列中的消息===work_queue队列第[9]条消息,hello,rabbitmq9,===接收时间===2024-03-29T10:34:34.470811800
3. 消息消费者Two
package com.happy.msg.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* <p>
* @Description: 工作队列模型_消息消费者二<br>
* </p>
* @Datetime: 2024/3/28 20:30
* @Author: Yuan · JinSheng <br>
* @Since 2024/3/28 20:30
*/
@Slf4j
@Component
public class WorkQueueConsumerTwo {
/***
* @param message 消息
* @Description: 监听work_queue队列中的消息,当客户端启动后,work_queue队列中的所有的消息都被此消费者消费并打印
* @Author: Yuan · JinSheng
*/
@RabbitListener(queues = "work_queue")
public void msg(Message message){
byte[] messageBody = message.getBody();
String msg = new String(messageBody);
log.info("WorkQueueConsumerTwo接收到work_queue队列中的消息==={},===接收时间==={}",msg, LocalDateTime.now());
}
}
4. 消息消费者Three
package com.happy.msg.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* <p>
* @Description: 工作队列模型_消息消费者三<br>
* </p>
* @Datetime: 2024/3/28 20:30
* @Author: Yuan · JinSheng <br>
* @Since 2024/3/28 20:30
*/
@Slf4j
@Component
public class WorkQueueConsumerTwo {
/***
* @param message 消息
* @Description: 监听work_queue队列中的消息,当客户端启动后,work_queue队列中的所有的消息都被此消费者消费并打印
* @Author: Yuan · JinSheng
*/
@RabbitListener(queues = "work_queue")
public void msg(Message message){
byte[] messageBody = message.getBody();
String msg = new String(messageBody);
//log.info("WorkQueueConsumerTwo接收到work_queue队列中的消息==={},===接收时间==={}",msg, LocalDateTime.now());
System.out.println("WorkQueueConsumerTwo接收到work_queue队列中的消息==="+msg+",===接收时间==="+LocalDateTime.now());
}
}
5. 输出结果
可看到消息被三个消费者按相等数量消费,总共10条消息,消费者1消费了4条,其他两个消费者消费了3条消息
WorkQueueConsumerTwo接收到work_queue队列中的消息===work_queue队列第[3]条消息,hello,rabbitmq3,===接收时间===2024-03-29T10:39:32.942546200
WorkQueueConsumerThree接收到work_queue队列中的消息===work_queue队列第[2]条消息,hello,rabbitmq2,===接收时间===2024-03-29T10:39:32.942546200
WorkQueueConsumerOne接收到work_queue队列中的消息===work_queue队列第[1]条消息,hello,rabbitmq1,===接收时间===2024-03-29T10:39:32.942037700
WorkQueueConsumerTwo接收到work_queue队列中的消息===work_queue队列第[6]条消息,hello,rabbitmq6,===接收时间===2024-03-29T10:39:32.943806300
WorkQueueConsumerOne接收到work_queue队列中的消息===work_queue队列第[4]条消息,hello,rabbitmq4,===接收时间===2024-03-29T10:39:32.944336900
WorkQueueConsumerTwo接收到work_queue队列中的消息===work_queue队列第[9]条消息,hello,rabbitmq9,===接收时间===2024-03-29T10:39:32.944336900
WorkQueueConsumerThree接收到work_queue队列中的消息===work_queue队列第[5]条消息,hello,rabbitmq5,===接收时间===2024-03-29T10:39:32.944336900
WorkQueueConsumerOne接收到work_queue队列中的消息===work_queue队列第[7]条消息,hello,rabbitmq7,===接收时间===2024-03-29T10:39:32.944336900
WorkQueueConsumerOne接收到work_queue队列中的消息===work_queue队列第[10]条消息,hello,rabbitmq10,===接收时间===2024-03-29T10:39:32.944336900
WorkQueueConsumerThree接收到work_queue队列中的消息===work_queue队列第[8]条消息,hello,rabbitmq8,===接收时间===2024-03-29T10:39:32.944336900