创建第一个 Flink 项目

news2024/12/29 1:47:01

一、运行环境介绍

Flink执行环境主要分为本地环境和集群环境,本地环境主要为了方便用户编写和调试代码使用,而集群环境则被用于正式环境中,可以借助Hadoop Yarnk8sMesos等不同的资源管理器部署自己的应用。

环境依赖:
【1】JDK环境:Flink核心模块均使用 Java开发,所以运行环境需要依赖JDKJDK版本需要保证在1.8以上。
【2】Maven编译环境:Flink的源代码目前仅支持通过 Maven进行编译,所以如果需要对源代码进行编译,或通过IDE开发Flink Application,则建议使用Maven作为项目工程编译方式。需要注意的是,Flink程序需要Maven的版本在3.0.4及以上,否则项目编译可能会出问题,建议用户根据要求进行环境的搭建。
【3】IDEA:需要安装scala插件以及scala环境等;

二、Flink项目 Scala版 DataSet 有界流

需求:同进文件文件中的单词出现的次数;

【1】创建Maven项目,pom.xml文件中配置如下依赖

<dependencies>
   <dependency>
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-scala_2.12</artifactId>
       <version>1.10.0</version>
   </dependency>
   <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
   <dependency>
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-streaming-scala_2.12</artifactId>
       <version>1.10.0</version>
   </dependency>
</dependencies>

<build>
   <plugins>
       <!-- 该插件用于将Scala代码编译成class文件 -->
       <plugin>
           <groupId>net.alchim31.maven</groupId>
           <artifactId>scala-maven-plugin</artifactId>
           <version>3.4.6</version>
           <executions>
               <execution>
                   <goals>
                       <!--声明绑定到 maven 的compile阶段-->
                       <goal>compile</goal>
                   </goals>
               </execution>
           </executions>
       </plugin>
       <plugin>
           <groupId>org.apache.maven.plugins</groupId>
           <artifactId>maven-assembly-plugin</artifactId>
           <version>3.0.0</version>
           <configuration>
               <descriptorRefs>
                   <descriptorRef>jar-with-dependencies</descriptorRef>
               </descriptorRefs>
           </configuration>
           <executions>
               <execution>
                   <id>make-assembly</id>
                   <phase>package</phase>
                   <goals>
                       <goal>single</goal>
                   </goals>
               </execution>
           </executions>
       </plugin>
   </plugins>
</build>

【2】resource目录中添加需要进行统计的文件文件及内容
[点击并拖拽以移动] ​

【3】WordCount.java文件内容如下,需要注意隐私转换问题,需要引入scala._

 import org.apache.flink.api.scala._

/**
* @Description 批处理 word count
* @Author zhengzhaoxiang
* @Date 2020/7/12 18:55
* @Param
* @Return
*/
object WordCount {
  def main(args: Array[String]): Unit = {
    //创建一个批处理的执行环境
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    //从文件中读取数据
    var inputDateSet: DataSet[String] = env.readTextFile("E:\\Project\\flink\\src\\main\\resources\\wordcount.txt")
    //基于Dataset 做转换,首先按空格打散,然后按照 word作为key做group by
    val resultDataSet: DataSet[(String,Int)] = inputDateSet
      .flatMap(_.split(" "))//分词得到所有 word构成的数据集
      .map((_,1))//_表示当前 word 转换成一个二元组(word,count)
      .groupBy(0)//以二元组中第一个元素作为key
      .sum(1) //1表示聚合二元组的第二个元素的值

    //打印输出
    resultDataSet.print()
  }
}

【4】统计结果展示:
[点击并拖拽以移动] ​

三、Flink项目 Scala版 DataStream 无界流

【1】StreamWordCount.java文件内容如下

package com.zzx.flink

import org.apache.flink.streaming.api.scala._

object StreamWordCount {
 def main(args: Array[String]): Unit = {
   // 创建一个流处理执行环境
   val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
   // 接受 socket 文本流
   val inputDataStream: DataStream[String] = env.socketTextStream("hadoop1",6666);
   //定义转换操作 word count
   val resultDataStream: DataStream[(String,Int)] = inputDataStream
     .flatMap(_.split(" "))//以空格分词,得到所有的 word
     .filter(_.nonEmpty)
     .map((_,1))//转换成 word count 二元组
     .keyBy(0)//按照第一个元素分组
     .sum(1)//按照第二个元素求和

   resultDataStream.print()

   //上面的只是定义了处理流程,同时定义一个名称。不会让任务结束
   env.execute("stream word count word")
 }
}

【2】我这里在Hadoop1中通过nc -lk xxx打开一个socket通信
点击并拖拽以移动​

