Debezium-EmbeddedEngine

news2025/1/19 23:20:49

提示:一个嵌入式的Kafka Connect源连接器的工作机制

文章目录

  • 前言
  • 一、控制流图
  • 二、代码分析
    • 1.构造函数
    • 2.完成回调
    • 3.连接器回调
    • 4.RUN
  • 总结


前言

工作机制:

* 独立运行:嵌入式连接器在应用程序进程中独立运行,不需要Kafka、Kafka Connect或  Zookeeper进程

* 数据传递:应用程序设置连接器并提供一个消费者函数,连接器将所有包含数据库变更事件的SourceRecord传递给该函数。

* 责任转移:应用程序负责故障恢复、可扩展性和持久性。

* 默认存储:连接器的数据库模式历史和偏移量默认存储在内存中,重启后会丢失。

* 执行与停止:连接器设计为提交给Executor或ExecutorService单线程执行,可以通过调用stop()方法或中断线程来停止。


提示:以下是本篇文章正文内容

一、控制流图

控制流图

二、代码分析

1.构造函数

    private EmbeddedEngine(Configuration config, ClassLoader classLoader, Clock clock, Consumer<SourceRecord> consumer,
                           CompletionCallback completionCallback, ConnectorCallback connectorCallback) {
        this.config = config;
        this.consumer = consumer;
        this.classLoader = classLoader;
        this.clock = clock;
        this.completionCallback = completionCallback != null ? completionCallback : (success, msg, error) -> {
            if (!success) logger.error(msg, error);
        };
        this.connectorCallback = connectorCallback;
        this.completionResult = new CompletionResult();
        assert this.config != null;
        assert this.consumer != null;
        assert this.classLoader != null;
        assert this.clock != null;
        keyConverter = config.getInstance(INTERNAL_KEY_CONVERTER_CLASS, Converter.class, () -> this.classLoader);
        keyConverter.configure(config.subset(INTERNAL_KEY_CONVERTER_CLASS.name() + ".", true).asMap(), true);
        valueConverter = config.getInstance(INTERNAL_VALUE_CONVERTER_CLASS, Converter.class, () -> this.classLoader);
        Configuration valueConverterConfig = config;
        if (valueConverter instanceof JsonConverter) {
            // Make sure that the JSON converter is configured to NOT enable schemas ...
            valueConverterConfig = config.edit().with(INTERNAL_VALUE_CONVERTER_CLASS + ".schemas.enable", false).build();
        }
        valueConverter.configure(valueConverterConfig.subset(INTERNAL_VALUE_CONVERTER_CLASS.name() + ".", true).asMap(), false);

        // Create the worker config, adding extra fields that are required for validation of a worker config
        // but that are not used within the embedded engine (since the source records are never serialized) ...
        Map<String, String> embeddedConfig = config.asMap(ALL_FIELDS);
        embeddedConfig.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
        embeddedConfig.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
        workerConfig = new EmbeddedConfig(embeddedConfig);
    }

构造函数

2.完成回调

/**
 * A callback function to be notified when the connector completes.
 */
public interface CompletionCallback {
    /**
     * Handle the completion of the embedded connector engine.
     * 
     * @param success {@code true} if the connector completed normally, or {@code false} if the connector produced an error
     *            that prevented startup or premature termination.
     * @param message the completion message; never null
     * @param error the error, or null if there was no exception
     */
    void handle(boolean success, String message, Throwable error);
}

这段代码定义了一个接口 CompletionCallback,其中包含一个方法 handle。该方法用于处理嵌入式连接器引擎的完成状态。
参数:
success: 布尔值,表示连接器是否正常完成。如果为 true,表示连接器正常完成;如果为 false,表示连接器启动失败或提前终止。
message: 完成消息,不能为空。
error: 异常对象,如果没有异常则为 null。 

完成回调

3.连接器回调

    /**
     * Callback function which informs users about the various stages a connector goes through during startup
     */
    public interface ConnectorCallback {

        /**
         * Called after a connector has been successfully started by the engine; i.e. {@link SourceConnector#start(Map)} has
         * completed successfully
         */
        default void connectorStarted() {
            // nothing by default
        }

        /**
         * Called after a connector has been successfully stopped by the engine; i.e. {@link SourceConnector#stop()} has
         * completed successfully
         */
        default void connectorStopped() {
            // nothing by default
        }

        /**
         * Called after a connector task has been successfully started by the engine; i.e. {@link SourceTask#start(Map)} has
         * completed successfully
         */
        default void taskStarted() {
            // nothing by default
        }

        /**
         * Called after a connector task has been successfully stopped by the engine; i.e. {@link SourceTask#stop()} has
         * completed successfully
         */
        default void taskStopped() {
            // nothing by default
        }
    }

