Flink系列之Flink中Source_Transform_Sink整理和实战

news2024/11/16 17:43:11

title: Flink系列


二、Flink Source 整理和实战

Flink Source 是程序的数据源输入,可以通过 StreamExecutionEnvironment.addSource(sourceFunction) 来为你的程序添加一个 Source。

Flink 提供了大量的已经实现好的 source 方法,也可以自定义 source:

1、通过实现 sourceFunction 接口来自定义无并行度的 source

2、通过实现 ParallelSourceFunction 接口 or 继承 RichParallelSourceFunction 来自定义有并行度的 source

大多数情况下,我们使用自带的 source 即可。

2.1 Flink 内置 Source

关于 Flink 的内置 Source 大致可以分为这四类:

  • 基于 File:readTextFile(path),读取文本文件,文件遵循 TextInputFormat 读取规则,逐行读取并返回。

  • 基于数据集合:fromCollection(Collection),通过 java 的 collection 集合创建一个数据流,集合中的所有元素必须是相同类型的。

  • 基于 Socket:socketTextStream(hostname,port),从 socker 中读取数据,元素可以通过一个分隔符切开。

  • 扩展 Source:addSource() 方法可以实现读取第三方数据源的数据,系统内置提供了一批connectors,连接器会提供对应的 source,比如 Kafka,Pulsar 等

关于这四种类型的 Source 的使用,具体见程序

官网:

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/overview/#bundled-connectors

官网截图:

在这里插入图片描述

Connectors provide code for interfacing with various third-party systems. Currently these systems are supported:

  • Apache Kafka (source/sink)
  • Apache Cassandra (sink)
  • Amazon Kinesis Streams (source/sink)
  • Elasticsearch (sink)
  • FileSystem (sink)
  • RabbitMQ (source/sink)
  • Google PubSub (source/sink)
  • Hybrid Source (source)
  • Apache NiFi (source/sink)
  • Apache Pulsar (source)
  • Twitter Streaming API (source)
  • JDBC (sink)

2.1.0 flink程序pom文件添加

依赖如下:

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.14.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.14.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.14.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>1.14.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>1.14.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.14.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_2.12</artifactId>
            <version>1.14.3</version>
        </dependency>
    </dependencies>

2.1.1 基于 File案例

FlinkSourceReadTextFile.java完整代码如下:

package com.aa.flinkjava.source.builtin;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;

/**
 * @Author AA
 * @Date 2022/2/24 16:35
 * @Project bigdatapre
 * @Package com.aa.flinkjava.source.builtin
 */
public class FlinkSourceReadTextFile {
    public static void main(String[] args) throws Exception {
        //1、初始化环境变量
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();

        //2、读取数据
        DataSource<String> dataSource = executionEnvironment.readTextFile("D://input//test1.txt");

        //3、打印输出
        dataSource.print();

        //4、执行
        //异常:No new data sinks have been defined since the last execution.
        // The last execution refers to the latest call to 'execute()', 'count()', 'collect()', or 'print()'.
        //上面异常解决方案:Flink批处理的时候注释掉下面的代码即可。
        //executionEnvironment.execute("FlinkSourceReadTextFile");
    }
}

2.1.2 基于数据集合案例

FlinkSourceFromCollection.java完整代码如下:

package com.aa.flinkjava.source.builtin;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapOperator;

import java.util.ArrayList;

/**
 * @Author AA
 * @Date 2022/2/24 16:46
 * @Project bigdatapre
 * @Package com.aa.flinkjava.source.builtin
 */
public class FlinkSourceFromCollection {
    public static void main(String[] args) throws Exception {
        //1、初始化环境变量
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();

        //2、造点数据
        ArrayList<String> list = new ArrayList<>();
        list.add("zhangsan");
        list.add("lisi");
        list.add("wangwu");
        list.add("zhaoliu");

        //3、从数据集合读取数据
        DataSource<String> dataSource = executionEnvironment.fromCollection(list);

        //4、做一个小的业务逻辑
        MapOperator<String, String> result = dataSource.map(new MapFunction<String, String>() {
            @Override
            public String map(String s) throws Exception {
                return s + " , 欢迎你!";
            }
        });

        //5、打印输出
        result.print();

        //6、执行 注意,批处理场景下,给下面的依据注释掉。
        //executionEnvironment.execute("FlinkSourceFromCollection");
    }
}

