确保消息不会丢失

news2024/10/5 14:27:14

现在主流的消息队列产品都提供了非常完善的消息可靠性保证机制,完全可以做到在消息传递过程中,即使发生网络中断或者硬件故障,也能确保消息的可靠传递,不丢消息。

绝大部分丢消息的原因都是由于开发者不熟悉消息队列,没有正确使用和配置消息队列导致的。虽然不同的消息队列提供的 API 不一样,相关的配置项也不同,但是在保证消息可靠传递这块儿,它们的实现原理是一样的。

检查消息丢失的方法

利用消息队列的有序性来验证是否有消息丢失

​ 在 Producer 端,我们给每个发出的消息附加一个连续递增的序号,然后在 Consumer 端来检查这个序号的连续性。如果没有消息丢失,Consumer 收到消息的序号必然是连续递增的,或者说收到的消息,其中的序号必然是上一条消息的序号 +1。如果检测到序号不连续,那就是丢消息了。还可以通过缺失的序号来确定丢失的是哪条消息,方便进一步排查原因。

​ 大多数消息队列的客户端都支持拦截器机制,你可以利用这个拦截器机制,在 Producer 发送消息之前的拦截器中将序号注入到消息中,在 Consumer 收到消息的拦截器中检测序号的连续性,这样实现的好处是消息检测的代码不会侵入到你的业务代码中,待你的系统稳定后,也方便将这部分检测的逻辑关闭或者删除。

确保消息的可靠传递

在这里插入图片描述

  • 生产阶段: 在这个阶段,从消息在 Producer 创建出来,经过网络传输发送到 Broker 端。
  • 存储阶段: 在这个阶段,消息在 Broker 端存储,如果是集群,消息会在这个阶段被复制到其他的副本上。
  • 消费阶段: 在这个阶段,Consumer 从 Broker 上拉取消息,经过网络传输发送到 Consumer 上。

一、生产阶段

​ 当你的代码调用发消息方法时,消息队列的客户端会把消息发送到 Broker,Broker 收到消息后,会给客户端返回一个确认响应,表明消息已经收到了

​ 只要 Producer 收到了 Broker 的确认响应,就可以保证消息在生产阶段不会丢失。有些消息队列在长时间没收到发送确认响应后,会自动重试,如果重试再失败,就会以返回值或者异常的方式告知用户。

​ 正确处理返回值或者捕获异常,就可以保证这个阶段的的消息不对丢失

from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import kafka_errors
import traceback
import json

import json

from kafka import KafkaConsumer, KafkaProducer


class KProducer:
    def __init__(self, bootstrap_servers, topic, acks):
        """
        kafka 生产者
        :param bootstrap_servers: 地址
        :param topic:  topic
        """
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            key_serializer=lambda k: json.dumps(k).encode('ascii'),
            value_serializer=lambda k: json.dumps(k).encode('ascii'),
            acks=acks
        )  # json 格式化发送的内容
        self.topic = topic

    def sync_producer(self, data_li: list):
        """
        同步发送 数据
        :param data_li:  发送数据
        :return:
        """
        for data in data_li:
            future = self.producer.send(self.topic, data)
            record_metadata = future.get(timeout=10)  # 同步确认消费
            partition = record_metadata.partition  # 数据所在的分区
            offset = record_metadata.offset  # 数据所在分区的位置
            print('save success, partition: {}, offset: {}'.format(partition, offset))

    # 异步发送时,则需要在回调方法里进行检查。这个地方是需要特别注意的,很多丢消息的原因就是,我们使用了异步发送,却没有在回调中检查发送结果。
    def async_producer(self, data_li: list):
        """
        异步发送数据
        :param data_li:发送数据
        :return:
        """
        for data in data_li:
            self.producer.send(self.topic, data)
        self.producer.flush()  # 批量提交

    def async_producer_callback(self, data_li: list):
        """
        异步发送数据 + 发送状态处理
        :param data_li:发送数据
        :return:
        """
        for data in data_li:
            self.producer.send(self.topic, data).add_callback(self.send_success).add_errback(self.send_error)
        self.producer.flush()  # 批量提交

    def send_success(self, *args, **kwargs):
        """异步发送成功回调函数"""
        print('save success')
        return

    def send_error(self, *args, **kwargs):
        """异步发送错误回调函数"""
        print('save error')
        return

    def close_producer(self):
        try:
            self.producer.close()
        except:
            pass


