如何存储队列位置信息

news2024/12/30 2:17:11

         实际运行中的系统,难免会遇到重新消费某条消息、跳过一段时间内的消息等情况。这些异常情况的处理,都和Offset有关。本节主要分析Offset的存储位置,以及如何根据需要调整Offset的值。

        首先来明确一下Offset的含义,RocketMQ中,一种类型的消息会放到一个Topic里,为了能够并行,一般一个Topic会有多个Message Queue(也可以设置成一个),Offset是指某个Topic下的一条消息在某个Message Queue里的位置,通过Offset的值可以定位到这条消息,或者指示Consumer从这条消息开始向后继续处理。

如图1所示是Offset的类结构,主要分为本地文件类型和Broker代存的类型两种。对于DefaultMQPushConsumer来说,默认是CLUSTERING模式,也就是同一个Consumer group里的多个消费者每人消费一部分,各自收到的消息内容不一样。这种情况下,由Broker端存储和控制Offset的值,使用RemoteBrokerOffsetStore结构。


 

图1 OffsetStore的类结构

在DefaultMQPushConsumer里的BROADCASTING模式下,每个Consumer都收到这个Topic的全部消息,各个Consumer间相互没有干扰,RocketMQ使用LocalFileOffsetStore,把Offset存到本地。

OffsetStore使用Json格式存储,简洁明了,下面是个例子:

代码清单1 Offsetstore的内容示例

{"OffsetTable":{{"brokerName":"localhost", "QueueId":1,"Topic":"broker1" }: 1,{ "brokerName":"localhost", "QueueId":2,"Topic":"broker1" }:2, { "brokerName":"localhost", "QueueId":0, "Topic":"broker1" }:3 } }
 

在使用DefaultMQPushConsumer的时候,我们不用关心OffsetStore的事,但是如果PullConsumer,我们就要自己处理OffsetStore了。在前面文章中PullConsumer示例中,代码里把Offset存到了内存,没有持久化存储,这样就可能因为程序的异常或重启而丢失Offset,在实际应用中不推荐这样做。接下来给出在磁盘存储Offset的示例程序,参照LocalFileOffsetStore的源码编写,如代码清单2所示。

代码清单2 自定义持久存储OffsetStore

