实时从TDengine数据库采集数据到Kafka Topic

news2025/1/9 3:13:15

实时从TDengine数据库采集数据到Kafka Topic

  • 一、认识TDengine
  • 二、TDengine Kafka Connector
  • 三、什么是 Kafka Connect?
  • 四、前置条件
  • 五、安装 TDengine Connector 插件
  • 六、启动 Kafka
  • 七、验证 kafka Connect 是否启动成功
  • 八、TDengine Source Connector 的使用
  • 九、添加 Source Connector 配置文件
  • 十、准备测试数据
  • 十一、创建 Source Connector 实例
  • 十二、查看 topic 数据
  • 十三、unload 插件
  • 十四、性能调优
  • 十五、配置参考
    • 通用配置
    • TDengine Source Connector 特有的配置
  • 十六、更多内容

一、认识TDengine

TDengine是一款高性能、高稳定性的开源时间序列数据库。它是由中国的PingCAP团队开发并开源的,旨在为大规模数据存储和实时分析提供解决方案。TDengine具有以下特点:

  • 高性能:TDengine使用了多种优化技术,如数据压缩、索引优化和并行计算,以实现高性能的数据写入和查询。它能够处理大规模的数据,并且在毫秒级的响应时间内提供查询结果。
  • 高稳定性:TDengine具有良好的容错和恢复机制,能够保证数据的持久性和可靠性。它支持数据的多副本备份和自动故障转移,以及数据一致性和完整性的检查。
  • 时间序列支持:TDengine专注于时间序列数据的存储和分析,能够高效地处理时间序列数据的写入、查询和聚合操作。它支持多种数据类型和数据模型,如数字、文本、地理位置和时间等。
  • 开源:TDengine是一个开源项目,遵循Apache 2.0许可证。用户可以自由地使用、修改和分发该软件,同时也可以参与到开发和改进过程中。
  • 总之,TDengine是一款专注于时间序列数据存储和分析的高性能、高稳定性的开源数据库,适用于大规模数据存储和实时分析的场景。

二、TDengine Kafka Connector

  • TDengine Kafka Connector 包含两个插件: TDengine Source Connector 和 TDengine Sink Connector。用户只需提供简单的配置文件,就可以将 Kafka 中指定 topic 的数据(批量或实时)同步到 TDengine, 或将 TDengine 中指定数据库的数据(批量或实时)同步到 Kafka。

三、什么是 Kafka Connect?

  • Kafka Connect 是 Apache Kafka 的一个组件,用于使其它系统,比如数据库、云服务、文件系统等能方便地连接到 Kafka。数据既可以通过 Kafka Connect 从其它系统流向 Kafka, 也可以通过 Kafka Connect 从 Kafka 流向其它系统。从其它系统读数据的插件称为 Source Connector, 写数据到其它系统的插件称为 Sink Connector。Source Connector 和 Sink Connector 都不会直接连接 Kafka Broker,Source Connector 把数据转交给 Kafka Connect。Sink Connector 从 Kafka Connect 接收数据。

在这里插入图片描述

TDengine Source Connector 用于把数据实时地从 TDengine 读出来发送给 Kafka Connect。TDengine Sink Connector 用于 从 Kafka Connect 接收数据并写入 TDengine。

在这里插入图片描述

四、前置条件

运行本教程中示例的前提条件。

  • Linux 操作系统
  • 已安装 Java 8 和 Maven
  • 已安装 Git、curl、vi
  • 已安装并启动 TDengine。

五、安装 TDengine Connector 插件

编译插件

git clone --branch 3.0 https://github.com/taosdata/kafka-connect-tdengine.git
cd kafka-connect-tdengine
mvn clean package -Dmaven.test.skip=true
unzip -d $KAFKA_HOME/components/ target/components/packages/taosdata-kafka-connect-tdengine-*.zip

以上脚本先 clone 项目源码,然后用 Maven 编译打包。打包完成后在 target/components/packages/ 目录生成了插件的 zip 包。把这个 zip 包解压到安装插件的路径即可。上面的示例中使用了内置的插件安装路径: $KAFKA_HOME/components/。

配置插件

将 kafka-connect-tdengine 插件加入 $KAFKA_HOME/config/connect-distributed.properties 配置文件 plugin.path 中

plugin.path=/usr/share/java,/opt/kafka/components

六、启动 Kafka

zookeeper-server-start.sh -daemon $KAFKA_HOME/config/zookeeper.properties

kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

connect-distributed.sh -daemon $KAFKA_HOME/config/connect-distributed.properties

七、验证 kafka Connect 是否启动成功