2.1.3 基于 Socket 案例

FlinkSourceSocketTextStream.java 完整代码如下:

package com.aa.flinkjava.source.builtin;


import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @Author AA
 * @Date 2022/2/24 16:53
 * @Project bigdatapre
 * @Package com.aa.flinkjava.source.builtin
 */
public class FlinkSourceSocketTextStream {
    public static void main(String[] args) throws Exception {
        //1、初始化环境变量
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

        //2、从socket读取数据
        DataStreamSource<String> dataStreamSource = executionEnvironment.socketTextStream("hadoop12", 9999);

        //3、业务逻辑处理
        //3-1 转换给键值对
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                if (s.isEmpty()){
                    return; //给输入的是空的过滤掉。
                }else {
                    String[] splits = s.split(" ");
                    for (String split : splits) {
                        collector.collect(new Tuple2<>(split, 1));
                    }
                }
            }
        });

        //3-2 累加
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne.keyBy(0).sum(1);

        //4、数据
        result.print();

        //5、提交执行
        executionEnvironment.execute();
    }
}

2.2 Flink 自定义 Source

样例UserDefineSourceDemo.java如下:

package com.aa.flinkjava.source.userdefine;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

/**
 * @Author AA
 * @Date 2022/2/24 17:15
 * @Project bigdatapre
 * @Package com.aa.flinkjava.source.userdefine
 */
public class UserDefineSourceDemo {
    public static void main(String[] args) throws Exception {
        //1、初始化环境变量
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

        //2、添加自定义的数据源
        DataStreamSource<Long> dataStreamSource = executionEnvironment.addSource(new UserDefineSource());

        //3、业务逻辑
        SingleOutputStreamOperator<Long> result = dataStreamSource.map(new MapFunction<Long, Long>() {
            @Override
            public Long map(Long aLong) throws Exception {
                return aLong + 1000;
            }
        });

        //4、打印结果
        result.print();

        //5、执行
        executionEnvironment.execute();
    }
}

class UserDefineSource implements SourceFunction<Long>{
    private boolean flag = true;

    private long num = 100;

    @Override
    public void run(SourceContext<Long> sourceContext) throws Exception {
        while (flag){
            //每间隔两秒给num数据递增输出。
            sourceContext.collect(num++);
            Thread.sleep(2000);
        }
    }

    @Override
    public void cancel() {
        flag = false;
    }
}

三、Flink Transform 整理和实战

DataStream Transformations 官网链接:

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/overview/

在这里插入图片描述

DataSet Transformations 官网链接:

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/dataset/transformations/

在这里插入图片描述

  • map 和 filter
  • flatMap,keyBy、sum
  • union
  • connect,coMap

四、Flink Sink 整理和实战

4.1 Flink 内置 Sink

关于 Flink 的内置 Sink 大致可以分为这三类:

  • 1、标准输出/异常输出:print() / printToErr(),打印每个元素的 toString() 方法的值到标准输出或者标准错误输出流中

  • 2、基于文件系统:writeAsText() / writeAsCsv(…) / write() / output()

  • 3、扩展 Sink:常见的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem等

官网链接:

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/overview/#bundled-connectors

官网截图:

在这里插入图片描述

4.2 Flink自定义Sink

Flink 自定义 Sink 有两种方式:

  • implements SinkFunction 接口

  • extends RichSinkFunction 抽象类

自定义的 Sink 逻辑可以在生命周期 open, invoke, close 中进行编写。

五、Flink DataSet 常用 Transformation

有一个大文件 ,有一个很大的集合,有一张很大的表,都是不动的。 针对这个数据整体,做一次计算:

  • 1、Map:输入一个元素,然后返回一个元素,中间可以做一些清洗转换等操作

  • 2、FlatMap:输入一个元素,可以返回零个,一个或者多个元素

  • 3、MapPartition:类似map,一次处理一个分区的数据【如果在进行map处理的时候需要获取第三方资源链接,建议使用MapPartition】

  • 4、Filter:过滤函数,对传入的数据进行判断,符合条件的数据会被留下

  • 5、Reduce:对数据进行聚合操作,结合当前元素和上一次reduce返回的值进行聚合操作,然后返回一个新的值

  • 6、Aggregate:sum、max、min 等,多个值,映射一个值

  • 7、Distinct:返回一个数据集中去重之后的元素,data.distinct()

  • 8、Join:内连接

  • 9、OuterJoin:外链接

  • 10、Cross:获取两个数据集的笛卡尔积

  • 11、Union:返回两个数据集的总和,数据类型需要一致

  • 12、First-n:获取集合中的前N个元素

  • 13、Sort Partition:在本地对数据集的所有分区进行排序,通过 sortPartition() 的链接调用来完成对多个字段的排序



