Debezium-MySqlConnectorTask

news2025/1/20 12:00:49

文章目录

    • 概要
    • 整体架构流程
    • 技术名词解释
    • 技术细节
    • 小结

概要

MySqlConnectorTask,用于读取MySQL的二进制日志并生成对应的数据变更事件

整体架构流程

技术名词解释

数据库模式(Database Schema)
数据库模式是指数据库中数据的组织结构和定义,它描述了数据库中所有对象(如表、视图、索引、存储过程等)的结构和关系。具体来说,数据库模式包括以下几个方面:
1  表结构:定义了数据库中各个表的名称、列的名称、数据类型、约束条件(如主键、外键、唯一性约束等)。
2  关系:描述了表与表之间的关系,如一对多、多对多等。
3  索引:定义了表上的索引,用于提高查询性能。
4  视图:定义了虚拟表,这些虚拟表基于SQL查询结果,可以简化复杂的查询操作。
5  存储过程和函数:定义了数据库中的存储过程和函数,用于执行特定的业务逻辑。
6  触发器:定义了在特定事件发生时自动执行的操作。

在 DatabaseHistory 接口中的应用
在 DatabaseHistory 接口中,数据库模式的变更记录和恢复功能主要用于以下场景:
    1  记录变更:当数据库模式发生变化时(如添加新表、修改表结构、删除表等),通过 record 方法记录这些变更。
    2  恢复:当需要恢复到某个历史点的数据库模式时,通过 recover 方法恢复到指定的历史状态。
通过这些功能,可以有效地管理和追踪数据库模式的变化,确保数据的一致性和完整性。

技术细节

