Debezium日常分享系列之:定制Debezium 信号发送和通知

news2024/9/29 13:32:31

Debezium日常分享系列之:定制Debezium 信号发送和通知

  • 一、自定义信号和通知通道
  • 二、结论

Debezium 2.3 在信号和通知功能方面引入了新的改进。除了 Debezium 提供的预定义信号和通知通道之外,您还可以设置新的信号和通知通道。此功能使用户能够自定义系统以满足他们的独特需求,并将其与现有基础设施或第三方解决方案相结合。它通过精确捕获和传达信号事件并通过首选渠道触发通知,实现对数据变化的有效监控和主动响应。

Debezium日常分享系列之:Debezium 信号发送和通知 - 第 1 部分

一、自定义信号和通知通道

在 Debezium 中,可以自定义信号和通知通道以满足特定要求。例如,我们可以通过为信号和通知创建 HTTP 通道来实现自定义。此 HTTP 通道从 http 端点接收信号,并且可以在信号传送后将通知发送回端点。

让我们探索一个示例,演示如何使用 Debezium Postgres 连接器、发送信号的模拟服务器以及通过 http 端点接收通知的 Postbin 来创建和利用 HTTP 信号和通知通道。

设置 HTTP 信号通道:

  • 将 Debezium Postgres 连接器配置为在发生相关数据库更改时接收信号。
  • 设置服务以使用 HTTP 通道向 Debezium 发送信号。该服务可以是数据库、第三方应用程序或任何其他可以发送 http 请求的系统。在此示例中,我们将使用模拟服务器向 Debezium 发送信号。 Mock Server 是一个可用于模拟 http 请求和响应的服务。
  • 配置模拟服务器以使用适当的 HTTP 方法(例如 POST)通过 http 端点发送信号。
  • 根据需要自定义 HTTP 通道设置以定义 http 端点 URL、身份验证、标头和任何其他参数。

设置 HTTP 通知通道:

  • 一旦 Debezium 接收并处理信号,它就可以触发向 http 端点发布通知。在此示例中,我们将使用 HTTP 通道将通知发送到 Postbin bin。 Postbin是一个可以用来接收http请求并查看请求详细信息的服务。
  • 自定义通知的 HTTP 通道设置,在 Postbin 中创建 bin,并根据需要定义 http 端点 URL、身份验证、标头和任何其他参数。
  • 使用适当的 HTTP 方法(例如 POST)将通知事件转发到 http 端点,即 Postbin bin。可以根据需要自定义通知负载。

博客文章中此示例的完整源代码在 Debezium 示例存储库的 http-signal-notification 目录下提供。

创建一个 java 项目来构建 HTTP 信号和通知通道。运行以下命令使用 Maven 创建一个新的 java 项目:

mvn archetype:generate
    -DgroupId=io.debezium.examples
    -DartifactId=http-signaling-notification

将以下依赖项添加到 Debezium 版本(2.3 及更高版本)的 pom.xml 文件中:

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-core</artifactId>
    <version>2.3.0.Final</version>
</dependency>

要使用模拟服务器接收信号,请创建定义模拟服务器服务的 Docker Compose 文件。模拟服务器服务的配置如下:

services:
  mockServer:
    image: mockserver/mockserver:latest
    ports:
      - 1080:1080
    environment:
      - MOCKSERVER_WATCH_INITIALIZATION_JSON=true
      - MOCKSERVER_INITIALIZATION_JSON_PATH=/config/initializerJson.json
    volumes:
        - ./initializerJson.json:/config/initializerJson.json

设置环境变量 MOCKSERVER_WATCH_INITIALIZATION_JSON 和 MOCKSERVER_INITIALIZATION_JSON_PATH 以使模拟服务器能够监视初始化 JSON 文件中的更改并指定其路径。包含信号的http请求和响应信息的initializerJson.json文件被安装到模拟服务器容器中。

initializerJson.json 文件定义了一个对路径 /api/signal 的模拟 http 请求,其中查询字符串参数 code=10969。当模拟服务器收到此请求时,它将使用包含 id、类型和数据的 JSON 正文进行响应。响应的状态码为200,表示响应成功。 initializerJson.json文件的定义如下:

