构建高效实时数据流水线:Flink、Kafka 和 CnosDB 的完美组合

news2025/1/12 4:10:51

当今的数据技术生态系统中,实时数据处理已经成为许多企业不可或缺的一部分。为了满足这种需求,Apache Flink、Apache Kafka和CnosDB等开源工具的结合应运而生,使得实时数据流的收集、处理和存储变得更加高效和可靠。本篇文章将介绍如何使用 Flink、Kafka 和 CnosDB 来构建一个强大的实时数据处理流水线。

什么是 Flink、Kafka、CnosDB

  • Flink:是一个强大的流式处理引擎,它支持事件驱动、分布式、并且容错。Flink能够处理高吞吐量和低延迟的实时数据流,适用于多种应用场景,如数据分析、实时报表和推荐系统等。
  • Kafka:是一个高吞吐量的分布式流数据平台,用于收集、存储和传输实时数据流。Kafka具有良好的持久性、可扩展性和容错性,适用于构建实时数据流的可靠管道。
  • CnosDB:是一个专为时序数据设计的开源时序数据库。它具有高性能、高可用性和易用性的特性,非常适合存储实时生成的时间序列数据,如传感器数据、日志和监控数据等。

场景描述

用例中假设有一个物联网设备网络,每个设备都定期生成传感器数据,包括温度、湿度和压力等。我们希望能够实时地收集、处理和存储这些数据,以便进行实时监控和分析。

数据流向架构图如下:

  1. 首先,我们需要设置一个数据收集器来获取传感器数据,并将数据发送到 Kafka 主题。这可以通过编写一个生产者应用程序来实现,该应用程序将生成的传感器数据发送到 Kafka。
  2. 使用 Flink来实时处理传感器数据。首先,需要编写一个Flink应用程序,该应用程序订阅 Kafka 主题中的数据流,并对数据进行实时处理和转换。例如,您可以计算温度的平均值、湿度的最大值等。
  3. 将处理后的数据存储到 CnosDB 中以供后续查询。为了实现这一步,需要配置一个CnosDB Sink,使得Flink应用程序可以将处理后的数据写入 CnosDB 中。

构建流水线

1.数据采集与传输

编写一个生产者应用程序,读取传感器数据并将其发送到 Kafka 主题。

public class SensorDataProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "kafka-broker1:9092,kafka-broker2:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");


        KafkaProducer<String, String> producer = new KafkaProducer<>(props);


        while (true) {
            SensorData data = generateSensorData(); // 生成传感器数据
            producer.send(new ProducerRecord<>("sensor-data-topic", data));
            Thread.sleep(1000); // 每秒发送一次数据
        }
    }
}

2.实时处理与转换

编写一个 Flink 应用程序,订阅 Kafka 主题中的数据流,实时处理并转换数据。

// Flink 应用程序示例
public class SensorDataProcessingJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "kafka-broker1:9092,kafka-broker2:9092");
        props.setProperty("group.id", "sensor-data-consumer-group");


        DataStream<String> sensorData = env.addSource(new FlinkKafkaConsumer<>("sensor-data-topic", new SimpleStringSchema(), props));


        DataStream<ProcessedData> processedData = sensorData
            .map(json -> parseJson(json)) // 解析JSON数据
            .keyBy(ProcessedData::getDeviceId)
            .window(TumblingEventTimeWindows.of(Time.seconds(10))) // 10秒滚动窗口
            .apply(new SensorDataProcessor()); // 自定义处理逻辑


        processedData.print(); // 打印处理后的数据,可以替换为写入 CnosDB 操作


        env.execute("SensorDataProcessingJob");
    }
}

3.数据写入与存储

配置CnosDB Sink,将 processedData.print() 替换为写入 CnosDB 的程序在 CnosDB 创建一个存储数据时长为 30 天的数据库:

| CnosDB 建库语法说明请查看:创建数据库[https://docs.cnosdb.com/zh/latest/reference/sql.html#创建数据库]

CREATE DATABASE IF NOT EXISTS "db_flink_test" WITH TTL '30d' SHARD 2 VNODE_DURATION '1d' REPLICA 2;

在 Maven [https://maven.apache.org/]中引入 CnosBD Sink [https://docs.cnosdb.com/zh/latest/reference/connector/flink-connector-cnosdb.html]包:

<dependency>
    <groupId>com.cnosdb</groupId>
    <artifactId>flink-connector-cnosdb</artifactId>
    <version>1.0</version>
</dependency>

编写程序:

public class WriteToCnosDBJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "kafka-broker1:9092,kafka-broker2:9092");
        props.setProperty("group.id", "sensor-data-consumer-group");


        DataStream<String> sensorData = env.addSource(new FlinkKafkaConsumer<>("sensor-data-topic", new SimpleStringSchema(), props));


        DataStream<ProcessedData> processedData = sensorData
            .map((MapFunction<String, ProcessedData>) json -> parseJson(json)) // 解析JSON数据
            .keyBy(ProcessedData::getDeviceId)
            .window(TumblingEventTimeWindows.of(Time.seconds(10))) // 10秒滚动窗口
            .apply(new SensorDataProcessor()); // 自定义处理逻辑


        DataStream<CnosDBPoint> cnosDBDataStream = processedData.map(
                new RichMapFunction<ProcessedData, CnosDBPoint>() {
                    @Override
                    public CnosDBPoint map(String s) throws Exception {
                        return new CnosDBPoint("sensor_metric")
                                .time(value.getTimestamp().toEpochMilli(), TimeUnit.MILLISECONDS)
                                .tag("device_id", value.getDeviceId())
                                .field("average_temperature", value.getAverageTemperature())
                                .field("max_humidity", value.getMaxHumidity());
                    }
                }
        );


        CnosDBConfig cnosDBConfig = CnosDBConfig.builder()
                .url("http://localhost:8902")
                .database("db_flink_test")
                .username("root")
                .password("")
                .build();


        cnosDBDataStream.addSink(new CnosDBSink(cnosDBConfig));
        env.execute("WriteToCnosDBJob");
    }
}

运行后查看结果:

db_flink_test ❯ select * from sensor_metric limit 10;
+---------------------+---------------+---------------------+--------------+
| time                | device_id     | average_temperature | max_humidity |
+---------------------+---------------+---------------------+--------------+
| 2023-01-14T17:00:00 | OceanSensor1  | 23.5                | 79.0         |
| 2023-01-14T17:05:00 | OceanSensor2  | 21.8                | 68.0         |
| 2023-01-14T17:10:00 | OceanSensor1  | 25.2                | 75.0         |
| 2023-01-14T17:15:00 | OceanSensor3  | 24.1                | 82.0         |
| 2023-01-14T17:20:00 | OceanSensor2  | 22.7                | 71.0         |
| 2023-01-14T17:25:00 | OceanSensor1  | 24.8                | 78.0         |
| 2023-01-14T17:30:00 | OceanSensor3  | 23.6                | 80.0         |
| 2023-01-14T17:35:00 | OceanSensor4  | 22.3                | 67.0         |
| 2023-01-14T17:40:00 | OceanSensor2  | 25.9                | 76.0         |
| 2023-01-14T17:45:00 | OceanSensor4  | 23.4                | 70.0         |
+---------------------+---------------+---------------------+--------------+

总结

通过结合Flink、Kafka 和 CnosDB,您可以构建一个强大的实时数据处理流水线,从数据采集到实时处理再到数据存储和可视化。每个步骤都涉及具体的配置和代码实现,确保您熟悉每个工具的特性和操作。这种架构适用于各种实时数据应用,如物联网监控、实时报表和仪表板等。根据您的需求和情境,调整配置和代码,以构建适合您业务的实时数据处理解决方案。

CnosDB简介

CnosDB是一款高性能、高易用性的开源分布式时序数据库,现已正式发布及全部开源。

欢迎关注我们的社区网站:https://cn.cnosdb.com

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/969437.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

论文的开题报告怎么写?

