Java修仙传之Flink篇

news2025/1/11 1:28:59

大道三千:最近我修Flink

目前个人理解:

处理有界,无界流的工具

FLINK:

FLINK定义:

Flink特点

Flink分层API

流的定义

有界数据流(批处理):

有界流:数据结束了,程序也就结束了

知道数据开始以及结束的地方

无界数据流:

特征:读一条,计算一条,输出一次结果

知道数据开始的地方,却不知道结束的地方

(好似长江大河,会一直一直一直产生数据)

流的状态

个人理解:(有状态流会基于内存保存之前的数据)

如果后续流的操作需要用到之前的数据,这个流时有状态的

如果后续流的操作不需要用到之前的数据,这个流是无状态的

DataSet API:有界流批处理( 已淘汰)

1:创建执行环境

2:读取流(数据)

3:将读取到的数据,转换为方便处理的格式

4:将收集到的数据进行(分组,求和,最大,最小等....)操作

//批处理方式(有界流,因为很明确的知道这个文件在哪里结束)
public class BatchWordCount {
    public static void main(String[] args) throws Exception {
        // 1. 创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 2. 从文件读取数据  按行读取(存储的元素就是每行的文本)
        DataSource<String> lineDS = env.readTextFile("input/words.txt");

        // 3. 转换数据格式
        FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
            @Override         //一行数据       // 数据收集器     out:相当于是一个按照 下面格式收集数据的收集器  格式=out.collect(Tuple2.of(word,1L));
            public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {

                String[] words = line.split(" ");  //一行数据按照" "拆分

                for (String word : words) {   //word = 一行中的每一个字段    如果1改成2,则统计时数目会成2
                    Tuple2<String, Long> of = Tuple2.of(word, 1L);//每个的那次都转为这种格式
                    out.collect(of);  // 收集器添加数据 (转换格式为 (循环到的字段,1L))
                }
            }
        });

        // 4. 按照 word 进行分组    按照第一个字段分组.(字段,1L),就是按照第一个字段分组(A,1),(b,1),(c,1),(d,1),(d,1) 就是按照abcd分组
        UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0);

        // 5. 分组内聚合统计   根据第二个字段求和,即将每个分组的第二个字段相加,得到该分组的总和
        AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);

        // 6. 打印结果
        sum.print();
    }
}

DataStream API:流、批一体处理

转换(flatMap)、

分组(keyBy)、

求和(sum)、

执行(execute)、

读取文本(readTextFile,有界流)

1:创建流式执行环境(基于StreamExecutionEnvironment)

2:读取文件

3:转换、分组、求和,得到统计结果

4:打印输出

5:执行

//流处理方式 (有界流,因为很明确的知道这个文件在哪里结束),如果不是本地而是网络则是无界流
public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        // 1. 创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2. 读取文件
        DataStreamSource<String> lineStream = env.readTextFile("input/words.txt");
        // 3. 转换、分组、求和,得到统计结果                                                                          
        SingleOutputStreamOperator<Tuple2<String, Integer>> resultList = lineStream.flatM
                              输入类型,输出类型
        ap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override           //当前行数据   //要返回的类型
            public void flatMap(String line, Collector<Tuple2<String, Integer>> list) throws Exception {
                String[] fields = line.split(" ");
                for (String field : fields) {
                    Tuple2<String, Integer> result = Tuple2.of(field, 1);
                    list.collect(result);
                }
            }
        });
        //分组                                                                                    // 传入的数据类型()           要分组的数据类型
        KeyedStream<Tuple2<String, Integer>, String> gropbyDate = resultList.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0; //这里是类型的第一位。如(hello,1),则是根据hello进行分组
            }
        });
        //求和。      以上一个为例子:(hello,1)分组之后,根据1索引即第二位(hello,1)的1进行求和
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = gropbyDate.sum(1);
        //打印输出
        sum.print();
        //执行
        env.execute();
    }

}
        // 3. 转换、分组、求和,得到统计结果
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {

                String[] words = line.split(" ");

                for (String word : words) {
                    out.collect(Tuple2.of(word, 1L));
                }
            }
        }).keyBy(data -> data.f0)
           .sum(1);

结果:

读取socket(无界流)

事件监听(环境对象.socketTextStream(IP,端口号))

备注:先启动linux 输入命令nc -lk 7777

然后启动代码监听 7777

此时linux输入的数据会被代码抓取到

备注2:跟前两个的区别就是这个是调用的socketTextStream。其他无任何区别

