基于电商场景的高并发RocketMQ实战-促销活动推送至用户完整流程、Spring结合RocketMQ的生产者消费者使用

news2024/11/19 17:32:05

🌈🌈🌈🌈🌈🌈🌈🌈
欢迎关注公众号(通过文章导读关注),发送【资料】可领取 深入理解 Redis 系列文章结合电商场景讲解 Redis 使用场景中间件系列笔记编程高频电子书
【11来了】文章导读地址:点击查看文章导读!
🍁🍁🍁🍁🍁🍁🍁🍁

促销活动推送至用户完整流程

那么至此就通过 分片 + 多线程 生成了多个批次的消息推送到 MQ 中去了,接下来只需要在推送系统中订阅这个消息,这个消息中包含了多个分片用户的 startUserIdendUserId,这里将这个范围中的所有用户信息都查询出来,之后遍历每个用户信息,为每个用户都生成一条推送的消息放入到 List 集合中,之后再使用上边我们封装好的分片组件 ListSplitter,将 List 集合中的多个消息合并成为一条消息(保证每条消息不超过 1MB),之后再通过 MQ 发送出去,监听到这个消息之后,消费者根据用户的具体信息以及推送方式来真正去真正调用第三方推送平台的 API 给用户推送促销活动的信息,这里还使用到了 设计模式:抽象工厂模式,因为第三方推送平台有多个,这里需要根据用户信息中的推送类型来判断使用哪一种推送平台进行推送,那么就是通过抽象工厂创建 具体工厂,再通过具体工厂发送到用户的 邮箱、微信、App 中去,这里在推送每一个用户的消息时,可以通过 Redis 进行 幂等性控制,如果一个用户的信息已经推送过了,就在 Redis 中存储一下,避免重复推送,整体的促销活动创建到第三方平台进行推送的流程如下:

在这里插入图片描述

Spring 如何结合 RocketMQ 创建生产者和消费者

在 Spring 的项目中,使用生产者的时候如果一个一个去创建肯定不合适,浪费了 Spring 自动帮助我们管理 Bean 的特性,对于生产者的使用,可以直接在 Spring 中创建一份 Bean,在需要用到的时候直接通过 @Resource 注入即可,创建 Producer 如下:

@Slf4j
@Component
public class DefaultProducer {

    private final TransactionMQProducer producer;

    
    @Autowired
    public DefaultProducer(RocketMQProperties rocketMQProperties) {
        producer = new TransactionMQProducer(RocketMqConstant.PUSH_DEFAULT_PRODUCER_GROUP);
        producer.setCompressMsgBodyOverHowmuch(Integer.MAX_VALUE);
        producer.setVipChannelEnabled(true);
        producer.setNamesrvAddr(rocketMQProperties.getNameServer());
        start();
    }

    /**
     * 对象在使用之前必须要调用一次,只能初始化一次
     */
    public void start() {
        try {
            this.producer.start();
        } catch (MQClientException e) {
            log.error("producer start error", e);
        }
    }

    /**
     * 一般在应用上下文,使用上下文监听器,进行关闭
     */
    public void shutdown() {
        this.producer.shutdown();
    }

    /**
     * 发送消息
     *
     * @param topic   topic
     * @param message 消息
     */
    public void sendMessage(String topic, String message, String type) {
        sendMessage(topic, message, -1, type);
    }

    /**
     * 发送消息
     *
     * @param topic   topic
     * @param message 消息
     */
    public void sendMessage(String topic, String message, Integer delayTimeLevel, String type) {
        Message msg = new Message(topic, message.getBytes(StandardCharsets.UTF_8));
        try {
            if (delayTimeLevel > 0) {
                msg.setDelayTimeLevel(delayTimeLevel);
            }
            SendResult send = producer.send(msg);
            if (SendStatus.SEND_OK == send.getSendStatus()) {
                log.info("发送MQ消息成功, type:{}, message:{}", type, message);
            } else {
                throw new BaseBizException(send.getSendStatus().toString());
            }
        } catch (Exception e) {
            log.error("发送MQ消息失败:type:{}",type, e);
            throw new BaseBizException("消息发送失败");
        }
    }