[
  {
    "httpRequest" : {
      "method" : "GET",
      "path" : "/api/signal",
      "queryStringParameters" : {
        "code" : ["10969"]
      }
    },
    "httpResponse" : {
      "body": "{\"id\":\"924e3ff8-2245-43ca-ba77-2af9af02fa07\",\"type\":\"log\",\"data\":{\"message\": \"Signal message received from http endpoint.\"}}",
      "statusCode": 200
    }
  }
]
  • id :标识信号实例的任意唯一字符串。
  • type :要发送的信号类型。在此示例中,类型为日志,它请求连接器将条目添加到连接器的日志文件中。处理信号后,连接器会在日志中打印指定的消息。
  • data :传递给信号事件的 JSON 格式的参数。在此示例中,消息参数被传递给信号事件。

通过实现SignalChannelReader接口创建HTTP信号通道,如下所示:

public class HttpSignalChannel implements SignalChannelReader {
    private static final Logger LOGGER = LoggerFactory.getLogger(HttpSignalChannel.class);

    public static final String CHANNEL_NAME = "http";
    private static final List<SignalRecord> SIGNALS = new ArrayList<>();
    public CommonConnectorConfig connectorConfig;

        @Override
    public String name() { (1)
        return CHANNEL_NAME;
    }

    @Override
    public void init(CommonConnectorConfig connectorConfig) { (2)
        this.connectorConfig = connectorConfig;
    }

    @Override
    public List<SignalRecord> read() { (3)
        try {
            String requestUrl = "http://mockServer:1080/api/signal?code=10969";

            // send http request to the mock server
            HttpClient httpClient = HttpClient.newHttpClient();
            HttpRequest request = HttpRequest.newBuilder()
                    .uri(URI.create(requestUrl))
                    .GET()
                    .header("Content-Type", "application/json")
                    .build();

            // read the response
            HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
           if (response.statusCode() == 200) {
               ObjectMapper mapper = new ObjectMapper();
               String responseBody = response.body();

               // parse the response body
               JsonNode signalJson = mapper.readTree(responseBody);
               Map<String, Object> additionalData = signalJson.has("additionalData") ? mapper.convertValue(signalJson.get("additionalData"), new TypeReference<>() {}) : new HashMap<>();
               String id = signalJson.get("id").asText();
               String type = signalJson.get("type").asText();
               String data = signalJson.get("data").toString();
               SignalRecord signal = new SignalRecord(id, type, data, additionalData);

               LOGGER.info("Recorded signal event '{}' ", signal);

               // process the signal
               SIGNALS.add(signal);
                } else {
                    LOGGER.warn("Error while reading signaling events from endpoint: {}", response.statusCode());
                }
            } catch (IOException | InterruptedException e) {
                LOGGER.warn("Exception while preparing to process the signal '{}' from the endpoint", e.getMessage());
                e.printStackTrace();
            }
        return SIGNALS;
        }

    @Override
    public void close() { (4)
       SIGNALS.clear();
    }
}
  1. name() 方法返回信号通道的名称。要使 Debezium 能够使用通道,请在连接器的 signal.enabled.channels 属性中指定名称 http。
  2. init() 方法可用于初始化 http 通道所需的特定配置、变量或连接。
  3. read() 方法从 http 端点读取信号并返回将由 Debezium 连接器处理的 SignalRecord 对象列表。
  4. close() 方法关闭所有分配的资源。

通过实现NotificationChannel接口创建通知通道,如下所示:

public class HttpNotificationChannel implements NotificationChannel {
    private static final Logger LOGGER = LoggerFactory.getLogger(HttpNotificationChannel.class);

    public static final String CHANNEL_NAME = "http";
    private static final String NOTIFICATION_PREFIX = "[HTTP NOTIFICATION SERVICE]";

    @Override
    public String name() { (1)
        return CHANNEL_NAME;
    }

    @Override
    public void init(CommonConnectorConfig config) { (2)
        // custom configuration
    }

    @Override
    public void send(Notification notification) { (3)
        LOGGER.info(String.format("%s Sending notification to http channel", NOTIFICATION_PREFIX));
        String binId = createBin();
        sendNotification(binId, notification);
    }