//监听7777端口的数据流
// 这里代码监听了  IP地址192.168.200.130  端口号7777 的操作   。ip地址那里写主机名也行
public class SocketStreamWordCount {
    public static void main(String[] args) throws Exception {
        //构建流环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //拿到数据
        DataStreamSource<String> lineStream = env.socketTextStream("192.168.200.130", 7777);
        // 转换、分组、求和,得到统计结果
        SingleOutputStreamOperator<Tuple2<String, Long>> convert = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {
                String[] fields = line.split(" ");
                for (String field : fields) {
                    Tuple2<String, Long> of = Tuple2.of(field, 1L);
                    out.collect(of);
                }
            }
        });
        //分组
        KeyedStream<Tuple2<String, Long>, Object> gropBy = convert.keyBy(new KeySelector<Tuple2<String, Long>, Object>() {
            @Override
            public Object getKey(Tuple2<String, Long> value) throws Exception {
                return value.f0;
            }
        });
        //求和
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = gropBy.sum(1);
        //输出
        sum.print();
        //执行
        env.execute();
    }
}
 SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            String[] words = line.split(" ");

            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG))
                .keyBy(data -> data.f0)
                .sum(1);

LMD存在泛型擦除,解决方案看这里

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1152210.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【图像分割】【深度学习】Windows10下PFNet官方代码Pytorch实现与源码讲解

【图像分割】【深度学习】Windows10下PFNet官方代码Pytorch实现与源码讲解 提示:最近开始在【图像分割】方面进行研究,记录相关知识点,分享学习中遇到的问题已经解决的方法。 文章目录 【图像分割】【深度学习】Windows10下PFNet官方代码Pytorch实现与源码讲解前言PFNet模型运行…

算法笔记【4】-冒泡排序法改进

一、冒泡排序缺点 冒泡排序是一种简单但效率较低的排序算法。冒泡排序通过比较相邻元素并交换位置来实现排序。具体而言&#xff0c;它从数组的第一个元素开始&#xff0c;依次比较相邻的两个元素&#xff0c;如果顺序错误则交换它们的位置&#xff0c;直到整个数组排好序为止…

RK3399平台开发系列讲解(基础篇)应用程序代码优化技巧

🚀返回专栏总目录 文章目录 一、利用高速缓存二、代码内联三、restrict 关键字四、消除不必要的内存引用沉淀、分享、成长,让自己和他人都能有所收获!😄 📢我主要会为你介绍四个优化 应用代码的技巧,它们分别是 利用高速缓存利用代码内联利用 restrict 关键字消除不必…

Java架构师软件可靠性构建

目录 1 导学2 软件可靠性基本概念3 软件可靠性建模4 软件可靠性管理5 软件可靠性设计6 软件可靠性测试与评价想学习架构师构建流程请跳转:Java架构师系统架构设计 1 导学 2 软件可靠性基本概念 软件可靠性是软件产品在规定的条件下和规定的时间区间完成规定功能的能力。软件…

Java精品项目源码爱心捐赠平台网站(编号V65)

Java精品项目源码扶农助农平台建设系统(编号V64) 大家好&#xff0c;小辰今天给大家介绍一个爱心捐赠平台网站(编号V65)&#xff0c;演示视频公众号&#xff08;小辰哥的Java&#xff09;对号查询观看即可 文章目录 Java精品项目源码扶农助农平台建设系统(编号V64)难度指数&a…

修仙路上的基石 ->继承与实现

继承与实现的区别 不同点&#xff1a; 继承&#xff1a;不强制 子类重写父类方法 实现&#xff1a;强制 实现类重写接口的全部方法共同点&#xff1a; 都可以使用多态 继承&#xff1a;父类 父类对象 new 子类() 实现&#xff1a;接口 接口对象 实现类.调用方法(); 这里…

[尚硅谷React笔记]——第7章 redux

目录&#xff1a; redux简介redux工作流程求和案例_纯react版求和案例_redux精简版redux完整版异步action版对react-redux的理解连接容器组件与UI组件&#xff0c;react-redux基本使用优化1_简写mapDispatch优化2_Provider组件的使用优化3_整合UI组件与容器组件数据共享_编写P…

风云七剑攻略,最强阵容搭配

今天的风云七剑攻略最强阵容搭配给大家推荐以神仙斋减怒回血为主的阵容。 关注【娱乐天梯】&#xff0c;获取内部福利号 首先&#xff0c;这个角色在这个阵容当中&#xff0c;所有的角色当中&#xff0c;他的输出系数是最高的&#xff0c;已经达到了200%的层次&#xff0c;而且…

【NLP】什么是语义搜索以及如何实现 [Python、BERT、Elasticsearch]