    /**
     * 批量发送消息
     *
     * @param topic   topic
     * @param messages 多个消息
     */
    public void sendMessages(String topic, List<String> messages, String type) {
        sendMessages(topic, messages, -1, type);
    }

    /**
     * 批量发送消息
     *
     * @param topic   topic
     * @param messages 多个消息
     */
    public void sendMessages(String topic, List<String> messages, String type, Integer timeoutMills) {
        sendMessages(topic, messages, -1, timeoutMills, type);
    }


    /**
     * 批量发送消息
     *
     * @param topic   topic
     * @param messages 多个消息
     */
    public void sendMessages(String topic, List<String> messages, Integer delayTimeLevel, String type) {
        List<Message> list = new ArrayList<>();
        for (String message : messages) {
            Message msg = new Message(topic, message.getBytes(StandardCharsets.UTF_8));
            if (delayTimeLevel > 0) {
                msg.setDelayTimeLevel(delayTimeLevel);
            }
            list.add(msg);
        }
        try {
            SendResult send = producer.send(list);
            if (SendStatus.SEND_OK == send.getSendStatus()) {
                log.info("发送MQ消息成功, type:{}", type);
            } else {
                throw new BaseBizException(send.getSendStatus().toString());
            }
        } catch (Exception e) {
            log.error("发送MQ消息失败:type:{}",type, e);
            throw new BaseBizException("消息发送失败");
        }
    }

    /**
     * 批量发送消息
     *
     * @param topic   topic
     * @param messages 多个消息
     */
    public void sendMessages(String topic, List<String> messages, Integer delayTimeLevel, Integer timeoutMills, String type) {
        List<Message> list = new ArrayList<>();
        for (String message : messages) {
            Message msg = new Message(topic, message.getBytes(StandardCharsets.UTF_8));
            if (delayTimeLevel > 0) {
                msg.setDelayTimeLevel(delayTimeLevel);
            }
            list.add(msg);
        }
        try {
            SendResult send = producer.send(list, timeoutMills);
            if (SendStatus.SEND_OK == send.getSendStatus()) {
                log.info("发送MQ消息成功, type:{}", type);
            } else {
                throw new BaseBizException(send.getSendStatus().toString());
            }
        } catch (Exception e) {
            log.error("发送MQ消息失败:type:{}",type, e);
            throw new BaseBizException("消息发送失败");
        }
    }
    public TransactionMQProducer getProducer() {
        return producer;
    }
}

对于消费者来说,可以通过一个配置类,在配置类中创建一个个的消费者 Bean,监听不同的 Topic 并注册监听器,代码如下:

@Configuration
public class ConsumerBeanConfig {

    /**
     * 注入 MQ 配置
     */
    @Autowired
    private RocketMQProperties rocketMQProperties;

