实战Flink Java api消费kafka实时数据落盘HDFS

news2024/11/14 5:42:50

文章目录

  • 1 需求分析
  • 2 实验过程
    • 2.1 启动服务程序
    • 2.2 启动kafka生产
  • 3 Java API 开发
    • 3.1 依赖
    • 3.2 代码部分
  • 4 实验验证
    • STEP1
    • STEP2
    • STEP3
  • 5 时间窗口

1 需求分析

在Java api中,使用flink本地模式,消费kafka主题,并直接将数据存入hdfs中。

flink版本1.13

kafka版本0.8

hadoop版本3.1.4

2 实验过程

2.1 启动服务程序

为了完成 Flink 从 Kafka 消费数据并实时写入 HDFS 的需求,通常需要启动以下组件:

[root@hadoop10 ~]# jps
3073 SecondaryNameNode
2851 DataNode
2708 NameNode
12854 Jps
1975 StandaloneSessionClusterEntrypoint
2391 QuorumPeerMain
2265 TaskManagerRunner
9882 ConsoleProducer
9035 Kafka
3517 NodeManager
3375 ResourceManager

确保 Zookeeper 在运行,因为 Flink 的 Kafka Consumer 需要依赖 Zookeeper。

确保 Kafka Server 在运行,因为 Flink 的 Kafka Consumer 需要连接到 Kafka Broker。

启动 Flink 的 JobManager 和 TaskManager,这是执行 Flink 任务的核心组件。

确保这些组件都在运行,以便 Flink 作业能够正常消费 Kafka 中的数据并将其写入 HDFS。

  • 具体的启动命令在此不再赘述。

2.2 启动kafka生产

  • 当前kafka没有在守护进程后台运行;
  • 创建主题,启动该主题的生产者,在kafka的bin目录下执行;
  • 此时可以生产数据,从该窗口键入任意数据进行发送。
kafka-topics.sh --zookeeper hadoop10:2181 --create --topic topic1 --partitions 1 --replication-factor 1

kafka-console-producer.sh --broker-list hadoop10:9092 --topic topic1

在这里插入图片描述

3 Java API 开发

3.1 依赖

此为项目的所有依赖,包括flink、spark、hbase、ck等,实际本需求无需全部依赖,均可在阿里云或者maven开源镜像站下载。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flink-test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <flink.version>1.13.6</flink.version>
        <hbase.version>2.4.0</hbase.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
           <!-- <scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!--<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.11</artifactId>
            <version>1.14.6</version>
        </dependency>-->

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop-2-uber</artifactId>
            <version>2.7.5-10.0</version>
        </dependency>

        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.24</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
        </dependency>

       <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.1.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>guava</artifactId>
                    <groupId>com.google.guava</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>log4j</artifactId>
                    <groupId>log4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>${hbase.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>guava</artifactId>
                    <groupId>com.google.guava</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.4.2</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>2.0.32</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hbase-2.2_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.20</version>
        </dependency>
    </dependencies>

    <build>
        <extensions>
            <extension>
                <groupId>org.apache.maven.wagon</groupId>
                <artifactId>wagon-ssh</artifactId>
                <version>2.8</version>
            </extension>
        </extensions>

        <plugins>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>wagon-maven-plugin</artifactId>
                <version>1.0</version>
                <configuration>
                    <!--上传的本地jar的位置-->
                    <fromFile>target/${project.build.finalName}.jar</fromFile>
                    <!--远程拷贝的地址-->
                    <url>scp://root:root@hadoop10:/opt/app</url>
                </configuration>
            </plugin>
        </plugins>

    </build>



</project>

  • 依赖参考
    在这里插入图片描述

3.2 代码部分

  • 请注意kafka和hdfs的部分需要配置服务器地址,域名映射。
  • 此代码的功能是消费topic1主题,将数据直接写入hdfs中。
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

public class Test9_kafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "hadoop10:9092");
        properties.setProperty("group.id", "test");

        // 使用FlinkKafkaConsumer作为数据源
        DataStream<String> ds1 = env.addSource(new FlinkKafkaConsumer<>("topic1", new SimpleStringSchema(), properties));

        String outputPath = "hdfs://hadoop10:8020/out240102";

        // 使用StreamingFileSink将数据写入HDFS
        StreamingFileSink<String> sink = StreamingFileSink
                .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
                .build();

        // 添加Sink,将Kafka数据直接写入HDFS
        ds1.addSink(sink);
        ds1.print();
        env.execute("Flink Kafka HDFS");
    }
}

4 实验验证

STEP1

运行idea代码,程序开始执行,控制台除了日志外为空。下图是已经接收到生产者的数据后,消费在控制台的截图。

在这里插入图片描述

STEP2

启动生产者,将数据写入,数据无格式限制,随意填写。此时发送的数据,是可以在STEP1中的控制台中看到屏幕打印结果的。
在这里插入图片描述

STEP3

