1:自定义mq信息类(我的交换这些信息都从nacos上直接取的,怎么从nacos取配置信息看上篇文章):
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MqInfoEntity implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 交换机类型(0:DIRECT直连交换机; 1:TOPIC主题交换机; 2:FANOUT扇形交换机; 3:HEADERS头交换机)
*/
private Integer exchangeType;
/**
* 交换机名称
*/
private String exchangeName;
/**
* 队列名称
*/
private String queueName;
/**
* 绑定关系
*/
private String routingKey;
}
2:自定义生产者类:
import com.alibaba.fastjson.JSONArray;
import com.fescotech.ordercommon.model.MqInfoEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
@Slf4j
public class SendRequestParamsProducer implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
//这两个变量是用来将交换机发送消息失败异常的原因在service里抛异常用
public boolean ack1 = true;
public String reason;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
rabbitTemplate.setMandatory(true);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String deliveryNumber = correlationData.getId();
if (ack){
ack1=true;
//记录发送记录
log.info("向交换机发送消息成功,派单编号:{}",deliveryNumber);
} else {
ack1=false;
reason = cause;
log.error("向交换机发送消息失败,派单编号:{},原因为:{},", deliveryNumber, cause);
}
}
/**
* 推送消息
* @author fu
* @time 2023/7/18 14:28
* @param mqInfoEntity
* @param data为要推送的数据
* @param correlationDataId调用时赋值一个唯一值就行
* @return void
*/
public void sendRabbitMq(MqInfoEntity mqInfoEntity, JSONArray data ,String correlationDataId) {
// rabbitTemplate.convertAndSend(mqInfoEntity.getExchangeName(), mqInfoEntity.getRoutingKey(), data, new CorrelationData(correlationDataId));
rabbitTemplate.convertSendAndReceive(mqInfoEntity.getExchangeName(), mqInfoEntity.getRoutingKey(), data, new CorrelationData(correlationDataId));
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
String json = new String(message.getBody());
log.info("消息本身:" + json);
log.info("退回的replyCode是:" + replyCode);
log.info("退回的replyText是:" + replyText);
log.info("退回的exchange是:" + exchange);
log.info("退回的routingKey是:" + routingKey);
String errorDesc = "未匹配到队列,交换机:"+ exchange +",routingKey:"+ routingKey;
}
}
3:逻辑推送数据至mq(前三步是我个人需求业务处理,你们可以从第四步看):
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSONArray;
import com.fescotech.ft.common.model.vo.Result;
import com.fescotech.ft.common.util.FtResultUtil;
import com.fescotech.ordercommon.constants.ExchangeTypeEnum;
import com.fescotech.ordercommon.model.MqInfoEntity;
import com.fescotech.ordercommon.model.param.SendMsgParam;
import com.fescotech.orderservice.config.limit.CommonMqMsgConfig;
import com.fescotech.orderservice.mq.producer.SendRequestParamsProducer;
import com.fescotech.orderservice.service.CommonMqMsgService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Map;
@Service
@Slf4j
public class CommonMqMsgServiceImpl implements CommonMqMsgService {
@Autowired
private SendRequestParamsProducer sendRequestParamsProducer;
@Autowired
private CommonMqMsgConfig commonMqMsgConfig;
@Override
public Result sendMsg(SendMsgParam sendMsgParam) {
//1.拼接faceCode_sysChannel
String faceCodeSysChannel = sendMsgParam.getFaceCode()+"_"+sendMsgParam.getSysChannel();
//2.获取nacos配置文件
Map<String,CommonMqMsgConfig.SystemMap> map = commonMqMsgConfig.getSystemMap();
//3.校验faceCode_sysChannel在nacos上是否有配置
if(!map.keySet().contains(faceCodeSysChannel)){
return FtResultUtil.result(Result.FAIL,null,"未找到"+sendMsgParam.getFaceCode()+"_"+sendMsgParam.getSysChannel()+"配置",null);
}
//4.将要推送的数据转至json串(我要推送json串,你们推啥数据用啥数据就行)
JSONArray jsonArray = JSONArray.parseArray(sendMsgParam.getRequestParams()) ;
log.info("发送数据:{}",jsonArray);
//5.取mq队列数据(我这是从nacos上取交换机名称、队列名称、绑定关系值,你们有现成的直接在第6步赋值即可)
CommonMqMsgConfig.SystemMap systemMap = map.get(faceCodeSysChannel);
//6.构建发送信息
MqInfoEntity mqInfoEntity = MqInfoEntity.builder().exchangeName(systemMap.getExchange_name())
.queueName(systemMap.getQueue_name()).routingKey(systemMap.getRouting_key())
.exchangeType(ExchangeTypeEnum.TOPIC.getCode())
.build();
log.info("推送MQ,{},data:{}", JSONUtil.toJsonStr(mqInfoEntity),jsonArray);
//7.推送mq
sendRequestParamsProducer.sendRabbitMq(mqInfoEntity,jsonArray,sendMsgParam.getFaceCode()+"_"+sendMsgParam.getSysChannel());
//8.捕捉交换机未收到消息异常原因
boolean flag = sendRequestParamsProducer.ack1;
if(flag){
return FtResultUtil.result(Result.SUCCESS, "推送成功", null, null);
}else {
log.error("向交换机发送消息失败,编号:{},原因为:{},", faceCodeSysChannel, sendRequestParamsProducer.reason);
return FtResultUtil.result(Result.FAIL,null,"推送失败,失败原因:"+sendRequestParamsProducer.reason,null);
}
}
}
推送成功: