初探Flink的Java实现流处理和批处理

news2024/12/24 21:10:55

端午假期,夏日炎炎,温度连续40度以上,在家学习Flink相关知识,记录下来,方便备查。
开发工具:IntelliJ Idea
Flink版本:1.13.0
本次主要用Flink实现批处理(DataSet API) 和 流处理(DataStream API)简单实现。

第一步、创建项目与添加依赖

1)新建项目
打开Idea,新建Maven项目,包和项目命名,点击确定进入项目。
在这里插入图片描述
2)引入依赖
pom.xml文件中添加依赖,即Flink-java、flink-streaming、slf4j等, 可参考以下代码。

<properties>
    <flink.version>1.13.0</flink.version>
    <java.version>1.8</java.version>
    <scala.binary.version>2.12</scala.binary.version>
    <slf4j.version>1.7.2</slf4j.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!-- 日志-->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>${slf4j.version}</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>${slf4j.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-to-slf4j</artifactId>
        <version>2.16.0</version>
    </dependency>
</dependencies>

3)添加日志文件
resource目录下添加日志文件log4j.properties,内容如下所示。

log4j.rootLogger=error,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=@-4r [%t] %-5p %c %x - %m%n
第二步、构造数据集

在项目下新建 input 文件夹,用于存放数据集,在其下新建 words.txt 文件,即测试的数据集,如下图所示。
在这里插入图片描述

第三步、编写业务代码

读取数据集中内容,并进行单词的字数统计。新建 BatchWordCout 类,引入分6个步骤实现数据集的读取与打印。
方式一、批处理 DataSet API
主要处理步骤为
1)创建执行环境;
2)从环境中读取数据;
3)将每行数据进行分词,转化成二元组类型 扁平映射;
4)按照word进行分组;
5)分组内进行聚合统计;
6)打印结果
批处理 DataSet API 写法如下所示。

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class BatchWordCount {
    public static void main(String[] args) throws Exception {
        //1、创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 2、从环境中读取数据
        DataSource<String> lineDataSource = env.readTextFile("input/words.txt");
        // 3、将每行数据进行分词,转化成二元组类型 扁平映射
        FlatMapOperator<String,Tuple2<String,Long>> wordAndOneTuple = lineDataSource.flatMap((String line, Collector<Tuple2<String,Long>> out) -> {
            // 将每行文本进行拆分
            String[] words = line.split(" ");
            // 将每个单词转化成二元组
            for(String word : words){
                out.collect(Tuple2.of(word,1L));
            }
        }).returns(Types.TUPLE(Types.STRING,Types.LONG));
        // 4、按照word进行分组
         UnsortedGrouping<Tuple2<String,Long>> wordAndOneGroup =  wordAndOneTuple.groupBy(0);
         // 5、分组内进行聚合统计
        AggregateOperator<Tuple2<String, Long>> sum = wordAndOneGroup.sum(1);
        // 6、打印结果
        sum.print();

    }

控制台打印效果如下图所示。
在这里插入图片描述
在Flink 1.12 版本后,官方推荐做法是直接使用 DataSet API 即提交任务时将执行模式更改为BATCH来进行批处理
$bin/flink run -Dexecution.runtime-mode=BATCH batchWordCount.jar

方式二、流处理 DataStream API
流处理的处理步骤与批处理流程类似,主要区别是执行环境不一样。

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class BatchSteamWordCount {
    public static void main(String[] args) throws Exception {
        // 1、创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2、读取文件
        DataStreamSource<String> lineDataStreamSource = env.readTextFile("input/words.txt");
        // 3、转换计算
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = lineDataStreamSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            // 将每行文本进行拆分
            String[] words = line.split(" ");
            // 将每个单词转化成二元组
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));
        // 4、分组
        KeyedStream<Tuple2<String, Long>, Object> wordAndOneKeyedStream = wordAndOneTuple.keyBy(data -> data.f0);
        // 5、求和
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKeyedStream.sum(1);
        // 6、打印结果
        sum.print();
        // 7、启动执行
        env.execute();
    }
}

控制台输出结果如下图所示。
在这里插入图片描述
从打印结果可以看出 多线程执行,结果是无序;第一列数字与本地运行环境的CPU核数有关;

参考文档

【1】https://www.bilibili.com/video/BV133411s7Sa?p=9&vd_source=c8717efb4869aaa507d74b272c5d90be

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/681282.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SAM与Prompt的结合

1. SAM介绍 由Meta AI Research开发的Segment anything model&#xff08;简称SAM&#xff09;最近引起了广泛的关注。SAM在超过10亿个mask的大型分割数据集上进行了训练&#xff0c;能够在特定的图像上分割任何对象。在最初的SAM工作中&#xff0c;作者们使用了零样本迁移任务…

08- c语言字符串 (C语言)

一 字符串的定义及基本使用 1、什么是字符串 被双引号引用的字符集合&#xff01;例如&#xff1a;”hello” 、”world”&#xff0c;或者是以 \0 结尾的字符数组&#xff01;&#xff01;&#xff01; 比如&#xff1a;char ch[] {h, e, \0} 注意&#xff1a;”hello” 中…

GB50149-2010电气装置安装工程母线装置施工及验收规范

为了确保强硬钢丝绳金属封闭体绝缘金属封闭母线、绝缘子、硬件、穿墙套管等设备母线的安装质量,加快安装技术的进步,和确保设备的安全运行,使该规范。 本规范适用于总线设备安装了750 kv及以下的T范围施工和验收。 母线的安装应按照批准的设计文件施工。 设备和设备运输、储…

python:并发编程(二十五)

前言 本文将和大家一起探讨python并发编程的实际项目&#xff1a;win图形界面应用&#xff08;篇七&#xff0c;共八篇&#xff09;&#xff0c;系列文章将会从零开始构建项目&#xff0c;并逐渐完善项目&#xff0c;最终将项目打造成适用于高并发场景的应用。 本文为python并…

postman 文档、导出json脚本 导出响应数据 response ,showdoc导入postman json脚本 导出为文档word或markdown

生成文档 Collections中 选中文件夹 - ... (文件夹 功能小按钮) - view documentation : 保存响应数据 Response&#xff1a;&#xff08;如果导出接口数据&#xff0c;会同步导出响应数据&#xff09; 请求接口后&#xff0c;点击下方 Save as Example 可以保存响应数…

机器学习day20(前向传播的向量化代码,矩阵乘法)

前向传播的循环代码与向量化代码的对比 把X、B写作二维数组&#xff0c;即矩阵左边的for循环就可以用右边的np.matmul来实现matmul是numpy执行矩阵乘法的一种方式注意&#xff1a;此时所有的变量&#xff08;X、W、B、Z、A_out&#xff09;都是二维数组&#xff0c;即矩阵 向…

基础排序算法【归并排序+非递归版本+边界修正】

基础排序算法【归并排序非递归版本边界修正】 Ⅰ.归并排序(递归版本)①.分割②.归并③.拷贝 Ⅱ.非递归版本Ⅲ.边界修正 Ⅰ.归并排序(递归版本) 递归排序&#xff0c;采用的是分治法。分成子问题来处理。先让序列不断分割成子序列&#xff0c;当子序列有序后再合并。 对于一段…

Yolov5-Face 原理解析及算法解析

YOLOv5-Face 文章目录 YOLOv5-Face1. 为什么人脸检测 一般检测&#xff1f;1.1 YOLOv5Face人脸检测1.2 YOLOv5Face Landmark 2.YOLOv5Face的设计目标和主要贡献2.1 设计目标2.2 主要贡献 3. YOLOv5Face架构3.1 模型架构3.1.1 模型示意图3.1.2 CBS模块3.1.3 Head输出3.1.4 stem…

202320读书笔记|《宋词》——竹杖芒鞋轻胜马,谁怕?一蓑烟雨任平生

202320读书笔记&#xff5c;《宋词》——竹杖芒鞋轻胜马&#xff0c;谁怕&#xff1f;一蓑烟雨任平生 《宋词》韩震主编&#xff0c;偶然从书友那加入书架的书。宋词挺喜欢李清照的词以及知否的《菩萨蛮》。诗集&#xff0c;词&#xff0c;俳句&#xff0c;短歌我都很喜欢&…

工欲善其事,必先利其器-基于ubuntu18.04 VScode开发100ASK-ESP32

点击上方“嵌入式应用研究院”&#xff0c;选择“置顶/星标公众号” 干货福利&#xff0c;第一时间送达&#xff01; 来源 | 嵌入式应用研究院 整理&排版 | 嵌入式应用研究院 前面我们基于ubuntu环境搭建了esp-idf的开发环境&#xff0c;它也是为了接下来基于VSCode来开发1…

