实现Kafka至少消费一次

news2024/12/25 0:52:06

实现Kafka至少消费一次

    • 默认的kafka消费者存在什么问题?
    • 实现至少消费一次
      • 加入重试队列再次消费
      • 使用seek方法再次消费

在实际重要的场景中,常常需要实现消费者至少消费一次。因为使用默认的kafka消费者存在某些问题。

默认的kafka消费者存在什么问题?

(1)需要自己实现重新消费数据

在刚开始思考时,有人认为实现重复消费还不简单?取消自动提交,确认一批消息已经消费成功就执行手动提交,否则不提交;之后重新获取未提交的数据,就可以达到重复消费的目的。

可是,事实是残酷的,试一次就知道,感觉手动不提交没有用一样😂,消费者一直在往后消费。

对于Kafka中的分区而言,它的每条消息都有唯一的 offset ,用来表示消息在分区中对应的位置(called:偏移量)。 对于消费者而言,它也有一个 offset 的概念,消费者使用offset来表示消费到分区中某个消息所在的位置(called:位移)。偏移量存储在Kafka内部的主题 __consumer_offsets 中,而位移存储在消费者端的内存中。“提交”就是将消费者端存储的位移存储到 __consumer_offsets 持久化,当消费者发生崩溃或发生消费者重平衡时,就会去读取存储在 __consumer_offsets 中的偏移量,其他正常情况下都是按内存存储的位移在顺序读取。因此,按照上述操作就会出现提交没用的效果。

(2)自动提交情况下,可能出现消息丢失情况
拉取线程 A 不断地拉取消息并存入本地缓存,比如在 BlockingQueue 中,另一个处理线程 B 从缓存中读取消息并进行相应的逻辑处理。假设目前进行到了第 y+1 次拉取,以及第 m 次位移提交的时候,也就是 x+6 之前的位移己经确认提交了,处理线程 B 却还正在消费 x+3 的消息 。 此时如果处理线程B 发生了异常,待其恢复之后会从第 m 次位移提交处,也就是 x+6 的位置开始拉取消息,那么 x+3 至 x+6 之间的消息就没有得到相应的处理,这样便发生消息丢失的现象 。

在这里插入图片描述

实现至少消费一次

语言:python 3.8
工具:confluent-kafka

目前有2个思路,第1个时新建一个重试队列,当遇到问题消息时将其插入到重试队列中,消费者可以再次获取到该问题消息并再次消费。第2个是通过 seek 方法设置位移到指定发生问题的位置,使得重新消费问题消息;

加入重试队列再次消费