声明:
        文章中代码及相关语句为自己根据相应理解编写,文章中出现的相关图片为自己实践中的截图和相关技术对应的图片,若有相关异议,请联系删除。感谢。转载请注明出处,感谢。


By luoyepiaoxue2014

B站: https://space.bilibili.com/1523287361 点击打开链接
微博地址: http://weibo.com/luoyepiaoxue2014 点击打开链接

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/56416.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

译文 | A poor man‘s API

作者&#xff1a;Nicolas Frnkel 翻译&#xff1a;Sylvia https://blog.frankel.ch/poor-man-api/ 在 API 日渐流行的年代&#xff0c;越来越多的非技术人员也希望能从 API 的使用中获利&#xff0c;而创建一套成熟的 API 方案需要时间成本和金钱两方面的资源加持。在这个过程中…

2022re:Invent:亚马逊云科技拥有强大的云原生数据能力

在2022亚马逊云科技re:Invent全球大会上的第三天&#xff0c;Swami博士为大家带来了关于“数据与机器学习如何助力企业构建端到端的数据战略”的解读。亚马逊云科技拥有强大的云原生数据能力&#xff0c;用来帮助企业扩展其数据库和数据分析服务&#xff0c;并确保数据安全与数…

汽车电子电气架构演进驱动主机厂多重变化

已剪辑自: https://mp.weixin.qq.com/s/P56MaFODVc_eZ4JEOVJvfA 汽车电子电气架构&#xff08;EEA&#xff0c;Electrical/Electronic Architecture&#xff09;把汽车中的各类传感器、ECU&#xff08;电子控制单元&#xff09;、线束拓扑和电子电气分配系统整合在一起完成运算…

[附源码]计算机毕业设计JAVA校园求职与招聘系统

[附源码]计算机毕业设计JAVA校园求职与招聘系统 项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM myba…

【Matplotlib绘制图像大全】(二十一):Matplotlib为绘图添加注释

前言 大家好,我是阿光。 本专栏整理了《Matplotlib绘制图像大全》,内包含了各种常见的绘图方法,以及Matplotlib各种内置函数的使用方法,帮助我们快速便捷的绘制出数据图像。 正在更新中~ ✨ 🚨 我的项目环境: 平台:Windows10语言环境:python3.7编译器:PyCharmMatp…

Maven 的安装与配置

文章目录Maven 简介一&#xff0c;下载Maven二&#xff0c;安装Maven三&#xff0c;配置Maven环境变量Maven 简介 Maven 项目对象模型(POM)&#xff0c;可以通过一小段描述信息来管理项目的构建&#xff0c;报告和文档的项目管理工具软件。 Maven 除了以程序构建能力为特色之…

C/C++使用Windows的API实现共享内存以及同步

目录共享内存事件-Event实现思路创建方(服务端)连接方&#xff1a;进程同步&#xff1a;windows的APICreateFileMappingMapViewOfFileCreateEventWaitForSingleObjectCreateThreadOpenFileMapping通过共享内存实现进程间的交互服务端客户端结论共享内存 共享内存指 (shared me…

数据结构和算法之图的遍历

6.2 图的遍历 6.2.1 图的遍历——DFS 遍历&#xff1a;把图里面每个顶点都访问一遍而且不能有重复的访问 深度优先搜索(DFS) 当访问完了一个节点所有的灯后&#xff0c;一定原路返回对应着堆栈的出栈入栈的一个行为 深度优先搜索的算法描述 void DFS(Vertex V)//从迷宫…

Redis面试篇

文章目录1 Redis与Memcache的区别&#xff1f;2 Redis的单线程问题3 Redis的持久化方案由哪些&#xff1f;4 Redis的集群方式有哪些&#xff1f;5 Redis的常用数据类型有哪些&#xff1f;6 聊一下Redis事务机制7 Redis的Key过期策略参考资料&#xff1a;为什么需要内存回收&…

SpringBoot+ElasticSearch 实现模糊查询,批量CRUD,排序,分页,高亮!