【3】查看IDEA输出统计内容如下:输出word的顺序不是按照输入的顺序,是因为它有并行度(多线程)是并行执行的。最前面的数字是并行子任务的编号类似线程号。最大的数字其实跟你cpu核数是息息相关的。这个并行度也可以通过env.setParallelism进行设置。我们也可以给每一个任务(算子)设置不同的并行度;
[点击并拖拽以移动] ​

【4】当我们需要将Java文件打包上传到Flink的时候,这里的hostport可以从参数中进行获取,代码修改如下:

package com.zzx.flink

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._

object StreamWordCount {
 def main(args: Array[String]): Unit = {
   // 创建一个流处理执行环境
   val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
   // 接受 socket 文本流  hostname:prot 从程序运行参数中读取
   val params: ParameterTool = ParameterTool.fromArgs(args);
   val hostname: String = params.get("host");
   val port: Int = params.getInt("port");
   val inputDataStream: DataStream[String] = env.socketTextStream(hostname,port);
   //定义转换操作 word count
   val resultDataStream: DataStream[(String,Int)] = inputDataStream
     .flatMap(_.split(" "))//以空格分词,得到所有的 word
     .filter(_.nonEmpty)
     .map((_,1))//转换成 word count 二元组
     .keyBy(0)//按照第一个元素分组
     .sum(1)//按照第二个元素求和

   resultDataStream.print()

   //上面的只是定义了处理流程,同时定义一个名称。不会让任务结束
   env.execute("stream word count word")
 }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1292765.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

RTL编码(2)——模块优化

一、顶层模块的划分 在RTL编码中&#xff0c;我们是以模块为单位进行设计的&#xff0c;模块之间的连接和嵌套关系对于电路结构有着很大的影响。一个好的系统设计中&#xff0c;我们应该使得模块尽量满足以下两个标准&#xff1a; 顶层模块扁平化内部模块层次化 1.1 顶层模块扁…

基于springboot + vue的社区医院信息系统

qq&#xff08;2829419543&#xff09;获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;springboot 前端&#xff1a;采用vue技术开发 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xf…

排序:快速排序(hoare版本)

目录 快速排序&#xff1a; 概念&#xff1a; 动画分析&#xff1a; 代码实现&#xff1a; 代码分析&#xff1a; 代码特性&#xff1a; 常见问题&#xff1a; 快速排序&#xff1a; 概念&#xff1a; 快速排序是Hoare于1962年提出的一种二叉树结构的交换排序方法&a…

LNMP网站架构分布式搭建部署(编译安装)

目录 一、数据库编译安装 二、nginx编译安装 三、php编译安装 三、通过nfs将三台不同的主机资源共享 四、基础测试 五、完成WordPress站点部署 六、完成bbs论坛站点部署 一、数据库编译安装 1、先下载安装包到/opt目录中&#xff0c;最好选择mysql-boost-5.7.44.tar.gz版…

Linux操作系统 中的用户管理,也就是关于用户的相关的操作与理解

目录 1 概念2 用户管理的类型3、模板目录4 查看历史命令5 相关文件6 创建用户useradd7 命令passwd 存储及shadow命令下的用户密码8 usermod命令&#xff08;修改属性&#xff09;9 密码设置10 删除命令userdel11 用户组的操作12 用户权限13 创建一个用户组&#xff0c;并且将这…

短视频账号矩阵系统源码搭建步骤包括以下几个方面:

短视频账号矩阵系统源码搭建步骤包括以下几个方面&#xff1a; 1. 确定账号类型和目标受众&#xff1a;确定要运营的短视频账号类型&#xff0c;如搞笑、美食、美妆等&#xff0c;并明确目标受众和定位。 2. 准备账号资料&#xff1a;准备相关资质和资料&#xff0c;如营业执照…

在Windows 11中,把iPhone照片和视频导出来又快又简单,无需第三方软件

如果你想将照片和视频从iPhone传输到Windows 11 PC&#xff0c;最快、最简单的方法是插入手机并执行自动导入。以下是操作方法。 如何将照片和视频从iPhone导入Windows 如果你用USB数据线将iPhone插入Windows PC&#xff0c;Windows 11可以像标准数码相机一样连接到它&#x…

nacos在win11无法正常启动,一闪而过,pause也不停止。【已解决】

问题一闪而过&#xff0c;无法抓屏。 1.首先排除配置文件和数据库问题&#xff1b; 2.问题及解决方法&#xff1a; 分析&#xff1a;startup.cmd在执行时&#xff0c;无法找到jdk&#xff1b; 解决方法&#xff1a;配置jdk到环境变量&#xff1b; 再次启动正常&#xff1b;

用Pandas轻松进行7项基本数据检查

大家好&#xff0c;作为一名数据工程师&#xff0c;面对糟糕的数据质量&#xff0c;可以使用Pandas执行快捷的数据质量检查。本文使用scikit-learn提供的California Housing数据集&#xff0c;进行基本数据检查。 一、California Housing数据集概述 【数据集】&#xff1a; …

黑苹果之显卡篇

一、什么是显卡 显卡GPU&#xff08;Video card、Display card、Graphics card、Video adapter&#xff09;是个人计算机基础的组成部分之一&#xff0c;将计算机系统需要的显示信息进行转换驱动显示器&#xff0c;并向显示器提供逐行或隔行扫描信号&#xff0c;控制显示器的正…

关键字volatile作用和用法

目录 一、多线程编程中的volatile关键字 二、嵌入式编程中的volatile关键字 三、 优化编译器优化 四、 指针类型转换 一个定义为volatile的变量是说这变量可能会被意想不到地改变&#xff0c;这样&#xff0c;编译器就不会去假设这个变量的值了。 精确地说就是&#xff0c;…

【云原生-K8s】镜像漏洞安全扫描工具Trivy部署及使用

基础介绍基础描述Trivy特点 部署在线下载百度网盘下载安装 使用扫描nginx镜像扫描结果解析json格式输出 总结 基础介绍 基础描述 Trivy是一个开源的容器镜像漏洞扫描器&#xff0c;可以扫描常见的操作系统和应用程序依赖项的漏洞。它可以与Docker和Kubernetes集成&#xff0c;…

shell命令学习(1)——(待完善)

explainshell.com shell统计当前文件夹下的文件个数、目录个数Linux之shell常用命令&#xff08;三&#xff09; sort&#xff08;排序&#xff09;、uniq&#xff08;处理重复字符&#xff09; linux中shell将换行输入到文件中 shell脚本&#xff0c;将多行内容写入文件中 f…

springboot086靓车汽车销售网站

springboot086靓车汽车销售网站 成品项目已经更新&#xff01;同学们可以打开链接查看&#xff01;需要定做的及时联系我&#xff01;专业团队定做&#xff01;全程包售后&#xff01; 2000套项目视频链接&#xff1a;https://pan.baidu.com/s/1N4L3zMQ9nNm8nvEVfIR2pg?pwd…

Javascript编程进阶 – 预定义函数

Javascript编程进阶 – 预定义函数 JavaScript Programming Advanced – Predefined Functions By JacksonML JavaScript引擎中包含了一组built-in functions(内建函数)。 本文简要介绍如何通过实践使用这些预定义函数并掌握传递参数和返回值。希望对您有所帮助。 JavaScri…

如何使用HadSky搭配内网穿透工具搭建个人论坛并发布至公网随时随地可访问

文章目录 前言1. 网站搭建1.1 网页下载和安装1.2 网页测试1.3 cpolar的安装和注册 2. 本地网页发布2.1 Cpolar临时数据隧道2.2 Cpolar稳定隧道&#xff08;云端设置&#xff09;2.3 Cpolar稳定隧道&#xff08;本地设置&#xff09;2.4 公网访问测试 总结 前言 经过多年的基础…

【我爱C语言】详解字符函数isdigit和字符串转换函数(atoi和snprintf实现互相转换字符串)三种strlen模拟实现

&#x1f308;write in front :&#x1f50d;个人主页 &#xff1a; 啊森要自信的主页 ✏️真正相信奇迹的家伙&#xff0c;本身和奇迹一样了不起啊&#xff01; 欢迎大家关注&#x1f50d;点赞&#x1f44d;收藏⭐️留言&#x1f4dd;>希望看完我的文章对你有小小的帮助&am…

使用Python实现轮盘赌选择法Roulette Wheel Selection Method in Python

一、引言 最近在手写遗传算法&#xff0c;想尝试解决一些优化问题。然而&#xff0c;在编码的过程中&#xff0c;自己发现了很多都不懂的问题。比如&#xff0c;交叉的操作&#xff0c;有单点交叉、两点交叉和多点交叉&#xff0c;具体选哪一种会更好呢&#xff1f;未知。还有交…

异常检测 | 基于孤立森林(Isolation Forest)的数据异常数据检测(结合t-SNE降维可视化)

异常检测 | MATLAB实现基于孤立森林的数据异常检测 目录 异常检测 | MATLAB实现基于孤立森林的数据异常检测效果一览基本介绍程序设计参考资料 效果一览 基本介绍 Matlab实现基于孤立森林(Isolation Forest)的数据异常数据检测可视化&#xff08;完整源码和数据) 基于孤立森林(…

好用免费的AI换脸5个工具

在当今社会的发展中&#xff0c;人工智能&#xff08;Artificial Intelligence, AI&#xff09;扮演着关键的角色&#xff0c;其应用领域不断扩展。作为AI的一个分支&#xff0c;换脸技术近年来备受欢迎。这项技术使得将一个人的面部特征迁移到另一个人的照片或视频成为可能。除…