版本:
当前测试版本:springBoot 2.3.9、 RocketMQ 5.1.0
Maven或Gradle RocketMQ的依赖项:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.0.5</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
1、生产者
# RocketMQ Producer配置 rocketmq.namesrv.addr=119.3.81.109:9876 rocketmq.producer.group=BREAD_ORDER_GROUP rocketmq.topic =Tseng-Dev
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class RocketMQProducer {
@Value("${rocketmq.producer.group}")
private String producerGroup;
@Value("${rocketmq.namesrv.addr}")
private String namesrvAddr;
public void sendMessage(String topic, String message) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
producer.setNamesrvAddr(namesrvAddr);
producer.start();
Message rocketMsg = new Message(topic, message.getBytes());
producer.send(rocketMsg);
producer.shutdown();
}
}
@Autowired
private RocketMQProducer rocketMQProducer;
@Value("${rocketmq.topic}")
private String topic;
@GetMapping("/send-message")
public String sendMessage() {
try {
String message = "Hello, RocketMQ!";
rocketMQProducer.sendMessage(topic, message);
return "Message sent successfully";
} catch (Exception e) {
e.printStackTrace();
return "Failed to send message";
}
}
2、消费者
# RocketMQ Consumer配置 rocketmq.namesrv.addr=119.3.81.109:9876 rocketmq.consumer.group=BREAD_ORDER_GROUP rocketmq.topic =Tseng-Dev
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* @author:Tseng
* @description 消费者
* @since: JDK1.8
* @version: 1.0
* @date: 2023-07-17
* @Copyright © 2023
*/
@Component
public class RocketMQConsumer {
@Value("${rocketmq.consumer.group}")
private String consumerGroup;
@Value("${rocketmq.namesrv.addr}")
private String namesrvAddr;
@Value("${rocketmq.topic}")
private String topic;
private final RocketMQMessageListener messageListener;
@Autowired
public RocketMQConsumer(RocketMQMessageListener messageListener) {
this.messageListener = messageListener;
}
@PostConstruct
public void start() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(namesrvAddr);
consumer.subscribe(topic, "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
return messageListener.consumeMessage(msgs, context);
});
consumer.start();
}
}
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.util.List;
@Slf4j
@Component
public class RocketMQMessageListener implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
String topic = msg.getTopic();
String message = new String(msg.getBody(), StandardCharsets.UTF_8);
String tags = msg.getTags();
String keys = msg.getKeys();
log.info("Received message: topic={}, tags={}, keys={}, message={}", topic, tags, keys, message);
// 处理接收到的消息
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
3、测试
发送
结果