【openGauss基本概念】---快速入门

【openGauss简单使用】---快速入门 &#x1f53b; 一、基本概念&#x1f530; 1.1 openGauss&#x1f530; 1.2 数据库&#xff08;Database&#xff09;&#x1f530; 1.3 数据块&#xff08;Block&#xff09;&#x1f530; 1.4 行&#xff08;Row&#xff09;&#x1f530; …

【MySQL数据库】主从复制与读写分离

目录 一、读写分离1.1概述1.2为什么要读写分离呢&#xff1f;1.3什么时候要读写分离&#xff1f; 1.4主从复制与读写分离1.5mtsql支持的复制类型1.6主从复制工作流程1.7主从复制原理 二、主从复制实战 一、读写分离 1.1概述 读写分离&#xff0c;基本的原理是让主数据库处理事…

React从入门到实战 -组件的三大核心属性(2)props

文章目录 基本使用对props进行限制类式组件中的构造器 基本使用 // 定义组件class MyComponent extends React.Component{render(){return (<ul><li>姓名&#xff1a;{this.props.name}</li><li>年龄&#xff1a;{this.props.age}</li><li>…

【开源与项目实战:开源实战】85 | 开源实战四(中):剖析Spring框架中用来支持扩展的两种设计模式

上一节课中&#xff0c;我们学习了 Spring 框架背后蕴藏的一些经典设计思想&#xff0c;比如约定优于配置、低侵入松耦合、模块化轻量级等等。我们可以将这些设计思想借鉴到其他框架开发中&#xff0c;在大的设计层面提高框架的代码质量。这也是我们在专栏中讲解这部分内容的原…

MongoDB负载均衡集群(第8章节选)

MongoDB自身可组成分片加复制的集群&#xff0c;在这个集群的前端加上负载均衡器&#xff08;比如HAProxmy Keepalived&#xff09;&#xff0c;就可组建成一个无单点故障、十分完美的高可用负载均衡集群&#xff08;如图8-1所示&#xff09;。 图8- 1 整个MongDB高可用体系结…

基于java+swing+mysql飞机票预订系统

基于javaswingmysql飞机票预订系统 一、系统介绍二、功能展示1.项目内容2.项目骨架3.数据库表4.注册窗口5.登录窗口6、用户-主窗口7、用户-查询航班8.用户--订票8.用户--取票9.管理员-所有航班信息10.管理员-添加航班11.用户信息12.订票状态 四、其它1.其他系统实现五.获取源码…

路径规划-DWA算法(C++实现)

1、简单介绍 DWA算法&#xff08;dynamic window approach&#xff09;&#xff0c;其原理主要是在速度空间&#xff08;v,w&#xff09;中采样多组速度&#xff0c;并模拟出这些速度在一定时间内的运动轨迹&#xff0c;并通过评价函数对这些轨迹进行评价(其中包括距离障碍物距…

【学习笔记】 科目一之概念篇

【学习笔记】 科目一之概念篇 概念题方法 1&#xff09;抓重点&#xff1a;科目一设计知识范围太广&#xff0c;不要妄想所有知识点都复习到&#xff0c;这是不可能的&#xff0c;我们的目标是45分几个而不是考高分&#xff0c;复习时间有限&#xff0c;所以要学会抓重点&…

图片是如何生成的--图像生成模型(GAN、VAE、扩散模型)简介

目录 1.GAN 2.AutoEncoder及其变种&#xff1a;AE/DAE/VAE/VQVAE 2.1 AE&#xff1a; 2.2 DAE&#xff1a;Denoising AutoEncoder 2.3 VAE&#xff1a;Variational AutoEncoder 2.4 VQVAE&#xff1a;Vector-quantised Variational AutoEncoder 3. 扩散模型 3.1 扩散…

【openGauss简单数据管理】--快速入门

【openGauss简单数据管理】--快速入门 &#x1f53b; 一、openGauss数据库管理&#x1f530; 1.1 连接openGauss数据库&#x1f530; 1.2 创建数据库&#x1f530; 1.3 查看数据库和切换数据库&#x1f530; 1.4 修改数据库&#x1f530; 1.5 删除数据库&#x1f530; 1.6 启停…