在HDFS中查看对应的目录,可以看到数据已经写入完成。
我这里生成了多个inprogress文件,是因为我测试了多次,断码运行了多次。ide打印在屏幕后,到hdfs落盘写入,中间有一定时间,需要等待,在HDFS中刷新数据,可以看到文件大小从0到被写入数据的过程。
在这里插入图片描述

5 时间窗口

  • 使用另一种思路实现,以时间窗口的形式,将数据实时写入HDFS,实验方法同上。截图为发送数据消费,并且在HDFS中查看到数据。
    在这里插入图片描述

在这里插入图片描述

package day2;

import day2.CustomProcessFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

public class Test9_kafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "hadoop10:9092");
        properties.setProperty("group.id", "test");

        // 使用FlinkKafkaConsumer作为数据源
        DataStream<String> ds1 = env.addSource(new FlinkKafkaConsumer<>("topic1", new SimpleStringSchema(), properties));

        String outputPath = "hdfs://hadoop10:8020/out240102";

        // 使用StreamingFileSink将数据写入HDFS
        StreamingFileSink<String> sink = StreamingFileSink
                .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
                .build();

        // 在一个时间窗口内将数据写入HDFS
        ds1.process(new CustomProcessFunction())  // 使用自定义 ProcessFunction
                .addSink(sink);

        // 执行程序
        env.execute("Flink Kafka HDFS");
    }
}
package day2;

import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

