使用 bend-ingest-kafka 将数据流实时导入到 Databend

news2025/1/21 18:37:43

作者:韩山杰

Databend Cloud 研发工程师

https://github.com/hantmac

Databend是一个开源、高性能、低成本易于扩展的新一代云数据仓库。bend-ingest-kafka 是一个专为 Databend 设计的实时数据导入工具,它允许用户从 Apache Kafka 直接将数据流导入到 Databend 中,实现数据的实时分析和处理。

为什么选择bend-ingest-kafka?

  • 实时性: 能够实时地从 Kafka 中读取数据并导入到 Databend。
  • 高吞吐量: 支持高并发的数据导入,满足大规模数据处理的需求。
  • 易用性: 提供了简单直观的配置方式,便于用户快速上手。
  • 灵活性: 可二次开发支持多种数据格式和自定义转换逻辑。

环境准备

在使用 bend-ingest-kafka 之前,需要确保以下环境已经搭建好:

  • 一个运行中的 Databend 实例或者在 Databend Cloud 中创建一个 warehouse(推荐)。
  • 一个配置好的 Apache Kafka 集群。
  • 已经安装的 bend-ingest-kafka。

快速开始

Step 1: 安装 bend-ingest-kafka

可以从 Databend 的官方 GitHub 仓库 release 页面 下载对应 OS 架构的 bend-ingest-kafka 的可执行二进制文件,或者直接执行命令安装最新版本。

go install  github.com/databendcloud/bend-ingest-kafka@latest

Step 2: 配置 bend-ingest-kafka

配置文件通常包括 Kafka 的连接以及配置信息、Databend 的连接信息以及数据转换的逻辑。以下是一个简单的配置示例:

{
  "kafkaBootstrapServers": "localhost:9092",
  "kafkaTopic": "ingest_test",
  "KafkaConsumerGroup": "test",
  "mockData": "",
  "isJsonTransform": false,
  "databendDSN": "https://cloudapp:password@tn3ftqihs--medium-p8at.gw.aws-us-east-2.default.databend.com:443",
  "databendTable": "default.kfk_test",
  "batchSize": 10,
  "batchMaxInterval": 5,
  "dataFormat": "json",
  "workers": 1,
  "copyPurge": false,
  "copyForce": false,
  "disableVariantCheck": true,
  "minBytes": 1024,
  "maxBytes": 1048576,
  "maxWait": 10,
  "useReplaceMode": false,
  "userStage": "~"
}

具体的配置参数可以参考 Parameter References,这里对几个比较重要的参数展开解释。

  • isJsonTransform: 默认为 true,将 Kafka Json 数据逐字段转换为 Databend 表数据。通过设置 isJsonTransform 为 true 来使用此模式。如果设置为 false 的话,系统将在 Databend 中自动创建一个 raw table, 列包括 (uuid, koffset, kpartition, raw_data, record_metadata, add_time),并将原始数据导入此表。其中 raw_data 为导入的 kafka Json 数据,record_metadata 包含了本条数据的 kafka 元信息 - topicpartitionoffsetcreate_timekey,方便用户查询。
  • useReplaceMode: useReplaceMode 是一种去重模式,开启后如果表中已存在数据,新数据将替换旧数据。但 useReplaceMode 仅在 isJsonTransform 为 false 时支持,因为它需要在目标表中添加 koffset 和 kpartition 字段。在这种模式下,系统可以实现 exactly once 的同步语义,否则为 at-least-once 语义。
  • userStage: 用户的自定义 external stage name。

Step 3: 启动数据导入

这里使用 raw-data 模式作演示。

Kafka 的 Json 数据示例为:
{"i64": 10,"u64": 30,"f64": 20,"s": "hao","s2": "hello","a16":[1],"a8":[2],"d": "2011-03-06","t": "2016-04-04 11:30:00"}
模拟 kafka 生产数据

可以使用下面的脚本快速生成 kafka json 数据:

from confluent_kafka import Producer

# 创建一个Producer实例
p = Producer({'bootstrap.servers': 'localhost:9092'})

for i in range(1000000):
    json_data = '{"i64": 10,"u64": 30,"f64": 20,"s": "hao","s2": "hello","a16":[1],"a8":[2],"d": "2011-03-06","t": "2016-04-04 11:30:00"}'
    p.produce('ingest_test', json_data)
    print(i)
    p.flush()
