使用IntelliJ Idea开发Flink应用程序

news2024/10/6 1:42:07

使用IntelliJ Idea开发Flink应用程序

  • 一、实验目的
  • 二、实验内容
  • 三、实验原理
  • 四、实验环境
  • 五、实验步骤
    • 5.1 启动IntelliJ Idea并创建flink项目
    • 5.2 编写flink代码
      • 5.2.1 准备工作
      • 5.2.2 批处理
      • 5.2.3 有界流处理
      • 5.2.4 无界流处理

⚠申明: 未经许可,禁止以任何形式转载,若要引用,请标注链接地址。 全文共计5696字,阅读大概需要3分钟
🌈更多学习内容, 欢迎👏关注👀【文末】我的个人微信公众号:不懂开发的程序猿
个人网站:https://jerry-jy.co/

一、实验目的

掌握IntelliJ Idea创建Flinnk应用程序的过程。

需求:统计一段文字中,每个单词出现的频次。

二、实验内容

1、使用IntelliJ Idea创建flink应用程序。

三、实验原理

Apache Flink擅长处理无界和有界数据集 精确的时间控制和状态化使得 Flink 的运行时(runtime)能够运行任何处理无界流的应用。有界流则由一些专为固定大小数据集特殊设计的算法和数据结构进行内部处理,产生了出色的性能。

无界流 有定义流的开始,但没有定义流的结束。它们会无休止地产生数据。无界流的数据必须持续处理,即数据被摄取后需要立刻处理。我们不能等到所有数据都到达再处理,因为输入是无限的,在任何时候输入都不会完成。处理无界数据通常要求以特定顺序摄取事件,例如事件发生的顺序,以便能够推断结果的完整性。

有界流 有定义流的开始,也有定义流的结束。有界流可以在摄取所有数据后再进行计算。有界流所有数据可以被排序,所以并不需要有序摄取。有界流处理通常被称为批处理

在这里插入图片描述

四、实验环境

硬件:x86_64 CentOS 7.5 服务器
软件:JDK1.8,Flink-1.17.1,Hadoop-3.3.3,IntelliJ Idea-2022

五、实验步骤

5.1 启动IntelliJ Idea并创建flink项目

1、创建一个Maven工程,,结构目录如左下


在这里插入图片描述


2、导入POM依赖

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.jerry</groupId>
    <artifactId>FlinkTutorial1.17</artifactId>
    <version>1.0-SNAPSHOT</version>
    <name>Archetype - FlinkTutorial1.17</name>
    <url>http://maven.apache.org</url>

    <properties>
        <flink.version>1.17.0</flink.version>
    </properties>


    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

</project>

5.2 编写flink代码

5.2.1 准备工作

环境准备:在src/main/java目录下,新建一个包,命名为com.jerry.wordcount
数据准备:
(1)在工程根目录下新建一个input文件夹,并在下面创建文本文件word.txt
(2)在words.txt中输入一些文字,例如:

hello flink
hello world
hello java

5.2.2 批处理

1、批处理基本思路:
(1)先逐行读入文件数据,然后将每一行文字拆分成单词;
(2)接着按照单词分组,统计每组数据的个数,就是对应单词的频次。

2、编写代码

package com.jerry.wordcount;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.shaded.asm9.org.objectweb.asm.tree.analysis.Value;
import org.apache.flink.util.Collector;

/**
 * ClassName: WordCountBatchDemo <br>
 * Package: com.jerry.wordcount <br>
 * Description:
 *
 * @Author: jerry_jy
 * @Create: 2023-06-12 16:29
 * @Version: 1.0
 */
public class WordCountBatchDemo {
    public static void main(String[] args) throws Exception {
        // TODO 1. 创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // TODO 2. 读取数据:从文件中读取
        DataSource<String> dataSource = env.readTextFile("input/word.txt");

        // TODO 3. 按行切分、转换(word,1)
        FlatMapOperator<String, Tuple2<String, Integer>> wordAndOne = dataSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                // TODO 3.1 按照空格切分单词
                String[] words = s.split(" ");
                // TODO 3.2 将单词转为(word,1)
                for (String word : words) {
                    Tuple2<String, Integer> wordTuple2 = Tuple2.of(word, 1);
                    // TODO 3.3 使用 Collector 向下游发送数据
                    collector.collect(wordTuple2);
                }

            }
        });
        // TODO 4. 按照 word 分组
        // 这里的0是位置,代表第1个元素
        UnsortedGrouping<Tuple2<String, Integer>> wordAndOnrGroupby = wordAndOne.groupBy(0);

        // TODO 5. 各分组内聚合
        // 这里的1是位置,代表第二个元素
        AggregateOperator<Tuple2<String, Integer>> sum = wordAndOnrGroupby.sum(1);

        // TODO 6. 输出
        sum.print();
    }
}