    /**
     * 消费者1
     */
    @Bean("consumer1")
    public DefaultMQPushConsumer consumer1(ConsumerListener1 consumerListener1) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group_1");
        // 设置 nameserver 地址
        consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
        // 订阅 topic
        consumer.subscribe(PLATFORM_COUPON_SEND_TOPIC, "*");
        // 注册监听器
        consumer.registerMessageListener(consumerListener1);
        consumer.start();
        return consumer;
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1351634.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Apache-2.0】springboot-openai-chatgpt超级AI大脑产品架构图

springboot-openai-chatgpt: 一个基于SpringCloud的Chatgpt机器人&#xff0c;已对接GPT-3.5、GPT-4.0、百度文心一言、stable diffusion AI绘图、Midjourney绘图。用户可以在界面上与聊天机器人进行对话&#xff0c;聊天机器人会根据用户的输入自动生成回复。同时也支持画图&a…

【微服务架构】Spring Cloud入门概念讲解

目录 一、单体架构VS微服务架构 1.1 单体应用 单体架构的优点 单体应用的缺点 1.2 微服务“定义” 微服务的特性 微服务的缺点 微服务的适用场景 二、微服务常见概念与核心模块 三、Spring Cloud 工作流程 一、单体架构VS微服务架构 1.1 单体应用 一个归档包&#x…

数据损毁!250 亿美金的 Pinterest,在数据库选型上的翻车经历

原文链接 Pinterest 是一个以图片为主的社交网络&#xff0c;用户可以将图片保存或 "钉 / pin" 在自己的图板上。Pinterest 在 2019 年上市&#xff0c;目前市值 250 亿美金。本文内容主要根据 2012 年 Scaling Pinterest 的分享。 2012 年 1 月&#xff0c;Pinteres…

ARM NEON 指令

NEON指令 按照操作数类型可以分为正常指令、宽指令、窄指令、饱和指令、长指令。 正常指令&#xff1a;生成大小相同且类型通常与操作数向量相同到结果向量。长指令&#xff1a;对双字向量操作数执行运算&#xff0c;生产四字向量到结果。所生成的元素一般是操作数元素宽度到…

从0搭建github.io网页

点击跳转到&#x1f517;我的博客文章目录 从0搭建github.io网页 文章目录 从0搭建github.io网页1.成果展示1.1 网址和源码1.2 页面展示 2.new对象2.1 创建仓库 3.github.io仓库的初始化3.1 千里之行&#xff0c;始于足下3.2 _config.yml3.3 一点杂活 4.PerCheung.github.io.p…

Qt(三):udp组播的发送与接收

1. 创建UDP套接字 使用QUdpSocket类创建一个UDP套接字。 udpSendnew QUdpSocket(this);udpRecenew QUdpSocket(this); 2. 绑定套接字 绑定套接字到一个本地地址和端口。可以使用bind()函数来完成。 如果要在组播中发送数据&#xff0c;可以将套接字绑定到一个通配符地址&#…

【CASS精品教程】CASS11计算城镇建筑密度

CASS中可以很方便计算建筑密度。 文章目录 一、建筑密度介绍二、CASS计算建筑密度1. 绘制宗地范围2. 绘制建筑物3. 计算建筑密度三、注意事项一、建筑密度介绍 建筑密度(building density;building coverage ratio),指在一定范围内,建筑物的基底面积总和与占用地面积的比…

计算机网络-以太网交换基础

一、网络设备的演变 最初的网络在两台设备间使用传输介质如网线等进行连接就可以进行通信。但是随着数据的传输需求&#xff0c;多个设备需要进行数据通信时就需要另外的设备进行网络互联&#xff0c;并且随着网络传输的需求不断更新升级。从一开始的两台设备互联到企业部门内部…

【OpenCV】在MacOS上源码编译OpenCV

在MacOS上源码编译OpenCV 1. 下载项目源码2. 创建CMake编译文件3. 编译安装4. 案例测试5. 总结 前言 在做视觉任务时&#xff0c;我们经常会用到开源视觉库OpenCV&#xff0c;OpenCV是一个基于Apache2.0许可&#xff08;开源&#xff09;发行的跨平台计算机视觉和机器学习软件…

Spring技术内幕笔记之IOC的实现

IOC容器的实现 依赖反转&#xff1a; 依赖对象的获得被反转了&#xff0c;于是依赖反转更名为&#xff1a;依赖注入。许多应用都是由两个或者多个类通过彼此的合作来实现业务逻辑的&#xff0c;这使得每个对象都需要与其合作的对象的引用&#xff0c;如果这个获取过程需要自身…

Java学习苦旅(十六)——List

本篇博客将详细讲解Java中的List。 文章目录 预备知识——初识泛型泛型的引入泛型小结 预备知识——包装类基本数据类型和包装类直接对应关系装包与拆包 ArrayList简介ArrayList使用ArrayList的构造ArrayList常见操作ArrayList遍历 结尾 预备知识——初识泛型 泛型的引入 我…

WebStorm 创建一个Vue项目(1)

一、下载并安装WebStorm 步骤一 步骤二 选择激活方式 激活码&#xff1a; I2A0QUY8VU-eyJsaWNlbnNlSWQiOiJJMkEwUVVZOFZVIiwibGljZW5zZWVOYW1lIjoiVU5JVkVSU0lEQURFIEVTVEFEVUFMIERFIENBTVBJTkFTIiwiYXNzaWduZWVOYW1lIjoiVGFvYmFv77yaSkVU5YWo5a625qG25rAIOa0uW3peS9nOWup…

json解析本地数据,使用JSONObject和JsonUtility两种方法。

json解析丨网址、数据、其他信息 文章目录 json解析丨网址、数据、其他信息介绍一、文中使用了两种方法作为配置二、第一种准备2.代码块 二、第二种总结 介绍 本文可直接解析本地json信息的功能示例&#xff0c;使用JSONObject和JsonUtility两种方法。 一、文中使用了两种方法…

R语言——R函数、选项参数、数学统计函数(六)

目录 一、R函数 二、选项参数 三、数学统计函数 四、参考 一、R函数 1.lm() lm()是R语言中经常用到的函数&#xff0c;用来拟合回归模型。它是拟合线性模型最基本的函数 lm()格式如下&#xff1a; fit<-lm(formula,data) 其中&#xff0c;formula指要拟合的模型形式…

【QT 自研上位机 与 ESP32下位机联调>>>串口控制GPIO-基础样例-联合文章】

【QT 自研上位机 与 ESP32下位机联调&#xff1e;&#xff1e;&#xff1e;串口控制GPIO-基础样例-联合文章】 1、概述2、实验环境3、 自我总结4、 实验过程1、验证上位机QT程序1、下载样例代码2、修改qt程序3、运行测试验证 2、验证下位机ESP32程序1、下载样例代码2、更改ESP3…

Ubuntu18 安装chatglm2-6b

记了下Ubuntu18 上安装chatglm2-6遇到的问题。 环境&#xff1a;Ubuntu18.04 V100(显卡) nvcc 11.6 显卡驱动cudacudnnaniconda chatglm6b 的安装 网上有很多&#xff0c; 不记录 了。 chatglm2-6b 我从别的地方拷贝的&#xff0c; 模型也包含了。 遇到的问题&#xf…

【MMC子系统】三、MMC子系统框架

我的圈子&#xff1a; 高级工程师聚集地 我是董哥&#xff0c;高级嵌入式软件开发工程师&#xff0c;从事嵌入式Linux驱动开发和系统开发&#xff0c;曾就职于世界500强企业&#xff01; 创作理念&#xff1a;专注分享高质量嵌入式文章&#xff0c;让大家读有所得&#xff01; …

【SpringBoot开发】之商城项目案例(购物车相关操作)

&#x1f389;&#x1f389;欢迎来到我的CSDN主页&#xff01;&#x1f389;&#x1f389; &#x1f3c5;我是君易--鑨&#xff0c;一个在CSDN分享笔记的博主。&#x1f4da;&#x1f4da; &#x1f31f;推荐给大家我的博客专栏《SpringBoot开发之商城项目系列》。&#x1f3af…

GZ075 云计算应用赛题第4套

2023年全国职业院校技能大赛&#xff08;高职组&#xff09; “云计算应用”赛项赛卷4 某企业根据自身业务需求&#xff0c;实施数字化转型&#xff0c;规划和建设数字化平台&#xff0c;平台聚焦“DevOps开发运维一体化”和“数据驱动产品开发”&#xff0c;拟采用开源OpenSt…

Pytorch上采样

文章目录 Upsample特殊上采样 Upsample 所谓上采样&#xff0c;实则是一个插值过程。所以上采样对象在初始化时&#xff0c;需要指定一个插值类型&#xff0c;Upsample是torch.nn中最基础的上采样类&#xff0c;初始化参数如下 Upsample(sizeNone, scale_factorNone, modenea…