消息推送平台有没有保证数据不丢?

news2025/1/10 12:13:30

我们在使用mq的时候,就会很自然思考一个问题:怎么保证数据不丢失

现在austin接入层是把消息发到mq,下发逻辑层从mq消费数据,随后调用对应渠道接口来下发消息。

消息推送平台🔥推送下发【邮件】【短信】【微信服务号】【微信小程序】【企业微信】【钉钉】等消息类型

  • https://gitee.com/zhongfucheng/austin/
  • https://github.com/ZhongFuCheng3y/austin

消息丢弃一般我们考虑的是消费端,于是重点看的是下发逻辑层。

(因为对于mq使用方来说:生产端只要配置mq相关的参数,在调用下发时有回调重试机制。那就足够了,生产端能做的东西确实不多)

目前为止,下发逻辑层(消费端)使用的是自动提交offset策略。只要消费端存在系统重启或者进程被kill掉,那就会有丢消息的情况。

spring.kafka.consumer.enable-auto-commit=true

当前下发逻辑层(消费端)有可能放大了这个丢弃消息的问题,因为现在是消费到mq数据后,会把消息给到线程池去处理。线程池会指定一个阻塞队列,那队列数量越大,可能由重启所丢弃的消息就越多

这里我的策略是:当应用重启的时候,系统里的线程池是优雅关闭的(尽可能等待一段时间,等阻塞队列里没有消息了,再关闭线程池)。

但回到问题的本质上,只要消费端是自动提交offset策略,就一定会有丢消息的问题。所以要做到消费端的消息不丢,我们就要设置为手动提交offset,这个是必要条件。

有没有必要保证不丢

在探讨具体的技术实现方案之前,我们来看看在业务上有没有必要保证消息不丢。我刚接触到消息推送平台的时候,当时那个交接的哥们告诉我和我学长:消息少发比多发要好

1、重要的消息用户很可能会手动重试触发

austin是一个发送各类渠道消息的平台,从我的经验来说,这里面最重要的是短信渠道。经过austin下发很可能是登陆验证码,银行卡提现验证码,这类消息从全局上看是最重要的。

而其他渠道,例如push通知栏的通知消息,微信渠道的营销消息,这种消息即便用户没收到,也不会对用户带来很大的使用体验问题。这种消息或许对绝大数用户都是无感知的(少发几条,用户可能更乐意)。

我们先假设用户的某一次银行卡提现的验证码恰好因为我们重启系统而丢弃。这时候,绝大数用户可能怀疑自己的信号问题,会继续操作,重新发送一次

(因为客服经常找我排查这种问题,每次都能看到有好几条下发记录。当然了,能到技术的,99%的问题都不是由系统重启丢失消息导致的,更多可能是用户的客户端本身确实就存在问题)

2、消息是有时效性的。比如验证码这种短信一般就5min的时效性,由于系统的问题,你超过这个时间给用户发送,对用户的体验是非常差的。

3、消息推送平台是有全链路追踪的,是可以知道下发的消息有没有到达到用户手上,至少都可以知道在我们的系统内部执行过程中有没有丢。如果这条消息真的那么重要,那可以单独为丢弃的消息单独做重发处理,这些功能在消息推送平台都是支持的。

这个问题我以前的同事也跟我探讨过,就是把上面的内容给我隔壁的老哥听的,他说:你就尽扯淡吧,到面试的时候人家可不认你,丢了就是丢了,其他都是借口

我说:没事,要是不认的话,就把我们处理订单那一套给他讲讲嘛,反正处理的思路都是一样的。

不过啊,广告订单逻辑处理又相对没那么复杂,广告订单最后是以入数据库作为标准的,又可以接受一定的延迟,只要能保证处理完就行了。

要想client端消费数据不能丢,肯定是不能使用autoCommit的,所以必须是手动提交的。

候选者:我们这边是这样实现的:

候选者:一、从Kafka拉取消息(一次批量拉取500条,这里主要看配置)时