这段代码定义了一个接口 ConnectorCallback,其中包含了四个默认方法:connectorStarted、connectorStopped、taskStarted 和 taskStopped。这些方法用于在连接器和任务的不同生命周期阶段通知用户。
connectorStarted:当连接器成功启动时调用。
connectorStopped:当连接器成功停止时调用。
taskStarted:当连接器任务成功启动时调用。
taskStopped:当连接器任务成功停止时调用。
这些方法的默认实现为空,子类可以根据需要重写这些方法来添加自定义的回调逻辑。


连接回调

4 RUN方法核心流程

run()

核心流程参考上图

注:debezium-0.6版本 

总结

EmbeddedEngine方法和成员变量

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2243336.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

阿里斑马智行 2025届秋招 NLP算法工程师

文章目录 个人情况一面/技术面 1h二面/技术面 1h三面/HR面 20min 个人情况 先说一下个人情况&#xff1a; 学校情况&#xff1a;211本中9硕&#xff0c;本硕学校都一般&#xff0c;本硕都是计算机科班&#xff0c;但研究方向并不是NLP&#xff0c;而是图表示学习论文情况&…

社交电商的优势及其与 AI 智能名片小程序、S2B2C 商城系统的融合发展

摘要&#xff1a;本文深入分析了社交电商相较于传统电商的优势&#xff0c;包括门槛低、易操作、更生活化和可团队化运作等特点。同时&#xff0c;探讨了 AI 智能名片小程序和 S2B2C 商城系统在社交电商发展中的作用&#xff0c;以及它们与社交电商融合所带来的新机遇和发展前景…

自动化运维-检测Linux服务器CPU、内存、负载、IO读写、机房带宽和服务器类型等信息脚本

前言&#xff1a;以上脚本为今年8月1号发布的&#xff0c;当时是没有任何问题&#xff0c;但现在脚本里网络速度测试py文件获取不了了&#xff0c;测速这块功能目前无法实现&#xff0c;后面我会抽时间来研究&#xff0c;大家如果有建议也可以分享下。 脚本内容&#xff1a; #…

3D Streaming 在线互动展示系统:NVIDIA RTX 4090 加速实时渲染行业数字化转型

随着科技的飞速发展&#xff0c;实时渲染正逐步成为游戏与实时交互领域的重要驱动力。与离线渲染不同&#xff0c;实时渲染需要极高的计算性能&#xff0c;对硬件设备尤其是GPU的性能要求极高。随着 RTX 4090 显卡的问世&#xff0c;其强大的算力和创新技术&#xff0c;为实时渲…

南京邮电大学《智能控制技术》课后作业

一、问题一 复现二输入单输出模糊控制系统&#xff0c;改动其中一到两个环节&#xff08;隶属度设置、规则等&#xff09;&#xff0c;对比修改前后控制效果。 定义模糊 %Fuzzy Control for water tank clear all; close all;anewfis(fuzz_tank);%Fuzzy Inference System stru…

2.5D视觉——Aruco码定位检测

目录 1.什么是Aruco标记2.Aruco码解码说明2.1 Original ArUco2.2 预设的二维码字典2.3 大小Aruco二维码叠加 3.函数说明3.1 cv::aruco::detectMarkers3.2 cv::solvePnP 4.代码注解4.1 Landmark图说明4.2 算法源码注解 1.什么是Aruco标记 ArUco标记最初由S.Garrido-Jurado等人在…

栈Stack和队列Queue

目录 一、栈 &#xff08;1&#xff09;用数组实现 &#xff08;2&#xff09;用单链表实现 &#xff08;3&#xff09;用标注尾结点的单链表实现 &#xff08;4&#xff09;用双向链表实现 2、栈的实际应用 &#xff08;1&#xff09;改变元素的序列 &#xff08;2&am…

Tailscale 自建 Derp 中转服务器

文章目录 为什么要建立 Derp 中转服务器&#xff1f;安装 Go 环境通过 Go 安装 Derp处理证书文件自签一个域名启动 DerpIPV6 的支持防止 Derp 被白嫖以上的操作命令合集自建 Headscale 添加 Derp参考 为什么要建立 Derp 中转服务器&#xff1f; Tailscale 使用的算法很有趣: 所…

RPC安全可靠的异常重试

当调用方调用服务提供方&#xff0c;由于网络抖动导致的请求失败&#xff0c;这个请求调用方希望执行成功。 调用方应该如何操作&#xff1f;catch异常再发起一次调用&#xff1f;显然不够优雅。这时可以考虑使用RPC框架的重试机制。 RPC框架的重试机制 RPC重试机制&#xff1…

