基于Confluent Kafka部署Kafka Connect集群,Kafka Connect集群加载debezium插件

news2025/1/10 21:47:31

基于Confluent Kafka部署Kafka Connect集群,Kafka Connect集群加载debezium插件

  • 一、下载Confluent Kafka
  • 二、配置文件connect-distributed.properties
  • 三、启动脚本connect-distributed
  • 四、启动Kafka Connect集群
  • 五、加载debezium插件
  • 六、总结和延伸

一、下载Confluent Kafka

Confluent Kafka的下载地址:

  • https://www.confluent.io/download/

下载社区免费版本:

在这里插入图片描述

二、配置文件connect-distributed.properties

核心参数如下所示:

  • /data/src/confluent-7.3.3/etc/schema-registry/connect-distributed.properties
bootstrap.servers=realtime-kafka-001:9092,realtime-kafka-003:9092,realtime-kafka-002:9092


group.id=datasight-confluent-test-debezium-cluster-status

key.converter=org.apache.kafka.connect.json.JsonConverter

value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=true
value.converter.schemas.enable=true

config.storage.topic=offline_confluent_test_debezium_cluster_connect_configs
offset.storage.topic=offline_confluent_test_debezium_cluster_connect_offsets
status.storage.topic=offline_confluent_test_debezium_cluster_connect_statuses


config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3

offset.storage.partitions=25
status.storage.partitions=5
config.storage.partitions=1

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=true
internal.value.converter.schemas.enable=true

#rest.host.name=0.0.0.0
#rest.port=8083

#rest.advertised.host.name=0.0.0.0
#rest.advertised.port=8083

plugin.path=/data/service/debezium/connectors2

三、启动脚本connect-distributed

  • /data/src/confluent-7.3.3/bin/connect-distributed

  • connect-distributed的脚本内容如下所示,可以不需要修改

  • 如果需要导出kafka connector的jmx,则需要设置jmx导出端口和jmx导出器,详细的部署方式可以参考博主下面这篇技术博客:

    • Debezium系列之:安装jmx导出器监控debezium指标
if [ $# -lt 1 ];
then
        echo "USAGE: $0 [-daemon] connect-distributed.properties"
        exit 1
fi

base_dir=$(dirname $0)

###
### Classpath additions for Confluent Platform releases (LSB-style layout)
###
#cd -P deals with symlink from /bin to /usr/bin
java_base_dir=$( cd -P "$base_dir/../share/java" && pwd )

# confluent-common: required by kafka-serde-tools
# kafka-serde-tools (e.g. Avro serializer): bundled with confluent-schema-registry package
for library in "confluent-security/connect" "kafka" "confluent-common" "kafka-serde-tools" "monitoring-interceptors"; do
  dir="$java_base_dir/$library"
  if [ -d "$dir" ]; then
    classpath_prefix="$CLASSPATH:"
    if [ "x$CLASSPATH" = "x" ]; then
      classpath_prefix=""
    fi
    CLASSPATH="$classpath_prefix$dir/*"
  fi
done

if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
  LOG4J_CONFIG_DIR_NORMAL_INSTALL="/etc/kafka"
  LOG4J_CONFIG_NORMAL_INSTALL="${LOG4J_CONFIG_DIR_NORMAL_INSTALL}/connect-log4j.properties"
  LOG4J_CONFIG_DIR_ZIP_INSTALL="$base_dir/../etc/kafka"
  LOG4J_CONFIG_ZIP_INSTALL="${LOG4J_CONFIG_DIR_ZIP_INSTALL}/connect-log4j.properties"
  if [ -e "$LOG4J_CONFIG_NORMAL_INSTALL" ]; then # Normal install layout
    KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${LOG4J_CONFIG_NORMAL_INSTALL} -Dlog4j.config.dir=${LOG4J_CONFIG_DIR_NORMAL_INSTALL}"
  elif [ -e "${LOG4J_CONFIG_ZIP_INSTALL}" ]; then # Simple zip file layout
    KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${LOG4J_CONFIG_ZIP_INSTALL} -Dlog4j.config.dir=${LOG4J_CONFIG_DIR_ZIP_INSTALL}"
  else # Fallback to normal default
    KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/connect-log4j.properties -Dlog4j.config.dir=$base_dir/../config"
  fi
fi
export KAFKA_LOG4J_OPTS

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
  export KAFKA_HEAP_OPTS="-Xms256M -Xmx2G"
fi

EXTRA_ARGS=${EXTRA_ARGS-'-name connectDistributed'}

COMMAND=$1
case $COMMAND in
  -daemon)
    EXTRA_ARGS="-daemon "$EXTRA_ARGS
    shift
    ;;
  *)
    ;;
