文章目录
- 依赖
- yaml配置
- 生产端发送消息
- 消费端
- 异步下单
- Business生产端
- 消费端
依赖
<!--整合的依赖-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
<!--starter-web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
yaml配置
rocketmq:
# namesrv地址
name-server: localhost:9876
# 当前应用程序的默认生产者和消费者分组
# 代码中可以覆盖
producer:
group: my-rocket-group-prod
consumer:
group: my-rocket-group-consume
生产端发送消息
@RestController
public class HelloController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/hello")
public String sayHi(String name){
//发送消息 Message是rocketMQTemplate支持发送的消息参数
//和底层api方法的Message不是同一个类,相当于将底层Message包装了一次.
Message message=
//payLoad在和,就是body
MessageBuilder.withPayload(name)
.setHeader("age",18)
.build();
SendResult sendResult = rocketMQTemplate.syncSend("rocket-topic-a:tagA", message);
//rocketMQTemplate.receive();
//发送消息
return "success";
}
}
补充
无论RocketMQTemplate 用哪种发送send消息,最终都会调用doSend实现方法.其他所有方法syncSend send都是重载+外部调用.
doSend方法,将Message对象中的payLoad做了序列化,存储到rocketmq message的body中. 将header存储到header头中. 方便消费的时候做反序列化.
消费端
/**
topic:消费端绑定主题
consumerGroup:消费者分组
selectorExpression: 顾虑的标签
*/
@Component
@RocketMQMessageListener(
topic = "rocket-topic-a",
consumerGroup = "${rocketmq.consumer.group}",
selectorExpression = "*")
public class MyConsumerListener implements RocketMQListener<String> {
/**
* 每个listener自定义的对象,底层都会开启一个消费进程 绑定这个listerner
* 在底层消费者中,监听consumerMessage方法里,调用这个类的onMessage;
* 调用之前,已经实现了对象消息数据的转化
* 接口有泛型,底层方法逻辑,会根据泛型,将消息message进行反序列化和数据封装
* @param name 根据泛型反序列化的body对象
* 对于消费成功还是失败,spring整合rocketmq: 只要抛异常,就返回失败,不抛异常就是正常
*/
@Override
public void onMessage(String name) {
System.out.println("消费端接收到消息:"+name);
}
}
注意
如果使用pull消费,继续使用RocketMQTemplate调用receive方法.每调用一次
就从对应目标队列中拿到一条消息.
push的消费端,处理步骤
- 准备消费端component bean对象
- 实现消费端push的接口,定义消息的泛型(涉及到spring框架如何将消息反序列化解析).实现方法
- 配置注解,提供消费属性(监听绑定队列,过滤tag,消费者组);
异步下单
将同步的调用关系,转化成异步调用关系,可以引入rocketmq消息中间件.
BusinessService–>IOrderService 同步关系
OrderService–>ICartService OrderService–>IStockService 同步关系
考虑: 是不是所有的同步,都有必要转化成异步.
- Business–>调用OrderService
可以将同步转化成异步.这样做的好处,提升请求并发qps. 缺点是不知道订单到底是成功还是失败.(业务处理落地方案选型在这里是需要平衡的,并发和业务用户体验)
- Order–>ICartService
可以异步.只要订单新增成功,说明库存够用.删除购物车,可以不在当前业务同步执行,降低订单处理时长,提升RT效率.
- Order–>IStockService
不可以异步,必须同步.(银行账号,支付平台账号划款,转账到当前系统的用户账户金额中)
将Business调用Order的过程实现异步下单:
Producer: BusinessServiceImpl生产消息 发送到队列
Consumer: Order的web应用实现消息的接收,调用OrderServiceImpl实现消费逻辑.
Business生产端
- 依赖和yaml同上
@Service
@Slf4j
public class BusinessServiceImpl implements IBusinessService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Override
public void buy() {
// 模拟触发购买业务
// 先实例化一个用于新增订单的DTO
OrderAddDTO orderAddDTO=new OrderAddDTO();
orderAddDTO.setUserId("UU100");
orderAddDTO.setCommodityCode("PC100");
//订单 快照
orderAddDTO.setMoney(100);
orderAddDTO.setCount(2);
// 暂时只能进行输出,后期有微服务支持可以调用其他模块
log.info("新增订单信息为:{}",orderAddDTO);
// dubbo调用order模块新增订单的方法
// 将上面实例化的orderAddDTO当做参数,让它在数据库中生效
/*dubboOrderService.orderAdd(orderAddDTO);*/
//替换成异步生单逻辑 发送订单新增的消息
//消息的携带信息,消息的封装特点. 消息一定要精简(足够小的占用空间)准确(足够用处理业务逻辑)
Message message= MessageBuilder.withPayload(orderAddDTO).build();
SendResult sendResult = rocketMQTemplate.syncSend("business-order-topic:orderAdd", message);
if (!sendResult.getSendStatus().toString().equals("SEND_OK")){
throw new CoolSharkServiceException(ResponseCode.BAD_REQUEST,"订单消息发送失败");
}
}
}
消费端
@Component
@RocketMQMessageListener(
topic = "business-order-topic",
consumerGroup = "${rocketmq.consumer.group}",
selectorExpression = "orderAdd")
@Slf4j
public class OrderAddConsumerListener implements RocketMQListener<OrderAddDTO> {
@Autowired
private IOrderService orderService;
@Override
public void onMessage(OrderAddDTO orderAddDTO) {
//调用service,执行orderAdd方法
//异常处理逻辑,消息消费失败的处理逻辑
try{
orderService.orderAdd(orderAddDTO);
}catch (CoolSharkServiceException e){
//业务异常,说明订单新增业务性失败,比如库存没了
log.error("库存减少失败,库存触底了:{},异常信息:{}",orderAddDTO,e.getMessage());
}
}
}