输入命令:

curl http://localhost:8083/connectors

如果各组件都启动成功,会得到如下输出:

[]

八、TDengine Source Connector 的使用

TDengine Source Connector 的作用是将 TDengine 某个数据库某一时刻之后的数据全部推送到 Kafka。TDengine Source Connector 的实现原理是,先分批拉取历史数据,再用定时查询的策略同步增量数据。同时会监控表的变化,可以自动同步新增的表。如果重启 Kafka Connect, 会从上次中断的位置继续同步。

TDengine Source Connector 会将 TDengine 数据表中的数据转换成 InfluxDB Line 协议格式 或 OpenTSDB JSON 协议格式然后写入 Kafka。

下面的示例程序同步数据库 test 中的数据到主题 tdengine-test-meters。

九、添加 Source Connector 配置文件

vi source-demo.json

输入以下内容:

source-demo.json

{
  "name":"TDengineSourceConnector",
    "config":{
    "connector.class": "com.taosdata.kafka.connect.source.TDengineSourceConnector",
    "tasks.max": 1,
    "subscription.group.id": "source-demo",
    "connection.url": "jdbc:TAOS://127.0.0.1:6030",
    "connection.user": "root",
    "connection.password": "taosdata",
    "connection.database": "test",
    "connection.attempts": 3,
    "connection.backoff.ms": 5000,
    "topic.prefix": "tdengine",
    "topic.delimiter": "-",
    "poll.interval.ms": 1000,
    "fetch.max.rows": 100,
    "topic.per.stable": true,
    "topic.ignore.db": false,
    "out.format": "line",
    "data.precision": "ms",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter"
    }
}

十、准备测试数据

准备生成测试数据的 SQL 文件。

prepare-source-data.sql

DROP DATABASE IF EXISTS test;
CREATE DATABASE test;
USE test;
CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT);

INSERT INTO d1001 USING meters TAGS('California.SanFrancisco', 2) VALUES('2018-10-03 14:38:05.000',10.30000,219,0.31000) \
            d1001 USING meters TAGS('California.SanFrancisco', 2) VALUES('2018-10-03 14:38:15.000',12.60000,218,0.33000) \
            d1001 USING meters TAGS('California.SanFrancisco', 2) VALUES('2018-10-03 14:38:16.800',12.30000,221,0.31000) \
            d1002 USING meters TAGS('California.SanFrancisco', 3) VALUES('2018-10-03 14:38:16.650',10.30000,218,0.25000) \
            d1003 USING meters TAGS('California.LosAngeles', 2)   VALUES('2018-10-03 14:38:05.500',11.80000,221,0.28000) \
            d1003 USING meters TAGS('California.LosAngeles', 2)   VALUES('2018-10-03 14:38:16.600',13.40000,223,0.29000) \
            d1004 USING meters TAGS('California.LosAngeles', 3)   VALUES('2018-10-03 14:38:05.000',10.80000,223,0.29000) \
            d1004 USING meters TAGS('California.LosAngeles', 3)   VALUES('2018-10-03 14:38:06.500',11.50000,221,0.35000);

使用 TDengine CLI, 执行 SQL 文件。

taos -f prepare-source-data.sql

十一、创建 Source Connector 实例

curl -X POST -d @source-demo.json http://localhost:8083/connectors -H "Content-Type: application/json"

十二、查看 topic 数据

使用 kafka-console-consumer 命令行工具监控主题 tdengine-test-meters 中的数据。一开始会输出所有历史数据, 往 TDengine 插入两条新的数据之后,kafka-console-consumer 也立即输出了新增的两条数据。 输出数据 InfluxDB line protocol 的格式。

kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic tdengine-test-meters

输出:

......
meters,location="California.SanFrancisco",groupid=2i32 current=10.3f32,voltage=219i32,phase=0.31f32 1538548685000000000
meters,location="California.SanFrancisco",groupid=2i32 current=12.6f32,voltage=218i32,phase=0.33f32 1538548695000000000
......

此时会显示所有历史数据。切换到 TDengine CLI, 插入两条新的数据:

USE test;
INSERT INTO d1001 VALUES (now, 13.3, 229, 0.38);
INSERT INTO d1002 VALUES (now, 16.3, 233, 0.22);

再切换回 kafka-console-consumer, 此时命令行窗口已经打印出刚插入的 2 条数据。

十三、unload 插件

测试完毕之后,用 unload 命令停止已加载的 connector。

查看当前活跃的 connector:

curl http://localhost:8083/connectors