public class LocalOffsetStoreExt {
    private final String groupName;
    private final String storePath;
    private ConcurrentMap<MessageQueue, AtomicLong> OffsetTable =
            new ConcurrentHashMap<MessageQueue, AtomicLong>();
    public LocalOffsetStoreExt(String storePath, String groupName) {
        this.groupName = groupName;
        this.storePath = storePath;
    }
    public void load() {
        OffsetSerializeWrapper OffsetSerializeWrapper = this.readLocal-Offset();
        if (OffsetSerializeWrapper != null && OffsetSerializeWrapper.getOffsetTable() != null) {
            OffsetTable.putAll(OffsetSerializeWrapper.getOffsetTable());
            for (MessageQueue mq : OffsetSerializeWrapper.getOffsetTable().keySet()) {
                AtomicLong Offset = OffsetSerializeWrapper.getOffset-Table().get(mq);
                System.out.printf("load Consumer's Offset, {} {} {} \n", this.groupName, mq, Offset.get());
            }
        }
    }
    public void updateOffset(MessageQueue mq, long Offset) {
        if (mq != null) {
            AtomicLong OffsetOld = this.OffsetTable.get(mq);
            if (null == OffsetOld) {
                this.OffsetTable.putIfAbsent(mq, new AtomicLong(Offset));
            } else {
                OffsetOld.set(Offset);
            }
        }
    }
    public long readOffset(final MessageQueue mq) {
        if (mq != null) {
            AtomicLong Offset = this.OffsetTable.get(mq);
            if (Offset != null) {
                return Offset.get();
            }
        }
        return 0;
    }
    public void persistAll(Set<MessageQueue> mqs) {
        if (null == mqs || mqs.isEmpty())
            return;
        OffsetSerializeWrapper OffsetSerializeWrapper = new Offset-SerializeWrapper();
        for (Map.Entry<MessageQueue, AtomicLong> entry : this.OffsetTable.
            entrySet()) {
            if (mqs.contains(entry.getKey())) {
                AtomicLong Offset = entry.getValue();
                OffsetSerializeWrapper.getOffsetTable().put(entry.getKey(), Offset);
            }
        }
        String jsonString = OffsetSerializeWrapper.toJson(true);
        if (jsonString != null) {
            try {
                MixAll.string2File(jsonString, this.storePath);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    private OffsetSerializeWrapper readLocalOffset() {
        String content = null;
        try {
            content = MixAll.file2String(this.storePath);
        } catch (IOException e) {
            e.printStackTrace();
        }
        if (null == content || content.length() == 0) {
            return null;
        } else {
            OffsetSerializeWrapper OffsetSerializeWrapper = null;
            try {
                OffsetSerializeWrapper =
                        OffsetSerializeWrapper.fromJson(content, Offset-SerializeWrapper.class);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return OffsetSerializeWrapper;
        }
    }
}

 

了解OffsetStore的存储机制以后,我们看看如何设置Consumer读取消息的初始位置。DefaultMQPushConsumer类里有个函数用来设置从哪儿开始消费消息:比如setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET),这个语句设置从最小的Offset开始读取。如果从队列开始到感兴趣的消息之间有很大的范围,用CONSUME_FROM_FIRST_OFFSET参数就不合适了,可以设置从某个时间开始消费消息,比如Consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP),Consumer.setConsumeTimestamp("20131223171201"),时间戳格式是精确到秒的。注意设置读取位置不是每次都有效,它的优先级默认在Offset Store后面,比如在DefaultMQPushConsumer的BROADCASTING方式下,默认是从Broker里读取某个Topic对应ConsumerGroup的Offset,当读取不到Offset的时候,ConsumeFromWhere的设置才生效。大部分情况下这个设置在Consumer Group初次启动时有效。如果Consumer正常运行后被停止,然后再启动,会接着上次的Offset开始消费,ConsumeFromWhere的设置无效。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1191418.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

进口猫罐头在排行榜中是否靠前?排行榜中靠前的猫罐头测评

养猫这6年&#xff0c;我对猫咪的日常饮食把关一直很严格。这些年我给我家猫们购买过很多不同品牌、不同口味的罐头&#xff0c;在猫罐头的挑选、分析上还是有一些经验的。今天&#xff0c;我将和大家一起探讨进口猫罐头在排行榜中是否靠前&#xff1f;同时&#xff0c;我将为大…

[量化投资-学习笔记008]Python+TDengine从零开始搭建量化分析平台-CCI和ATR

目录 1. 指标简介CCIATR 2. 程序编写题外话 1. 指标简介 将这两个指标放在一起&#xff0c;一方面是因为这两个指标都属于摆动指数&#xff0c;可以反应市场的活跃度。 另一方面是因为CCI和ATR与之前提到的EMA,MACD,布林带的三个指标的计算基础不同。之前的三个指标都是以收盘…

UltraEdit2024免费版文本编辑器

我们必须承认软件员使用的编辑器或代码编辑器是一款强大 IDE 的重要组成部分&#xff0c;它是任何 IDE 的核心基础。用户量向我们证明了UEStudio 基于著名的 UltraEdit 进行构建&#xff0c;同样&#xff0c;软件的主干非常成熟和稳定&#xff0c;并且已经被证实成为文本和软件…

【Unity ShaderGraph】| 物体靠近时局部溶解,根据坐标控制溶解的位置【文末送书】

前言 【Unity ShaderGraph】| 物体靠近时局部溶解&#xff0c;根据坐标控制溶解的位置一、效果展示二、根据坐标控制溶解的位置&#xff0c;物体靠近局部溶解三、应用实例&#x1f451;评论区抽奖送书 前言 本文将使用ShaderGraph制作一个根据坐标控制溶解的位置&#xff0c;物…

winform开发小技巧

如果我们不知道怎么在代码中new 一个控件&#xff0c;我们可以先在窗体中拉一个然后看Form1.Designer.cs 里面生成的代码就是我们要的 我们会在下面看到 还有泛型的使用&#xff0c;马上更新

ProPainter——实现视频消除特定对象、去水印、视频修复

ProPainter视频修复 1. 安装1.1 克隆项目1.2 创建虚拟环境和安装依赖库1.3 下载权重文件 2. 自带示例2.1 消除物体2.2 视频修复 3. 实际应用案例4.训练自己的数据集 &#x1f4dd;github&#xff1a;https://github.com/sczhou/ProPainter &#x1f4d6;paper&#xff1a;ICCV…

AI 时代的企业级安全合规策略

目录 漏洞分类管理的流程 安全策略管理 在扫描结果策略中定义细粒度的规则 有效考虑整个组织中的关键漏洞 确保职责分离 尝试组合拳 本文来源&#xff1a;about.gitlab.com 作者&#xff1a;Grant Hickman 在应用程序敏捷研发、敏捷交付的今天&#xff0c;让安全人员跟上…

SNP应邀参加2023中国企业数字化转型峰会暨赛意用户大会

创新驱动科技&#xff0c;数智驱动未来。如今&#xff0c;我国产业数字化进程提速升级&#xff0c;数字产业化规模持续壮大。数据显示&#xff0c;2022年&#xff0c;我国数字经济规模达50.2万亿元&#xff0c;总量稳居世界第二。数字经济已经成为推动传统产业转型升级、促进高…

MathWorks Matlab R2023b ARM Mac报错 License Manager Error -8

MathWorks Matlab R2023b 23.2.0.2365128 ARM 版本安装激活后出现报错&#xff1a; License Manager Error -8 License checkout failed. License Manager Error -8 Make sure the HostID of the license file matches this machine, and that the HostID on the SERVER line m…

python实现FINS协议的TCP服务端(篇一)

python实现FINS协议的TCP服务端是一件稍微麻烦点的事情。它不像modbusTCP那样&#xff0c;可以使用现成的pymodbus模块去实现。但是&#xff0c;我们可以根据协议帧进行组包&#xff0c;自己去实现帧的格式&#xff0c;而这一切可以基于socket模块。本文为第一篇。 一、了解FI…

深度学习之基于Python+OpenCV(DNN)性别和年龄识别系统

欢迎大家点赞、收藏、关注、评论啦 &#xff0c;由于篇幅有限&#xff0c;只展示了部分核心代码。 文章目录 一项目简介 二、功能三、系统四. 总结 一项目简介 基于Python和OpenCV的深度学习性别和年龄识别系统是一种利用深度学习模型来自动识别人脸照片中的性别和年龄的技术。…

95 课程表

课程表 题解1 BFS&#xff08;拓扑图模板&#xff09;题解2 DFS 你这个学期必须选修 numCourses 门课程&#xff0c;记为 0 到 numCourses - 1 。 在选修某些课程之前需要一些先修课程。 先修课程按数组 prerequisites 给出&#xff0c;其中 prerequisites[i] [ai, bi] &am…

PostgreSQL 技术内幕(十一)位图扫描

扫描算子在上层计算和底层存储之间&#xff0c;向下扫描底层存储的数据&#xff0c;向上作为计算的输入源&#xff0c;在SQL的执行层中&#xff0c;起着关键的作用。顺序、索引、位图等不同类型的扫描算子适配不同的数据分布场景。然而&#xff0c;扫描算子背后的实现原理是怎样…

Linux nohup后台启动/ 后台启动命令中nohup 、、重定向的使用

文章目录 一、前言二、nohup&#xff08;不挂断&#xff09;简介三、nohup使用3.1、nohup启动3.2、nohup与&&#xff0c;后台运行3.3、nohup与>&#xff0c;日志重定向3.4、nohup后台启动-综合使用(推荐)2>&1 3.5、nohup后台启动(不生成日志) 四、查看进程五、知…

输出所有最长公共子序列

输出所有最长公共子序列 什么是最长公共子序列过程讲解完整程序代码&#xff08;python&#xff09; 什么是最长公共子序列 在力扣题库中的1143题有一道最长公共子序列&#xff0c;但是那个只是返回最长子序列的长度&#xff0c;而没有输出所有的最长子序列 通过上图中的举例…

VR全景技术,为养老院宣传推广带来全新变革

现如今&#xff0c;人口老龄化的现象加剧&#xff0c;养老服务行业也如雨后春笋般不断冒头&#xff0c;但是市面上各式的养老院被包装的五花八门&#xff0c;用户实际参访后却差强人意&#xff0c;如何更好的给父母挑选更为舒心的养老环境呢&#xff1f;可以利用720度VR全景技术…

响应式艺术作品展示前端html网站模板源码

响应式艺术作品展示网站模板是一款适合各种艺术作品在线展示的响应式网站模板下载。提示&#xff1a;本模板调用到谷歌字体库&#xff0c;可能会出现页面打开比较缓慢。 转载自 https://www.qnziyw.cn/wysc/qdmb/23778.html

文献管理软件Zotero之同步篇(2)

文章目录 0、前言1、官方自带同步1.1、Zotero的同步逻辑【必须掌握】1.2、自带同步设置 2、官网进行数据同步&#xff0c;同步网盘进行文件同步的方案2.1、对1.1同步逻辑的补充说明2.2、同步设置2.2.1、同步网盘的选择&#xff1a;2.2.2、同步设置2.2.2.1、分别在每一台计算机中…

JMeter参数化方式:三招让你的性能测试更灵活!

科技新势力&#xff0c;引领向未来 今天我们将探讨关于JMeter&#xff08;Apache JMeter&#xff09;性能测试工具中的参数化方式&#xff0c;这些方法可以帮助你更灵活地进行性能测试&#xff0c;并更好地模拟实际用户行为。JMeter是一款强大的开源工具&#xff0c;可用于测试…

R-install_miniconda()卸载 | conda命令行报错及解决方法

运行以下代码&#xff0c;突然报错&#xff1a; C:\Users\hp>conda info-e >>>>>>>>>>>>>>>>>>>>>> ERROR REPORT <<<<<<<<<<<<<<<<<<<<&…