使用配置文件启动 bend-ingest-kafka

默认读取 ./config/conf.json 配置文件,开始将 Kafka 中的数据导入到 Databend。

./bend-ingest-kafka

启动后可以看到 log 和 metrics:

到 Databend 中可以查询到已经同步的数据:

 由于 raw_data 和 record_metadata 的字段格式都是 JSON ,所以可以很灵活地做一些数据分析:

select record_metadata['partition'] p,
                min(record_metadata['offset']::bigint) o1,
        max(record_metadata['offset']::bigint) o2,
        o2-o1+1 sub_count,
        count(distinct record_metadata['offset']) distinct_cnt,
        count(1) cnt
from default.kfk_test 
group by p
order by p;

高级特性

  • 错误处理: 能够处理数据导入过程中的异常,并提供重试机制。
  • 监控与日志: 提供详细的日志记录和监控指标,方便跟踪数据导入的状态。

结语

bend-ingest-kafka 作为一个强大的工具,为 Databend 用户提供了从 Kafka 实时导入数据的能力。通过本文的介绍,用户应该能够快速上手并利用这个工具来实现实时数据处理的需求。

关于 Databend

Databend 是一款开源、弹性、低成本,基于对象存储也可以做实时分析的新式数仓。期待您的关注,一起探索云原生数仓解决方案,打造新一代开源 Data Cloud。
👨‍💻‍ Databend Cloud:https://databend.cn

📖 Databend 文档:https://docs.databend.cn/

💻 Wechat:Databend

✨ GitHub:https://github.com/datafuselabs/databend

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1891564.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【UML用户指南】-27-对体系结构建模-制品

目录 1、组成结构 2、制品的种类 2.1、部署制品 (deployment artifact) 2.2、工作产品制品 (work product artifact) 2.3、执行制品 (execution artifact) 3、标准元素 4、常用建模技术 4.1、对可执…

CGLib动态代理技术

基于CGLib的动态代理机制,ProxyFactoryy无需再像JDK动态代理那样实现一个interface,实际情况下可能这个interface并不存在,只需要实现另外一个接口MethodInterceptor即可 package com.hmdp.service.尚硅谷的代理模式3; //CGlib代理import …

UE5 03-物体碰撞检测

在你需要碰撞的物体上添加一个碰撞检测组件 碰撞预设 设置为NoCollision,这样移动过程中就不会有物理碰撞阻挡效果,只负责检测是否碰撞,比较难解释,如果学过Unity的话,可以把它理解成 Collision 为 Trigger

INFINI Console 使用介绍

上次在《INFINI Easysearch尝鲜Hands on》中我们部署了两个节点的Easysearch,并且也设置了Console对集群进行监控。那么今天我们再来介绍下INFINI Console的使用。 INFINI Console 仪表盘功能介绍 INFINI Console 是一个功能强大的数据管理和分析平台,…

conda env pip install error:No space left on device

conda 环境 pip install error:No space left on device 文章目录 conda 环境 pip install error:No space left on device现象1 实验2 分析和解决办法 现象 非root用户的服务器,需要安装环境,安装的环境超过2GB sudo pip insta…

Roboflow自动标定数据集

最近需要自己打数据集,记录一下用Roboflow来打标签。 https://roboflow.com/(官网) 进入官网先注册,注册完成后进入这个界面。 我先讲如果不想让数据集公开怎么办,因为这里每个新建的都是公开的。新账号进去应该进去…

Python | Leetcode Python题解之第214题最短回文串