    private static String createBin()  {
        // Create a bin on the server
        try {
            HttpRequest request = HttpRequest.newBuilder()
                    .uri(new URI("https://www.toptal.com/developers/postbin/api/bin"))
                    .POST(HttpRequest.BodyPublishers.ofString(" "))
                    .build();

            HttpClient httpClient = HttpClient.newHttpClient();
            HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());

            if (response.statusCode() == HTTP_CREATED) {
                String binId = response.body().replaceAll(".*\"binId\":\"([^\"]+)\".*", "$1");
                LOGGER.info("Bin created: " + response.body());
                return binId;
            }
        } catch (URISyntaxException | InterruptedException | IOException e) {
            throw new RuntimeException(e);
        }
        return null;
    }

    private static void sendNotification (String binId, Notification notification) {
        // Get notification from the bin
        try {
            ObjectMapper mapper = new ObjectMapper();
            String notificationString = mapper.writeValueAsString(notification);
            HttpRequest request = HttpRequest.newBuilder()
                    .uri(new URI("https://www.toptal.com/developers/postbin/" + binId))
                    .header("Content-Type", "application/json")
                    .POST(HttpRequest.BodyPublishers.ofString(notificationString))
                    .build();

            HttpClient httpClient = HttpClient.newHttpClient();
            HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());

            if (response.statusCode() == HTTP_OK) {
                LOGGER.info("Notification received : " + response.body());
            }
        } catch (URISyntaxException | InterruptedException | IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void close() { (4)
    }
}
  1. name() 方法返回通知通道的名称。要使 Debezium 能够使用通道,请在连接器的 notification.enabled.channels 属性中指定 http。
  2. init() 方法可用于初始化通道所需的特定配置、变量或连接。
  3. send() 方法将通知发送到通道。该通知包含由 Debezium 连接器处理的 SignalRecord 对象。
  4. close() 方法关闭所有分配的资源。

分别在 META-INF/services 目录下的 io.debezium.pipeline.signal.SignalChannelReader 和 io.debezium.pipeline.notification.channels.NotificationChannel 文件下声明 HTTP 信号和通知通道。

编译 Java 项目并将其导出为 JAR 文件。这可以使用 Maven 或您喜欢的构建工具来完成。将 JAR 文件复制到包含要使用的 Debezium 连接器的 JAR 文件的目录。例如,如果您想要将自定义信号和通知通道与 Debezium Postgres 连接器一起使用,请将 JAR 文件复制到 /kafka/connect/debezium-connector-postgres 目录。

此示例提供了一个 Docker Compose 文件,其中定义了必要的服务,包括 Mock Server、Zookeeper、Kafka Connect 和 Postgres 数据库。

要启动服务,请运行以下命令:

export DEBEZIUM_VERSION=2.3
docker-compose up -d

确保服务已启动并正在运行,并且 Postgres 数据库已准备好接受连接后,下一步是注册连接器。这涉及创建连接器配置文件。让我们创建一个名为 register-postgres.json 的文件,其中包含以下属性:

{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": 1,
    "database.hostname": "postgres",
    "database.port": 5432,
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname" : "postgres",
    "topic.prefix": "dbserver1",
    "schema.include.list": "inventory",
    "signal.enabled.channels": "http", 1
    "notification.enabled.channels": "http" 2
  }
}
  1. signal.enabled.channels 属性指定连接器要使用的信号通道。在这种情况下,连接器使用 http 信号通道。
  2. notification.enabled.channels 属性指定连接器要使用的通知通道。在这种情况下,连接器使用 http 通知通道。

现在我们已经准备好了连接器配置文件,我们可以通过执行以下命令来向 Kafka Connect 注册连接器:

curl -i -X POST -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/ \
    -d @register-postgres.json

连接器成功注册后,您可以查看连接器日志以观察信号事件。这些日志提供了有关连接器的处理和进度的见解,包括任何与信号相关的信息。您将遇到类似于以下内容的日志消息:

Recorded signal event 'SignalRecord{id='924e3ff8-2245-43ca-ba77-2af9af02fa07', type='log', data='{"message":"Signal message received from http endpoint."}', additionalData={}}'    [io.debezium.examples.signal.HttpSignalChannel]

此外,您可能会注意到与发送到邮筒的通知事件相关的日志消息。例如:

[HTTP NOTIFICATION SERVICE] Sending notification to http channel   [io.debezium.examples.notification.HttpNotificationChannel]
Bin created: {"binId":"1688742588469-1816775151528","now":1688742588470,"expires":1688744388470}   [io.debezium.examples.notification.HttpNotificationChannel]

它提供有关通知事件的信息,例如创建具有唯一标识符 (binId) 的 bin 以及其他相关详细信息。要从 Postbin 检索通知事件,请从日志消息中获取 binId 并使用它从 Postbin 请求相应的通知事件。要查看通知事件,您可以使用以下 URL 访问 Postbin:https://www.toptal.com/developers/postbin/b/:binId。将 URL 中的 :binId 替换为从连接器日志中获取的实际 binId。

发送到 Postbin 的通知事件如下所示:

在这里插入图片描述

二、结论