候选者:二、为每条拉取的消息分配一个msgId(递增)

候选者:三、将msgId存入内存队列(sortSet)中

候选者:四、使用Map存储msgId与msg(有offset相关的信息)的映射关系,通过msgId用来获取相关元信息

候选者:五、当业务处理完消息后,ack时,获取当前处理的消息msgId,然后从sortSet删除该msgId(此时代表已经处理过了)

候选者:六、接着与sortSet队列(本地内存队列)的首部第一个Id比较(其实就是最小的msgId),如果当前msgId<=sort Set第一个ID,则提交当前offset

候选者:七、系统即便挂了,在下次重启时就会从sortSet队首的消息开始拉取,实现至少处理一次语义

候选者:八、会有少量的消息重复,但只要下游做好幂等就OK了。

面试官:嗯,你也提到了幂等,你们这业务怎么实现幂等性的呢?

候选者:嗯,还是以处理订单消息为例好了。

候选者:幂等Key我们由订单编号+订单状态所组成(一笔订单的状态只会处理一次)

候选者:在处理之前,我们首先会去查Redis是否存在该Key,如果存在,则说明我们已经处理过了,直接丢掉

候选者:如果Redis没处理过,则继续往下处理,最终的逻辑是将处理过的数据插入到业务DB上,再到最后把幂等Key插入到Redis上

候选者:显然,单纯通过Redis是无法保证幂等的(:

候选者:所以,Redis其实只是一个「前置」处理,最终的幂等性是依赖数据库的唯一Key来保证的(唯一Key实际上也是订单编号+状态)

候选者:总的来说,就是通过Redis做前置处理,DB唯一索引做最终保证来实现幂等性的

保证austin数据不丢需要做什么?

保证数据不丢简单来说,就是我们要在消费端手动ack offset,不能再用自动提交策略了。这样当我们系统重启时,kafka会自动从未ackoffset中拉取。

如果要实现消息推送平台不丢消息的话,有几个问题是需要考虑的:

1、消息少发比多发要好,那么要实现消息不丢,就必须要在系统内实现幂等。因为现在的消息不丢,一般都是基于【至少一次]消费语义去做的。

2、那实现幂等的逻辑是在调用渠道下发接口前,还是渠道下发接口后?

如果做在下发接口前,那是不是会有可能第一次下发记录写入了,但实际调用下发接口却失败了,后面的重试都被幂等处理掉了。

如果做在下发接口后,那是不是会有可能调用调用下发接口成功了,但写入幂等处理的消息失败了,后面的重试就会导致消息多发

3、消息是有时效性的,那如果重试的处理时间过长,那是不是要考虑把这条消息给丢弃掉,不再重试了。

4、重试的消息不应该影响到正常消息的下发,他得作为一种补偿的机制,而非主流程

稍微细想下技术实现,应该不太好搞,还有很多细节的地方得关注到。比如业务上的:应该是不需要所有的渠道的所有类型消息都得实现消息不丢吧?现在的设计是追求高性能的,能在短时间内下发批量的消息。而如果做到所有消息不丢,肯定会影响到下发的速率

什么时候动手?

1、对于这个功能吧,有用肯定是有用,但这功能又没那么急

2、我估摸对现有代码改动还是蛮大的,现在我还没想好该怎么实现比较好,也一直没下手。

3、最近工作的事挺多的,没那么有空

结论:先看看想要这个功能的人多不多,不多就鸽一会。

都看到这了,如果按上面的理由,我不实现这个功能,你认不认可?

消息推送平台🔥推送下发【邮件】【短信】【微信服务号】【微信小程序】【企业微信】【钉钉】等消息类型

  • https://gitee.com/zhongfucheng/austin/
  • https://github.com/ZhongFuCheng3y/austin

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/530029.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

001 - STM32固件库编程