最近收到很多私信&#xff0c;在问我关于开题报告的问题。基本都是毕业论文题目怎样选&#xff1f;系统好不好弄&#xff1f;开题报告怎么写啊&#xff1f;啥也不会怎样办呢&#xff1f;系统运行不会&#xff1f;查重问题呀&#xff0c;要马上交开题报告了等等。 毕业论文题目怎…

Python 之__name__的用法以及解释

文章目录 介绍代码 介绍 __name__ 是一个在 Python 中特殊的内置变量&#xff0c;用于确定一个 Python 文件是被直接运行还是被导入为模块。 文件作为模板导入&#xff0c;则其 __name__属性值被自动设置为模块名 文件作为程序直接运行&#xff0c;则__name__属性属性值被自动设…

【FPGA零基础学习之旅#12】三线制数码管驱动(74HC595)串行移位寄存器驱动

&#x1f389;欢迎来到FPGA专栏~三线制数码管驱动 ☆* o(≧▽≦)o *☆嗨~我是小夏与酒&#x1f379; ✨博客主页&#xff1a;小夏与酒的博客 &#x1f388;该系列文章专栏&#xff1a;FPGA学习之旅 文章作者技术和水平有限&#xff0c;如果文中出现错误&#xff0c;希望大家能指…

stm32之30.DMA

DMA&#xff08;硬件加速方法&#xff09;一般用于帮运比较大的数据&#xff08;如&#xff1a;摄像头数据图像传输&#xff09;&#xff0c;寄存器-》DMA-》RAM 或者 RAM-》DMA-》寄存器提高CPU的工作效率 源码-- #include "myhead.h" #include "adc.h"#…

小白的第一个RNN(情感分析模型)

平台&#xff1a;window10&#xff0c;python3.11.4&#xff0c;pycharm 框架&#xff1a;keras 编写日期&#xff1a;20230903 数据集&#xff1a;英语&#xff0c;自编&#xff0c;训练集和测试集分别有4个样本&#xff0c;标签有积极和消极两种 环境搭建 新建文件夹&am…

【Sentinel】Sentinel与gateway的限流算法

文章目录 1、Sentinel与Hystrix的区别2、限流算法3、限流算法对比4、Sentinel限流与Gateway限流 1、Sentinel与Hystrix的区别 线程隔离有两种方式实现&#xff1a; 线程池隔离&#xff08;Hystrix默认采用&#xff09;信号量隔离&#xff08;Sentinel默认采用&#xff09; 服…

2023.09.03 学习周报

文章目录 摘要文献链接题目亮点本文工作 题目亮点本文工作 题目亮点本文工作 大气污染物传输的相关内容总结 摘要 本周阅读了三篇论文&#xff0c;第一篇文章的核心为改进PageRank算法和标签传播算法实现大气污染物传输分析模型&#xff0c;第二篇文章的核心为将SOD、VGG和LST…

9.3.tensorRT高级(4)封装系列-自动驾驶案例项目self-driving-车道线检测

目录 前言1. 车道线检测总结 前言 杜老师推出的 tensorRT从零起步高性能部署 课程&#xff0c;之前有看过一遍&#xff0c;但是没有做笔记&#xff0c;很多东西也忘了。这次重新撸一遍&#xff0c;顺便记记笔记。 本次课程学习 tensorRT 高级-自动驾驶案例项目self-driving-车道…

AJAX学习笔记2发送Post请求

AJAX学习笔记1发送Get请求_biubiubiu0706的博客-CSDN博客 继续 AJAX发送POST请求 无参数 测试 改回来 测试 AJAX POST请求 请求体中提交参数 测试 后端打断点 如何用AJAX模拟form表单post请求提交数据呢&#xff1f; 设置请求头必须在open之后,send之前 请求头里的设置好比…

yolov5手机版移植

