Flink消费Kafka实时写入Doris

news2025/1/15 13:33:48

本文模拟实际生产环境,通过FileBeat采集日志信息到Kafka,再通过Flink消费Kafka实时写入Doris。

文章目录

    • Filebeat采集日志到Kafka
    • Flink消费Kafka实时写入Doris
    • 总结

在这里插入图片描述

Filebeat采集日志到Kafka

常见的日志采集工具有以下几种:Flume、Logstash和Filebeat

  • Flume采用Java编写,它是一个分布式、高度可靠且高度可用的工具,旨在高效地搜集、汇总和转移大量日志数据,该工具拥有一个简洁且灵活的流数据流架构,它配备了可调节的可靠性机制、故障切换以及恢复功能,此外,Flume通过简单且可扩展的数据模型支持在线分析应用程序。
  • Logstash是一个开源的日志管理和分析工具,它能够从多个数据源收集数据,对数据进行转换和清洗,并将处理后的数据传输到目标系统。
  • Filebeat是一款go语言编写的日志文件收集工具,当在服务器上部署其客户端后,它会持续监听特定的日志目录或日志文件,实时跟踪并读取这些文件的更新内容,并将这些数据发送到指定的输出目标,例如Elasticsearch或Kafka等。

这里选择Filebeat进行日志采集的主要原因在于其资源消耗极低,相较于Flume和Logstash,Filebeat占用的内存最少,对CPU的负载也最小。它的运行进程十分稳定,很少出现崩溃或宕机的情况。

首先下载Filebeat

curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.12.0-linux-x86_64.tar.gz

在这里插入图片描述
解压缩文件

tar xzvf filebeat-8.12.0-linux-x86_64.tar.gz

进入目录

cd filebeat-8.12.0-linux-x86_64

编写配置文件接入Kafka

vim filebeat.yaml

filebeat.yaml的文件内容