在本教程中,我们探讨了如何为 Debezium 连接器创建自定义信号和通知通道。我们创建了一个自定义信号通道,用于从 HTTP 端点接收信号事件。我们还创建了一个自定义通知通道,用于将通知事件发送到 HTTP 端点。

Debezium 的综合信号和通知系统可与第三方解决方案无缝集成,使用户能够随时了解 Debezium 连接器的状态和进度。该系统的可扩展性使用户能够自定义信号和通知渠道,以满足他们的定制需求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/803538.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数学建模学习(5):数学建模各类题型及解题方案

一、数学建模常见的题型 总体来说&#xff0c;数学建模赛题类型主要分为&#xff1a;评价类、预测类和优化类三种&#xff0c;其中优化类是最常见的赛题类 型&#xff0c;几乎每年的地区赛或国赛美赛等均有出题&#xff0c;必须要掌握并且熟悉。 二、评价类赛题 综合评价是数学…

【线程安全的讨论(一)】CPU多核缓存架构和JMM

CPU多核缓存架构 一、CPU多核缓存架构可见性问题乱序执行&#xff08;指令重排&#xff09; 二、JMM——Java内存模型 一、CPU多核缓存架构 计算机的基本组成图 CPU 缓存为了提高程序运行的性能&#xff0c;现代 CPU 在很多方面会对程序进行优化。CPU 的处理速度很快&#xf…

@monaco-editor/react组件CDN加载失败解决办法

monaco-editor/react引入这个cdn资源会load失败 网上很多例子都是这样写的&#xff0c;我这样写monaco会报错 import * as monaco from monaco-editor; import { loader } from monaco-editor/react;loader.config({ monaco });改成这样 import * as monaco from monaco-edi…

2023年第三届控制理论与应用国际会议 | IET独立出版 | EI检索

会议简介 Brief Introduction 2023年第三届控制理论与应用国际会议(ICoCTA 2023) 会议时间&#xff1a;2023年10月20 -22日 召开地点&#xff1a;中国厦门 大会官网&#xff1a;www.icocta.org 控制理论作为一门科学技术&#xff0c;已经广泛地运用于我们社会生活方方面面。随着…

Java【Spring】 核心概念: 什么是 IoC, 什么是 DI?

文章目录 前言一、什么是Spring1, 什么是容器2, 什么是IoC 二、如何理解IoC1, 传统方式的代码编写2, 控制反转的代码编写3, 再谈IoC 三、什么是DI总结 前言 各位读者好, 我是小陈, 这是我的个人主页, 希望我的专栏能够帮助到你: &#x1f4d5; JavaSE基础: 基础语法, 类和对象,…

西安电子科技大学

前言 本篇文章投稿与以下活动 【西安城市开发者社区】探索西安高校&#xff1a;展现历史与创新的魅力 资料参考与百度百科 学校简介 西安电子科技大学&#xff08;Xidian University&#xff09;&#xff0c;简称“西电”&#xff0c;位于陕西省西安市&#xff0c;是中央部…

聚焦东南亚五国 | 7月TikTok达人数据洞察

或许中国直播的盛况&#xff0c;就会在东南亚重演。 据超店有数洞察&#xff0c;2023年7月东南亚TikTok主销五国(印度尼西亚、泰国、越南、马来西亚、菲律宾)带货达人数量环比上涨14.7%&#xff0c;其中越南、泰国市场达人数增长幅度远超均值&#xff0c;分别为25.46%、17.16%。…

代码随想录算法训练营第三天| 203.移除链表元素 707.设计链表 206.反转链表

链表是一种通过指针串联在一起的线性结构&#xff0c; 单链表 单链表每一个节点由两部分组成&#xff0c;一个是数据域一个是指针域&#xff08;存放指向下一个节点的指针&#xff09;&#xff0c;最后一个节点的指针域指向null&#xff08;空指针的意思&#xff09;。 双链…

搭建JMeter+Jenkins+Ant持续化

在D盘下新建文件夹Test,在其中填充文件夹 在 ant中解压 “apache-ant-1.9.14-bin.zip” 在TestAudo中新建文件夹 进入pc文件夹继续新建文件夹并把准备好的build.xml拖入 在resultlog中新建两个空文件夹 名称分为 html jtl 把准备好的baidu.jmx放入script 配置环境变量 在环…

2023跨境电商有哪些热门的夏季选品类目?纯干货!快戳进来