一、导入elasticsearch依赖 在pom.xml里加入如下依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-elasticsearch</artifactId> </dependency>非常重要&#xff1a;检查依赖版本是否…

【图像隐写】DWT数字水印嵌入+攻击+提取【含Matlab源码 1759期】

⛄一、DWT数字水印简介 1 引言 数字水印技术发展迅速&#xff0c;出现了各种水印算法&#xff0c;最低有效位(Least Significant Bit,LSB)数字水印技术是最早的空域水印添加算法&#xff0c;它原理简单且易实现&#xff0c;但鲁棒性差。变换域水印算法大大提高了水印的鲁棒性&…

【微信小程序】博客小程序,静态版本(三)设计和开发首页、个人关于页

【博客小程序】专栏 【微信小程序】博客小程序&#xff0c;静态版本&#xff08;一&#xff09;准备工作 【微信小程序】博客小程序&#xff0c;静态版本&#xff08;二&#xff09;引入 lin-ui 组件、设计和开发文章页 【微信小程序】博客小程序&#xff0c;静态版本&#…

模型推荐丨政务大数据项目案例模型分享

主要工具&#xff1a;Python 技术大类&#xff1a;自然语言处理 主要业务问题&#xff1a; 在社会治理上&#xff0c;政府部门一般通过群众的意见反馈、舆论情绪&#xff0c;掌握社会现状&#xff0c;做好舆情工作&#xff0c;以促进社会长治久安。微博作为有着大量活跃用户…

13 Igress,集群进出流量的总管

文章目录1. 前言2. 为什么要有 Ingress?2.1 Service 的缺点2.2 (Ingress)怎么解决Service 的缺点&#xff1f;3. 为什么要有 Ingress Controller 和 IngressClass?3.1 为什么要有 Ingress Controller&#xff1f;3.1.1 Ingress Controller3.1.1 Ingress Controller 常见公司3…

深入学习Linux内核(二)体系结构简析

Linux内核体系结构简析 图1 Linux系统层次结构 最上面是用户&#xff08;或应用程序&#xff09;空间。这是用户应用程序执行的地方。用户空间之下是内核空间&#xff0c;Linux 内核正是位于这里。GNU C Library &#xff08;glibc&#xff09;也在这里。它提供了连接内核的系…

Jitter

1、Jitter定义 定义1&#xff08;SONET规范&#xff09; 抖动可以定义为数字信号在重要时点上偏离理想时间位置的短期变化。 Long term jitter 测量由参考点滞后相当数量的Cycle&#xff08;500~1000&#xff09;后时钟的抖动值。该抖动参数也是时钟稳定性的一个重要指标&a…

如何清除电脑缓存?让电脑运行速度提升的有效方法

随着时间的流逝&#xff0c;电脑保存太多缓存文件&#xff0c;电脑的运行速度会越来越慢&#xff0c;甚至无法启动。当出现这种情况时&#xff0c;我们就需要对电脑进行清理。那么如何清除电脑缓存&#xff1f;今天就来给大家分享几个方法&#xff0c;让你的电脑运行速度快起来…

Spark SQL 与 Hive 的小文件调优

文章目录小文件危害表的缓存shuffle 分区数调整Spark SQL 客户端设置合并Hive 客户端处理小文件合并小文件危害 小文件会造成 nn 处理压力变大&#xff0c;大大降低了读取性能&#xff0c;整个 HDFS 文件系统访问缓慢&#xff0c;大量的小文件还会导致 nn 内存溢出&#xff0c…

DataX使用、同步MySQL数据到HDFS案例

文章目录4. DataX使用4.1 DataX使用概述4.1.1 DataX任务提交命令4.1.2 DataX配置文件格式4.2 同步MySQL数据到HDFS案例4.2.1 MySQLReader之TableMode4.2.1.1 编写配置文件4.2.1.1.1 创建配置文件base_province.json4.2.1.1.2 配置文件内容如下4.2.1.2 配置文件说明4.2.1.2.1 Re…

jdk 11 自带的HttpClient

jdk 11 自带的HttpClient 文章目录jdk 11 自带的HttpClient步骤1&#xff1a;创建jdk HttpClient 对象步骤2&#xff1a;创建请求步骤3&#xff1a;使用client.sent()发送同步请求步骤4&#xff1a;处理响应下面是发送post请求&#xff0c;请求类型是json&#xff0c;使用clien…