class KafkaAtLeastOnceConsumer(object):
    """
    注意:
    1. 用户方法须返回Boolean类型数据,False将可能重新消费该数据
    2. 用户消息内容不得包含 try_count、old_topic 关键字

    """

    run_flag = True
    if config.DEBUG:
        total_set = set()

    def __init__(
            self,
            group_id: str,
            topic_list: List,
            user_function: Callable,
            servers: List = config.KAFKA_HOST,
            consumer_count: int = 5,
            reset_type: str = 'latest',
            concurrency: int = 5,
            batch_size: int = 500,
            timeout: int = 1,
            base_mode: bool = False,
            retry_count: int = 3,
    ):
        assert consumer_count > 0, '消费者数目须大于0'
        assert concurrency > 0, '消费者并发度须大于0'
        assert batch_size > concurrency, '单批消息数须大于并发度'
        assert timeout > 0, '获取消息超时时间须大于0'
        assert retry_count > -2, '重试次数应大于等于-1'

        self._consumer_count = consumer_count
        self._pool = ThreadPoolExecutor(max_workers=self._consumer_count)
        self._servers = ','.join(servers)
        self._group_id = group_id
        self._user_function = user_function
        self._reset_type = reset_type
        self._topic_list = topic_list
        self._concurrency = concurrency
        self._batch_size = batch_size
        self._timeout = timeout
        self._retry_count = retry_count

        self._process_num_per_thread = 100
        self._retry_topic_name = 'kfk_retry_queue'
        self._inner_producer = None

        self._topic_list.append(self._retry_topic_name)
        if base_mode:
            self._retry_count = 0

    def start(self) -> None:
        for i in range(self._consumer_count):
            self._pool.submit(self._core)

    def shutdown(self) -> None:
        KafkaAtLeastOnceConsumer.run_flag = False

    def _split_msgs(self, msgs: List) -> List:
        msg_num = len(msgs)
        if msg_num <= self._process_num_per_thread * self._concurrency:
            process_num_per_thread = self._process_num_per_thread
        else:
            process_num_per_thread = int(msg_num / self._concurrency)
        return list(chunked(msgs, process_num_per_thread))

    def _core(self) -> None:
        try:
            consumer = self._init_consumer()
            batch_pool = ThreadPoolExecutor(max_workers=self._concurrency)
            while KafkaAtLeastOnceConsumer.run_flag:
                msgs = consumer.consume(num_messages=self._batch_size, timeout=self._timeout)
                if not msgs:
                    continue
                if config.DEBUG:
                    log.info(f'开始处理一批消息')
                msg_lists = self._split_msgs(list(msgs))
                threads = []
                for msg_list in msg_lists:
                    t = batch_pool.submit(self._thread_run, msg_list)
                    threads.append(t)
                wait(threads)
                consumer.commit()
                if config.DEBUG:
                    log.info(f'完成处理一批消息')
                    log.info(f'total_set:{len(KafkaAtLeastOnceConsumer.total_set)}')
                    time.sleep(1)
        except Exception as e:
            log.exception(e)
        finally:
            try:
                if consumer:
                    consumer.close()
                if batch_pool:
                    batch_pool.shutdown()
            except Exception as e:
                log.exception(e)

    def _thread_run(self, msg_list) -> None:
        for msg in msg_list:
            msg_map = json.loads(msg.value().decode('utf-8'))
            if 'old_topic' in msg_map and msg_map['old_topic'] not in self._topic_list:
                continue

            try:
                func_is_success = self._user_function(msg)
            except Exception:
                func_is_success = False

            if not func_is_success:
                if msg.topic() == self._retry_topic_name:
                    try_count = msg_map.get('try_count', 0)
                else:
                    try_count = 0
                if self._retry_count == 0 or (0 < self._retry_count <= try_count):
                    continue
                else:
                    # 重试操作
                    if not self._inner_producer:
                        self._inner_producer = self._init_producer()
                    msg_map['try_count'] = try_count + 1
                    msg_map['old_topic'] = self._topic_list[0]
                    self._inner_producer.produce(self._retry_topic_name, json.dumps(msg_map))
            elif config.DEBUG:
                KafkaAtLeastOnceConsumer.total_set.add(json.loads(msg.value().decode('utf-8'))['t'])

    def _init_consumer(self) -> Consumer:
        _consumer = Consumer(
            {
                'bootstrap.servers': self._servers,
                'group.id': self._group_id,
                'auto.offset.reset': self._reset_type,
                'enable.auto.commit': False,
            }
        )
        _consumer.subscribe(self._topic_list)
        return _consumer

    def _init_producer(self) -> Producer:
        _producer = Producer(
            {
                'bootstrap.servers': ','.join(config.KAFKA_HOST),
            }
        )
        return _producer
  1. 同时开启 consumer_count 个消费者并处于同一分组中,为了提高吞吐量,每个消费者又会开启 concurrency 个线程去消费数据
  2. 当消费消费出现异常或返回False,并且重试次数没有使用完毕,就会将初始消息以及重试次数发送到“重试队列”
  3. 关闭自动提交,开启手动提交,当消费者端崩溃或再平衡时再次消费未提交数据。

测试代码:

def _get_msg_data(msg):
    p = msg.partition()
    o = msg.offset()
    t = msg.topic()
    value = msg.value().decode('utf-8')
    return p, o, t, value