1、厨房用品 夏季食材新鲜多样&#xff0c;所以厨房用品非常受欢迎。 冷水壶、冰桶、冰箱收纳盒等可帮助保持食材新鲜&#xff1b;烤肉架、烤盘、烤架、烤盘套装等是户外烧烤时的好帮手刨冰机、果汁机、冰淇淋机等可以制作各种冰凉饮品和甜品。 2、客厅用品 夏季在家放松休息&a…

数据结构—数组和广义表

4.2数组 数组&#xff1a;按一定格式排列起来的&#xff0c;具有相同类型的数据元素的集合。 **一维数组&#xff1a;**若线性表中的数据元素为非结果的简单元素&#xff0c;则称为一维数组。 **一维数组的逻辑结构&#xff1a;**线性结构&#xff0c;定长的线性表。 **声明…

婴儿床出口欧盟CE认证EN 1130办理指南

随着人们生活水平的不断提高&#xff0c;妈妈们对婴儿产品的要求也越来越高。作为新生儿最常用的家具之一&#xff0c;婴儿床的质量安全自然成为了消费者关注的焦点。而欧盟CE认证EN 1130就是婴儿床出口欧盟市场的必备证书&#xff0c;下面我们就来详细介绍一下该认证的办理流程…

C++网络编程 TCP套接字基础知识,利用TCP套接字实现客户端-服务端通信

1. TCP 套接字编程流程 1.1 概念 流式套接字编程针对TCP协议通信&#xff0c;即是面向对象的通信&#xff0c;分为服务端和客户端两部分。 1.2 服务端编程流程&#xff1a; 1&#xff09;加载套接字库&#xff08;使用函数WSAStartup()&#xff09;&#xff0c;创建套接字&…

ORB特征笔记

简介 ORB Oriented FAST Rotated BRIEF 前面的Oriented FAST说明的是它的关键点的选取是一种改良过的FAST&#xff0c;在FAST的基础上加了方向信息&#xff1b;后面的Rotated BRIEF是指特征描述符使用BRIEF描述子&#xff08;Binary Robust Independent Elementary Featur…

赋能医院数字化转型,医院拍摄VR全景很有必要

医院有没有必要拍摄制作VR全景呢&#xff1f;近期也有合作商问我们这个问题&#xff0c;其实VR智慧医院是趋势、也是机遇。现在外面很多的口腔医院、医美机构等都开始引入VR全景技术了&#xff0c;力求打造沉浸式、交互式的VR智慧医院新体验&#xff0c;通过VR全景展示技术来助…

c# 此程序集中已使用了资源标识符

严重性 代码 说明 项目 文件 行 禁止显示状态 错误 CS1508 此程序集中已使用了资源标识符“BMap.NET.WindowsForm.BMapControl.resources” BMap.NET.WindowsForm D:\MySource\Decompile\BMap.NET.WindowsForm\CSC 1 活动 运行程序时&a…

javascript 模板引擎

使用场景 在实际开发中&#xff0c;一般都是使用动态请求数据来更新页面&#xff0c;服务器端通常返回json格式的数据&#xff0c;正常操作是我们手动的去拼装HTML&#xff0c;但麻烦且容易出错&#xff0c;因此出现了一些用模版生成HTML的的框架叫js模板引擎如&#xff1a;jq…

成本中心修改或者创建保存时增强的实现

成本中心修改或者创建保存时增强 用户出口程序为&#xff1a;EXIT_SAPLKMA1_003。 可以通过SMOD 或者 CMOD来添加增强代码。 CMOD->COOMKS02 ->EXIT_SAPLKMA1_003 按公司要求&#xff0c;写了段代码检查 创建利成本中心时&#xff0c;业务范围要规范输入。 data PRCTR…

对Windows应用程序进行代码签名

Windows上发布桌面应用程序&#xff0c;您可以看到带有“发布者&#xff1a;未知”的未正确签名的应用程序。 没有被签名的软件 因此&#xff0c;这里有一个有关经验教训的快速指南&#xff0c;可能会帮助您从一开始就正确签署安装程序。请注意&#xff0c;这些说明适用于对已…

环境搭建-Ubuntu18.04.6系统TensorFlow BenchMark的GPU测试

1. 下载Ubuntu18.04.6镜像 登录阿里云官方镜像站&#xff1a;阿里巴巴开源镜像站-OPSX镜像站-阿里云开发者社区 2. 测试环境 Server OS&#xff1a;Ubuntu 20.04.6 LTS Kernel: Linux 5.4.0-155-generic x86-64 Docker Version&#xff1a;24.0.5, build ced0996 docker-com…