一.建立绑定关系
package com.lx.mq.bind;
import com.lx.constant.MonitorEventConst;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @author liuweiping.com
* @version 1.0
* @date 2023-06-26 10:04:03
*/
@Slf4j
@Configuration
public class MonitorRabbitMqBinding {
@Value(value = "-${spring.profiles.active}")
private String profile;
/**
* Description: 延迟消息 <br/>
* Created By: liu wei ping <br/>
* Creation Time: 2023年6月26日 下午6:59:43 <br/>
* <br/>
* @return <br/>
*/
@Bean("delayExchange")
public CustomExchange buildDelayedMessageNoticeExchange(){
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(MonitorEventConst.MONITOR_DELAYED_MESSAGE_EXCHANGE + profile, "x-delayed-message", Boolean.FALSE, Boolean.FALSE, args);
}
@Bean
public Queue buildDelayedMessageNoticeQueue(){
return QueueBuilder.durable(MonitorEventConst.MONITOR_DELAYED_MESSAGE_QUEUE + profile).build();
}
@Bean
public Binding buildDelayedMessageNoticeBinding(){
return BindingBuilder.bind(buildDelayedMessageNoticeQueue()).to(buildDelayedMessageNoticeExchange()).with(MonitorEventConst.MONITOR_DELAYED_MESSAGE_ROUTING_KEY).noargs();
}
/**
* 交车完成事件消息定时处理队列
*/
@Bean
public Queue deliveryCompleteEventHandQueue() {
return QueueBuilder.durable(MonitorEventConst.DELIVERY_COMPLETE_DELAYED_QUEUE + profile).build();
}
/**
* 交车完成事件消息定时处理队列绑定
*/
@Bean
public Binding deliveryCompleteBinding() {
return BindingBuilder.bind(deliveryCompleteEventHandQueue())
.to(buildDelayedMessageNoticeExchange())
.with(MonitorEventConst.DELIVERY_COMPLETE_DELAYED_ROUTING_KEY)
.noargs();
}
}
二.建立生产者
1.消息实体
package com.lx.dto.monitor;
import lombok.Data;
import java.util.Date;
/**
* @author liuweiping.com
* @version 1.0
* @date 2023-06-26 10:11:06
*/
@Data
public class MonitorEventMessage {
/**
* 事件id
*/
private String eventId;
/**
* 事件编码
*/
private String eventCode;
/**
* 业务数据
*/
private String businessUniqueKey;
/**
* 业务类型
*/
private String businessType;
/**
* 到期时间
*/
private Long expireMillis;
/**
* 时间处理唯一版本号
*/
private Integer eventHandVersion;
/**
* 定时处理时间
*/
private Date timedOperationTime;
public void setTimedOperationTime(Date timedOperationTime) {
this.timedOperationTime = timedOperationTime;
expireMillis = timedOperationTime.getTime() - new Date().getTime();
if (expireMillis < 0) {
expireMillis = 0L;
}
}
}
package com.lx.mq.producer;
import com.lx.constant.MonitorEventConst;
import com.lx.designPattern.strategypattern.workorderbase.service.sysField.JsonUtil;
import com.lx.dto.monitor.MonitorEventMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* 监控事件消息发送类
*/
@Slf4j
@Component
public class MonitorEventMessageProducer {
@Value(value = "-${spring.profiles.active}")
private String profile;
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 交车完成监控事件定时发送
*/
public void sendDeliveryCompleteEventHandMessage(MonitorEventMessage monitorEventMessage) {
String message = JsonUtil.toJson(monitorEventMessage);;
rabbitTemplate.convertAndSend(MonitorEventConst.MONITOR_DELAYED_MESSAGE_EXCHANGE + profile,
MonitorEventConst.DELIVERY_COMPLETE_DELAYED_ROUTING_KEY,
message,
msg -> {
msg.getMessageProperties().setDelay(monitorEventMessage.getExpireMillis().intValue());
return msg;
});
log.info("sending event processing messages: {}", message);//发送事件处理消息
}
}
三.建立消费者
package com.lx.mq.consumer;
import com.lx.constant.MonitorEventConst;
import com.lx.designPattern.strategypattern.workorderbase.service.sysField.JsonUtil;
import com.lx.dto.monitor.MonitorEventMessage;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
/**
* 监控事件消息发送类
*/
@Slf4j
@Component
public class MonitorEventMessageConsumer {
@Value(value = "-${spring.profiles.active}")
private String profile;
/**
* 交车完成事件处理mq监听
*/
@RabbitListener(queues = MonitorEventConst.DELIVERY_COMPLETE_DELAYED_QUEUE + "-${spring.profiles.active}")
public void dealWithDeliveryCompleteEventHandMessage(String eventMessage, Channel channel, Message message) {
log.info("dealWithDeliveryCompleteEventHandMessage:【{}】", JsonUtil.toJson(eventMessage));
String str = new String(message.getBody(), StandardCharsets.UTF_8);
log.info("Received the message of regular loading and unloading of goods: {}", str); //收到商品定时上下架消息
MonitorEventMessage monitorEventMessage = JsonUtil.toBean(eventMessage, MonitorEventMessage.class);
try {
analyzeHand(monitorEventMessage);
}catch (Exception e){
log.error("交车完成事件分析失败,参数:{},e:{}",JsonUtil.toJson(monitorEventMessage),JsonUtil.toJson(e));
}
}
/**
* 事件分析
* @param monitorEventMessage
*/
private void analyzeHand(MonitorEventMessage monitorEventMessage) throws Exception {
}
}
四.测试类测试
package com.lx.controller;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import com.lx.conf.MQConfig;
import com.lx.conmon.ResultData;
import com.lx.dto.monitor.MonitorEventMessage;
import com.lx.mq.producer.MonitorEventMessageProducer;
import com.lx.utils.DateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import com.lx.constant.RedisMange;
import com.lx.utils.RedisUtil;
import org.thymeleaf.util.DateUtils;
/**
* @Author : liu wei ping
* @CreateTime : 2019/9/3
* @Description :
**/
@RestController
public class SendMessageController {
@Autowired
private MonitorEventMessageProducer messageProducer;
@GetMapping("/sendTopicMessage3")
public ResultData<String> sendTopicMessage3() {
MonitorEventMessage monitorEventMessage = new MonitorEventMessage();
monitorEventMessage.setEventCode("delivery");
//设置定时处理时间= 当前时间+ 定时处理时长
monitorEventMessage.setTimedOperationTime(DateUtil.date(DateUtil.getCurrentMillis() + 30 * 1000));
monitorEventMessage.setBusinessType("deliveryType");
messageProducer.sendDeliveryCompleteEventHandMessage(monitorEventMessage);
return new ResultData<>("ok");
}
}
五.效果如图所示