def my_function(msg):
    if msg.error():
        log.error('fetch msg is error. error:%s' % msg.error())
        return False
    # 处理业务逻辑,单次问题进行重试
    p, o, t, value = _get_msg_data(msg)
    if random.randint(1, 100) == 9:
        log.info(f"发生业务异常返回False, topic:{t}, partition:{p},  offset {o}, value:{value['t']} ")
        return False
    else:
        log.info(f'业务处理消息,topic:{t}, partition:{p}, offset:{o}, content:{value}')
        return True


def success_function(msg):
    # 处理业务逻辑,完全正常
    if msg.error():
        log.error('fetch msg is error. error:%s' % msg.error())
        return False
    p, o, t, value = _get_msg_data(msg)
    log.info(f'业务处理消息,topic:{t}, partition:{p}, offset:{o}, content:{value}')
    return True


def exception_function(msg):
    # 处理业务逻辑,抛出异常重试
    if msg.error():
        log.error('fetch msg is error. error:%s' % msg.error())
        return False
    p, o, t, value = _get_msg_data(msg)
    if random.randint(1, 100) == 9:
        log.info(f"发生业务异常返回False, topic:{t}, partition:{p},  offset {o}, value:{value['t']} ")
        raise Exception('业务异常')
    else:
        log.info(f'业务处理消息,topic:{t}, partition:{p}, offset:{o}, content:{value}')
        return True


def test_normal_try_limit(retry_count):
    # 用户函数返回false重试(-1无限/数字为重试次数)
    consumer = KafkaAtLeastOnceConsumer(group_id, [topic], my_function, retry_count=retry_count)
    consumer.start()
    while True:
        time.sleep(10)


def test_exception_try_limit(retry_count):
    # 异常指定重试次数(-1无限/数字为重试次数)
    consumer = KafkaAtLeastOnceConsumer(group_id, [topic], exception_function, retry_count=retry_count)
    consumer.start()
    while True:
        time.sleep(10)


def test_normal_no_try():
    # 基本模式
    consumer = KafkaAtLeastOnceConsumer(group_id, [topic], my_function, base_mode=True)
    consumer.start()
    while True:
        time.sleep(10)


def test_crash_consume():
    # 模拟消费者关停/崩溃,消费者继续消费
    consumer = KafkaAtLeastOnceConsumer(group_id, [topic], success_function, base_mode=True)
    consumer.start()
    time.sleep(20)
    consumer.shutdown()


if __name__ == '__main__':
    test_channel = sys.argv[1]
    if test_channel == "1":
        test_normal_try_limit(-1)
    elif test_channel == "2":
        test_normal_try_limit(3)
    elif test_channel == "3":
        test_exception_try_limit(-1)
    elif test_channel == "4":
        test_exception_try_limit(3)
    elif test_channel == "5":
        test_normal_no_try()
    elif test_channel == "6":
        test_crash_consume()
    else:
        test_normal_try_limit(-1)

使用seek方法再次消费

理论上来讲,这种方法是能行的通的。因为seek能将一个活跃分区的消费位移设置到消费失败的位置,然后下一次拉取时可以重新获取该数据,并且相比使用“重试队列”,seek方式还可以保证消息部分顺序。
但是seek在批量处理下存在未知问题,后面再研究。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/108637.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Django+DRF+Vue+Mysql+Redis OUC软件工程作业

交作业啦 前端&#xff1a;htmlcssjsVueElement-ui 后端&#xff1a;DjangoDRFceleryhaystackdjango_crontab 数据库&#xff1a;MysqlRedis 一些技术和功能&#xff1a; 为session、短信验证码、用户浏览记录、购物车、异步任务队列 创建缓存whoosh搜索引擎异步任务队列 用…

谷歌Recorder实现说话人自动标注,功能性与iOS语音备忘录再度拉大

