【基础篇】二、Flink的批处理和流处理API

news2025/1/12 17:49:46

文章目录

  • 0、demo模块创建
  • 1、批处理有界流
  • 2、流处理有界流
  • 3、流处理无界流
  • 4、The generic type parameters of 'Collector' are missing

0、demo模块创建

创建个纯Maven工程来做演示,引入Flink的依赖:(注意不同本版需要导入的依赖不一样,这里是1.17版本)

<properties>
	<flink.version>1.17.0</flink.version>
</properties>


<dependencies>
	<!--Flink核心依赖-->
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-streaming-java</artifactId>
		<version>${flink.version}</version>
	</dependency>

	<!--Flink客户端-->
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-clients</artifactId>
		<version>${flink.version}</version>
	</dependency>
</dependencies>

下面写Demo,演示用Flink提供的API统计txt文件里每个单词出现的频次,测试文件位置project目录/input/words.txt,文件内容:

hello flink
hello world
hello java

1、批处理有界流

基本步骤:

  • 创建执行环境
  • 读取数据
  • 处理数据
  • 输出

处理数据,包括把从文本读取的每一行String按空格切分成单词 ⇒ 转换二元组(word,1) ⇒ 按二元组的第一个词来分组 ⇒ 按二元组的第二个词来聚合(如求和)

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class BatchWordCount {

    public static void main(String[] args) throws Exception {

        // 1. 创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        
        // 2. 从文件读取数据  按行读取(存储的元素就是每行的文本),获得数据源对象
        DataSource<String> lineDS = env.readTextFile("input/words.txt");
        
        // 3. 转换数据格式(调用数据源对象的flatMap方法)
        FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {

            @Override
            public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {
				
				//按照空格切分单词
                String[] words = line.split(" ");  
				
				//将每个单词转为二元组Tuple2
                for (String word : words) {
                
					Tuple2<String,Long> wordTuple2 = Tuple2.of(word,1L);
					//使用采集器向下游发送数据,这里将转成二元组的数据继续向下发
                    out.collect(wordTuple2);
                }
            }
        });

        // 4. 此时数据已经变成了(word,1)格式,下面按照 word 进行分组
        //按二元组的第一个元素(索引为0)来分组
        UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0);
        
        // 5. 分组内聚合统计(按二元组的第二个元素来聚合,第二个元素索引为1)
        AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);

        // 6. 打印结果到控制台
        sum.print();
    }
}

//Ctrl+P看下方法的传参提示

FlatMapFunction接口的泛型,第一个为输入Flink的数据类型,第二个为你从Flink输出的类型,也就是你想转换成的类型。比如上面需要把从文本读取的String,分割后转为(word,1)的形式,即String转二元组Tuple2<String, Long>,那就是FlatMapFunction<String, Tuple2<String, Long>>

flatMap方法的形参是一个FlatMapFunction类型的对象,FlatMapFunction是一个接口,new接口的对象得实现它的方法flatMap,该方法两个形参,第一个即进入Flink的源数据,demo中是String,第二个参数是Collector类型的收集器,向下游发送数据

//复习:这种直接用匿名内部类来new接口的对象的方式下面用的很多,复习下
//有一个接口A,里面有抽象方法a()
interface A{
	void a();
}
//此时new A的对象,可以先写一个它的实现类AImpl,再重写接口的抽象方法,然后A a = new AImpl();
class AImpl implements A{
	@Overrdie
	void a(){
	}
}
A a = new AImpl();

//但这样写很繁琐,直接匿名内部类:
new A(){
	@Overrdie
	void a(){
	}
}

运行结果:

在这里插入图片描述

注意,以上的实现是基于DataSet API,即批处理,而Flink是流批统一的处理架构,Flink 1.12开始,官方推荐的做法是直接使用DataStream API,DataStream API更加强大,可以直接处理批处理和流处理的所有场景。该API下,想进行批处理,可:

//在提交任务时通过将执行模式设为BATCH来进行批处理
$ bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar

上面的DataSet API仅做个演示,以后不用这种API。

2、流处理有界流

继续读words.txt,统计单词频次,这次用流处理:

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;

public class StreamWordCount {