AutoDL部署视觉大模型llama3.2-vision,从视频中寻找特定目标

注&#xff1a; windows11系统。示例为此项目&#xff1a;https://github.com/win4r/VideoFinder-Llama3.2-vision-Ollama 在当今的人工智能领域&#xff0c;深度学习模型的计算需求日益增长&#xff0c;特别是在处理复杂的视觉任务时&#xff0c;强大的算力往往是实现高效应用…

【大语言模型】ACL2024论文-16 基于地图制图的罗马尼亚自然语言推理语料库的新型课程学习方法

【大语言模型】ACL2024论文-16 基于地图制图的罗马尼亚自然语言推理语料库的新型课程学习方法 目录 文章目录 【大语言模型】ACL2024论文-16 基于地图制图的罗马尼亚自然语言推理语料库的新型课程学习方法目录摘要&#xff1a;研究背景&#xff1a;问题与挑战&#xff1a;如何解…

golang调用模组程序实现交互输入自动化,获取imei及iccid

应用场景&#xff1a;在openwrt下调用移远的测试程序&#xff0c;并实现输入自动话&#xff0c;获取imei rootOpenWrt:~# ql-api-test Test groups:0: ql_dsi1: ql_nw2: ql_sim3: ql_dev4: ql_voice5: ql_sms6: ql_adc7: ql_i2c8: …

【数据分享】2022年我国10米分辨率茶树种植分布栅格数据

小麦、玉米、水稻、茶树等各类农作物的种植分布数据在农业、环境、国土等很多专业都经常用到&#xff01; 本次给大家分享的是我国2022年10米分辨率茶树种植分布栅格数据&#xff01;数据格式为TIFF格式。数据坐标为GCS_WGS_1984。数据格式为TIFF格式。数据坐标为GCS_WGS_1984…

【弱监督视频异常检测】2024-ESWA-基于扩散的弱监督视频异常检测常态预训练

2024-ESWA-Diffusion-based normality pre-training for weakly supervised video anomaly detection 基于扩散的弱监督视频异常检测常态预训练摘要1. 引言2. 相关工作3. 方法论3.1. 使用扩散自动编码器进行常态学习3.2. 全局-局部特征编码器3.2.1 局部块3.2.2 全局块3.2.3 协同…

vue实现展示并下载后端返回的图片流

// 点击下载 downLoadCode() {const image new Image();image.setAttribute("crossOrigin", "anonymous");image.onload () > {const canvas document.createElement("canvas");canvas.width image.width;canvas.height image.height;c…

STL关联式容器之平衡二叉搜索树

平衡二叉搜索树 在STL关联式容器介绍-CSDN博客中对二叉搜索树做了简要的描述&#xff1b;但是因为没有对二叉搜索树对数的深度及插入后树的结构进行调整&#xff0c;二叉搜索树可能失去平衡&#xff0c;造成搜寻效率低落的情况。如下所示&#xff1a; 所谓树形平衡与否&#xf…

Django启用国际化支持(2)—实现界面内切换语言:activate()

文章目录 ⭐注意⭐1. 配置项目全局设置&#xff1a;启用国际化2. 编写视图函数3. 配置路由4. 界面演示5、扩展自动识别并切换到当前语言设置语言并保存到Session设置语言并保存到 Cookie ⭐注意⭐ 以下操作依赖于 Django 项目的国际化支持。如果你不清楚如何启用国际化功能&am…

Java基础——继承和多态

目录 一、继承 继承的定义&#xff1a; 继承的基本用法&#xff1a; 如何调用父类的方法&#xff1f; 二、多态 多态性的好处 多态中的强制类型转换&#xff1a; 包的命名规则——域名倒叙 一、继承 继承的定义&#xff1a; 继承是面向对象编程中的一种机制&#xff0c…

2024-11-17 -MATLAB三维绘图简单实例

1. x -1:0.05:1; y x; [X, Y] meshgrid(x, y); f (X, Y) (sin(pi * X) .* sin(pi * Y)) .^ 2.*sin(2.*X2.*Y); mesh(X, Y, f(X, Y)); % 调用函数f并传递X和Y xlabel(X-axis); ylabel(Y-axis); zlabel(Z-axis); title(Surface Plot of (sin(pi * X) .* sin(pi * Y)) .^ 2.*…

resnet50,clip,Faiss+Flask简易图文搜索服务

一、实现 文件夹目录结构&#xff1a; templates -----upload.html faiss_app.py 前端代码&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widt…