3、输出

(flink,1)
(world,1)
(hello,3)
(java,1)

需要注意的是,这种代码的实现方式,是基于DataSet API的,也就是我们对数据的处理转换,是看作数据集来进行操作的。事实上Flink本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的API来实现。所以从Flink 1.12开始,官方推荐的做法是直接使用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理:

$ bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar

这样,DataSet API就没什么用了,在实际应用中我们只要维护一套DataStream API就可以。这里只是为了方便大家理解,我们依然用DataSet API做了批处理的实现。

5.2.3 有界流处理

对于Flink而言,流才是整个处理逻辑的底层核心,所以流批统一之后的DataStream API更加强大,可以直接处理批处理和流处理的所有场景。

下面我们就针对不同类型的输入数据源,用具体的代码来实现流处理。

package com.jerry.wordcount;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * ClassName: WordCountStreamDemo <br>
 * Package: com.jerry.wordcount <br>
 * Description: dataStream 实现word count 读文件(有界流)
 *
 * @Author: jerry_jy
 * @Create: 2023-07-24 20:24
 * @Version: 1.0
 */
public class WordCountStreamDemo {
    public static void main(String[] args) throws Exception {
        // TODO 1. 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // TODO 2. 读取数据:从文件中读取
        DataStreamSource<String> lineDS = env.readTextFile("input/word.txt");

        // TODO 3. 按行切分、转换(word,1)
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                // TODO 3.1 按照空格切分单词
                String[] words = s.split(" ");
                // TODO 3.2 将单词转为(word,1)
                for (String word : words) {
                    Tuple2<String, Integer> wordsAndOne = Tuple2.of(word, 1);
                    // TODO 3.3 使用 Collector 向下游发送数据
                    collector.collect(wordsAndOne);
                }

            }
        });

        // TODO 4. 按照 word 分组
        KeyedStream<Tuple2<String, Integer>, String> wordAndOneKS = wordAndOneDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return stringIntegerTuple2.f0;
            }
        });


        // TODO 5. 各分组内聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> sumDS = wordAndOneKS.sum(1);


        // TODO 6. 输出
        sumDS.print();

        // TODO 7、启动,类似于sparkstreaming 最后的ssc.start()
        env.execute();

    }
}

输出:

3> (java,1)
5> (hello,1)
5> (hello,2)
5> (hello,3)
13> (flink,1)
9> (world,1)

主要观察与批处理程序BatchWordCount的不同:

  • 创建执行环境的不同,流处理程序使用的是StreamExecutionEnvironment。
  • 转换处理之后,得到的数据对象类型不同。
  • 分组操作调用的是keyBy方法,可以传入一个匿名函数作为键选择器(KeySelector),指定当前分组的key是什么。
  • 代码末尾需要调用env的execute方法,开始执行任务。

5.2.4 无界流处理

在实际的生产环境中,真正的数据流其实是无界的,有开始却没有结束,这就要求我们需要持续地处理捕获的数据。为了模拟这种场景,可以监听socket端口,然后向该端口不断的发送数据。

(1)将StreamWordCount代码中读取文件数据的readTextFile方法,替换成读取socket文本流的方法socketTextStream

具体代码实现如下:

package com.jerry.wordcount;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * ClassName: WordCountUnboundedDemo <br>
 * Package: com.jerry.wordcount <br>
 * Description: dataStream 实现wordCount 读socket(无界流)
 *
 * @Author: jerry_jy
 * @Create: 2023-07-24 20:50
 * @Version: 1.0
 */
public class WordCountUnboundedDemo {
    public static void main(String[] args) throws Exception {
        // TODO 1. 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // TODO 2. 读取数据:从文件中读取
        DataStreamSource<String> socketDS = env.socketTextStream("hadoop102", 7777);

        // TODO 3. 处理数据:切分、转换、分组、聚合
        socketDS
                .flatMap(
                        (String s, Collector<Tuple2<String, Integer>> collector) -> {
                            String[] words = s.split(" ");
                            for (String word : words) {
                                collector.collect(Tuple2.of(word, 1));
                            }
                        }

                )
                // 这里由于Lambda表达式存在类型擦除,所以必须指定返回元素的类型
                .returns(Types.TUPLE(Types.STRING,Types.INT))
                .keyBy(stringIntegerTuple2 -> stringIntegerTuple2.f0)
                .sum(1)
                .print();

        // TODO 4. 输出

        // TODO 5、执行
        env.execute();
    }
}

(2)在Linux环境的主机hadoop102上,执行下列命令,发送数据进行测试

[root@hadoop102 ~]# nc -lk 7777

说明:CentOS 下安装nc的命令

yum install -y netcat