filebeat.inputs:
- type: log
  paths:
    - /doc/input/*.log  # 更换为你的日志文件路径
processors:
  - include_fields:
      fields: ["message"]
output.kafka:
  # 更换为你的Kafka地址和主题.
  hosts: ["192.168.235.130:9092"]
  topic: k2gg
  codec:
    format:
      string: '%{[message]}'

运行Filebeat采集日志

./filebeat -e -c ./filebeat.yaml

在这里插入图片描述

这是log日志的信息,现要求保持原始格式发送到Kafka
在这里插入图片描述Filebeat采集日志信息发送到Kafka的主题,消费者收到的信息如下,Filebeat会添加一些自带的数据,比如时间戳和元数据等,但是一般情况下只需要采集message里面的信息,通过filebeat.yaml中的processors和codec即可实现。
在这里插入图片描述processors处理只保留 message 的字段信息,其他字段将被丢弃,codec用于定义数据的编码格式,将 message 字段的值作为字符串发送到 kafka,这样就可以保留日志信息的原始格式发送到Kafka。
消费者消费原始格式的日志消息
在这里插入图片描述

Flink消费Kafka实时写入Doris

在写入之前,建立doris的数据表用于接收消费的信息

CREATE TABLE transactions (
  timestamp datetime,
  user_id INT,
  transaction_type VARCHAR(50),
  amount DECIMAL(15, 2),
  currency CHAR(3),
  status VARCHAR(20),
  description TEXT
)
DISTRIBUTED BY HASH(user_id) BUCKETS 10
PROPERTIES("replication_num"="1");

引入依赖

   <dependency>
            <groupId>org.apache.doris</groupId>
            <artifactId>flink-doris-connector-1.16</artifactId>
            <version>24.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.17.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>1.17.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.17.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>1.17.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>1.17.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>1.17.0</version>
        </dependency>

主程序

package flink;

import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;


import java.util.Properties;

public class DorisWrite {

    public static void main(String[] args) throws Exception {

        Properties props = new Properties();
        //Kafka broker的地址
       props.put("bootstrap.servers", "192.168.235.130:9092");
       props.put("group.id", "test");

        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "latest");

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //指定消费的主题
        FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>("k2gg",new SimpleStringSchema(),props);

        DorisSink.Builder<String> builder = DorisSink.builder();
        DorisOptions.Builder dorisBuilder = DorisOptions.builder();
        //Doris的地址以及账号密码等信息
        dorisBuilder.setFenodes("192.168.235.130:8030")
                .setTableIdentifier("test.transactions")
                .setUsername("root")
                .setPassword("1445413748");


        Properties pro = new Properties();
        pro.setProperty("format", "json");
        pro.setProperty("read_json_by_line", "true");
        DorisExecutionOptions  executionOptions = DorisExecutionOptions.builder()
                .setLabelPrefix("label-doris12"+System.currentTimeMillis()) //streamload label prefix,
                .setStreamLoadProp(pro).build();

        builder.setDorisReadOptions(DorisReadOptions.builder().build())
                .setDorisExecutionOptions(executionOptions)
                .setSerializer(new SimpleStringSerializer()) //serialize according to string
                .setDorisOptions(dorisBuilder.build());

        DataStreamSource<String> dataStreamSource = env.addSource(flinkKafkaConsumer);

        // 将Kafka数据转换为JSON格式
        DataStream<String> jsonStream = dataStreamSource.map(new MapFunction<String, String>() {

            @Override
            public String map(String value) throws Exception {
                System.out.println("value"+value);
                // 分割字符串
                String[] parts = value.split(",");

                // 创建JSON字符串
                StringBuilder jsonString = new StringBuilder();
                jsonString.append("{");
                jsonString.append("\"timestamp\":\"").append(parts[0]).append("\",");
                jsonString.append("\"user_id\":").append(parts[1]).append(",");
                jsonString.append("\"transaction_type\":\"").append(parts[2]).append("\",");
                jsonString.append("\"amount\":").append(parts[3]).append(",");
                jsonString.append("\"currency\":\"").append(parts[4]).append("\",");
                jsonString.append("\"status\":\"").append(parts[5]).append("\",");
                jsonString.append("\"description\":\"").append(parts[6].replace("\"", "")).append("\"");
                jsonString.append("}");

                return jsonString.toString();
            }
        });

      
        jsonStream.print();
        jsonStream.sinkTo(builder.build());

        env.execute("flink kafka to doris by datastream");
    }
}

运行主程序通过Flink消费Kafka的信息写入doris
在这里插入图片描述log日志的信息
在这里插入图片描述
登录Doris进行验证

mysql -h k8s-master -P 9030 -uroot -p

这是没运行主程序之前doris的数据,没有2024-10-15这一天的数据。

select * from transactions where date(timestamp) = "2024-10-15";

在这里插入图片描述
运行主程序之后,Flink将Kafka主题的信息实时写入Doris。
在这里插入图片描述

总结

1.Filebeat格式问题
Filebeat采集日志格式会添加一些自带的额外信息,一般情况下只需要message里面的字段信息,那么yaml文件配置processors和codec属性即可。processors处理只保留 message 的字段信息,其他字段将被丢弃,codec用于定义数据的编码格式,将 message 字段的值作为字符串发送到 kafka,这样就可以保留日志信息的原始格式发送到Kafka。
2.Flink消费Kafka失败
Flink在消费Kafka主题的过程中,不要往该主题发送其他格式的数据,否则会解析失败,尽量新建一个新主题来接收Filebeat采集过来的日志信息。如果还是执行失败,可以尝试在setLabelPrefix添加一个时间戳,这样保证每次生成的标签前缀都不一样,这是因为客户端会生成一个唯一的标签来标识这次导入Doris的操作,Doris服务器会根据这个标签来跟踪导入的进度和状态,如果导入过程中出现问题,Doris会保留失败的数据,客户端就可以通过标签重新导入这些数据。
3.实时写入Doris失败
Flink处理字段的数据类型要与Doris匹配,可以参考官方文档Doris 和 Flink 列类型映射关系。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2219308.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

自动机器学习(AutoML)

utoML是PAI的提供的自动寻找超参组合的机器学习增强型服务。您在训练模型时&#xff0c;如果超参组合复杂度过高&#xff0c;需大量训练资源和手工调试工作&#xff0c;可以使用AutoML来节省模型调参时间&#xff0c;提升模型调优效率和模型质量。 基础概念 超参数&#xff1a;…

Spring 获取URL中的参数

PathVariable 获取多个变量参数重命名 获取 URL 中的 Id&#xff0c;可以根据 Id 到数据库中筛选相应的内容 Id 的类型是可以定义的&#xff0c;这里定义为 Integer 类型 并且在 RequestMapping中需要定义路径 {articleId} PathVariable 从路径中获取 变量 获取多个变量 参数…

【软件运行类文档】项目试运行方案,试运行计划书(word原件)

一、 试运行目的 &#xff08;一&#xff09; 系统功能、性能与稳定性考核 &#xff08;二&#xff09; 系统在各种环境和工况条件下的工作稳定性和可靠性 &#xff08;三&#xff09; 检验系统实际应用效果和应用功能的完善 &#xff08;四&#xff09; 健全系统运行管理体制&…

jmeter发送post请求

在jmeter中&#xff0c;有两种常用的请求方式&#xff0c;get和post.它们两者的区别在于get请求的参数一般是放在路径中&#xff0c;可以使用用户自定义变量和函数助手等方式进行参数化&#xff0c;而post请求的参数不能随url发送&#xff0c;而是作为请求体提交给服务器。而在…

打开游戏提示丢失(或找不到)XINPUT1_3.DLL的多种解决办法

xinput1_3.dll是一个动态链接库&#xff08;DLL&#xff09;文件&#xff0c;它在Windows操作系统中扮演着重要的角色。该文件作为系统库文件&#xff0c;通常存放于C:\Windows\System32目录下&#xff08;对于32位系统&#xff09;或C:\Windows\SysWOW64目录下&#xff08;对于…

PPT自动化:快速更换PPT图片(如何保留原图片样式等参数更换图片)

文章目录 📖 介绍 📖🏡 演示环境 🏡📒 PPT更换图片 📒1. 安装 `python-pptx` 模块2. 加载PPT文件3. 查找并替换图片3.1 查找图片形状3.2 获取原图片的样式和位置3.3 替换图片4. 保存修改后的PPT文件5. 设置图片的相关参数5.1 设置透明度5.2 设置边框🚀 保留所有参…

FFmpeg 4.3 音视频-多路H265监控录放C++开发四 :RGB颜色

一 RGB 的意义&#xff1f; 为什么要从RGB 开始讲起呢&#xff1f; 因为最终传输到显卡显示器的颜色都是RGB 即使能处理YUV的API&#xff0c;本质上也是帮你做了从 YUV 到 RGB的转换。 RGB888 表示 R 占8bit&#xff0c;G 占8bit&#xff0c;B 占8bit&#xff0c;也就是每一…

内网微隔离,三步防横移——基于微隔离的横移攻击防护方案

引言 在网络攻防的战场上&#xff0c;横移攻击如同隐匿的刺客&#xff0c;一旦突破边界&#xff0c;便在内网肆意游走&#xff0c;给企业网络安全带来致命损害。当前多数企业数据中心内网如同未设防的城池&#xff0c;面对突破外围边界的“刺客”毫无招架之力。蔷薇灵动基于多…

Thread类的介绍

线程是操作系统中的概念&#xff0c;操作系统中的内核实现了线程这种机制&#xff0c;同时&#xff0c;操作系统也提供了一些关于线程的API让程序员来创建和使用线程。 在JAVA中&#xff0c;Thread类就可以被视为是对操作系统中提供一些关于线程的API的的进一步的封装。 多线…

基于SpringBoot+Vue+uniapp微信小程序的澡堂预订的微信小程序的详细设计和实现

项目运行截图 技术框架 后端采用SpringBoot框架 Spring Boot 是一个用于快速开发基于 Spring 框架的应用程序的开源框架。它采用约定大于配置的理念&#xff0c;提供了一套默认的配置&#xff0c;让开发者可以更专注于业务逻辑而不是配置文件。Spring Boot 通过自动化配置和约…

Goland 搭建Gin脚手架

一、使用编辑器goland 搭建gin 打开编辑器 新建项目后 点击 create 二、获得Gin框架的代码 命令行安装 go get -u github.com/gin-gonic/gin 如果安装不上&#xff0c;配置一下环境 下载完成 官网git上下载 这样就下载完成了。、 不过这种方法需要设置一下GOPATH 然后再执…

Electron-(三)网页报错处理与请求监听

在前端开发中&#xff0c;Electron 是一个强大的框架&#xff0c;它允许我们使用 Web 技术构建跨平台的桌面应用程序。在开发过程中&#xff0c;及时处理网页报错和监听请求是非常重要的环节。本文将详细介绍 Electron 中网页报错的日志记录、webContents 的监听事件以及如何监…

【uniapp】打包成H5并发布

目录 1、设置配置mainifest.sjon 1.1 页面标题 1.2 路由模式 1.3 运行的基础路径 2、打包 2.1 打包入口 2.2 打包成功 2.3 依据目录找到web目录 3、 将web目录整体拷贝出来 4、上传 4.1 登录uniapp官网注册免费空间 4.2 上传拷贝的目录 4.3 检查上传是否正确 5、…

【软件测试】JUnit

Junit 是一个用于 Java 编程语言的单元测试框架&#xff0c;Selenium是自动化测试框架&#xff0c;专门用于Web测试 本篇博客介绍 Junit5 文章目录 Junit 使用语法注解参数执行顺序断言测试套件 Junit 使用 本篇博客使用 Idea集成开发环境 首先&#xff0c;创建新项目&#…

【Python-AI篇】人工智能python基础-计算机组成原理

1. 计算机组成原理 2. python基础&#xff08;查漏补缺&#xff09; 2.1 字符串 2.1.1 字符串查找方法 find()&#xff1a; 检测某个字符串是否包含在这个字符串中&#xff0c;在的话返回下标&#xff0c;不在的话返回-1index()&#xff1a; 检测某个字符串是否包含在这个字…

git命令使用一览【自用】

git常见操作&#xff1a; git initgit remote add master【分支名字】 gitgits.xxxxx【仓库中获取的ssh链接或者http协议的链接】检查远程仓库是否链接成功。 git remote -v出现以下画面就可以git pull,git push了

cefsharp63.0.3(Chromium 63.0.3239.132)支持H264视频播放-PDF预览 老版本回顾系列体验

一、版本 版本:Cef 63/CefSharp63.0.3/Chromium63.0.3239.132/支持H264/支持PDF预览 支持PDF预览和H264推荐版本 63/79/84/88/100/111/125 <

Java EE规范

1、简介 Java EE的全称是Java Platform, Enterprise Edition。早期Java EE也被称为J2EE&#xff0c;即Java 2 Platform Enterprise Edition的缩写。从J2EE1.5以后&#xff0c;就改名成为Java EE。一般来说&#xff0c;企业级应用具备这些特征&#xff1a;1、数据量特别大&…

java 文件File类概述

前言 在Java中&#xff0c;File类是一个与文件和目录&#xff08;文件夹&#xff09;路径名相关的抽象表示形式。它是java.io包中的一个重要类&#xff0c;用于表示和操作文件系统中的文件和目录。 File类的基本概念 表示路径&#xff1a;File类既可以表示文件路径&#xff…

【mod分享】波斯王子遗忘之沙高清重置,纹理,字体,贴图全部重置,特效增强,支持光追

各位好&#xff0c;今天小编给大家带来一款新的高清重置MOD&#xff0c;本次高清重置的游戏叫《波斯王子&#xff1a;遗忘之沙》。 《波斯王子&#xff1a;遗忘之沙》是由育碧&#xff08;Ubisoft&#xff09;开发并发行的一款动作类游戏&#xff0c;于2010年5月18日发行。游戏…