在今年的 Made By Google 大会上&#xff0c;谷歌公布了 Recorder 应用的自动说话人标注功能。该功能将实时地为语音识别的文本加上匿名的说话人标签&#xff08;例如 “说话人 1” 或“说话人 2”&#xff09;。这项功能将极大地提升录音文本的可读性与实用性。 谷歌于 2019 …

Spring Cloud Alibaba Sentinel - - >流控规则初体验

源码地址&#xff1a;https://github.com/alibaba/Sentinel 新手指南&#xff1a;https://github.com/alibaba/Sentinel/wiki/新手指南#公网-demo 官方文档&#xff1a;https://sentinelguard.io/zh-cn/docs/introduction.html 注解支持文档&#xff1a;https://github.com/ali…

Android常用布局总结之(FrameLayout、ConstraintLayout)

一、FrameLayout 帧布局 这种布局类似叠加的图片&#xff0c;没有任何的定位方式&#xff0c;当我们往里面添加组件的时候&#xff0c;会默认把他们放到容器的左上角。 上面的组件显示在底层&#xff0c;下面的组件显示在上层。 如下代码&#xff0c;视图1显示在最底层&#…

虹科案例 | 光纤传感器实现了新的核磁共振应用!

背景介绍 光纤传感器已成为推动MRI最新功能套件升级和新MRI设备设计背后的关键技术。将患者的某些活动与MRI成像系统同步是越来越受重视的需求。磁场强度随着每一代的发展而增大&#xff08;3.0T是当今最高的标准&#xff09;&#xff0c;因此&#xff0c;组件的电磁透明度在每…

python---数据库操作

在python中&#xff0c;使用第三方库pymysql来执行数据库操作 命令行窗口输入 &#xff1a;pip install pymysql&#xff0c;下载第三方库 数据库查询操作 Python查询Mysql使用 fetchone() 方法获取单条数据, 使用fetchall() 方法获取多条数据。 fetchone(): 该方法获取下一…

OpManager 网络管理软件

随着网络在有线、无线和虚拟 IT 环境中的扩展&#xff0c;网络管理只会变得越来越复杂&#xff0c;使网络管理员需要他们可以获得的所有帮助。市场上有无数的网络管理解决方案&#xff0c;因此将注意力集中在正确的解决方案上非常重要。网络管理工具通常可以帮助您将网络的各种…

一行python命令让手机读取电脑文件

本文讲解python的一个内置文件传输下载器&#xff0c;可以用来在局域网内进行文件传输&#xff0c;当然可能有人会问&#xff0c;我用微信QQ也能传&#xff0c;为什么还要用python来传输下载&#xff1f;在此&#xff0c;其实我个人感觉的是&#xff0c;这种操作更简单&#xf…

【Web开发】Python实现Web服务器(Ubuntu下打包Flask)

&#x1f37a;基于Python的Web服务器系列相关文章编写如下&#x1f37a;&#xff1a; &#x1f388;【Web开发】Python实现Web服务器&#xff08;Flask快速入门&#xff09;&#x1f388;&#x1f388;【Web开发】Python实现Web服务器&#xff08;Flask案例测试&#xff09;&a…

安科瑞红外测温方案助力滁州某新能源光伏产业工厂安全用电

安科瑞 李亚俊 壹捌柒贰壹零玖捌柒伍柒 摘要&#xff1a; 近年来&#xff0c;在国家政策引导与技术革新驱动的双重作用下&#xff0c;光伏产业保持快速增长态势&#xff0c;产业规模持续扩大&#xff0c;技术迭代更新不断&#xff0c;目前已在全球市场取得优势。据统计&#…

数据结构C语言版——链式二叉树的基本操作实现

文章目录链式二叉树1. 概念2. 链式二叉树的基本操作前序遍历中序遍历后续遍历根据前序遍历构建二叉树层序遍历在二叉树中查找指定值获取二叉树节点个数获取叶子节点个数求二叉树的高度链式二叉树 1. 概念 设计不同的节点结构可构成不同形式的链式存储结构。由二叉树的定义可知…