esac

export CLASSPATH
exec $(dirname $0)/kafka-run-class $EXTRA_ARGS org.apache.kafka.connect.cli.ConnectDistributed "$@"

四、启动Kafka Connect集群

启动命令如下所示:

/data/src/confluent-7.3.3/bin/connect-distributed /data/src/confluent-7.3.3/etc/schema-registry/connect-distributed.properties

正常启动Kafka Connect集群完整输出如下所示:

[2023-06-21 16:43:01,249] INFO EnrichedConnectorConfig values: 
        config.action.reload = restart
        connector.class = io.debezium.connector.mysql.MySqlConnector
        errors.log.enable = false
        errors.log.include.messages = false
        errors.retry.delay.max.ms = 60000
        errors.retry.timeout = 0
        errors.tolerance = none
        exactly.once.support = requested
        header.converter = null
        key.converter = null
        name = mysql-dw-valuekey-test
        offsets.storage.topic = null
        predicates = []
        tasks.max = 1
        topic.creation.default.exclude = []
        topic.creation.default.include = [.*]
        topic.creation.default.partitions = 12
        topic.creation.default.replication.factor = 3
        topic.creation.groups = []
        transaction.boundary = poll
        transaction.boundary.interval.ms = null
        transforms = [unwrap, moveFieldsToHeader, moveHeadersToValue, Reroute]
        transforms.Reroute.key.enforce.uniqueness = true
        transforms.Reroute.key.field.regex = null
        transforms.Reroute.key.field.replacement = null
        transforms.Reroute.logical.table.cache.size = 16
        transforms.Reroute.negate = false
        transforms.Reroute.predicate = 
        transforms.Reroute.topic.regex = debezium-dw-encryption-test.dw.(.*)
        transforms.Reroute.topic.replacement = debezium-test-dw-encryption-all3
        transforms.Reroute.type = class io.debezium.transforms.ByLogicalTableRouter
        transforms.moveFieldsToHeader.fields = [cdc_code, product]
        transforms.moveFieldsToHeader.headers = [product_code, productname]
        transforms.moveFieldsToHeader.negate = false
        transforms.moveFieldsToHeader.operation = copy
        transforms.moveFieldsToHeader.predicate = 
        transforms.moveFieldsToHeader.type = class org.apache.kafka.connect.transforms.HeaderFrom$Value
        transforms.moveHeadersToValue.fields = [product_code2, productname2]
        transforms.moveHeadersToValue.headers = [product_code, productname]
        transforms.moveHeadersToValue.negate = false
        transforms.moveHeadersToValue.operation = copy
        transforms.moveHeadersToValue.predicate = 
        transforms.moveHeadersToValue.type = class io.debezium.transforms.HeaderToValue
        transforms.unwrap.add.fields = []
        transforms.unwrap.add.headers = []
        transforms.unwrap.delete.handling.mode = drop
        transforms.unwrap.drop.tombstones = true
        transforms.unwrap.negate = false
        transforms.unwrap.predicate = 
        transforms.unwrap.route.by.field = 
        transforms.unwrap.type = class io.debezium.transforms.ExtractNewRecordState
        value.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:376)
[2023-06-21 16:43:01,253] INFO [mysql-dw-valuekey-test|task-0] Loading the custom topic naming strategy plugin: io.debezium.schema.DefaultTopicNamingStrategy (io.debezium.config.CommonConnectorConfig:849)
Jun 21, 2023 4:43:01 PM org.glassfish.jersey.internal.Errors logErrors
WARNING: The following warnings have been detected: WARNING: The (sub)resource method listLoggers in org.apache.kafka.connect.runtime.rest.resources.LoggingResource contains empty path annotation.
WARNING: The (sub)resource method listConnectors in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method createConnector in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method listConnectorPlugins in org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource contains empty path annotation.
WARNING: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation.

[2023-06-21 16:43:01,482] INFO Started o.e.j.s.ServletContextHandler@2b80497f{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:921)
[2023-06-21 16:43:01,482] INFO REST resources initialized; server is started and ready to handle requests (org.apache.kafka.connect.runtime.rest.RestServer:324)
[2023-06-21 16:43:01,482] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:56)

五、加载debezium插件

  • 下载debezium插件到plugin.path=/data/service/debezium/connectors2设置的目录下
  • 然后重新启动Kafka Connect集群就能够成功加载debezium插件

