RocketMQ源码 Broker-SubscriptionGroupManager 订阅组管理组件源码分析

news2024/7/6 20:00:23

前言

SubscriptionGroupManager 继承了ConfigManager配置管理组件,拥有将内存数据持久化到磁盘文件subscriptionGroup.json的能力。它主要负责维护所有消费组在内存中的订阅数据。


源码版本:4.9.3

源码架构图

核心数据结构

主要的数据结构比较简单,维护了Map<消费组名称, 订阅组配置>的映射关系。

// 订阅组管理组件
public class SubscriptionGroupManager extends ConfigManager {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);

    // Map<消费组名称,订阅组配置>
    private final ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable =
        new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024);
    // 内存数据版本号
    private final DataVersion dataVersion = new DataVersion();
}

深入看下SubscriptionGroupConfig 的数据结构。

public class SubscriptionGroupConfig {
    // 消费组名称
    private String groupName;
    // 是否开启消费
    private boolean consumeEnable = true;
    // 是否允许消费最早消息
    private boolean consumeFromMinEnable = true;
    // 是否允许广播消费
    private boolean consumeBroadcastEnable = true;
    // 重试队列数
    private int retryQueueNums = 1;
    // 重试最大次数
    private int retryMaxTimes = 16;
    // brokerId
    private long brokerId = MixAll.MASTER_ID;
    // 当产生慢消费时,选择第几个broker
    private long whichBrokerWhenConsumeSlowly = 1;
    // 是否通知消费者ids变化
    private boolean notifyConsumerIdsChangedEnable = true;
}

核心数据行为

数据行为主要都是对上面提到的数据结构的维护,代码 + 注释如下:

// 订阅组管理组件
public class SubscriptionGroupManager extends ConfigManager {
    
    public SubscriptionGroupManager() {
        this.init();
    }

    public SubscriptionGroupManager(BrokerController brokerController) {
        this.brokerController = brokerController;
        this.init();
    }