if __name__ == '__main__':
    send_data_li = ["Egg", "西瓜", "呼伦贝尔"]
    kp = KProducer(topic='gourd', bootstrap_servers=['127.0.0.1:9092'], acks="all")

    # 同步发送
    # kp.sync_producer(send_data_li)

    # 异步发送
    # kp.async_producer(send_data_li)

    # 异步+回调
    kp.async_producer_callback(send_data_li)

这里由于已经做了是否发送成功,在实例化producer阶段的acks是可以不用的

二、存储阶段

​ 在存储阶段正常情况下,只要 Broker 在正常运行,就不会出现丢失消息的问题,但是如果 Broker 出现了故障,比如进程死掉了或者服务器宕机了,还是可能会丢失消息的。

如果对消息的可靠性要求非常高,可以通过配置 Broker 参数来避免因为宕机丢消息。

对于单个节点的 Broker,需要配置 Broker 参数,在收到消息后,将消息写入磁盘后再给 Producer 返回确认响应,这样即使发生宕机,由于消息已经被写入磁盘,就不会丢失消息,恢复后还可以继续消费。例如,在 RocketMQ 中,需要将刷盘方式 flushDiskType 配置为 SYNC_FLUSH 同步刷盘。

三、消费阶段

​ 消费阶段采用和生产阶段类似的确认机制来保证消息的可靠传递,客户端从 Broker 拉取消息后,执行用户的消费业务逻辑,成功后,才会给 Broker 发送消费确认响应。如果 Broker 没有收到消费确认响应,下次拉消息的时候还会返回同一条消息,确保消息不会在网络传输过程中丢失,也不会因为客户端在执行消费逻辑中出错导致丢失。

from kafka import KafkaConsumer
import json

def consumer_demo():
    consumer = KafkaConsumer(
        'gourd',
        bootstrap_servers='47.116.26.72:9092',
        group_id='test',
        # earliest 从上一次未消费的位置开始读
        # latest 当前时刻开始读之后产生的,之前产生的数据不再消费
        auto_offset_reset="earliest",
        # 是否自动commit,当前消费者消费完该数据后,需要commit,才可以将消费完的信息传回消息队列的控制中心。
        # enable_auto_commit设置为True后,消费者将自动commit,并且两次commit的时间间隔为auto_commit_interval_ms。
        enable_auto_commit=False,  # 取消自动提交
        consumer_timeout_ms="10"
    )
    for message in consumer:
        print("receive, key: {}, value: {}".format(
            json.loads(message.key.decode()),
            json.loads(message.value.decode())
            )
        )
        # 存入数据库
        mysql.clinet("insert xxxxx json.loads(message.key.decode()) json.loads(message.value.decode()")
		consumer.commit()

if __name__ == '__main__':
    consumer_demo()

建议 一个进程一个consumer

正确的顺序是,先是把消息保存到数据库中,然后再发送消费确认响应。这样如果保存消息到数据库失败了,就不会执行消费确认的代码,下次拉到的还是这条消息,直到消费成功。

  • 在生产阶段,你需要捕获消息发送的错误,并重发消息。

  • 在存储阶段,你可以通过配置刷盘和复制相关的参数,让消息写入到多个副本的磁盘上,来确保消息不会因为某个 Broker 宕机或者磁盘损坏而丢失。
    息保存到数据库中,然后再发送消费确认响应。这样如果保存消息到数据库失败了,就不会执行消费确认的代码,下次拉到的还是这条消息,直到消费成功。

  • 在生产阶段,你需要捕获消息发送的错误,并重发消息。

  • 在存储阶段,你可以通过配置刷盘和复制相关的参数,让消息写入到多个副本的磁盘上,来确保消息不会因为某个 Broker 宕机或者磁盘损坏而丢失。

  • 在消费阶段,你需要在处理完全部消费业务逻辑之后,再发送消费确认。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/666414.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

第一章:软件工程师必备的硬件基础