@Override
    public void start(Map<String, String> props) {
        if (context == null) {
            throw new ConnectException("Unexpected null context");
        }

        // Validate the configuration ...
        final Configuration config = Configuration.from(props);
        if (!config.validate(MySqlConnectorConfig.ALL_FIELDS, logger::error)) {
            throw new ConnectException("Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details");
        }

        // Create and configure the database history ...
        this.dbHistory = config.getInstance(MySqlConnectorConfig.DATABASE_HISTORY, DatabaseHistory.class);
        if (this.dbHistory == null) {
            throw new ConnectException("Unable to instantiate the database history class " +
                    config.getString(MySqlConnectorConfig.DATABASE_HISTORY));
        }
        Configuration dbHistoryConfig = config.subset(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING, false); // do not remove
                                                                                                                 // prefix
        this.dbHistory.configure(dbHistoryConfig); // validates
        this.dbHistory.start();
        this.running.set(true);

        // Read the configuration ...
        final String user = config.getString(MySqlConnectorConfig.USER);
        final String password = config.getString(MySqlConnectorConfig.PASSWORD);
        final String host = config.getString(MySqlConnectorConfig.HOSTNAME);
        final int port = config.getInteger(MySqlConnectorConfig.PORT);
        final String initialBinLogFilename = config.getString(MySqlConnectorConfig.INITIAL_BINLOG_FILENAME);
        final long serverId = config.getLong(MySqlConnectorConfig.SERVER_ID);
        serverName = config.getString(MySqlConnectorConfig.SERVER_NAME.name(), host + ":" + port);
        final boolean keepAlive = config.getBoolean(MySqlConnectorConfig.KEEP_ALIVE);
        final int maxQueueSize = config.getInteger(MySqlConnectorConfig.MAX_QUEUE_SIZE);
        final long timeoutInMilliseconds = config.getLong(MySqlConnectorConfig.CONNECTION_TIMEOUT_MS);
        final boolean includeSchemaChanges = config.getBoolean(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES);
        final long pollIntervalMs = config.getLong(MySqlConnectorConfig.POLL_INTERVAL_MS);
        maxBatchSize = config.getInteger(MySqlConnectorConfig.MAX_BATCH_SIZE);
        metronome = Metronome.parker(pollIntervalMs, TimeUnit.MILLISECONDS, Clock.SYSTEM);

        // Define the filter using the whitelists and blacklists for tables and database names ...
        Predicate<TableId> tableFilter = TableId.filter(config.getString(MySqlConnectorConfig.DATABASE_WHITELIST),
                                                        config.getString(MySqlConnectorConfig.DATABASE_BLACKLIST),
                                                        config.getString(MySqlConnectorConfig.TABLE_WHITELIST),
                                                        config.getString(MySqlConnectorConfig.TABLE_BLACKLIST));
        if (config.getBoolean(MySqlConnectorConfig.TABLES_IGNORE_BUILTIN)) {
            Predicate<TableId> isBuiltin = (id) -> {
                return BUILT_IN_DB_NAMES.contains(id.catalog().toLowerCase()) || BUILT_IN_TABLE_NAMES.contains(id.table().toLowerCase());
            };
            tableFilter = tableFilter.and(isBuiltin.negate());
        }

        // Create the queue ...
        events = new LinkedBlockingDeque<>(maxQueueSize);
        batchEvents = new ArrayDeque<>(maxBatchSize);

        // Set up our handlers for specific kinds of events ...
        tables = new Tables();
        tableConverters = new TableConverters(topicSelector, dbHistory, includeSchemaChanges, tables, tableFilter);
        eventHandlers.put(EventType.ROTATE, tableConverters::rotateLogs);
        eventHandlers.put(EventType.TABLE_MAP, tableConverters::updateTableMetadata);
        eventHandlers.put(EventType.QUERY, tableConverters::updateTableCommand);
        eventHandlers.put(EventType.EXT_WRITE_ROWS, tableConverters::handleInsert);
        eventHandlers.put(EventType.EXT_UPDATE_ROWS, tableConverters::handleUpdate);
        eventHandlers.put(EventType.EXT_DELETE_ROWS, tableConverters::handleDelete);

        // Set up the log reader ...
        client = new BinaryLogClient(host, port, user, password);
        client.setServerId(serverId);
        client.setKeepAlive(keepAlive);
        if (logger.isDebugEnabled()) client.registerEventListener(this::logEvent);
        client.registerEventListener(this::enqueue);
        client.registerLifecycleListener(traceLifecycleListener());

        // Set up the event deserializer with additional types ...
        EventDeserializer eventDeserializer = new EventDeserializer();
        eventDeserializer.setEventDataDeserializer(EventType.STOP, new StopEventDataDeserializer());
        client.setEventDeserializer(eventDeserializer);

        // Check if we've already processed some of the log for this database ...
        source.setServerName(serverName);
        // Get the offsets for our partition ...
        Map<String, ?> offsets = context.offsetStorageReader().offset(source.partition());
        if (offsets != null) {
            source.setOffset(offsets);
            // And set the client to start from that point ...
            client.setBinlogFilename(source.binlogFilename());
            client.setBinlogPosition(source.binlogPosition());
            // The event row number will be used when processing the first event ...
            logger.info("Restarting MySQL connector '{}' from binlog file {}, position {}, and event row {}",
                        serverName, source.binlogFilename(), source.binlogPosition(), source.eventRowNumber());

            // We have to make our Tables reflect the state of the database at the above source partition (e.g., the location
            // in the MySQL log where we last stopped reading. Since the TableConverts writes out all DDL statements to the
            // TopicSelector.getTopic(serverName) topic, we can consume that topic and apply each of the DDL statements
            // to our Tables object. Each of those DDL messages is keyed by the database name, and contains a single string
            // of DDL. However, we should consume no further than offset we recovered above.
            try {
                logger.info("Recovering MySQL connector '{}' database schemas from history stored in {}", serverName, dbHistory);
                DdlParser ddlParser = new MySqlDdlParser();
                dbHistory.recover(source.partition(), source.offset(), tables, ddlParser);
                tableConverters.loadTables();
                logger.debug("Recovered MySQL connector '{}' database schemas: {}", serverName, tables.subset(tableFilter));
            } catch (Throwable t) {
                throw new ConnectException("Failure while recovering database schemas", t);
            }
        } else {
            // initializes this position, though it will be reset when we see the first event (should be a rotate event) ...
            client.setBinlogFilename(initialBinLogFilename);
            logger.info("Starting MySQL connector from beginning of binlog file {}, position {}",
                        source.binlogFilename(), source.binlogPosition());
        }

        // Start the log reader, which starts background threads ...
        try {
            logger.debug("Connecting to MySQL server");
            client.connect(timeoutInMilliseconds);
            logger.debug("Successfully connected to MySQL server and beginning to read binlog");
        } catch (TimeoutException e) {
            double seconds = TimeUnit.MILLISECONDS.toSeconds(timeoutInMilliseconds);
            throw new ConnectException("Timed out after " + seconds + " seconds while waiting to connect to the MySQL database at " + host
                    + ":" + port + " with user '" + user + "'", e);
        } catch (AuthenticationException e) {
            throw new ConnectException("Failed to authenticate to the MySQL database at " + host + ":" + port + " with user '" + user + "'",
                    e);
        } catch (Throwable e) {
            throw new ConnectException(
                    "Unable to connect to the MySQL database at " + host + ":" + port + " with user '" + user + "': " + e.getMessage(), e);
        }
    }

 

  1. 验证配置:从传入的属性中创建配置对象并验证其有效性。
  2. 创建数据库历史记录:根据配置实例化 DatabaseHistory 对象并启动。
  3. 读取配置参数:从配置中读取各种必要的参数,如用户名、密码、主机、端口等。
  4. 定义表过滤器:根据白名单和黑名单定义表过滤器,忽略内置表。
  5. 创建队列:初始化事件队列和批处理队列。
  6. 设置事件处理器:为不同的事件类型设置处理器。
  7. 设置日志读取器:创建并配置 BinaryLogClient,注册事件监听器和生命周期监听器。
  8. 设置事件反序列化器:配置事件反序列化器以处理特定类型的事件。
  9. 恢复数据库状态:检查是否有已处理的日志,如果有则恢复数据库模式。
  10. 连接到 MySQL 服务器:尝试连接到 MySQL 服务器并开始读取二进制日志。