public class CustomProcessFunction extends ProcessFunction<String, String> {
    @Override
    public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
        // 在这里可以添加具体的逻辑,例如将数据写入HDFS
        System.out.println(value);  // 打印结果到屏幕
        out.collect(value);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1364832.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

秒懂百科,C++如此简单丨第十五天:指针

目录 必看信息 Everyday English 前言 &#x1f4dd;了解指针 &#x1f4dd;定义指针 &#x1f4dd;分析指针 &#x1f4dd;运用指针 总结 必看信息 ▶本篇文章由爱编程的小芒果原创&#xff0c;未经许可&#xff0c;严禁转载。 ▶本篇文章被收录于秒懂百科&#xff0c…

网络层协议及IP编址

0x00 前言 本节为网络层协议及IP编址内容 IP地址的范围&#xff1a;0.0.0.0-255.255.255.255 IP分为网络位以及主机位。子网划分就是向主机位借位。 网络层协议 IPICMP&#xff08;internet Control message protocol&#xff09;IPX IP协议的作用 为网络层的设备提供逻…

2023湾区产城创新大会:培育数字化供应链金融新时代

2023年12月26日&#xff0c;由南方报业传媒集团指导&#xff0c;南方报业传媒集团深圳分社主办的“新质新力——2023湾区产城创新大会”在深圳举行。大会聚集里国内产城研究领域的专家学者以及来自产业园区、金融机构、企业的代表&#xff0c;以新兴产业发展为议题&#xff0c;…

【计算机网络】网络基础--协议/网络协议/网络传输流程/地址管理

文章目录 一、计算机网络背景二、协议1.协议是什么2.为什么要有协议 三、网络协议1.为什么要进行协议分层2.OSI七层模型3.TCP/IP五层(或四层)模型 四、网络传输基本流程1.协议报头2.局域网3.数据包封装和分用4.网络传输流程图 五、网络中的地址管理1.认识IP地址2.认识MAC地址3.…

C++与数据库MySQL锁——模拟订票(事务)

假设订票的时候&#xff0c;好几个人同时进入&#xff0c;查看这张票是否售出&#xff0c;假如同时购买了这张票&#xff0c;那对于售票行业来说&#xff0c;可能就会发生低级错误。那么如何避免这类事情发生呢&#xff1f; 解决办法&#xff1a; 在一个人访问的时候&#xf…

DataFunSummit:2023年知识图谱在线峰会-核心PPT资料下载

一、峰会简介 AIGC&#xff0c;ChatGPT以及发布的GPT-4相信已经给大家带来足够的冲击&#xff0c;那么对于知识图谱的应用产生哪些变化和变革&#xff1f;知识图谱在其中如何发挥作用呢&#xff1f;通过LLM是否有可能辅助创建通用大规模知识图谱&#xff1f;AIGC时代下行业知识…

burpsuite专业版的安装和破解(2024年最新)

burpsuite专业版的安装和破解&#xff08;2024年最新&#xff09; 简介视频教程下载BP专业版第一步第二步&#xff1a;下载第三步第四步&#xff1a;打开powershell界面第五步&#xff1a;在powershell中执行BurpLoaderKeygen.jar文件第六步&#xff1a;破解第七步&#xff1a;…

聚道云软件连接器,助力某钢铁行业公司实现发票信息自动同步

客户介绍&#xff1a; 某钢铁行业公司是一家大型现代化民营钢铁企业&#xff0c;拥有覆盖钢铁全产业链的冶金装备、技术和全过程信息系统。公司业务涉及钢铁、煤炭、房产等行业&#xff0c;多年来一直保持着稳健的发展态势。 添加图片注释&#xff0c;不超过 140 字&#xff0…

VlnPlot画的其实不是原始数据

昨天的推文描述了让小提琴图肚子变大的做法&#xff1a;让你的小提琴肚子大起来‍‍‍‍‍‍‍‍‍‍‍‍‍ 在此说明&#xff1a;这种不考虑后果&#xff0c;就让肚子大起来的做法是不严谨的。如需使用&#xff0c;建议将原始图和修改图放在一起对比&#xff0c;且在文章中注…

vue3+echart绘制中国地图并根据后端返回的坐标实现涟漪动画效果

1.效果图 2.前期准备 main.js app.use(BaiduMap, {// ak 是在百度地图开发者平台申请的密钥 详见 http://lbsyun.baidu.com/apiconsole/key */ak: sRDDfAKpCSG5iF1rvwph4Q95M6tDCApL,// v:3.0, // 默认使用3.0// type: WebGL // ||API 默认API (使用此模式 BMapBMapGL) });i…

K210基础实验系列

CanMV K210 开发板: CanMV K210 是由 01Studio 设计研发&#xff0c;基于嘉楠科技边缘计算芯片 K210 &#xff08; RSIC V 架构&#xff0c; 64 位双核&#xff09;方案的一款开发板&#xff0c;采用硬件一体化设计&#xff08; K210 核心板、 摄像头、 LCD 集成在一个…

服务器内存不足怎么办?会有什么影响?

服务器内存&#xff0c;也被称为RAM&#xff08;Random Access Memory&#xff09;&#xff0c;是一种临时存储设备&#xff0c;用于临时存放正在运行的程序和数据。它是服务器上的超高速存储介质&#xff0c;可以快速读取和写入数据&#xff0c;提供给CPU进行实时计算和操作。…

localhost和127.0.0.1的区别是什么

今天在网上逛的时候看到一个问题&#xff0c;没想到大家讨论的很热烈&#xff0c;就是标题中这个&#xff1a; localhost和127.0.0.1的区别是什么&#xff1f; 前端同学本地调试的时候&#xff0c;应该没少和localhost打交道吧&#xff0c;只需要执行 npm run 就能在浏览器中打…

Priors in Deep Image Restoration and Enhancement: A Survey

深度图像恢复和增强中的先验&#xff1a;综述 论文链接&#xff1a;https://arxiv.org/abs/2206.02070 项目链接&#xff1a;https://github.com/VLIS2022/Awesome-Image-Prior (Preprint. Under review) Abstract 图像恢复和增强是通过消除诸如噪声、模糊和分辨率退化等退化…

【北邮国院大四上】Business Technology Strategy 企业技术战略

北邮国院电商大四在读&#xff0c;本笔记仅为PPT内容的整理与翻译&#xff0c;并不代表本课程的考纲及重点&#xff0c;仅为本人复习时方便阅读与思考之作。 写在前面 大家好&#xff0c;欢迎来到大学期间的最后一门课程&#xff0c;本门课程是中方课&#xff0c;所以很庆幸的…

WEB 3D技术 three.js 顶点交换

本文 我们来说 顶点的转换 其实就是 我们所有顶点的位置发生转变 我们整个物体的位置也会随之转变 这里 我们编写代码如下 import ./style.css import * as THREE from "three"; import { OrbitControls } from "three/examples/jsm/controls/OrbitControls.j…

基于MATLAB的均值,方差,变量的矩(附完整代码与例题)

目录 一. 数学期望与方差 二. 样本的均值与方差 三. MATLAB代码 四. 例题与代码 4.1 正态分布 4.2 Rayleigh分布 五. 随机变量的矩 5.1 原点矩与中心距 5.2 例题3 5.3 样本向量的原点矩与中心矩 一. 数学期望与方差 将某连续随机变量x的概率密度函数记为p(x)&#x…

CentOS未能挂起虚拟机

问题&#xff1a; CentOS未能挂起虚拟机 解决方案&#xff1a; 1、切换到root 2、打开/etc/selinux/config 3、编辑fonfig文件SELINUXpermissive 4、重启VMware&#xff08;很重要&#xff01;&#xff01;&#xff01;简单粗暴&#xff0c;直接右上角x关机。&#xff09; …

《Linux C编程实战》笔记:创建线程

上一章是进程&#xff0c;这一章是线程 有关线程进程的概念之类的请自行学操作系统吧&#xff0c;书里都是偏实战应用的 线程创建函数pthread_create #include <pthread.h> int pthread_create(pthread_t *thread, const pthread_attr_t *attr,void *(*start_routine)…

【AI视野·今日NLP 自然语言处理论文速览 第六十九期】Wed, 3 Jan 2024

AI视野今日CS.NLP 自然语言处理论文速览 Wed, 3 Jan 2024 Totally 24 papers &#x1f449;上期速览✈更多精彩请移步主页 Daily Computation and Language Papers An Autoregressive Text-to-Graph Framework for Joint Entity and Relation Extraction Authors Zaratiana Ur…