需求描述
在挂号界面选择完需要挂号的医生和排版后,添加就诊人,确认挂号
附上业务流程图
技术分析
我们今天主要来看看这块 mq 的运用,也是一个思考,我还是挑着重要的来讲,这里讲讲我们这里怎么使用 mq 的
这里会用到两个队列
- order: 记录库存(剩余可挂号数)扣除信息,由order管理模块调用,hosp模块监听,库存(剩余可挂号数)扣除成功后,发送一条短信发送消息给msm队列
- msm:用来记录需要发送短信的信息,msm模块监听
mq 相关依赖和工具类搭建
项目模块划分如图
我们创建 rabbit_utils 模块单独存储 mq 相关操作的封装
依赖
<!--rabbitmq消息队列-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<dependency>
<groupId>com.example</groupId>
<artifactId>model</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
@Configuration
public class MQConfig {
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
public class MqConst {
/**
* 预约下单
*/
public static final String EXCHANGE_DIRECT_ORDER = "exchange.direct.order";
public static final String ROUTING_ORDER = "order";
//队列
public static final String QUEUE_ORDER = "queue.order";
/**
* 短信
*/
public static final String EXCHANGE_DIRECT_MSM = "exchange.direct.msm";
public static final String ROUTING_MSM_ITEM = "msm.item";
//队列
public static final String QUEUE_MSM_ITEM = "queue.msm.item";
}
@Service
@Slf4j
public class RabbitService {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送消息
*
* @param exchange 交换机
* @param routingKey 路由键
* @param message 消息
*/
public boolean sendMessage(String exchange, String routingKey, Object message) {
log.info("发送消息: " + exchange + routingKey + message);
rabbitTemplate.convertAndSend(exchange, routingKey, message);
return true;
}
}
在 order, hosp, msm 这三个需要使用 mq 服务的模块中引入依赖
<dependency>
<groupId>com.example</groupId>
<artifactId>rabbit_utils</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
配置文件
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
具体方法
order 保存订单
@Override
public Long saveOrder(String scheduleId, Long patientId) {
// 获取就诊人信息
Patient patient = patientFeignClient.getPatient(patientId);
// 获取排班信息
ScheduleOrderVo scheduleOrderVo = hospFeignClient.getScheduleOrderVo(scheduleId);
//判断当前时间是否还可以预约
if (new DateTime(scheduleOrderVo.getStartTime()).isAfterNow()
|| new DateTime(scheduleOrderVo.getEndTime()).isBeforeNow()) {
throw new HospitalException(ResultCodeEnum.TIME_NO);
}
//获取签名信息
SignInfoVo signInfoVo = hospFeignClient.getSignInfoVo(scheduleOrderVo.getHoscode());
//把数据添加到订单表
OrderInfo orderInfo = new OrderInfo();
//scheduleOrderVo数据复制到orderInfo
BeanUtils.copyProperties(scheduleOrderVo, orderInfo);
//设置其他数据
String outTradeNo = System.currentTimeMillis() + "" + new Random().nextInt(100);
orderInfo.setOutTradeNo(outTradeNo);
orderInfo.setScheduleId(scheduleId);
orderInfo.setUserId(patient.getUserId());
orderInfo.setPatientId(patientId);
orderInfo.setPatientName(patient.getName());
orderInfo.setPatientPhone(patient.getPhone());
orderInfo.setOrderStatus(OrderStatusEnum.UNPAID.getStatus());
baseMapper.insert(orderInfo);
// 调用医院接口,实现预约挂号操作
//设置调用医院接口所需参数,参数放到map集合中
Map<String, Object> paramMap = new HashMap<>();
paramMap.put("hoscode", orderInfo.getHoscode());
paramMap.put("depcode", orderInfo.getDepcode());
paramMap.put("hosScheduleId", orderInfo.getScheduleId());
paramMap.put("reserveDate", new DateTime(orderInfo.getReserveDate()).toString("yyyy-MM-dd"));
paramMap.put("reserveTime", orderInfo.getReserveTime().toString());
paramMap.put("amount", orderInfo.getAmount().toString());
paramMap.put("name", patient.getName());
paramMap.put("certificatesType", patient.getCertificatesType());
paramMap.put("certificatesNo", patient.getCertificatesNo());
paramMap.put("sex", patient.getSex().toString());
paramMap.put("birthdate", patient.getBirthdate().toString());
paramMap.put("phone", patient.getPhone());
paramMap.put("isMarry", patient.getIsMarry().toString());
paramMap.put("provinceCode", patient.getProvinceCode());
paramMap.put("cityCode", patient.getCityCode());
paramMap.put("districtCode", patient.getDistrictCode());
paramMap.put("address", patient.getAddress());
paramMap.put("contactsName", patient.getContactsName()); //联系人
paramMap.put("contactsCertificatesType", patient.getContactsCertificatesType());
paramMap.put("contactsCertificatesNo", patient.getContactsCertificatesNo());
paramMap.put("contactsPhone", patient.getContactsPhone());
paramMap.put("timestamp", String.valueOf(HttpRequestHelper.getTimestamp()));
String sign = HttpRequestHelper.getSign(paramMap, signInfoVo.getSignKey());
paramMap.put("sign", sign);
// 请求医院系统接口
JSONObject result = HttpRequestHelper.sendRequest(paramMap, signInfoVo.getApiUrl() + "/order/submitOrder");
if (result.getInteger("code") == 200) {
JSONObject jsonObject = result.getJSONObject("data");
//预约记录唯一标识(医院预约记录主键)
String hosRecordId = jsonObject.getString("hosRecordId");
//预约序号
Integer number = jsonObject.getInteger("number");
//取号时间
String fetchTime = jsonObject.getString("fetchTime");
//取号地址
String fetchAddress = jsonObject.getString("fetchAddress");
//更新订单
orderInfo.setHosRecordId(hosRecordId);
orderInfo.setNumber(number);
orderInfo.setFetchTime(fetchTime);
orderInfo.setFetchAddress(fetchAddress);
baseMapper.updateById(orderInfo);
//排班可预约数
Integer reservedNumber = jsonObject.getInteger("reservedNumber");
//排班剩余预约数
Integer availableNumber = jsonObject.getInteger("availableNumber");
// TODO 发送 mq 消息
//发送mq,号源更新
OrderMqVo orderMqVo = new OrderMqVo();
orderMqVo.setScheduleId(scheduleId);
orderMqVo.setReservedNumber(reservedNumber);
orderMqVo.setAvailableNumber(availableNumber);
//短信提示
MsmVo msmVo = new MsmVo();
msmVo.setPhone(orderInfo.getPatientPhone());
String reserveDate =
new DateTime(orderInfo.getReserveDate()).toString("yyyy-MM-dd")
+ (orderInfo.getReserveTime() == 0 ? "上午" : "下午");
Map<String, Object> param = new HashMap<String, Object>() {{
put("title", orderInfo.getHosname() + "|" + orderInfo.getDepname() + "|" + orderInfo.getTitle());
put("amount", orderInfo.getAmount());
put("reserveDate", reserveDate);
put("name", orderInfo.getPatientName());
put("quitTime", new DateTime(orderInfo.getQuitTime()).toString("yyyy-MM-dd HH:mm"));
}};
msmVo.setParam(param);
orderMqVo.setMsmVo(msmVo);
// TODO 消息没发过去
rabbitService.sendMessage(MqConst.EXCHANGE_DIRECT_ORDER, MqConst.ROUTING_ORDER, orderMqVo);
} else {
throw new HospitalException(result.getString("message"), ResultCodeEnum.FAIL.getCode());
}
return orderInfo.getId();
}
hosp 中监听方法的实现
@Component
public class HospitalReceiver {
@Autowired
private ScheduleService scheduleService;
@Autowired
private RabbitService rabbitService;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = MqConst.QUEUE_ORDER, durable = "true"),
exchange = @Exchange(value = MqConst.EXCHANGE_DIRECT_ORDER),
key = {MqConst.ROUTING_ORDER}
))
@RabbitHandler
public void receiver(OrderMqVo orderMqVo) throws IOException {
//下单成功更新预约数
Schedule schedule = scheduleService.getScheduleId(orderMqVo.getScheduleId());
schedule.setReservedNumber(orderMqVo.getReservedNumber());
schedule.setAvailableNumber(orderMqVo.getAvailableNumber());
scheduleService.update(schedule);
//发送短信
MsmVo msmVo = orderMqVo.getMsmVo();
if (null != msmVo) {
rabbitService.sendMessage(MqConst.EXCHANGE_DIRECT_MSM, MqConst.ROUTING_MSM_ITEM, msmVo);
}
}
}
Msm 中监听
@Component
@Slf4j
public class MsmReceiver {
@Autowired
private MsmService msmService;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = MqConst.QUEUE_MSM_ITEM, durable = "true"),
exchange = @Exchange(value = MqConst.EXCHANGE_DIRECT_MSM),
key = {MqConst.ROUTING_MSM_ITEM}
))
public void send(MsmVo msmVo) {
log.info("接收到消息: " + msmVo);
msmService.send(msmVo);
}
}
反思
做这块的功能的时候其实出现了不少问题,发现自己对于 mq 的掌握其实是不够的,虽然之前做过一个尚硅谷讲rabbitmq的笔记,但还是停留在了表面,很多问题还是需要实操,在业务场景中使用效果才是更好的,后面的学习也要注意这一点
这里其实有很多可以优化的点
-
消息可靠性投递
-
消费的幂等性
-
rabbitmq集群部署
-
消息的异常处理
-
消息的堆积问题
-
替换成rocketmq来实现这块功能该怎么做
-
…
需要提高的地方还不少,继续努力!