注意:要先启动端口,后启动StreamWordCount程序,否则会报超时连接异常。

(3)启动StreamWordCount程序
我们会发现程序启动之后没有任何输出、也不会退出。这是正常的,因为Flink的流处理是事件驱动的,当前程序会一直处于监听状态,只有接收到数据才会执行任务、输出统计结果。

(4)从hadoop102发送数据
①在hadoop102主机中,输入“hello flink”,输出如下内容

13> (flink,1)
5> (hello,1)

②再输入“hello world”,输出如下内容

2> (world,1)
5> (hello,2)

提醒:
Flink还具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。但是,由于Java中泛型擦除的存在,在某些特殊情况下(比如Lambda表达式中),自动提取的信息是不够精细的——只告诉Flink当前的元素由“船头、船身、船尾”构成,根本无法重建出“大船”的模样;这时就需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。
因为对于flatMap里传入的Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple2<String, Long>。只有显式地告诉系统当前的返回类型,才能正确地解析出完整数据。

注意:
前面的POM.xml文件中关于flink的依赖作用域是provided,也就是说,在生产环境下不会被打包上传,这里我们需要在【Run】–> 【Edit Configuration】中做如下配置,才能运行成功程序


在这里插入图片描述

–end–

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/786874.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

剑指Offer第一章——整数

1. 整数 1.1 整数的基础知识 整数是一种基本的数据类型。编程语言可能会提供占据不同内存 空间的整数类型&#xff0c;每种类型能表示的整数的范围也不相同。例如&#xff0c;Java中有4种不同的整数类型&#xff0c;分别为8位的byte&#xff08;-&#xff5e;-1&#xff09;、…

Minecraft 1.20.x Forge模组开发 04.动画效果物品

我们本次实现一个具有动画效果的流星锤: 效果演示 效果演示 效果演示 首先,请确保你的开发包中引入了geckolib依赖,相关教程请参考:Minecraft 1.20.x Forge模组开发 03.动画生物实体 1.首先我们要使用geckolib制作一个物品和对应的动画: 在blockbench中新建一个

Android TelephonyManager双卡获取数据开启状态异常的可能原因

背景 应用内不指定subId获取数据状态可能会错误&#xff0c;因为可能拿到voice的能力&#xff0c;而非data。 代码逻辑 1、通过TelephonyManager的isDataEnabled()没有指定subId时&#xff0c;调用内部方法isDataEnabledForReason&#xff0c;传入getId()参数以指定subid&am…

设备监测诊断与维护:优化工业生产效率的关键措施

在现代工业生产中&#xff0c;设备的稳定运行对于保障生产效率至关重要。设备监测、诊断和维护作为关键措施&#xff0c;能够帮助企业及时发现设备问题、诊断故障原因&#xff0c;并采取有效维护措施&#xff0c;从而降低生产中断风险&#xff0c;提高生产效率。本文将深入探讨…

【二叉树】利用前序和中序遍历结果生成二叉树并输出其后序和层序遍历结果

&#x1f680;个人主页&#xff1a;为梦而生~ 关注我一起学习吧&#xff01; ⭐️往期关于树的文章&#xff1a; 【哈夫曼树】基本概念、构建过程及C代码 【线索二叉树】C代码及线索化过程详解 欢迎阅读&#xff01; 实验内容 根据二叉树先序和中序遍历的结果,生成该二叉树。并…

pytest自动化测试框架tep环境变量、fixtures、用例三者之间的关系

tep是一款测试工具&#xff0c;在pytest测试框架基础上集成了第三方包&#xff0c;提供项目脚手架&#xff0c;帮助以写Python代码方式&#xff0c;快速实现自动化项目落地。 在tep项目中&#xff0c;自动化测试用例都是放到tests目录下的&#xff0c;每个.py文件相互独立&…

低代码治好了CIO们的 “精神内耗”

目录 一、前言 二、低代码在企业数字化转型中能“C位出道”&#xff0c;凭什么&#xff1f; 三、低代码的数字化构建能力 四、结束语 一、前言 近两年&#xff0c;我发现身边的CIO/CTO朋友都得了一种“病”——“数字化焦虑”症。 他们经常皱着眉头问我&#xff1a;“老板对数字…

gradio初体验

背景 近期随着很多开源大模型的出现&#xff0c;对于其如何落地&#xff0c;或者说充分地去挖掘其实际应用领域和商业价值变得格外重要。于是乎&#xff0c;对于不懂技术的前方市场或销售人员&#xff0c;如何在没有形成AI产品之前向其展示算法模型效果呢&#xff1f;这时候gr…

【严重】Citrix ADC 和 Citrix Gateway 远程代码执行漏洞(PoC)