如果按照前述操作,此时应有两个活跃的 connector。使用下面的命令 unload:

curl -X DELETE http://localhost:8083/connectors/TDengineSinkConnector
curl -X DELETE http://localhost:8083/connectors/TDengineSourceConnector

十四、性能调优

如果在从 TDengine 同步数据到 Kafka 的过程中发现性能不达预期,可以尝试使用如下参数提升 Kafka 的写入吞吐量。

  • 打开 KAFKA_HOME/config/producer.properties 配置文件。
  • 参数说明及配置建议如下:
参数参数说明设置建议
producer.type此参数用于设置消息的发送方式,默认值为 sync 表示同步发送,async 表示异步发送。采用异步发送能够提升消息发送的吞吐量。async
request.required.acks参数用于配置生产者发送消息后需要等待的确认数量。当设置为1时,表示只要领导者副本成功写入消息就会给生产者发送确认,而无需等待集群中的其他副本写入成功。这种设置可以在一定程度上保证消息的可靠性,同时也能保证一定的吞吐量。因为不需要等待所有副本都写入成功,所以可以减少生产者的等待时间,提高发送消息的效率。1
max.request.size该参数决定了生产者在一次请求中可以发送的最大数据量。其默认值为 1048576,也就是 1M。如果设置得太小,可能会导致频繁的网络请求,降低吞吐量。如果设置得太大,可能会导致内存占用过高,或者在网络状况不佳时增加请求失败的概率。建议设置为 100M。104857600
batch.size此参数用于设定 batch 的大小,默认值为 16384,即 16KB。在消息发送过程中,发送到 Kafka 缓冲区中的消息会被划分成一个个的 batch。故而减小 batch 大小有助于降低消息延迟,而增大 batch 大小则有利于提升吞吐量,可根据实际的数据量大小进行合理配置。可根据实际情况进行调整,建议设置为 512K。524288
buffer.memory此参数用于设置生产者缓冲待发送消息的内存总量。较大的缓冲区可以允许生产者积累更多的消息后批量发送,提高吞吐量,但也会增加延迟和内存使用。可根据机器资源来配置,建议配置为 1G。1073741824

十五、配置参考

通用配置

以下配置项对 TDengine Sink Connector 和 TDengine Source Connector 均适用。

  • name: connector 名称。
  • connector.class: connector 的完整类名, 如: com.taosdata.kafka.connect.sink.TDengineSinkConnector。
  • tasks.max: 最大任务数, 默认 1。
  • topics: 需要同步的 topic 列表, 多个用逗号分隔, 如 topic1,topic2。
  • connection.url: TDengine JDBC 连接字符串, 如 jdbc:TAOS://127.0.0.1:6030。
  • connection.user: TDengine 用户名, 默认 root。
  • connection.password :TDengine 用户密码, 默认 taosdata。
  • connection.attempts :最大尝试连接次数。默认 3。
  • connection.backoff.ms : 创建连接失败重试时间隔时间,单位为 ms。 默认 5000。

TDengine Source Connector 特有的配置

  • connection.database: 源数据库名称,无缺省值。
  • topic.prefix: 数据导入 kafka 时使用的 topic 名称的前缀。默认为空字符串 “”。
  • timestamp.initial: 数据同步起始时间。格式为’yyyy-MM-dd HH:mm:ss’,若未指定则从指定 DB 中最早的一条记录开始。
  • poll.interval.ms: 检查是否有新建或删除的表的时间间隔,单位为 ms。默认为 1000。
  • fetch.max.rows : 检索数据库时最大检索条数。 默认为 100。
  • query.interval.ms: 从 TDengine 一次读取数据的时间跨度,需要根据表中的数据特征合理配置,避免一次查询的数据量过大或过小;在具体的环境中建议通过测试设置一个较优值,默认值为 0,即获取到当前最新时间的所有数据。
  • out.format : 结果集输出格式。line 表示输出格式为 InfluxDB Line 协议格式,json 表示输出格式是 json。默认为 line。
  • data.precision: 使用 InfluxDB 行协议格式时,时间戳的精度。可选值为:
    • ms : 表示毫秒,
    • us : 表示微秒
    • ns : 表示纳秒。
  • topic.per.stable: 如果设置为 true,表示一个超级表对应一个 Kafka topic,topic的命名规则 <topic.prefix><topic.delimiter><connection.database><topic.delimiter>
    <stable.name>;如果设置为 false,则指定的 DB 中的所有数据进入一个 Kafka topic,topic 的命名规则为 <topic.prefix><topic.delimiter><connection.database>
  • topic.ignore.db: topic 命名规则是否包含 database 名称,true 表示规则为 <topic.prefix><topic.delimiter><stable.name>,false 表示规则为 <topic.prefix><topic.delimiter><connection.database><topic.delimiter><stable.name>,默认 false。此配置项在 topic.per.stable 设置为 false 时不生效。
  • topic.delimiter: topic 名称分割符,默认为 -。
  • read.method: 从 TDengine 读取数据方式,query 或是 subscription。默认为 subscription。
  • subscription.group.id: 指定 TDengine 数据订阅的组 id,当 read.method 为 subscription 时,此项为必填项。
  • subscription.from: 指定 TDengine 数据订阅起始位置,latest 或是 earliest。默认为 latest。