STM32固件库编程 一、新建工程文件夹 1、工程文件夹内添加&#xff1a;Project、Libraries、User、Doc&#xff0c;之后在Project目录内新建工程&#xff0c;并将标准库中CMSIS、STM32F4xx_StdPeriph_Driver复制到Libraries中。 2、将\STM32F4xx_DSP_StdPeriph_Lib_V1.8.0\Pr…

Hbase基础介绍-1 概述

Hbase基础介绍-1 概述 一、概述1、定义&#xff1a;2、特点&#xff1a;3、HBase在Hadoop生态中的地位4、HBase与HDFS5、Hbase与Hive6、关系型数据库与列式数据库7、结构化数据和非结构化数据8、HBase使用场景9、CAP定理10、Hbase与传统关系型数据库的区别 一、概述 1、定义&am…

飞腾D2000 如何修改boot启动项

开机后,按着F2进入如下界面 选择Enter Setup 选择 Boot Maintenance Manager 进入Boot Options 选择Change Boot Order 在这个界面按 键盘上的 Enter 键 在弹出得蓝色小方框里,按键盘的上下键,比如我要把硬盘盘符放在boot 启动项的第一首选项,则移动下键到硬盘盘符上,再…

嵌入式系统入门基础知识分析(二)

目录 ​编辑 1、GPIO原理与结构 2、A/D接口 3、D/A接口基本 4、键盘接口

解密网站401错误:了解发生原因和修复方法

​  每个网站都会有不同的错误码&#xff0c;其中&#xff0c;401错误被认为是相对常见的错误码。那么&#xff0c;什么是网站401错误呢?在摸清了这一点之后&#xff0c;我们也需要学习一下如何解决它。 什么是 401 状态码? 401 状态代码是 Web 服务器发送给浏览器的 HTTP …

【小菜鸡刷题记】--字符串篇

【小菜鸡刷题记】&#xff1a;字符串 剑指 Offer 05. 替换空格剑指 Offer 58 - II.左旋转字符串剑指 Offer 20.表示数值的字符串剑指 Offer 67. 把字符串转换成整数 特此声明&#xff1a;题目均来自于力扣 剑指 Offer 05. 替换空格 题目链接 请实现一个函数&#xff0c;把字符…

【JUC基础】06. 生产者和消费者问题

1、前言 学习JUC&#xff0c;就不得不提生产者消费者。生产者消费者模型是一种经典的多线程模型&#xff0c;用于解决生产者和消费者之间的数据交换问题。在生产者消费者模型中&#xff0c;生产者生产数据放入共享的缓冲区中&#xff0c;消费者从缓冲区中取出数据进行消费。在…

MySQL同时In俩个字段,In多个字段,Mybatis多个In查询问题,Mysql多个IN查询多出数据问题,Mysql多个IN查询 数据准确问题

背景&#xff1a; 今天产品验收的时候&#xff0c;导入了大量数据&#xff1b;发现造价项目某个查询列表数据多出了几条数据&#xff1b;看了Mybatis查询&#xff0c;才发现是同时使用了多个IN查询导致的问题&#xff1b;入参是对象列表&#xff0c;In值是分开循环赋值…

【SpringBoot整合RabbitMQ(上)】