用简单伪随机数发生器实现随机中点位移分形(Matlab代码实现)

目录 &#x1f4a5;1 概述 &#x1f4da;2 运行结果 &#x1f389;3 参考文献 &#x1f468;‍&#x1f4bb;4 Matlab代码 &#x1f4a5;1 概述 随机分形(random fractal)采用随机生成机制而得到的分形集.分形体不具有特征尺度(亦即大小尺度跨好几个量级)&#xff0c;却有…

5G无线技术基础自学系列 | 5G接入类KPI

素材来源&#xff1a;《5G无线网络规划与优化》 一边学习一边整理内容&#xff0c;并与大家分享&#xff0c;侵权即删&#xff0c;谢谢支持&#xff01; 附上汇总贴&#xff1a;5G无线技术基础自学系列 | 汇总_COCOgsta的博客-CSDN博客 接入类KPI反映了用户成功接入到网络中并…

李沐精读论文:Swin transformer: Hierarchical vision transformer using shifted windows

论文地址&#xff1a;Swin transformer: Hierarchical vision transformer using shifted windows 代码&#xff1a;官方源码 pytorch实现 SwinTransformerAPI 视频&#xff1a;Swin Transformer论文精读【论文精读】_哔哩哔哩_bilibili 本文注意参考&#xff1a;Swin Transfor…

MySql性能优化(四)索引

Index索引相关概念数据结构B树优点及用处优点用处分类技术名词回表覆盖索引最左匹配索引下推索引的匹配方式哈希索引特点代价案例组合索引案例聚簇索引与非聚簇索引聚簇索引非聚簇索引覆盖索引基本介绍优点判断参考索引相关概念 数据结构 B树 推荐一篇讲的很不错的文章&…

【小程序】wxss与rpx单位以及全局样式和局部样式

目录 WXSS 1. 什么是 WXSS 2. WXSS 和 CSS 的关系 rpx 1. 什么是 rpx 尺寸单位 2. rpx 的实现原理 3. rpx 与 px 之间的单位换算* 样式导入 1. 什么是样式导入 2. import 的语法格式 全局样式和局部样式 1. 全局样式 2. 局部样式 WXSS 1. 什么是 WXSS WXSS (We…

Linux网络与数据封装

欢迎关注博主 Mindtechnist 或加入【Linux C/C/Python社区】一起探讨和分享Linux C/C/Python/Shell编程、机器人技术、机器学习、机器视觉、嵌入式AI相关领域的知识和技术。 Linux网络与数据封装1. 网络应用程序的设计模式&#xff08;1&#xff09;C/S架构&#xff08;2&#…

VRTK4 入门指南

VRTK4 说明文档VRTK Farm Yard 示例 - Virtual Reality Toolkit要求使用 Unity 2020.3.24f1.Beta 免责声明简介入门下载项目在 Unity 中打开下载的项目使用 Unity Hub在 Unity 中打开项目运行示例场景Made With VRTK贡献第三方包许可证VRTK Farm Yard 示例 - Virtual Reality T…

家居建材行业数字化重构,依靠CRM打通全流程

国家十四五规划提出大力推进产业数字化转型&#xff0c;如今各行各业数字化进程如火如荼&#xff0c;传统行业将数字化转型视为重塑产业竞争力的重要途径。因此&#xff0c;即便是数字化率平均只有10%的家具建材业&#xff0c;也在积极进行全生命周期的产品数字化、全域营销数字…

加载速度提升 15%,关于 Python 启动加速探索与实践的解析 | 龙蜥技术

编者按&#xff1a;在刚刚结束的 PyCon China 2022 大会上&#xff0c;龙蜥社区开发者严懿宸分享了主题为《Python 启动加速的探索与实践》的技术演讲。本次演讲&#xff0c;作者将从 CPython 社区相关工作、本方案的设计及实现&#xff0c;以及业务层面的集成等方面进行介绍。…