目录 1、软件工程师需要具备的知识 2、计算机的组成 3、操作系统 4、BIOS的相关知识 1、软件工程师需要具备的知识 问题一:运维工程师、实施工程师是啥? 运维工程师负责服务的稳定性,确保服务可以不间断地为用户提供服务。 实施工程师负…

CmakeList使用笔记

cmake是一个跨平台、开源的构建系统。它是一个集软件构建、测试、打包于一身的软件。它使用与平台和编译器独立的配置文件来对软件编译过程进行控制。 Cmake的所有语句都写在一个CMakeLists.txt的文件中,Cmake运行之后就会产生我们想要的makefile文件,然…

【Jquery】Jquery实现页面嵌套到客户项目框架里面,不需要登录,获取cookie并直接展示首页:

文章目录 一、效果图:二、实现思路:三、实现代码: 一、效果图: 二、实现思路: 需求&#xff1a;嵌套到别的客户项目框架里面&#xff0c;不需要登录直接展示首页 实现&#xff1a;在打开页面前&#xff0c;获取登录cookie&#xff0c;然后再打开页面 三、实现代码: <!DOCTYP…

63、基于51单片机数字频率计NE555数码管显示系统设计(程序+原理图+Proteus仿真+参考论文+开题报告+任务书+元器件清单等)

摘 要 近年来随着计算机在社会领域的渗透和大规模集成电路的发展&#xff0c;单片机的应用正在不断地走向深入&#xff0c;由于它具有功能强&#xff0c;体积小&#xff0c;功耗低&#xff0c;价格便宜&#xff0c;工作可靠&#xff0c;使用方便等特点&#xff0c;因此越来越…

世界名酒商城元宇宙 中国4大“惨败酒

大家平时买白酒都怎么选择呢&#xff1f;一般都选择平时广告做得多&#xff0c;耳熟能详的大品牌&#xff0c;或者是直接听导购的买酒&#xff0c;毕竟那么贵的价格酒不可能不好&#xff0c;实际上这种买酒方式虽然不能说错吧&#xff0c;但是极容易错过很多好酒。 白酒市场上就…

自动化测试,UI测试和接口测试的基本概念以及指令

今天跟大家介绍UI测试、接口测试、单元测试主要内容 UI测试【Selenium】 UI测试是最接近软件真实用户使用行为的测试类型。通常是模拟真实用户使用软件的行为&#xff0c;即模拟用户在软件界面上的各种操作&#xff0c;并验证这些操作对应的结果是否正确。 这是最基本的一些S…

清华发布 KoLA 评测集,分4个认知层级评测LLM,GPT-4竟不是第一?

作者 | Python 预训练语言模型&#xff08;PLM&#xff09;刷GLUE&#xff0c;SuperGLUE&#xff0c;甚是常见&#xff1b;那ChatGPT等大语言模型&#xff08;LLM&#xff09;刷什么榜呢&#xff1f;现在常用的榜单&#xff0c;例如MMLU评测了57个学科知识&#xff0c;Big-Benc…

CodeForces..一条绳上的蚂蚱.[简单].[ifelse]

题目描述&#xff1a; 题目解读&#xff1a; 给定整数x和k&#xff0c;从0开始到达x&#xff0c;且每次移动的值&#xff0c;不能被k整除。 输出到达目标点x的最小移动次数和每次移动的值。 解题思路&#xff1a; 相当于在数轴上移动到目标点&#xff0c;且每次移动的数值不…

WPS数据清洗+R语言读取文件画频数分布直方图

R语言是一门好语言&#xff0c;但很多人在读取文件中数据时会遇到问题。比如我遇到的问题就是从文件中读取数据后&#xff0c;数据无法用于画图。 检索了N篇博文&#xff08;抱歉我实在无法一一列举30篇博文&#xff09;后&#xff0c;终于看到曙光&#xff0c;事实告诉我学任…

最新版CleanMyMacX4.13.6发布了,它值得买吗?

Clean My Mac X是Mac上一款美观易用的系统优化清理工具&#xff0c;也是小编刚开始用Mac时的装机必备。垃圾需要时时清&#xff0c;电脑才能常年新。Windows的垃圾清理工具选择有很多&#xff0c;但是Mac的清理工具可选择的就很少。 最新版CleanMyMacX4.13.6发布此版本有哪些亮…

2023年衣物洗护市场行业分析(京东天猫数据分析)