感谢阅读 运行export.py然后百度一个onnx转化工具下载yolov5移动版文件和ncnn修改代码CMakeLists.txt修改修改param的参数![在这里插入图片描述](https://img-blog.csdnimg.cn/7c929414761840db8a2556843abcb2b3.jpeg)yolov5ncnn_jni.cpp修改修改stride16和stride32完工 运行ex…

【AWS实验 】在 AWS Fargate 上使用 Amazon ECS 部署应用程序

文章目录 实验概览目标实验环境任务 1&#xff1a;连接到实验命令主机任务 2&#xff1a;将应用程序容器化任务 3&#xff1a;构建 Web2048 容器任务 4&#xff1a;创建 Amazon ECR 存储库并推送 Docker 映像任务 5&#xff1a;创建 ECS 集群任务 6&#xff1a;测试应用程序总结…

12.redis 持久化

redis 持久化 redis 持久化redis持久化策略RDB > Redis DataBase 定期备份rdb 文件处理rdb 优缺点 AOF > Append Only File 实时备份AOF 工作流程AOF 缓冲区刷新策略AOF 重写机制AOF 重写流程 混合持久化持久化流程总结 redis 持久化 redis 是一个内存数据库&#xff0c…

Mybatis学习|动态sql、动态sql标签

动态SQL 什么是动态SQL: 动态SQL就是指根据不同的条件生成不同的SQL语句 动态SQL就是在拼接SQL语句&#xff0c;我们只要保证SQL的正确性&#xff0c;按照SQL的格式&#xff0c;去排列组合就可以了! 搭建环境 创建一个基础工程 1.导包 2.编写配置文件 3.编写实体类 4.编写实…

OS 内存换入换出

当通过逻辑地址得到虚拟地址&#xff0c;但是发现虚拟地址没有对应的页框号时&#xff0c;就要中断&#xff0c;然后从磁盘中找把这一页读进来&#xff0c;再把页表中的影射做好&#xff0c;再接着原来的程序。 缺页中断进行中断处理 getfreepage 得到物理空闲页 下一句从磁…

9.2.tensorRT高级(4)封装系列-自动驾驶案例项目self-driving-深度估计

目录 前言1. 深度估计总结 前言 杜老师推出的 tensorRT从零起步高性能部署 课程&#xff0c;之前有看过一遍&#xff0c;但是没有做笔记&#xff0c;很多东西也忘了。这次重新撸一遍&#xff0c;顺便记记笔记。 本次课程学习 tensorRT 高级-自动驾驶案例项目self-driving-深度估…

【电路参考】缓启动电路

一、外部供电直接上电可能导致的问题 1、在热拔插的过程中&#xff0c;两个连接器的机械接触&#xff0c;触点在瞬间会出现弹跳&#xff0c;电源不稳&#xff0c;发生震荡。这期间系统工作可能造成不稳定。 2、由于电路中存在滤波或大电解电容&#xff0c;在上电瞬间&#xff…

基于ResNet18网络训练二分类模型

目录 一、背景介绍 二、数据构建 三、模型构建及训练 3.1 采用预训练的权重进行训练 3.2 固定模型的参数&#xff0c;训练过程中不更新 3.3 如何保存训练好的模型&#xff1f; 3.4 如何查看可视化训练过程&#xff1f; 四、模型预测 五、查看网络各层的参数 六、可视…

船舶稳定性和静水力计算——绘图体平面图,静水力,GZ计算(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

Navigation2学习笔记--总揽nav2_bringup导航包launch文件

launch文件是一个包的窗口&#xff0c;通过这个窗口我们可以知道一个软件包能干什么&#xff0c;具体调动了什么节点&#xff0c;需要加载什么参数&#xff0c;下面我们从总体看里面每个launch文件的作用。 环境&#xff1a;utuntu20.04 ros2 foxy nav2不同版本大同小异。 …

keil 编译stm32,编译信息释义

文章目录 上图中&#xff1a; Code&#xff1a;表示代码大小&#xff0c;占用 279420 字节。 RO-Data&#xff1a;表示只读数据所占的空间大小&#xff0c;一般是指 const 修饰的数据大小。 RW-Data&#xff1a;表示有初值&#xff08;且非 0&#xff09;的可读写数据所占的空…