重启Kafka Connect集群查看debezium插件是否加载成功,如下所示:成功加载到了debezium 插件

[{
"class":"io.debezium.connector.mysql.MySqlConnector",
"type":"source",
"version":"2.2.1.Final"},

{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"7.3.3-ce"},

{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"7.3.3-ce"},

{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"7.3.3-ce"}]

六、总结和延伸

总结:

  • 至此成功部署了具有一个节点的Kafka Connect集群,如果需要更多节点,需要在多台服务器上启动Kafka Connect,从而组成一个多节点的Kafka Connect集群

基于Kafka Connect加载debezium插件的更多的内容可以参考博主以下几篇技术博客或者Debezium 专栏:

  • Debezium系列之:安装部署debezium详细步骤,并把debezium服务托管到systemctl
  • Debezium系列之:打通Debezium2.0以上版本的使用技术
  • Debezium系列之:安装部署debezium2.0以上版本的详细步骤
  • Debezium系列之:实现接入上千Mysql、Sqlserver、MongoDB、Postgresql数据库的Debezium集群从Debezium1.X版本升级到Debezium2.X版本
  • Debezium系列之:安装jmx导出器监控debezium指标
  • Debezium系列之:Debezium UI部署详细步骤
  • Debezium 专栏地址

延伸:

  • 组成一个Kafka Connect集群后,需要启动多个connector进行Kafka Connect集群稳定性、可靠性测试。
  • 可以进一步部署Kafka Connect集群UI

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/670989.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

leetcode39. 组合总和(回溯算法-java)

组合总和 leetcode39. 组合总和题目描述解题思路代码演示 回溯算法专题 leetcode39. 组合总和 来源:力扣(LeetCode) 链接:https://leetcode.cn/problems/combination-sum 题目描述 给你一个 无重复元素 的整数数组 candidates 和一…

浏览器跨域限制:为什么浏览器不能跨域发送Ajax请求?

💂 个人网站:【海拥】【游戏大全】【神级源码资源网】🤟 前端学习课程:👉【28个案例趣学前端】【400个JS面试题】💅 寻找学习交流、摸鱼划水的小伙伴,请点击【摸鱼学习交流群】 目录 前言什么是跨域请求&am…

什么是虚拟展厅?教您快速打造一个3D元宇宙虚拟展厅

引言: 在如今的数字化时代,虚拟展厅和3D元宇宙成为了展示和推广产品、品牌以及创意的新兴方式。虚拟展厅为企业带来了无限的可能性,如何快速打造一个3D元宇宙虚拟展厅成了许多企业想了解的。 一.虚拟展厅的魅力 1.什么是虚拟展厅…

BUUCTF刷题十一道(07)

文章目录 [Zer0pts2020]Can you guess it?[CISCN2019 华北赛区 Day1 Web2]ikun[GWCTF 2019]枯燥的抽奖[WUSTCTF2020]CV Maker[NCTF2019]True XML cookbook[RCTF2015]EasySQL[CISCN2019 华北赛区 Day1 Web1]Dropbox[CISCN2019 华北赛区 Day1 Web5]CyberPunk[红明谷CTF 2021]wri…

防汛四级应急响应启动,尾矿库如何安全度过汛期?

国家防总办公室向上海、江苏、浙江、安徽、江西、河南、湖北、湖南、广西、重庆、四川、贵州、云南等省份防指下发通知,要求全面压实以地方行政首长负责制为核心的各项防汛责任,加强精准监测预报和会商研判,落实好“叫应”机制,确…

我把一句话需求交给AI,它竟然给我返回了……

👉腾小云导读 也许你经历过这种情况:产品和设计同学用一句话就把需求说完了,你抓破脑袋做出来的版本又达不到他们的要求。不如尝试让 AI 承担痛苦,让它理解、拆解并实现一句话需求?本篇作者尝试提出一个自动配置可视化…

踩坑:Vue3 中的watch监视属性

文章目录 一、问题一:reactive 定义的响应式数据无 oldValue问题分析解决 二、问题二:watch默认开启了深度监视且无法关闭问题分析解决 一、问题一:reactive 定义的响应式数据无 oldValue 问题 监视 reactive 所定义的一个响应式数据&#…

linux安装git步骤;基于yum、dnf、源码安装【非常详细】

这里写目录标题 一 dnf安装二 yum安装三 源码安装1 基于 RPM 的发行版(Fedora/RHEL/RHEL衍生版)2 基于 Debian 的发行版(Debian/Ubuntu/Ubuntu-derivatives)3 yum软件包管理器来安装,这个一般是CnetOS Stream 8以前的版…