近年来&#xff0c;受消费者习惯的推动&#xff0c;衣物洗护用品市场不断发展&#xff0c;洗护用品行业的市场规模也不断增长。 根据鲸参谋电商数据分析平台的相关数据显示&#xff0c;今年1月份至4月份&#xff0c;天猫平台上衣物洗护相关产品的销量为7300万&#xff0c;产品销…

TC8:SOMEIPSRV_OPTIONS_05-07

SOMEIPSRV_OPTIONS_05: Reserved field of the IPv4 Endpoint Option 目的 IPv4 Endpoint Option的Reserved字段应静态设置为0x00 这是第二个Reserved字段 测试步骤 DUT CONFIGURE:启动具有下列信息的服务Service ID:SERVICE-ID-1Instance数量:1Tester:客户端-1发送SOME/I…

数字广东:共建区块链开源生态,实现高水平科技自立自强

近日&#xff0c;在2023年第1季社区Task挑战赛中&#xff0c;众多开发者为FISCO BCOS开源项目及周边组件贡献了丰富的代码和教程。其中&#xff0c;作为金链盟开源工作组成员&#xff0c;数字广东网络建设有限公司科技发展部的数字信任中心团队参与了共建。 数字广东网络建设…

【深度学习】5-1 与学习相关的技巧 - 参数的更新(Momentum,AdaGrad, Adam )

神经网络的学习的目的是找到使损失函数的值尽可能小的参数。这是寻找最优参数的问题&#xff0c;解决这个问题的过程称为最优化。 但是神经网络的最优化问题非常难。这是因为参数空间非常复杂&#xff0c;无法轻易找到最优解。而且&#xff0c;在深度神经网络中&#xff0c;参…

直击面试现场:你对MySQL的数据类型了解有多少?

前言 隔着玻璃门&#xff0c;看着面试官缓缓走来&#xff0c;头上飘着几根白发&#xff0c;在行走中随风摇曳&#xff0c;看的让人有一种想帮他薅下来的冲动。 这次面试的岗位是数据库数据类型&#xff0c;面试官坐下来冲着面试者沐风晓月呵呵一笑&#xff0c; “来啦”&…

广工击败清华,CGTN Sports 是这样说的

6 月 18 日晚上&#xff0c;被很多人不看好的弱旅广东工业大学&#xff0c;击败了豪门清华大学&#xff0c;拿下 CUBAL 的总冠军。 CGTN Sports Scene 是这样报道的&#xff1a; &#x1f3c6; 1st ever CUBAL championship in school history 校史上第一个 CUBAL 冠军 CUBA…

uniapp——Android 异常: failed to connect to localhost/127.0.0.1

bug解决——携带出现&#xff1a; Waiting to navigate to: /pages/……, do not operate continuously: /pages/…… failed to connect to localhost/127.0.0.1 解决方法&#xff1a; 我的报错&#xff1a;主要是failed to connect to localhost/127.0.0.1引发的Waiting …

【MSP430单片机】MSP430G2553程序,MSP430G2553单片机教程,MSP430G2553实战演练

文章目录 开发环境板子介绍官网示例代码下载MSP430普通IO口控制IO口外部中断MSP430时钟系统MSP430不精确延时之delay_msMSP430定时器_CCR0溢出中断MSP430定时器_定时器计数溢出中断MSP430定时器_PWM波形产生MSP430串口_收发9600波特率115200 波特率 MSP430ADCMSP430 Flash读写 …

数据结构顺序表(C语言实现)

绪论 从本章开始就是开始数据结构的开端&#xff0c;本章将会写出数据结构中的顺序表的代码实现&#xff0c;多会以注释的方法来描述一些细节&#xff08;注释是我们程序员必须常用的工具&#xff09;。 话不多说安全带系好&#xff0c;发车啦&#xff08;建议电脑观看&#xf…

Linux多线程之生产者消费者模型1

目录 &#x1f34a;一、什么是生产者消费者模型 &#x1f34a;二、基于BlockingQueue的生产者消费者模型 &#x1f34a;三、生产消费模型的upgrade版本 &#x1f34a; 四、三线程实现生产消费和存储 &#x1f34a;一、什么是生产者消费者模型 生产者消费者模式就是通过一…