十六、更多内容

更多内容请参阅官网:

  • https://docs.taosdata.com/third-party/collection/kafka/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2209686.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【更新】A股上市公司企业网络安全治理数据集(2007-2023年)

一、测算方式&#xff1a;参考C刊《金融评论》王辉&#xff08;2024&#xff09;老师的做法&#xff0c;安全治理种子词的选取主要依托于《中华人民共和国网络安全法》、《中华人民共和国数据安全法》、《关键信息基础设施安全保护条例》等法律法规文件与《网络安全审查办法》、…

蓝桥杯刷题--幸运数字

幸运数字 题目: 解析: 我们由题目可以知道,某个进制的哈沙德数就是该数和各个位的和取整为0.然后一个幸运数字就是满足所有进制的哈沙德数之和.然后具体就是分为以下几个步骤 1. 我们先写一个方法,里面主要是用来判断,这个数在该进制下是否是哈沙德数 2. 我们在main方法里面调用…

量化之一:均值回归策略

文章目录 均值回归策略理论基础数学公式 关键指标简单移动平均线&#xff08;SMA&#xff09;标准差Z-Score 交易信号实际应用优缺点分析优点缺点 结论 实践backtrader参数&#xff1a;正常情况&#xff1a;异常情况&#xff1a; 均值回归策略 均值回归&#xff08;Mean Rever…

华为公有云实战

1.申请一台ECS云主机&#xff0c;并且可以提供web服务 1.1访问云主机-华为特有技术novnc&#xff0c;KVM中提到vnc技术&#xff0c;novnc是不用安装vnc客户端用浏览器html语言实现。 1.2cloudshell 1.3小工具 ssh 弹性ip 1.4.安装httpd服务 建立索引文件 浏览器上输入弹性ip可…

网络资源模板--Android Studio 实现简易记事本App

目录 一、项目演示 二、项目测试环境 三、项目详情 四、完整的项目源码 一、项目演示 网络资源模板--基于Android studio 实现的简易记事本App 二、项目测试环境 三、项目详情 首页 创建一个空的笔记本列表 mNotebookList。使用该列表和指定的布局资源 item_notebook 创建…

前端开发笔记--html 黑马程序员1

文章目录 前端开发工具--VsCode前端开发基础语法VsCode优秀插件Chinese --中文插件Auto Rename Tag --自动重命名插件open in browserOpen in Default BrowserOpen in Other Browser Live Server -- 实时预览 前端开发工具–VsCode 轻量级与快速启动 快速加载&#xff1a;VSCo…

WordPress添加meta标签做seo优化

一、使用function.php文件添加钩子函数添加 方法1、使用is_page()判断不同页面的page_id进行辨别添加不同页面keyword和description &#xff08;1&#xff09;通过页面前台源码查看对应页面的id &#xff08;2&#xff09;或者通过wordpress后台&#xff0c;点击页面列表&…

云计算ftp 服务器实验

创建VLAN 10 划分端口 创建VLAN 10 的地址 10.1.1.1 服务器的地址是 10.1.1.2 这是服务上的配置 服务器上选择ftp 启动 &#xff0c;文件目录选择一下 在 交换机上 ftp 10.1.1.2 服务器的地址 把刚才创建的shenyq txt 文件下载下到本地交换机 我们能看到交换…

有关安科瑞Acrel-1000DP分布式光伏监控系统在某公司分布式光伏发电项目中的应用探讨-安科瑞 蒋静

摘要&#xff1a;分布式光伏作为可再生能源的一种重要形式&#xff0c;能够根据不同场地的实际情况进行定制&#xff0c;尽可能地利用可用空间&#xff0c;减少对传统化石燃料的依赖&#xff0c;也能降低温室气体排放、改善环境质量。在政策支持和市场需求的双重推动下&#xf…

电脑查不到IP地址是什么原因?怎么解决

