- 一、pom依赖
- 二、消费端
- 2.1、application.properties 配置文件
- 2.2、消费端核心组件
- 三、生产端
- 3.1、application.properties 配置文件
- 2.2、生产者 MQ消息发送组件
- 四、测试
- 1、生产端控制台
- 2、消费端控制台
一、pom依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--spring整合MQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
二、消费端
2.1、application.properties 配置文件
server.port=8002
server.servlet.context-path=/
spring.application.name=rabbit_consumer
spring.rabbitmq.addresses=192.168.220.3:5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.concurrency=6
spring.rabbitmq.listener.simple.max-concurrency=11
spring.rabbitmq.listener.simple.prefetch=1
spring.rabbitmq.listener.test.exchange=test_topic_exchange
spring.rabbitmq.listener.test.exchange.type=topic
spring.rabbitmq.listener.test.queue=test_topic1
spring.rabbitmq.listener.test.key=test_topic1.*
2.2、消费端核心组件
package com.xiao.component;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class RabbitMQReceived {
@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(name = "${spring.rabbitmq.listener.test.exchange}",
type = "${spring.rabbitmq.listener.test.exchange.type}",
durable = "true",ignoreDeclarationExceptions = "true"),
value = @Queue(value = "${spring.rabbitmq.listener.test.queue}",durable = "true"),
key = "${spring.rabbitmq.listener.test.key}"
))
@RabbitHandler
public void onMessage(Message message, Channel channel) throws IOException {
System.err.println("=====================================");
System.err.println("消费端 RabbitMQReceived 消费消息:" + message.getPayload());
Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
System.err.println("消费端 RabbitMQReceived ack:yes deliveryTag:" + deliveryTag);
}
}
三、生产端
3.1、application.properties 配置文件
server.port=8001
server.servlet.context-path=/
spring.application.name=rabbit_produce
spring.rabbitmq.addresses=192.168.220.3:5672
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
spring.rabbitmq.publisher-confirms=true
2.2、生产者 MQ消息发送组件
package com.xiao.component;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Correlation;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.UUID;
@Component
public class RabbitMQSender {
@Autowired
private RabbitTemplate rabbitTemplate;
private final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String formatStr = String.format("生产端 confirmCallback 相关数据:%s," +
"broker签收情况 ack=%s,异常信息:%s" ,
correlationData.toString(),ack,cause);
System.err.println(formatStr);
}
};
public void send(Object message, Map<String,Object> properties) {
MessageHeaders messageHeaders = new MessageHeaders(properties);
Message<?> msg = MessageBuilder.createMessage(message, messageHeaders);
rabbitTemplate.setConfirmCallback(confirmCallback);
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message) throws AmqpException {
System.err.println("生产端 RabbitMQSender send后置处理:" + message);
return message;
}
@Override
public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message, Correlation correlation) {
System.err.println("生产端 RabbitMQSender send后置处理:" + message+" 消息标识:" + correlation);
return message;
}
};
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("test_topic_exchange",
"test_topic1.xiao",
msg,
messagePostProcessor,
correlationData);
}
}
四、测试
package com.xiao;
import com.xiao.component.RabbitMQSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.util.HashMap;
import java.util.Map;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class SendMessageTest {
@Autowired
private RabbitMQSender rabbitMQSender;
@Test
public void send() throws InterruptedException {
Map<String,Object> properties = new HashMap<>(2);
properties.put("userName","xiao");
rabbitMQSender.send("hello world!",properties);
Thread.sleep(5000);
}
}
1、生产端控制台
生产端 RabbitMQSender send后置处理:(Body:'[B@3a6045c6(byte[535])' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=535, deliveryMode=PERSISTENT, priority=0, deliveryTag=0]) 消息标识:CorrelationData [id=8c78e89d-80f3-4f3d-ba8b-13e863c6295c]
2023-07-21 20:05:37.611 INFO 4536 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [192.168.220.3:5672]
2023-07-21 20:05:37.653 INFO 4536 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#6f38a289:0/SimpleConnection@6215366a [delegate=amqp://root@192.168.220.3:5672/, localPort= 4712]
生产端 confirmCallback 相关数据:CorrelationData [id=8c78e89d-80f3-4f3d-ba8b-13e863c6295c],broker签收情况 ack=true,异常信息:null
2、消费端控制台
=====================================
消费端 RabbitMQReceived 消费消息:hello world!
消费端 RabbitMQReceived ack:yes deliveryTag:1