一、简单的生产者-消费者 1.1、创建连接工具类获取信道 public class RabbitMqUtils {public static Channel getChannel() throws IOException, TimeoutException {//创建一个链接工厂ConnectionFactory factory new ConnectionFactory();//工厂IP 链接RabbitMQ的队列facto…

google_breakpad库的基本使用

参考链接&#xff1a; windows下捕获dump之Google breakpad_client的理解Google Breakpad&#xff1a;基本介绍和操作方法Breakpad 入门linux下用QT捕获程序异常 简介 github 地址 三大组件 client:读取当前线程的状态、加载的可执行文件、共享库等信息&#xff0c;写入到…

Azure深层防御

深层防御的目的是保护信息&#xff0c;防止未经授权访问信息的人员窃取信息。 深层防御策略使用一系列机制来减缓攻击进度&#xff0c;这些攻击旨在获取对数据的未经授权的访问权限。 深层防御层 可以将深度防御可视化为一组层&#xff0c;并将要保护的数据放在中心&#xf…

一篇文章搞定ftp、dns服务器

一篇文章搞定ftp、dns服务器 1、ftp 安装ftp 挂载centos镜像cd /media/CentOS_6.8_Final/Packages安装命令&#xff1a;[rootlocalhost Packages]# rpm -ivh vsftpd-2.2.2-21.el6.x86_64.rpm Vsftpd配置目录为/etc/vsftpd&#xff0c;其中包含下面几个文件 /var/ftp/&#xf…

awk命令编辑

awk工作原理 逐行读取文本&#xff0c;默认以空格或tab键分隔符进行分隔&#xff0c;将分隔所得的各个字段保存到内建变量中&#xff0c;并按模式或者条件执行编辑命令。 sed命令常用于一整行的处理&#xff0c;而awk比较倾向于将一行分成多个“字段”然后再进行处理。awk信息…

做网工10年,没人在30岁前和我讲这些(一)

晚上好&#xff0c;我是老杨。 23年才刚过几天&#xff0c;我就感觉自己又上了点年纪&#xff0c;时常面对年纪比较小的粉丝&#xff0c;无意识的面露慈爱的笑容。 还是每次小冬提醒我&#xff0c;我才发现我的表情不对劲。 我对年轻人的包容度是很强的&#xff0c;尤其是一…

VMware、CentOS、XShell、Xftp的安装

第 1 章 VMware 1.1 VMware 安装 一台电脑本身是可以装多个操作系统的&#xff0c;但是做不到多个操作系统切换自如&#xff0c;所以我们 需要一款软件帮助我们达到这个目的&#xff0c;不然数仓项目搭建不起来。 推荐的软件为 VMware&#xff0c;VMware 可以使用户在一台计…

DNS正反向解析

正向解析 1.准备工作 关闭Selinux服务和firewalld服务 [rootserver ~]# setenforce 0 [rootserver ~]# systemctl stop firewalld 修改服务器与客户端的IP为静态IP地址 [rootserver ~]# nmcli connection modify ens160 ipv4.method manual ipv4.address …

剑指offer 19. 正则表达式匹配

文章目录 1. 题目描述2. 解题思想3. 设置dp初始值4.代码实现 1. 题目描述 2. 解题思想 定义dp数组 dp[i][j]&#xff1a;表示当字符串长度i&#xff0c;j是&#xff0c;s与p是否匹配 确定递推公式 核心是s[i]要与p[j]进行比较&#xff0c;比较的结果来确定 dp数组的值&#xf…

STM32-ADC多通道输入实验

之前已经介绍了几个ADC的笔记和实验了&#xff0c;链接如下&#xff1a; 关于ADC的笔记1_Mr_rustylake的博客-CSDN博客 STM32-ADC单通道采集实验_Mr_rustylake的博客-CSDN博客 STM32-单通道ADC采集&#xff08;DMA读取&#xff09;实验_Mr_rustylake的博客-CSDN博客 接下来…

NodeJs基础之NRM与NPM

nrm nrm can help you easy and fast switch between different npm registries, now include: npm, cnpm, taobao, nj(nodejitsu). 译文&#xff1a;nrm可以帮助您在不同的npm注册表之间轻松快速地切换&#xff0c;现在包括&#xff1a;npm、cnpm、taobao、nj&#xff08;no…

编译安装及yum安装

一、编译安装 源码包&#xff1a;是由程序员按照特定格式和语法编写的包 二进制包:源码包经过成功编译之后产生的包 1.tar -xf httpd-2.4.29.tar.bz #解压源码包 2.安装依赖环境 3.配置安装路径 4.编译make并安装 5.关闭防火墙&#xff0c;和安全机制 6.开启服务器 7.…