    public static void main(String[] args) throws Exception {
    
        // 1. 创建流式的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 2. 读取文件
        DataStreamSource<String> lineStream = env.readTextFile("input/words.txt");
        
        // 3. 转换、分组、求和,得到统计结果
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {

                String[] words = line.split(" ");

                for (String word : words) {
                    out.collect(Tuple2.of(word, 1L));
                }
            }
        }).keyBy(data -> data.f0)  //分组 ,类比Person -> Person.age,fo是二元组类的一个属性名
           .sum(1);   //聚合,链式编程

        // 4. 打印
        sum.print();
        
        // 5. 执行,因为是流,要手动触发开始执行
        env.execute();
    }
}

注意流处理下的分组用的keyBy方法,该方法的传参是一个KeySelector接口类型,接口中有一个getKey方法,给我们定义如何从数据中提取到分组的字段根据源码中getKey的返回类型和形参类型分析,可以得出结论:KeySelector接口上的泛型,第一个是数据类型,从哪个类型的数据中提取分组的字段key,第二个泛型则是分组的时候,分组的字段类型是啥。

在这里插入图片描述

因为该接口有@FunctionalInterface注解标识,即是可以用Lambda表达式,上面代码中就是Lambad的写法,展开就是这样:

在这里插入图片描述

可以看出,写Lambda省事,KeySelector的泛型也不用分析了,运行下:

在这里插入图片描述

和批处理相比,流处理的代码:

  • 创建执行环境的不同,流处理程序使用的是StreamExecutionEnvironment
  • 转换处理之后,得到的数据对象类型不同,流处理为DataStreamSource
  • 流处理分组操作调用的是keyBy方法,可以传入一个匿名函数作为键选择器(KeySelector),指定当前分组的key是什么
  • 流处理代码末尾需要多调用env的execute方法,开始执行任务

3、流处理无界流

在实际的生产环境中,真正的数据流其实是无界的,有开始却没有结束,这就要求我们需要持续地处理捕获的数据。下面监听一台Linux主机的socket端口,然后向该端口不断的发送数据,模拟一个无界流的数据源。

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;

public class SocketStreamWordCount {

    public static void main(String[] args) throws Exception {

        // 1. 创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 2. 读取文本流:选择socket方法,传入发送端主机名、9527表示端口号
        DataStreamSource<String> lineStream = env.socketTextStream(10.6.134.81, 9527);
        
        // 3. 转换、分组、求和,得到统计结果
        //这里用匿名内部类写FlatMapFunction接口的对象
        //形参列表拿过来,加一个箭头,后面{}里写逻辑,很像映射
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            String[] words = line.split(" ");

            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
            
        }).returns(Types.TUPLE(Types.STRING, Types.LONG))   //消除Java泛型擦除的问题,注意这个Type类是Flink包下的
                .keyBy(data -> data.f0)  //分组
                .sum(1);  //聚合

        // 4. 打印
        sum.print();
        
        // 5. 执行
        env.execute();
    }
}

启动程序前,先去Linux主机启动监听:

# 监听9527端口,l即保持连接
nc -lk 9527

Linux里你可能遇到的坑一:

# 安装
yum install -y netcat
# 或者
yum install -y nc

坑二:启动Flink程序后超时连接异常

# 原因:你代码里写的那个端口未开放出来

systemctl status firewalld # 应该是开着的active状态
# systemctl start firewalld

firewall-cmd --add-port 9527/tcp --permanent
firewall-cmd --reload
# 再看下,你的端口应该出来了
firewall-cmd --list-ports

此时启动Flink程序,可以发现程序启动之后没有任何输出、也不会退出。这是正常的,因为Flink的流处理是事件驱动的,当前程序会一直处于监听状态,只有接收到数据才会执行任务、输出统计结果。向Linux主机输入hello flink,Flink程序中输出:

3> (flink,1)
5> (hello,1)

再输入hello world,Flink程序中输出:

2> (world,1)
5> (hello,2)

开两个窗口,体验一下流的概念,来一个处理一个,不像批处理,程序直接就执行结束exit code 0 了

在这里插入图片描述

4、The generic type parameters of ‘Collector’ are missing