    private void init() {
        {
            // 初始化系统消费组
            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
            subscriptionGroupConfig.setGroupName(MixAll.TOOLS_CONSUMER_GROUP);
            this.subscriptionGroupTable.put(MixAll.TOOLS_CONSUMER_GROUP, subscriptionGroupConfig);
        }

        {
            // 初始化过滤服务消费组
            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
            subscriptionGroupConfig.setGroupName(MixAll.FILTERSRV_CONSUMER_GROUP);
            this.subscriptionGroupTable.put(MixAll.FILTERSRV_CONSUMER_GROUP, subscriptionGroupConfig);
        }

        {
            // 初始化自测消费组
            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
            subscriptionGroupConfig.setGroupName(MixAll.SELF_TEST_CONSUMER_GROUP);
            this.subscriptionGroupTable.put(MixAll.SELF_TEST_CONSUMER_GROUP, subscriptionGroupConfig);
        }

        {
            // 初始化http代理消费组
            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
            subscriptionGroupConfig.setGroupName(MixAll.ONS_HTTP_PROXY_GROUP);
            subscriptionGroupConfig.setConsumeBroadcastEnable(true);
            this.subscriptionGroupTable.put(MixAll.ONS_HTTP_PROXY_GROUP, subscriptionGroupConfig);
        }

        {
            // 初始化ONS_API_PULL消费组
            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
            subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PULL_GROUP);
            subscriptionGroupConfig.setConsumeBroadcastEnable(true); // 激活广播模式
            this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PULL_GROUP, subscriptionGroupConfig);
        }

        {
            // 初始化ONS_API_PERMISSION消费组
            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
            subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PERMISSION_GROUP);
            subscriptionGroupConfig.setConsumeBroadcastEnable(true);
            this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PERMISSION_GROUP, subscriptionGroupConfig);
        }

        {
            // 初始化ONS_API_OWNER消费组
            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
            subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_OWNER_GROUP);
            subscriptionGroupConfig.setConsumeBroadcastEnable(true);
            this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_OWNER_GROUP, subscriptionGroupConfig);
        }
    }

    // 更新订阅配置,且更新内存数据版本号
    public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) {
        SubscriptionGroupConfig old = this.subscriptionGroupTable.put(config.getGroupName(), config);
        if (old != null) {
            log.info("update subscription group config, old: {} new: {}", old, config);
        } else {
            log.info("create new subscription group, {}", config);
        }

        this.dataVersion.nextVersion();

        this.persist();
    }

    // 失效消费组
    public void disableConsume(final String groupName) {
        SubscriptionGroupConfig old = this.subscriptionGroupTable.get(groupName);
        if (old != null) {
            old.setConsumeEnable(false);
            this.dataVersion.nextVersion();
        }
    }

    // 查找指定消费组的订阅配置
    public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) {
        SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.get(group);
        if (null == subscriptionGroupConfig) {
            if (brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup() || MixAll.isSysConsumerGroup(group)) {
                subscriptionGroupConfig = new SubscriptionGroupConfig();
                subscriptionGroupConfig.setGroupName(group);
                SubscriptionGroupConfig preConfig = this.subscriptionGroupTable.putIfAbsent(group, subscriptionGroupConfig);
                if (null == preConfig) {
                    log.info("auto create a subscription group, {}", subscriptionGroupConfig.toString());
                }
                this.dataVersion.nextVersion();
                this.persist();
            }
        }

        return subscriptionGroupConfig;
    }

    // 将内存数据结构编码成字符串
    @Override
    public String encode() {
        return this.encode(false);
    }

    // 获取配置文件路径
    @Override
    public String configFilePath() {
        return BrokerPathConfigHelper.getSubscriptionGroupPath(this.brokerController.getMessageStoreConfig()
            .getStorePathRootDir());
    }

    // 从字符串中恢复数据,写回内存数据结构
    @Override
    public void decode(String jsonString) {
        if (jsonString != null) {
            SubscriptionGroupManager obj = RemotingSerializable.fromJson(jsonString, SubscriptionGroupManager.class);
            if (obj != null) {
                this.subscriptionGroupTable.putAll(obj.subscriptionGroupTable);
                this.dataVersion.assignNewOne(obj.dataVersion);
                this.printLoadDataWhenFirstBoot(obj);
            }
        }
    }

    // 将内存数据结构编码成字符串
    public String encode(final boolean prettyFormat) {
        return RemotingSerializable.toJson(this, prettyFormat);
    }

    // 当第一次启动时,打印加载数据时的日志
    private void printLoadDataWhenFirstBoot(final SubscriptionGroupManager sgm) {
        Iterator<Entry<String, SubscriptionGroupConfig>> it = sgm.getSubscriptionGroupTable().entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, SubscriptionGroupConfig> next = it.next();
            log.info("load exist subscription group, {}", next.getValue().toString());
        }
    }

    public ConcurrentMap<String, SubscriptionGroupConfig> getSubscriptionGroupTable() {
        return subscriptionGroupTable;
    }

    public DataVersion getDataVersion() {
        return dataVersion;
    }

    // 删除指定消费组的订阅配置
    public void deleteSubscriptionGroupConfig(final String groupName) {
        SubscriptionGroupConfig old = this.subscriptionGroupTable.remove(groupName);
        if (old != null) {
            log.info("delete subscription group OK, subscription group:{}", old);
            this.dataVersion.nextVersion();
            this.persist();
        } else {
            log.warn("delete subscription group failed, subscription groupName: {} not exist", groupName);
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1308478.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java - 异常(三)- 声明异常(throws)和手动抛出异常throw

目录 6.3 方式2&#xff1a;声明异常&#xff08;throws&#xff09; 6.4 手动抛出异常throw 6.4.1 概述 6.4.2 使用格式&#xff1a; 6.4.3 实例代码 6.4.4 为什么要手动抛出异常对象&#xff1f; 6.4.5 如何理解“自动”和“手动” 抛出异常对象 6.4.6 注意点 ❓面试…

@Scheduled任务调度/定时任务-非分布式

1、功能概述 任务调度就是在规定的时间内执行的任务或者按照固定的频率执行的任务。是非常常见的功能之一。常见的有JDK原生的Timer, ScheduledThreadPoolExecutor以及springboot提供的Schduled。分布式调度框架如QuartZ、Elasticjob、XXL-JOB、SchedulerX、PowerJob等。 本文…

GPT4All 本地部署教程

省流&#xff1a;偷懒的可以直接看第二章的GPT4All部署 一. GPT4All README 根据官方网站GPT4All的描述&#xff0c;它是一个开源大型语言模型&#xff0c;可在CPU和几乎任何GPU上本地运行 github source: https://github.com/nomic-ai/gpt4all GPT4All Website and Models…

目标检测检测精度

在一个数据集检测中&#xff0c;会产生四类检测结果&#xff1a;TP、TN 、FP 、FN&#xff1a; T ——true 表示正确 F——false 表示错误 P—— positive 表示积极的&#xff0c;看成正例 N——negative 表示消极的&#xff0c;看成负例 我的理解&#xff1a;后面为预测结…

Postman-脚本自动化及定时执行脚本(7)

一.postman脚本自动化&#xff08;从postman至Newman可以一键执行脚本并生成报告&#xff1a;&#xff09; Postman Newman 是一个 CLI&#xff08;命令行界面&#xff09;工具&#xff0c;可以使用它来运行 Postman 中的集合&#xff08;Collection&#xff09;和环境&#xf…

懒惰的数独——lodash的shuffle方法实现随机打乱的效果

1.效果 2.代码 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title></title><script src"https://cdn.jsdelivr.net/npm/vue2.5.17/dist/vue.js"></script><script src&q…

案例055:基于微信小程序的四六级词汇

文末获取源码 开发语言&#xff1a;Java 框架&#xff1a;SSM JDK版本&#xff1a;JDK1.8 数据库&#xff1a;mysql 5.7 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.5.4 小程序框架&#xff1a;uniapp 小程序开发软件&#xff1a;HBuilder X 小程序…

Matter分析与安全验证

本文作者&#xff1a;杉木涂鸦智能安全实验室 什么是matter Matter是一项智能家居的开源标准&#xff0c;由连接标准联盟制定、认证、推广&#xff0c;该标准基于互联网协议&#xff08;IP&#xff09;&#xff0c;遵循该标准的智能家居设备、移动应用程序和云服务能够进行互…

three.js(一)

文章目录 three.js环境搭建正文补充 示例效果知识点补充1:一个标准的html知识点补充2:原生的前端框架和Vue框架的区别原生的前端框架Vue框架声明式编程和响应式编程 three.js环境搭建 正文 搭建 Three.js 的环境通常包括以下几个步骤&#xff1a; 1.创建项目目录&#xff1a…

【LeetCode每日一题】1904. 你完成的完整对局数

给你两个字符串 startTime 和 finishTime &#xff0c;均符合 "HH:MM" 格式&#xff0c;分别表示你 进入 和 退出 游戏的确切时间&#xff0c;请计算在整个游戏会话期间&#xff0c;你完成的 完整对局的对局数 。 如果 finishTime 早于 startTime &#xff0c;这表示…

一个文件下png,jpg,jpeg,bmp,xml,json,txt文件名称排序命名

#utf-8 #authors 会飞的渔WZH #time:2023-12-13 import os# 要修改的图像所在的文件夹路径 root_path rD:\images\lines2\3 # 要修改的图像所在的文件夹路径filelist os.listdir(root_path) #遍历文件夹 print(len(filelist)) i0for item in filelist:if item.endswith(.…

Re58:读论文 REALM: Retrieval-Augmented Language Model Pre-Training

诸神缄默不语-个人CSDN博文目录 诸神缄默不语的论文阅读笔记和分类 论文名称&#xff1a;REALM: Retrieval-Augmented Language Model Pre-Training 模型名称&#xff1a;Retrieval-Augmented Language Model pre-training (REALM) 本文是2020年ICML论文&#xff0c;作者来自…

python 基于imageio_ffmpeg 直接操作ffmpeg,无需额外在官网下载!

python直接操作ffmpeg&#xff0c;无需在官网下载&#xff01; 一、前言 在要使用ffmpeg处理的时候&#xff0c;不想去官网下载ffmpeg然后添加到环境变量再使用。研究了一下&#xff0c;可以通过下面的方法解决 imageio_ffmpeg subprocess 二、具体步骤 1、环境配置 pip i…

Mac搭建Frida逆向开发环境

一、简介 Frida是一种基于Python+JavaScript的动态分析工具,可以用于逆向开发、应用程序的安全测试、反欺诈技术等领域,本质是一种动态插桩技术。Frida主要用于在已安装的应用程序上运行自己的JavaScript代码,从而进行动态分析、调试、修改等操作,能够绕过应用程序的安全措…

小程序开发实战案例之三 | 小程序底部导航栏如何设置

小程序中最常见的功能就是底部导航栏了&#xff0c;今天就来看一下怎么设置一个好看的导航栏&#xff5e;这里我们使用的是支付宝官方小程序 IDE 做示范。 官方提供的底部导航栏 第一步&#xff1a;页面创建 一般的小程序会有四个 tab&#xff0c;我们这次也是配置四个 tab 的…

基于单片机的火灾报警器 (论文+源码)

1.系统设计 本系统由火灾检测模块、A/D转换模块、信号处理模块、声光报警模块和灭火装置模块组成。火灾检测模块由温度检测和烟雾检测构成&#xff0c;其温度传感器选用DS18B20&#xff0c;烟雾传感器选用MQ-2烟雾传感器。A/D转换模块选用常用的模数转换芯片ADC0832。声光报警模…

Git篇---第七篇

系列文章目录 文章目录 系列文章目录前言一、如果分支是否已合并为master,你可以通过什么手段知道?二、 什么是SubGit?三、列举工作中常用的几个git命令?前言 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站,这篇文…

阅读笔记——《UTOPIA: Automatic Generation of Fuzz Driverusing Unit Tests》

【参考文献】Jeong B, Jang J, Yi H, et al. UTOPIA: automatic generation of fuzz driver using unit tests[C]//2023 IEEE Symposium on Security and Privacy (SP). IEEE, 2023: 2676-2692.【注】本文仅为作者个人学习笔记&#xff0c;如有冒犯&#xff0c;请联系作者删除。…

electron命令下载失败,手动安装教程

现象&#xff1a;pnpm i electron, 一直卡在提示错误node install.js 一 、下载需要的electron版本 地址 二、下载完毕&#xff0c;解压压缩包&#xff0c; 进入项目的node_modules/electron文件夹&#xff0c;创建dist文件夹&#xff0c;将下载的zip包里的文件复制到dist…

亚马逊鲲鹏系统:防关联技术守护您的账户安全

亚马逊买家账号注册是一项相当简便的操作&#xff0c;但当涉及到批量注册时&#xff0c;我们就需要更加注意防关联的问题。对于那些对此领域不够熟悉的朋友们&#xff0c;可以使用亚马逊鲲鹏系统&#xff0c;这款系统能够为我们提供一站式的解决方案。该系统不仅支持买家账号的…