漏洞描述 Citrix ADC是应用程序交付和负载平衡解决方案&#xff0c;Citrix Gateway是一套安全的远程接入解决方案&#xff0c;常用于提供虚拟桌面和远程桌面服务&#xff0c;此外&#xff0c;Citrix ADC还被广泛用作Windows堡垒机。 在 Citrix ADC 和 Citrix Gateway 受影响版…

《我为什么要听你的 如何与强势的人相处》读书笔记二

目录 反驳例子 勇敢反击的益处 一些重要的自我保护法 不要再讲述你生活中的细节 别答应那些表意不明的请求 记录下一切 第一时间告知亲友 拒绝成为中间人 区别对待强势者 谨慎应对奉承话 想方设法快点跑 反驳例子 例子 例子&#xff1a; 例子&#xff1a; 例子&am…

什么是UE像素流送,像素流推流是什么原理?

游戏开发者通常在运行游戏逻辑时会将游戏渲染到屏幕的同一台设备上来运行虚幻引擎应用&#xff0c;多人联网游戏可能会在应用程序的多个实例之间分发部分游戏逻辑&#xff0c;但每个单独的实例仍然会为自己的玩家在本地渲染游戏。即使是使用 HTML5 部署选项创建可以在 Web 浏览…

CountDownLatch 一个神奇的计数器,您了解吗

一、CountDownLatch基础概念及案例 1.CountDownLatch是java.util.concurrent 包下提供一个同步工具类&#xff0c;它允许一个或多个线程一直等待&#xff0c;直到其他线程执行完成再执行。其本质就是一个计数器&#xff0c;传入一个初始的值&#xff0c;调用await 方法会阻塞当…

vue+Element项目中v-for循环+表单验证

如果在Form 表单里有通过v-for动态生成&#xff0c;如何设置验证呢&#xff1f; <el-form ref"ruleFormRef" :model"ruleForm" status-icon :rules"rules" label-width"120px"class"demo-ruleForm" hide-required-aster…

大数据课程综合实验案例---课设问题汇总

最近翻看两年前的大数据课设&#xff0c;感觉这个大数据课设实验当时答辩 在大数据课设实验过程中&#xff0c;我遇到了很多问题&#xff0c;在这里做出汇总&#xff1a; 1、MySQL启动报错 首先&#xff0c;我的MySQL有时候启动不了&#xff0c;当我输入这个命令的时候&#…

ModuleNotFoundError: No module named sklearn

前言 出现ModuleNotFoundError: No module named sklearn’的debug过程记录 步骤 安装机器学习库&#xff0c;需要注意报错的sklearn是scikit-learn缩写。 pip install scikit-learn 完成&#xff0c;不再报错

剖析Linuxptp中ptp4l实现--OC

源码克隆地址&#xff1a; git://git.code.sf.net/p/linuxptp/code 项目官网文档&#xff1a; https://linuxptp.nwtime.org/documentation/ 关于linuxptp的相关配置可以参考以下博文&#xff1a; linuxptp/ptp4l PTP时钟同步配置选项 代码剖析 ptp4l的main函数在ptp4l.…

“学习嵌入式开发:明确目标,提升技能“

嵌入式领域涵盖广泛&#xff0c;不可能一次性掌握所有知识。因此&#xff0c;明确学习目标和方向非常重要。选择感兴趣且与职业发展相关的领域进行深入学习是明智之举。 嵌入式技术在不断发展&#xff0c;过去与现在存在差异。选择学习当前行业的主流技术和趋势是明智选择。掌…

基于STM32设计的人体健康监护系统(华为云IOT)

一、设计需求 1.1 设计需求总结 根据需求,要求设计一款基于 STM32 的人体健康监护系统。采用系统模块化思路进行,将多个数模传感器收集到的数据和操作指令一并送至 STM32 中心处理器进行处理分析。 该系统可以实时监测被测者的心率、体温以及周围环境的温度,也同时可以通…

生物信息学_玉泉路_课堂笔记_06 第六章 基因组学:遗传变异分析以及FGWAS

&#x1f345; 课程&#xff1a;生物信息学_玉泉路_课堂笔记 中科院_2022秋季课 第一学期 &#x1f345; 个人笔记使用 &#x1f345; 2023/7/21 课程回顾 第六章 基因组学&#xff1a;遗传变异分析以及FGWAS 第一节 SNP 与 Indel 的鉴定与分析 基本概念 参考基因组 和基因组…

tinkerCAD案例:9.Twist Earrings 扭耳环

tinkerCAD案例&#xff1a;9.Twist Earrings 扭耳环 In this lesson you learn how to create earrings by using cylinder shapes. Let’s get started! 在本课中&#xff0c;您将学习如何使用圆柱形制作耳环。让我们开始吧&#xff01; 说明 Drag a Cylinder shape to the w…