Flink还具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。但是,由于Java中泛型擦除的存在,在某些特殊情况下(比如Lambda表达式中),自动提取的信息是不够精细的——只告诉Flink当前的元素由“船头、船身、船尾”构成,根本无法重建出“大船”的模样;这时就需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。

在这里插入图片描述

上面flatMap里传入的Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple2<String, Long>。只有显式地告诉系统当前的返回类型,才能正确地解析出完整数据。

(Types.TUPLE(Types.STRING, Types.Integer)
//即二元组的第一个元素类型为String,第二个元素为Integer
//注意Types类是Flink包下的

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1082040.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【ComfyUI】MacBook Pro 安装(Intel 集成显卡)

文章目录 环境概述配置pip镜像配置pip代理git配置&#xff08;选配&#xff09;下载comfyUI代码创建、激活虚拟环境下载依赖安装torchvision启动comfyUI为什么Mac不支持CUDA&#xff0c;即英伟达的显卡&#xff1f;安装Intel工具包 环境 显卡&#xff1a;Intel Iris Plus Grap…

自定义步骤条setup

自定义步骤条 话不多说 先上效果 <div class"process_more"><!-- 步骤条 --><divclass"set-2":key"index"v-for"(item, index) in recordsList"><div class"set-3"><div class"content_b…

pytorch的基本运算,是不是共享了内存,有没有维度变化

可以把PyTorch简单看成是Python的深度学习第三方库&#xff0c;在PyTorch中定义了适用于深度学习的基本数据结构——张量&#xff0c;以及张量的各类计算。其实也就相当于NumPy中定义的Array和对应的科学计算方法&#xff0c;正是这些基本数据类型和对应的方法函数&#xff0c;…

并购交易:Truist Financial正在商谈以100亿美元将保险经纪业务出售

来源&#xff1a;猛兽财经 作者&#xff1a;猛兽财经 猛兽财经获悉&#xff0c;纽交所上市公司Truist Financial(TFC)正在商谈以100亿美元将保险经纪业务出售给Stone Point Capital。 这笔交易可能会加强Truist Financial的资本状况&#xff0c;在监管机构试图提高大型银行的资…

AAPT2简介

关于作者&#xff1a;CSDN内容合伙人、技术专家&#xff0c; 从零开始做日活千万级APP。 专注于分享各领域原创系列文章 &#xff0c;擅长java后端、移动开发、商业变现、人工智能等&#xff0c;希望大家多多支持。 目录 一、导读二、概览三、使用介绍3.3 编译3.2 链接3.3 dump…

[CISCN 2019初赛]Love Math - RCE(异或绕过)

[CISCN 2019初赛]Love Math 1 解题流程1.1 分析1.2 解题题目代码: <?php //听说你很喜欢数学,不知道你是否爱它胜过爱flag if(!isset($_GET[c]))

【Python】QTreeWidget树形结构添加

源码&#xff1a; # 参考网址&#xff1a; https://blog.csdn.net/weixin_42286052/article/details/129532631 import os.path import sys from PySide6.QtWidgets import QApplication,QMainWindow,QHBoxLayout,QVBoxLayout,QPushButton,QTreeWidget,QTreeWidgetItem,QTreeW…

抖音自动养号脚本+抖音直播控场脚本

功能描述 一.抖音功能 1.垂直浏览 2.直播暖场 3.精准引流 4.粉丝留言 5.同城引流 6.取消关注 7.万能引流 8.精准截流 9.访客引流 10.直播间引流 11.视频分享 12.榜单引流 13.搜索引流 14.点赞回访 15.智能引流 16.关注回访 介绍下小红书数据挖掘 搜索关键词&…

uml简单用例图怎么画(要素,文字形式)

参与者&#xff08;三类&#xff09;&#xff1a;人&#xff0c;外部系统&#xff0c;设备 用例&#xff1a; 系统外部可见的用例单元 表示前俩的关系&#xff1a;带箭头的实线&#xff08;参与者1——用例1&#xff09; 【实线为了观看我标红了&#xff0c;实际没有的】 1.i…

vue实现一个简单导航栏

Vue之简单导航栏 在vue中&#xff0c;想要实现导航栏的功能&#xff0c;除了用传统的a标签以外&#xff0c;还可以使用路由——vue-router来实现&#xff0c;前端小白在此记录一下学习过程&#xff08;默认已经搭建好vue的脚手架环境&#xff09;&#xff1a; 建立项目并安装…

【数据结构-字符串 四】【字符串识别】字符串转为整数、比较版本号

废话不多说&#xff0c;喊一句号子鼓励自己&#xff1a;程序员永不失业&#xff0c;程序员走向架构&#xff01;本篇Blog的主题是【字符串转换】&#xff0c;使用【字符串】这个基本的数据结构来实现&#xff0c;这个高频题的站点是&#xff1a;CodeTop&#xff0c;筛选条件为&…

Andriod学习笔记(一)

写在前面的话 App开发的编程语言Java和KotlinXML App连接的数据库App工程目录结构模块级别的编译配置文件清单文件 界面显示与逻辑处理 安卓是一种基于Linux内核的自由及开放源代码的操作系统&#xff0c;主要使用于移动设备。 Mininum SDK表示安卓该版本以上的设备都可以运行该…

Vue计算属性的使用

当我们想要通过data中运算得到一个新的数据时&#xff0c;我们就可以使用计算属性。比如&#xff1a;data里的单价price和数量number可以相乘计算总价sum&#xff0c;这个sum我们就称为计算属性。 计算属性的语法格式&#xff1a; computed:{ 计算属性名称 ( ) { return 计算…

LAS Spark 在 TPC-DS 的优化揭秘

更多技术交流、求职机会&#xff0c;欢迎关注字节跳动数据平台微信公众号&#xff0c;回复【1】进入官方交流群 文章主要介绍了火山引擎湖仓一体分析服务 LAS Spark&#xff08;下文以 LAS Spark 指代&#xff09;在 TPC-DS 上的性能突破与优化策略。TPC-DS 是一个模拟复杂数据…

4、在 CentOS 8 系统上安装 pgAdmin 4

pgAdmin 4 是一个开源的数据库管理工具&#xff0c;专门用于管理和操作 PostgreSQL 数据库系统。它提供了一个图形用户界面&#xff08;GUI&#xff09;&#xff0c;使用户能够轻松地连接到 PostgreSQL 数据库实例&#xff0c;执行 SQL 查询&#xff0c;管理数据库对象&#xf…

网络拓扑自动扫描工具

topology-scanner Topology-Scanner是WeOps团队免费开放的一个网络拓扑自动扫描模块&#xff0c;可以自动发现网络设备的类型、网络设备之间的互联 使用方式 java -jar ./topology-scanner.jar --config_path./config/ 配置说明 1. 拓扑发现请求参数文件(request.json) i…

Web3 新手攻略:9 个不可或缺的 APP 助力你踏入加密领域

Web3世界充满了无限机遇&#xff0c;但要掌握它&#xff0c;您需要合适的工具&#xfffd;&#xfffd;&#xfffd;。今天&#xff0c;我将为您介绍9款Web3必备APP&#xff0c;涵盖钱包、DEX、和工具三大类别。而且&#xff0c;我要特别强烈推荐一个强大的钱包——Bitget Wall…

基于java+vue+springboot的家庭理财记账信息网站

运行环境 开发语言&#xff1a;Java 框架&#xff1a;springboot JDK版本&#xff1a;JDK1.8 服务器&#xff1a;tomcat7 数据库&#xff1a;mysql 数据库工具&#xff1a;Navicat11 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven 项目介绍 在这科技…

Bitxhub跨链平台

BitXHub跨链平台 跨链系统架构 过程 在跨链合约中调用统一写好的Broker合约Broker合约抛出事件由Plugin捕获到封装成平台统一的数据结构提交到中继链中目的链的跨链网关从中继链中同步IBTP数据结构网关将该数据结构通过Plugin提交到目的链 中继链体系架构 中继链的模块和流程…

【Vue面试题十七】、你知道vue中key的原理吗?说说你对它的理解

文章底部有个人公众号&#xff1a;热爱技术的小郑。主要分享开发知识、学习资料、毕业设计指导等。有兴趣的可以关注一下。为何分享&#xff1f; 踩过的坑没必要让别人在再踩&#xff0c;自己复盘也能加深记忆。利己利人、所谓双赢。 面试官&#xff1a;你知道vue中key的原理吗…