在日常使用电脑的过程中&#xff0c;有时会遇到无法查询到电脑IP地址的情况&#xff0c;这可能会影响到网络的正常使用。本文将探讨电脑查不到IP地址的可能原因&#xff0c;并提供相应的解决方案。 一、原因分析 ‌网络连接问题‌&#xff1a;首先&#xff0c;网络连接不稳定或…

MySQL(B站CodeWithMosh)——2024.10.11(14)

ZZZZZZ目的ZZZZZZ代码ZZZZZZ重点ZZZZZZ操作&#xff08;非代码&#xff0c;需要自己手动&#xff09; 8- CASE运算符The CASE Operator_哔哩哔哩_bilibilihttps://www.bilibili.com/video/BV1UE41147KC?p62&vd_sourceeaeec77dfceb13d96cce76cc299fdd08 在sql_store中&am…

智能网联汽车安全隐患,如何化解?

0. 智能网联汽车安全问题如何才能解决&#xff1f;1. TARA 威胁分析与风险评估平台2. CSTP 智能网联汽车网络安全测试平台3. 智能网联汽车安全解决方案4. 车联网测试认证与培训解决方案5. 车联网网络安全实验室建设方案 0. 智能网联汽车安全问题如何才能解决&#xff1f; 智能…

FFmpeg的简单使用【Windows】--- 简单的视频混合拼接

实现功能 点击【选择文件】按钮在弹出的对话框中选择多个视频&#xff0c;这些视频就是一会将要混剪的视频素材&#xff0c;点击【开始处理】按钮之后就会开始对视频进行处理&#xff0c;处理完毕之后会将处理后的文件路径返回&#xff0c;并在页面展示处理后的视频。 视频所…

【数据结构】排序算法系列——桶排序(附源码+图解)

桶排序 算法思想 桶排序&#xff08;BucketSort)&#xff0c;也被叫做箱排序&#xff0c;它将整个数据组分为n个相同大小的子区间&#xff0c;这类子区间或称为桶。输入数据是均匀、独立分布的&#xff0c;所以一般不会出现一个桶中装有过多数据的情况。作为一种排序算法&…

160页PPT | 埃森哲-制造业变革转型八大领域:痛点剖析与改进策略

PT下载链接见文末~ 引言&#xff1a;制造业数字化转型规划 制造业正处于数字化转型的关键时期&#xff0c;旨在通过技术革新和流程优化&#xff0c;灵活应对市场波动&#xff0c;强化竞争优势&#xff0c;并紧跟技术进步的步伐。此规划围绕三大核心要素展开&#xff1a; 1、…

Pytest中fixture的scope详解

pytest作为Python技术栈下最主流的测试框架&#xff0c;功能极为强大和灵活。其中Fixture夹具是它的核心。而且pytest中对Fixture的作用范围也做了不同区分&#xff0c;能为我们利用fixture带来很好地灵活性。 下面我们就来了解下这里不同scope的作用 fixture的scope定义 首…

8.优化存储过程的性能(8/10)

优化存储过程的性能 1.引言 存储过程是数据库系统中预先编写好的SQL语句集合&#xff0c;它们被保存在数据库服务器上&#xff0c;可以在需要时被调用执行。存储过程的使用可以提高数据库操作的效率&#xff0c;减少网络通信&#xff0c;并且可以封装复杂的逻辑&#xff0c;使…

中科星图GVE(案例)——AI实现建筑用地变化前后对比情况

目录 简介 函数 gve.Services.AI.ConstructionLandChangeExtraction(image1,image2) 代码 结果 知识星球 机器学习 简介 AI可以通过分析卫星图像、航拍影像或其他地理信息数据&#xff0c;实现建筑用地变化前后对比。以下是一种可能的实现方法&#xff1a; 数据获取&am…

全能PDF工具集 | PDF Shaper Ultimate v14.6 便携版

软件简介 PDF Shaper是一款功能强大的PDF工具集&#xff0c;它提供了一系列用于处理PDF文档的工具。这款软件使用户能够轻松地转换、分割、合并、提取页面以及旋转和加密PDF文件。PDF Shaper的界面简洁直观&#xff0c;使得即使是新手用户也能快速上手。它支持广泛的功能&…

牛客一>DP34 【模板】前缀和

1.题目&#xff1a; 【模板】前缀和_牛客题霸_牛客网 2.解析&#xff1a;这里可以看成一个缩小版动态规划 代码&#xff1a; import java.util.Scanner;// 注意类名必须为 Main, 不要有任何 package xxx 信息 public class Main {public static void main(String[] args) {Scan…