题目: 题解: class Solution:def shortestPalindrome(self, s: str) -> str:n len(s)fail [-1] * nfor i in range(1, n):j fail[i - 1]while j ! -1 and s[j 1] ! s[i]:j fail[j]if s[j 1] s[i]:fail[i] j 1best -1for i in range(n - 1,…

LIS2DH12

LIS2DH12 是属于“nano”系列的超低功耗高性能 3 轴线性加速度计,具有数字 I 2C、SPI 串行接口标准输出。 器件具有超低功耗工作模式,可实现高级节能、智能睡眠唤醒以及恢复睡眠功能。 LIS2DH12 具有2g/4g/8g/16g 的动态用户可选满量程,并能通…

Adobe Acrobat添加时间戳服务器

文章目录 前言一、Adobe Acrobat添加时间戳服务器1.打开Adobe Acrobat软件2.点击【菜单】→ 【首选项】3.点击【安全性】→【更多】4.点击【新建】5.输入【名称】→【服务器URL】 前言 一、Adobe Acrobat添加时间戳服务器 1.打开Adobe Acrobat软件 2.点击【菜单】→ 【首选项…

汽车电子零部件(15):车载TFT LCD显示模组

前言: 车载显示越来越受到重视,屏的使用越来越大且多,车载显示屏模组技术也在快速发展。 在复杂的显示技术世界中,薄膜晶体管(TFT,Thin Film Transistor)液晶显示器(LCD,Liquid Crystal Display)模块的制造证明了现代工程的奇迹。 TFT显示器是全彩色LCD,提供明亮、生…

WPF 3D绘图 点云 系列五

基本概念:点云是某个坐标系下的点的数据集。 可能包含丰富的信息,包括三维坐标X,Y,Z、颜色、分类值、强度值、时间等等 点云可以将现实世界原子化,通过高精度的点云数据可以还原现实世界。万物皆点云。 通过三维激光扫描仪进行数据采集获取点云数据,其次通过二维影像进行…

新手教学系列——使用uWSGI对Flask应用提速

在构建和部署Flask应用时,性能和稳定性是两个关键的因素。为了提升Flask应用的性能,我们可以借助uWSGI这个强大的工具。本文将详细介绍为什么要使用uWSGI、uWSGI的底层原理,并提供一个实例配置,帮助你更好地理解和应用这个工具。 为什么要使用uWSGI uWSGI 是一个应用服务…

docker部署简单的Kafka

文章目录 1. 拉取镜像2. 运行创建网络运行 ZooKeeper 容器运行 Kafka 容器 3. 简单的校验1. 检查容器状态2. 检查 ZooKeeper 日志3. 检查 Kafka 日志4. 使用 Kafka 命令行工具检查5. 创建和删除测试主题 1. 拉取镜像 选择一组兼容性好的版本。 docker pull bitnami/kafka:3.6…

【C++】类和对象(中)--上篇

个人主页~ 类和对象上 类和对象 一、类的六个默认成员函数二、构造函数1、构造函数基本概念2、构造函数的特性 三、析构函数1、析构函数的概念2、特性 四、拷贝构造函数1、拷贝构造函数的概念2、特征 一、类的六个默认成员函数 如果有个类中什么成员都没有,那么被称…

[C++初阶]vector的初步理解

一、标准库中的vector类 1.vector的介绍 1. vector是表示可变大小数组的序列容器 , 和数组一样,vector可采用的连续存储空间来存储元素。也就是意味着可以采用下标对vector的元素进行访问,和数组一样高效。但是又不像数组,它的大…

营销故事之扩大牙膏开口

职场营销故事“扩大牙膏开口”又可以说是“牙膏开口扩大1毫米”,为十大经典营销故事之一。某品牌的牙膏,包装精美,品质优良,备受顾客喜爱,连续10年营业额保持10%-20%的增幅。可到了第11年,销售业绩却停滞不…

API-正则表达式

学习目标: 掌握正则表达式 学习内容: 什么是正则表达式语法元字符修饰符 什么是正则表达式: 正则表达式是用于匹配字符串中字符组合的模式。在JavaScript中,正则表达式也是对象。 通常用来查找、替换那些符合正则表达式的文本&a…

泛微开发修炼之旅--26前端j实现手机号码验证

文章链接:26前端j实现手机号码验证

开关电源中强制连续FCCM模式与轻载高效PSM,PFM模式优缺点对比笔记

文章目录 前言一、连续FCCM模式优点:缺点: 二,轻载高效PSM,PFM优点:缺点: 总结 前言 今天我们来学习下开关电源中,强制连续FCCM模式与轻载高效PSM,PFM模式优缺点对比 一、连续FCCM模式 优点: …

安装 VisualSVN Server提示HTTP服务无法启动的问题解决

安装 VisualSVN Server 版本:VisualSVN-Server-5.4.0-x64 安装包在安装到一半的时候,弹窗提示:HTTP服务无法启动,网上找了一大堆,说是service里面更改用户为本地用户什么的都没用用,点右键也无法启动。 …