总概
A、技术栈
- 开发语言:Java 1.8
- 数据库:MySQL、Redis、MongoDB、Elasticsearch
- 微服务框架:Spring Cloud Alibaba
- 微服务网关:Spring Cloud Gateway
- 服务注册和配置中心:Nacos
- 分布式事务:Seata
- 链路追踪框架:Sleuth
- 服务降级与熔断:Sentinel
- ORM框架:MyBatis-Plus
- 分布式任务调度平台:XXL-JOB
- 消息中间件:RocketMQ
- 分布式锁:Redisson
- 权限:OAuth2
- DevOps:Jenkins、Docker、K8S
B、本节实现目标
- [mall-order]下单,用RocketMQ消息中间件发送消息,[mall-member]监听消费给用户加积分
一、RocketMQ安装
供参考:
-
保姆级教程 Windows11下安装RocketMQ
-
RocketMQ基础入门
二、功能描述
用户下单(mall-order服务)后,发送下单事件MQ, mall-member服务监听消费MQ,为用户增加积分,MQ此处的作用是解耦。
三、代码实现
3.1 maven加RocketMQ依赖包
在项目[mall-pom]的pom.xml里加入RocketMQ依赖包
<rocketmq.version>2.2.3</rocketmq.version>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq.version}</version>
</dependency>
3.2 common.yml配置RocketMQ参数
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: group-${spring.profiles.active}
send-message-timeout: 3000 # 消息发送超时时长,默认3s
retry-times-when-send-failed: 3 # 同步发送消息失败重试次数,默认2
retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2
common.yml完整配置
spring:
redis:
database: 0
host: 127.0.0.1
port: 6379a
password: 123abc
jedis:
pool:
max-active: 500 #连接池的最大数据库连接数。设为0表示无限制
max-idle: 20 #最大空闲数
max-wait: -1
min-idle: 5
timeout: 1000
redisson:
password: 123abc
cluster:
nodeAddresses: ["redis://127.0.0.1:6379"]
single:
address: "redis://127.0.0.1:6379"
database: 0
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://192.168.100.51:3306/ac_db?serverTimezone=Asia/Shanghai&useUnicode=true&tinyInt1isBit=false&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowPublicKeyRetrieval=true
username: ac_u
password: ac_PWD_123
#hikari数据库连接池
hikari:
pool-name: YH_HikariCP
minimum-idle: 10 #最小空闲连接数量
idle-timeout: 600000 #空闲连接存活最大时间,默认600000(10分钟)
maximum-pool-size: 100 #连接池最大连接数,默认是10
auto-commit: true #此属性控制从池返回的连接的默认自动提交行为,默认值:true
max-lifetime: 1800000 #此属性控制池中连接的最长生命周期,值0表示无限生命周期,默认1800000即30分钟
connection-timeout: 30000 #数据库连接超时时间,默认30秒,即30000
connection-test-query: SELECT 1
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: group-${spring.profiles.active}
send-message-timeout: 3000 # 消息发送超时时长,默认3s
retry-times-when-send-failed: 3 # 同步发送消息失败重试次数,默认2
retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2
mybatis-plus:
configuration:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
3.3 [mall-order]生产者
生产者OrderSender
package com.ac.order.mq.send;
import com.ac.common.qm.MqTopicConstant;
import com.ac.common.qm.msg.MqOrderMsg;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Slf4j
@Lazy
@Component
public class OrderSender {
@Resource
private RocketMQTemplate rocketMQTemplate;
public void asyncSend(MqOrderMsg mqMsg) {
String payload = JSONObject.toJSONString(mqMsg);
//Topic+Tag更精准接收消息
String destination = MqTopicConstant.TOPIC_ORDER + ":" + mqMsg.getAction().getCode();
rocketMQTemplate.asyncSend(destination, payload, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info(OrderSender.class.getSimpleName() + ",消息发送成功, result: {}", sendResult);
}
@Override
public void onException(Throwable e) {
log.error(OrderSender.class.getSimpleName() + ",消息发送失败");
e.printStackTrace();
}
});
}
}
下单发送MQ
@Slf4j
@Service
public class OrderServiceImpl implements OrderService {
@Resource
private OrderDao orderDaoImpl;
@Resource
private MemberFeignApi memberFeignApi;
@Resource
private OrderItemService orderItemServiceImpl;
@Resource
private OrderSender orderSender;
@Override
public OrderDetailDTO findOrderDetail(Long id) {
return null;
}
@Override
public IPage<OrderDTO> pageOrder(OrderPageQry qry) {
return orderDaoImpl.pageOrder(qry);
}
@Transactional(rollbackFor = Exception.class)
@Override
public Long createOrder(OrderAddVO addVO) {
Order order = new Order();
order.setOrderNo(RandomUtil.randomNumbers(8));
//省略支付流程
order.setOrderState(OrderStateEnum.PAYED);
order.setOrderTime(LocalDateTime.now());
//通过feign取用户信息
MemberDTO member = memberFeignApi.findMember(addVO.getMemberId());
order.setMemberId(addVO.getMemberId());
order.setMemberName(member.getMemberName());
order.setMobile(member.getMobile());
orderDaoImpl.save(order);
BigDecimal discountAmount = new BigDecimal(0.00);
BigDecimal productAmount = new BigDecimal(0.00);
//存订单项信息
for (OrderItemAddVO orderItemAdd : addVO.getOrderItemList()) {
OrderItem orderItem = orderItemServiceImpl.addOrderItem(order.getId(), orderItemAdd);
productAmount = productAmount.add(orderItem.getBuyPrice().multiply(new BigDecimal(orderItem.getBuyNum())));
}
//更新订单金额信息
order.setDiscountAmount(discountAmount);
order.setProductAmount(productAmount);
BigDecimal payAmount = productAmount.subtract(discountAmount);
order.setPayAmount(payAmount);
orderDaoImpl.updateById(order);
//发送下单MQ
MqOrderMsg mqMsg = MqOrderMsg.builder()
.action(MqOrderAction.PAID)
.orderId(order.getId())
.memberId(order.getMemberId())
.payAmount(order.getPayAmount())
.build();
orderSender.asyncSend(mqMsg);
return order.getId();
}
}
3.4 [mall-member]消费者
MemberOrderListener消费者
package com.ac.member.mq.listener;
import com.ac.common.qm.MqTopicConstant;
import com.ac.common.qm.MqConsumerConstant;
import com.ac.common.qm.msg.MqOrderAction;
import com.ac.common.qm.msg.MqOrderMsg;
import com.ac.member.component.MemberIntegralComponent;
import com.ac.member.enums.IntegralSourceTypeEnum;
import com.ac.member.vo.IntegralLogEditVO;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Slf4j
@Service
@RocketMQMessageListener(
consumerGroup = MqConsumerConstant.CONSUMER_MEMBER_ORDER,
topic = MqTopicConstant.TOPIC_ORDER,
selectorExpression = "PAID||REFUND",
messageModel = MessageModel.CLUSTERING)
public class MemberOrderListener implements RocketMQListener<MessageExt> {
@Resource
private MemberIntegralComponent memberIntegralComponent;
@Override
public void onMessage(MessageExt message) {
MqOrderMsg mqMsg = JSONObject.parseObject(message.getBody(), MqOrderMsg.class);
log.info(MemberOrderListener.class.getSimpleName() + ",msgId={},msg={}", message.getMsgId(), mqMsg);
try {
//Topic+Tag更精准接收消息
MqOrderAction action = mqMsg.getAction();
if (MqOrderAction.PAID == action) {
dealPaid(mqMsg);
} else if (MqOrderAction.REFUND == action) {
dealRefund(mqMsg);
}
} catch (Exception e) {
log.error(MemberOrderListener.class.getSimpleName() + ",消费失败,mqMsg={},e={}", mqMsg, e.getMessage());
}
}
/**
* 处理订单付款事件
*
* @param mqMsg
*/
private void dealPaid(MqOrderMsg mqMsg) {
IntegralLogEditVO integralVO = new IntegralLogEditVO();
integralVO.setMemberId(mqMsg.getMemberId());
integralVO.setSourceType(IntegralSourceTypeEnum.AWARD_ORDER);
integralVO.setSourceRemark("下单获得积分");
integralVO.setIntegral(mqMsg.getPayAmount().longValue());
memberIntegralComponent.recordIntegral(integralVO);
}
private void dealRefund(MqOrderMsg mqMsg) {
log.info("处理退单事件");
}
}
四、测试
4.1 下单
下单
4.2 控制台日志
[mall-order]控制台MQ发送日志:
2023-04-04 15:58:37.052 INFO 25204 --- [ublicExecutor_1] com.ac.order.mq.send.OrderSender : OrderSender,消息发送成功, result: SendResult [sendStatus=SEND_OK, msgId=7F000001627418B4AAC212E0B7F30000, offsetMsgId=AC100B8D00002A9F000000000003B369, messageQueue=MessageQueue [topic=TOPIC_ORDER, brokerName=LAPTOP-R0R80SCR, queueId=3], queueOffset=0]
[mall-member]控制台MQ接收日志:
2023-04-04 15:58:37.243 INFO 26788 --- [_MEMBER_ORDER_1] c.a.m.mq.listener.MemberOrderListener : MemberOrderListener,msgId=7F000001627418B4AAC212E0B7F30000,msg=MqOrderMsg(action=PAID, orderId=281635594240001, memberId=264260572479489, payAmount=40.50)
4.3 数据库记录
t_order t_member_integral
t_member_integral_log
4.4 RocketMQ Dashboard
Dashboard列表
Dashboard消息内容