小结

/**
 * 该类负责配置和初始化MySQL连接器,包括设置数据库和表的过滤条件、创建事件队列、注册事件处理器、设置二进制日志客户端、恢复数据库模式等。
 * 主要功能包括:
 * - 应用数据库和表的黑白名单过滤条件。
 * - 配置是否忽略内置表。
 * - 创建事件队列和批处理事件队列。
 * - 注册不同类型的事件处理器。
 * - 初始化二进制日志客户端并设置相关参数。
 * - 检查并恢复已处理的日志位置。
 * - 连接到MySQL服务器并开始读取二进制日志。
 */

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2243184.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

剧本杀门店预约小程序,解锁沉浸式推理体验

一、开发背景 剧本杀作为一种热门娱乐游戏&#xff0c;深受大众的欢迎&#xff0c;但随着市场的快速发展&#xff0c;竞争也在不断加大&#xff0c;对于剧本杀线下商家来说面临着发展创新。 剧本杀线下门店数量目前正在逐渐增加&#xff0c;竞争激烈&#xff0c;而门店的获客…

今天你学C++了吗——C++启航之入门知识

♥♥♥~~~~~~欢迎光临知星小度博客空间~~~~~~♥♥♥ ♥♥♥零星地变得优秀~也能拼凑出星河~♥♥♥ ♥♥♥我们一起努力成为更好的自己~♥♥♥ ♥♥♥如果这一篇博客对你有帮助~别忘了点赞分享哦~♥♥♥ ♥♥♥如果有什么问题可以评论区留言或者私信我哦~♥♥♥ ✨✨✨✨✨✨ 个…

【element-tiptap】Tiptap编辑器核心概念----结构篇

core-concepts 前言&#xff1a;这篇文章来介绍一下 Tiptap 编辑器的一些核心概念 &#xff08;一&#xff09;结构 1、 Schemas 定义文档组成方式。一个文档就是标题、段落以及其他的节点组成的一棵树。 每一个 ProseMirror 的文档都有一个与之相关联的 schema&#xff0c;…

LC69---219存在重复元素(滑动窗口)---Java版

