🌈🌈🌈🌈🌈🌈🌈🌈
欢迎关注公众号(通过文章导读关注),发送【资料】可领取 深入理解 Redis 系列文章结合电商场景讲解 Redis 使用场景
、中间件系列笔记
和编程高频电子书
!
【11来了】文章导读地址:点击查看文章导读!
🍁🍁🍁🍁🍁🍁🍁🍁
促销活动推送至用户完整流程
那么至此就通过 分片 + 多线程
生成了多个批次的消息推送到 MQ 中去了,接下来只需要在推送系统中订阅这个消息,这个消息中包含了多个分片用户的 startUserId
和 endUserId
,这里将这个范围中的所有用户信息都查询出来,之后遍历每个用户信息,为每个用户都生成一条推送的消息放入到 List 集合中,之后再使用上边我们封装好的分片组件 ListSplitter
,将 List 集合中的多个消息合并成为一条消息(保证每条消息不超过 1MB),之后再通过 MQ 发送出去,监听到这个消息之后,消费者根据用户的具体信息以及推送方式来真正去真正调用第三方推送平台的 API 给用户推送促销活动的信息,这里还使用到了 设计模式:抽象工厂模式
,因为第三方推送平台有多个,这里需要根据用户信息中的推送类型来判断使用哪一种推送平台进行推送,那么就是通过抽象工厂创建 具体工厂
,再通过具体工厂发送到用户的 邮箱、微信、App
中去,这里在推送每一个用户的消息时,可以通过 Redis 进行 幂等性控制
,如果一个用户的信息已经推送过了,就在 Redis 中存储一下,避免重复推送,整体的促销活动创建到第三方平台进行推送的流程如下:
Spring 如何结合 RocketMQ 创建生产者和消费者
在 Spring 的项目中,使用生产者的时候如果一个一个去创建肯定不合适,浪费了 Spring 自动帮助我们管理 Bean 的特性,对于生产者的使用,可以直接在 Spring 中创建一份 Bean,在需要用到的时候直接通过 @Resource 注入即可,创建 Producer 如下:
@Slf4j
@Component
public class DefaultProducer {
private final TransactionMQProducer producer;
@Autowired
public DefaultProducer(RocketMQProperties rocketMQProperties) {
producer = new TransactionMQProducer(RocketMqConstant.PUSH_DEFAULT_PRODUCER_GROUP);
producer.setCompressMsgBodyOverHowmuch(Integer.MAX_VALUE);
producer.setVipChannelEnabled(true);
producer.setNamesrvAddr(rocketMQProperties.getNameServer());
start();
}
/**
* 对象在使用之前必须要调用一次,只能初始化一次
*/
public void start() {
try {
this.producer.start();
} catch (MQClientException e) {
log.error("producer start error", e);
}
}
/**
* 一般在应用上下文,使用上下文监听器,进行关闭
*/
public void shutdown() {
this.producer.shutdown();
}
/**
* 发送消息
*
* @param topic topic
* @param message 消息
*/
public void sendMessage(String topic, String message, String type) {
sendMessage(topic, message, -1, type);
}
/**
* 发送消息
*
* @param topic topic
* @param message 消息
*/
public void sendMessage(String topic, String message, Integer delayTimeLevel, String type) {
Message msg = new Message(topic, message.getBytes(StandardCharsets.UTF_8));
try {
if (delayTimeLevel > 0) {
msg.setDelayTimeLevel(delayTimeLevel);
}
SendResult send = producer.send(msg);
if (SendStatus.SEND_OK == send.getSendStatus()) {
log.info("发送MQ消息成功, type:{}, message:{}", type, message);
} else {
throw new BaseBizException(send.getSendStatus().toString());
}
} catch (Exception e) {
log.error("发送MQ消息失败:type:{}",type, e);
throw new BaseBizException("消息发送失败");
}
}
/**
* 批量发送消息
*
* @param topic topic
* @param messages 多个消息
*/
public void sendMessages(String topic, List<String> messages, String type) {
sendMessages(topic, messages, -1, type);
}
/**
* 批量发送消息
*
* @param topic topic
* @param messages 多个消息
*/
public void sendMessages(String topic, List<String> messages, String type, Integer timeoutMills) {
sendMessages(topic, messages, -1, timeoutMills, type);
}
/**
* 批量发送消息
*
* @param topic topic
* @param messages 多个消息
*/
public void sendMessages(String topic, List<String> messages, Integer delayTimeLevel, String type) {
List<Message> list = new ArrayList<>();
for (String message : messages) {
Message msg = new Message(topic, message.getBytes(StandardCharsets.UTF_8));
if (delayTimeLevel > 0) {
msg.setDelayTimeLevel(delayTimeLevel);
}
list.add(msg);
}
try {
SendResult send = producer.send(list);
if (SendStatus.SEND_OK == send.getSendStatus()) {
log.info("发送MQ消息成功, type:{}", type);
} else {
throw new BaseBizException(send.getSendStatus().toString());
}
} catch (Exception e) {
log.error("发送MQ消息失败:type:{}",type, e);
throw new BaseBizException("消息发送失败");
}
}
/**
* 批量发送消息
*
* @param topic topic
* @param messages 多个消息
*/
public void sendMessages(String topic, List<String> messages, Integer delayTimeLevel, Integer timeoutMills, String type) {
List<Message> list = new ArrayList<>();
for (String message : messages) {
Message msg = new Message(topic, message.getBytes(StandardCharsets.UTF_8));
if (delayTimeLevel > 0) {
msg.setDelayTimeLevel(delayTimeLevel);
}
list.add(msg);
}
try {
SendResult send = producer.send(list, timeoutMills);
if (SendStatus.SEND_OK == send.getSendStatus()) {
log.info("发送MQ消息成功, type:{}", type);
} else {
throw new BaseBizException(send.getSendStatus().toString());
}
} catch (Exception e) {
log.error("发送MQ消息失败:type:{}",type, e);
throw new BaseBizException("消息发送失败");
}
}
public TransactionMQProducer getProducer() {
return producer;
}
}
对于消费者来说,可以通过一个配置类,在配置类中创建一个个的消费者 Bean,监听不同的 Topic 并注册监听器,代码如下:
@Configuration
public class ConsumerBeanConfig {
/**
* 注入 MQ 配置
*/
@Autowired
private RocketMQProperties rocketMQProperties;
/**
* 消费者1
*/
@Bean("consumer1")
public DefaultMQPushConsumer consumer1(ConsumerListener1 consumerListener1) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group_1");
// 设置 nameserver 地址
consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
// 订阅 topic
consumer.subscribe(PLATFORM_COUPON_SEND_TOPIC, "*");
// 注册监听器
consumer.registerMessageListener(consumerListener1);
consumer.start();
return consumer;
}
}