POLARDB IMCI 白皮书 云原生HTAP 数据库系统 一 与其他的商业数据库在HTAP的异同点(译)...

开头还是介绍一下群,如果感兴趣polardb ,mongodb ,mysql ,postgresql ,redis 等有问题,有需求都可以加群群内有各大数据库行业大咖,CTO,可以解决你的问题。加群请联系 liuaustin3 ,在新加的朋友会分到2群(共…

​关于 O2OA (翱途) 软件商用许可证授权形式的调整​

尊敬的小伙伴们: 非常感谢您对 O2OA 开发平台的关注与支持! 兰德网络 O2OA 平台软件商用许证授权形式正式由 “按年授权” 改为 “按版本买断” 的永久许可形式。 这意味着,合作伙伴在持有特定版本的软件商用许可后,将能够永久…

《Python精选300题》

专栏简介 Python 是一门功能强大的编程语言,已经成为了数据科学、机器学习、Web 开发等领域的首选语言之一。因此,掌握 Python 的相关知识点对于学习和使用这门语言至关重要。 本专栏中,精选了 300 道题目,囊括了 Python 入门阶段…

什么是链表?

链表 什么是链表? 链表是有序的数据结构,链表中的每个部分称为节点。可以首、尾、中间进行数据存取,链表的元素在内存中不必是连续的空间,每个节点通过 next 指针指向下一个节点。 优点 链表的添加和删除不会导致其余元素位移。…

java中synchronized和ReentrantLock的加锁和解锁能在不同线程吗?如果能,如何实现?

java中synchronized和ReentrantLock的加锁和解锁能在不同线程吗?如果能,如何实现? 答案2023-06-21: java的: 这个问题,我问了一些人,部分人是回答得有问题的。synchronized这是个关键字&…

23---WPF数据库ORM框架

一、仓库--存放货物---数据库--存放数据--关系型数据/非关系型数据库 1.关系型数据:保存数据保存关系--SqlServer,MySql,Oracle 2.非关系型数据:保存数据---Redis,Mongo,Memecahe 二、关系型数据和非关系型数据的区别: 1.关系…

高效底座模型LLaMA

论文标题:LLaMA: Open and Efficient Foundation Language Models 论文链接:https://arxiv.org/abs/2302.13971 论文来源:Meta AI 一、概述 大型语言模型(Large Languages Models,LLMs)通过大规模文本数据的…

Selenium 环境配置

如果你做过 Web 测试的工作,那么你应该明白 Web 测试中最重要的一部分工作就是自动化测试。自动化测试,顾名思义就是让浏览器自动运行,而无需手动操作。这和我们爬虫工作原理有些相似,我们爬虫也需要让浏览器运行网址来获取我们需…

HTTPS加密

目录 HTTPS加密1.加密和解密2.对称加密3.非对称加密4.中间人攻击5.证书 HTTPS加密 1.加密和解密 1.明文: 要传递的原始信息。 2.密文: 经过加密后的信息。 3.加密就是指将明文(要传输的信息)按照指定的方式进行变换,生成密文。 4.解密…

Pytest+selenium+allure+Jenkins自动化测试框架搭建及使用

一、 环境搭建 1. Python下载及安装 Python可应用于多平台包括windows, Linux 和 Mac OS X, 本文主要介绍windows环境下。你可以通过终端窗口输入 "python" 命令来查看本地是否已经安装Python以及Python的安装版本。 如未安装python, 推荐下载python 3.8.3以…

Android——事务处理(续)(十三)

1. 长按事件 1.1 知识点 &#xff08;1&#xff09;掌握长按事件的操作形式&#xff1b; &#xff08;2&#xff09;可以设置手机的桌面背景&#xff1b; 1.2 具体内容 范例&#xff1a;长按一张图片之后&#xff0c;此图片设置为手机桌面背景。 <LinearLayout xmlns:a…

LocalDateTime 和 LocalDate 与 date 有什么区别;LocalDateTime 示例,LocalDate 示例

目录 1 LocalDateTime 和 LocalDate 与 date 有什么区别2 LocalDateTime 示例&#xff1a;2 LocalDate 示例&#xff1a; 1 LocalDateTime 和 LocalDate 与 date 有什么区别 LocalDateTime、LocalDate和Date是 Java中不同的类库中用于表示日期和时间的类&#xff0c; 它们在功…