语义搜索是一种先进的信息检索技术&#xff0c;旨在通过理解搜索查询和搜索内容的上下文和含义来提高搜索结果的准确性和相关性。与依赖于匹配特定单词或短语的传统基于关键字的搜索不同&#xff0c;语义搜索会考虑查询的意图、上下文和语义。 语义搜索在搜索结果的精度和相关…

Linux 文件系统之虚拟文件系统

文章目录 一、简介二、进程读写文件示例三、VFS高速缓存参考资料 一、简介 虚拟文件系统&#xff08;Virtual File System&#xff0c;简称 VFS&#xff09;是内核中的软件层&#xff0c;为用户空间程序提供文件系统接口。它还在内核中提供了一个抽象层&#xff0c;允许不同的…

SQL 通用数据类型

SQL 通用数据类型 数据类型定义了存储在列中的值的类型。 SQL 通用数据类型 数据库表中的每一列都需要有一个名称和数据类型。 SQL 开发人员必须在创建 SQL 表时决定表中的每个列将要存储的数据的类型。数据类型是一个标签&#xff0c;是便于 SQL 了解每个列期望存储什么类型的…

[GKCTF 2021]easycms 禅知cms

一道类似于渗透的题目 记录一下 首先扫描获取 登入界面 admin/12345登入 来到了后台 然后我们开始测试有无漏洞点 1.文件下载 设计 自定义 导出 然后进行抓包 解密后面的内容 发现是绝对路径了 所以这里我们要获取 flag 就/flag即可 L2ZsYWc /admin.php?mui&fdownlo…

linux的使用学习(1)

Linux 修改root密码 1.以 root 用户或具有 sudo 权限的登录到 Linux 系统。 2.打终端&#xff0c;并执行以下命令以更改 root 用户的密码&#xff1a; sudo passwd root 3.然后&#xff0c;系统会要求你输入新的 root 密码。请注意&#xff0c;在输入密码时&#xff0c;终端界…

图、深度优先(DFS)、广度优先(BFS)

图 基本介绍 表示方式 图的创建 from typing import Listclass Graph:vertex_list: List[str] [] # 存储顶点的数组edges: List[list] [] # 存储图中各条边的邻接矩阵num_edges: int 0 # 边的数总数def __init__(self, n: int):"""根据传入的顶点个数初始…

11、插件注入到vue实例中

新建插件 nuxt-demo2\plugins\vue-inject.js import Vue from "vue"Vue.prototype.$test function (str) {console.log(str) }配置插件 nuxt-demo2\nuxt.config.js export default {...// Plugins to run before rendering page: https://go.nuxtjs.dev/config-…

表格识别软件:科技革新引领行业先锋,颠覆性发展前景广阔

表格识别软件的兴起背景可以追溯到数字化和自动化处理的需求不断增加的时期。传统上&#xff0c;手动处理纸质表格是一项费时费力的工作&#xff0c;容易出现错误&#xff0c;效率低下。因此&#xff0c;开发出能够自动识别和提取表格数据的软件工具变得非常重要。 随着计算机…

Xcode运行程序提示 Executable Path is a Directory 问题解决

一、首先运行模拟器报错&#xff08;没有记录&#xff09;&#xff0c;解决办法&#xff1a; TARGET->Build Settings->Architectures -> Exclude Architectures里面填入arm64&#xff0c;后运行模拟器成功 二、其次模拟器开发完成后&#xff0c;xcode运行真机调试&…

canvas:理解canvas / 基础使用 / Demo效果

一、理解Canvas Canvas是一个HTML5元素&#xff0c;用于在Web页面上绘制2D或3D图形。它允许使用JavaScript在网页上创建和操作图形。Canvas的主要功能是绘图&#xff0c;但也可以用来实现其他功能&#xff0c;如动画和交互式游戏。 使用Canvas&#xff0c;可以创建各种形状、…

【AOP进阶】实现重试机制

&#x1f4da;目录 ⚙️简介&#xff1a;✨注解定义&#xff1a;⛳RetryMechanism ⌛编写AOP代码&#xff1a;⚓RetryMechanismAspect 切面 ⛵演示&#xff1a;⛴如何使用RetryMechanism&#xff1a;⚡️正常请求如下&#xff1a;☘️测试异常并且重试&#xff1a;☄️测试异常…

WSL2 Ubuntu安装CUDA Toolkit

目前CUDA ToolKit需要切换到WSL2&#xff0c;在WLS1下不支持。之前折腾了很久&#xff0c;才从WSL1的坑中爬出来&#xff0c;仅写此文避免大家再从坑里走一次。 Windows WSL2相关 检查正在运行的 WSL 版本 可列出已安装的 Linux 发行版&#xff0c;并通过在 PowerShell 或 W…