1.题目描述 2.思路 3.代码实现 public class Solution { public boolean containsNearbyDuplicate(int[] nums, int k) {Map<Integer,Integer> m1new HashMap<>();// 1:0, 2:1,3:2,1:3 key存数组的值&#xff0c;value存索引&#xff0c;为getnum[i]做准备&am…

【C++】了解map和set及平衡二叉树和红黑树的原理

目录 ​编辑 一、关联式容器 二、 键值对 三、pair介绍 四、树形结构的关联式容器 4.1 set 4.2 map 4.3 multiset 4.4 multimaps 五、底层结构&#xff08;重点&#xff09; 5.1 AVL 树 5.1.1 AVL树的概念 5.1.2 AVL树节点的定义 5.1.3 AVL树的旋转 5.1.4 AVL树的…

LeetCode 力扣 热题 100道(五)最长回文子串(C++)

最长回文子串 给你一个字符串 s&#xff0c;找到 s 中最长的 回文子串。 回文性 如果字符串向前和向后读都相同&#xff0c;则它满足 回文性 子字符串子字符串 是字符串中连续的 非空 字符序列。 动态规划法 class Solution { public:string longestPalindrome(string s) {i…

Spring Boot汽车资讯:科技与汽车的新融合

摘要 随着信息技术在管理上越来越深入而广泛的应用&#xff0c;管理信息系统的实施在技术上已逐步成熟。本文介绍了汽车资讯网站的开发全过程。通过分析汽车资讯网站管理的不足&#xff0c;创建了一个计算机管理汽车资讯网站的方案。文章介绍了汽车资讯网站的系统分析部分&…

逆向攻防世界CTF系列41-EASYHOOK

逆向攻防世界CTF系列41-EASYHOOK 看题目是一个Hook类型的&#xff0c;第一次接触&#xff0c;虽然学过相关理论&#xff0c;可以看我的文章 Hook入门(逆向)-CSDN博客 题解参考&#xff1a;https://www.cnblogs.com/c10udlnk/p/14214057.html和攻防世界逆向高手题之EASYHOOK-…

【网络】HTTP 协议

目录 基本概念基于 HTTP 的系统组成HTTP 的基本性质 HTTP 请求头 & 响应头HTTP 的请求方法HTTP 的返回码HTTP 的 CookieHTTP 缓存 Cache-Control会话HTTP/1.x 的连接管理 基本概念 HTTP&#xff08;Hypertext Transfer Protocol&#xff0c;超文本传输协议&#xff09;是一…

执行flink sql连接clickhouse库

手把手教学&#xff0c;flink connector打通clickhouse大数据库&#xff0c;通过下发flink sql&#xff0c;来使用ck。 组件版本jdk1.8flink1.17.2clickhouse23.12.2.59 1.背景 flink官方不支持clickhouse连接器&#xff0c;工作中难免会用到。 2.方案 利用GitHub大佬提供…

笔记|M芯片MAC (arm64) docker上使用 export / import / commit 构建amd64镜像

很简单的起因&#xff0c;我的东西最终需要跑在amd64上&#xff0c;但是因为mac的架构师arm64&#xff0c;所以直接构建好的代码是没办法跨平台运行的。直接在arm64上pull下来的docker镜像也都是arm64架构。 检查镜像架构&#xff1a; docker inspect 8135f475e221 | grep Arc…

免费送源码:Java+Springboot+MySQL Springboot多租户博客网站的设计 计算机毕业设计原创定制

Springboot多租户博客网站的设计 摘 要 博客网站是当今网络的热点&#xff0c;博客技术的出现使得每个人可以零成本、零维护地创建自己的网络媒体&#xff0c;Blog站点所形成的网状结构促成了不同于以往社区的Blog文化&#xff0c;Blog技术缔造了“博客”文化。本文课题研究的“…

代码随想录第46期 单调栈

这道题主要是单调栈的简单应用 class Solution { public:vector<int> dailyTemperatures(vector<int>& T) {vector<int> result(T.size(),0);stack<int> st;st.push(0);for(int i1;i<T.size();i){if(T[i]<T[st.top()]){st.push(i);}else{wh…

【Three.js基础学习】24. shader patterns

前言 课程回顾: ShaderMaterial 这里用的是着色器材质 所以顶点和片段着色器就不需要像原始着色器那样添加需要的属性 然后写 片段着色器需要属性 &#xff1a; 顶点 属性 -》变化 -》 片段中 顶点中的属性不需要声明 只需要声明传送的变量 例如 varying vec vUv; vUv uv; 补充…

力扣整理版七:二叉树(待更新)

满二叉树&#xff1a;如果一棵二叉树只有度为0的结点和度为2的结点&#xff0c;并且度为0的结点在同一层上&#xff0c;则这棵二叉树为满二叉树。深度为k&#xff0c;有2^k-1个节点的二叉树。 完全二叉树&#xff1a;在完全二叉树中&#xff0c;除了最底层节点可能没填满外&am…

173. 二叉搜索树迭代器【 力扣(LeetCode) 】

文章目录 零、原题链接一、题目描述二、测试用例三、解题思路四、参考代码 零、原题链接 173. 二叉搜索树迭代器 一、题目描述 实现一个二叉搜索树迭代器类BSTIterator &#xff0c;表示一个按中序遍历二叉搜索树&#xff08;BST&#xff09;的迭代器&#xff1a; BSTIterato…

自动驾驶系列—深入解析自动驾驶车联网技术及其应用场景

&#x1f31f;&#x1f31f; 欢迎来到我的技术小筑&#xff0c;一个专为技术探索者打造的交流空间。在这里&#xff0c;我们不仅分享代码的智慧&#xff0c;还探讨技术的深度与广度。无论您是资深开发者还是技术新手&#xff0c;这里都有一片属于您的天空。让我们在知识的海洋中…

STL序列式容器之list

相较于vector的连续性空间&#xff0c;list相对比较复杂&#xff1b;list内部使用了双向环形链表的方式对数据进行存储&#xff1b;list在增加元素时&#xff0c;采用了精准的方式分配一片空间对数据及附加指针等信息进行存储&#xff1b; list节点定义如下 template<clas…

Element-ui Select选择器自定义搜索方法

效果图 具体实现 <template><div class"home"><el-selectref"currencySelect"v-model"currency"filterable:spellcheck"false"placeholder"请选择":filter-method"handleCurrencyFilter"change&q…

Linux debian系统安装ClamTk开源图形用户界面(GUI)杀毒软件

一、ClamTk简介 ClamTk 是一个基于 ClamAV 的开源图形用户界面&#xff08;GUI&#xff09;杀毒软件。它使用 GTK2-Perl 脚本构建而成&#xff0c;支持32位与64位操作系统。ClamTk 提供了一个直观的用户界面&#xff0c;使得用户无需深入了解命令行即可完成大部分操作。它具备…