[flink 实时流基础]源算子和转换算子

news2024/10/9 0:41:23

文章目录

    • 1. 源算子 Source
        • 1. 从集合读
        • 2. 从文件读取
        • 3. 从 socket 读取
        • 4. 从 kafka 读取
        • 5. 从数据生成器读取数据
    • 2. 转换算子
        • 基本转换算子(map/ filter/ flatMap)


1. 源算子 Source

Flink可以从各种来源获取数据,然后构建DataStream进行转换处理。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。所以,source就是我们整个处理程序的输入端。
image.png
在Flink1.12以前,旧的添加source的方式,是调用执行环境的addSource()方法:
DataStream stream = env.addSource(…);
方法传入的参数是一个“源函数”(source function),需要实现SourceFunction接口。
从Flink1.12开始,主要使用流批统一的新Source架构:
DataStreamSource stream = env.fromSource(…)
Flink直接提供了很多预实现的接口,此外还有很多外部连接工具也帮我们实现了对应的Source,通常情况下足以应对我们的实际需求。

1. 从集合读
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 1. 从集合读
//        DataStreamSource<Integer> source = env.fromCollection(Arrays.asList(1, 2, 3));

        // 2. 直接填元素
        DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4);

        source.print();

        env.execute();
    }
2. 从文件读取
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-files</artifactId>
			<version>${flink.version}</version>
		</dependency>

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        FileSource<String> source = FileSource.forRecordStreamFormat(
            new TextLineInputFormat(),
            new Path("input/world.txt"))
            .build();

        env
            .fromSource(source, WatermarkStrategy.noWatermarks(), "fileSource")
            .print();


        env.execute();
    }
3. 从 socket 读取
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> source = env.socketTextStream("localhost", 7777);
        source.print();


        env.execute();
    }

可以使用 nc -l 7777创建一个监听链接的 tcp

4. 从 kafka 读取
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-kafka</artifactId>
			<version>${flink.version}</version>
		</dependency>
public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
            .setBootstrapServers("hadoop102:9092")
            .setTopics("topic_1")
            .setGroupId("atguigu")
            .setStartingOffsets(OffsetsInitializer.latest())
            .setValueOnlyDeserializer(new SimpleStringSchema()) 
            .build();

        DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source");

        stream.print("Kafka");

        env.execute();
    }
5. 从数据生成器读取数据
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-datagen</artifactId>
			<version>${flink.version}</version>
		</dependency>
 public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(new GeneratorFunction<Long, String>() {
            @Override
            public String map(Long value) throws Exception {
                return "Number:" + value;
            }
        }, 10, // 自动生成的数字序列
            RateLimiterStrategy.perSecond(10), // 限速策略,每秒生成10条
            Types.STRING // 返回类型
        );


        env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "datagenerator").print();


        env.execute();


    }

2. 转换算子

数据源读入数据之后,我们就可以使用各种转换算子,将一个或多个DataStream转换为新的DataStream。
image.png

基本转换算子(map/ filter/ flatMap)

map是大家非常熟悉的大数据操作算子,主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个“一一映射”,消费一个元素就产出一个元素。
image.png
filter转换操作,顾名思义是对数据流执行一个过滤,通过一个布尔条件表达式设置过滤条件,对于每一个流内元素进行判断,若为true则元素正常输出,若为false则元素被过滤掉。
image.png
flatMap操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。
:::info
消费一个元素,可以产生0到多个元素。
:::
flatMap可以认为是“扁平化”(flatten)和“映射”(map)两步操作的结合,也就是先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理。
image.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1554818.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ios应用内支付

用uniapp开发iOS应用内支付 准备前端代码服务器端处理如果iOS支付遇到问题实在解决不了&#xff0c;可以联系我帮忙解决&#xff0c;前端后端都可以解决&#xff08;添加的时候一定要备注咨询iOS支付问题&#xff09; 准备前端代码 获取支付通道 (uni.getProvider) uni.getPr…

怎么更新sd-webui AUTOMATIC1111/stable-diffusion-webui ?

整个工程依靠脚本起来的&#xff1a; 可直接到stable-diffusion-webui子目录执行&#xff1a; git pull更新代码完毕后&#xff0c;删除venv的虚拟环境。 然后再次执行webui.sh&#xff0c;这样会自动重新启动stable-diffusion-webui.

Netty核心原理剖析与RPC实践21-25

Netty核心原理剖析与RPC实践21-25 21 技巧篇&#xff1a;延迟任务处理神器之时间轮 HahedWheelTimer Netty 中有很多场景依赖定时任务实现&#xff0c;比较典型的有客户端连接的超时控制、通信双方连接的心跳检测等场景。在学习 Netty Reactor 线程模型时&#xff0c;我们知道…

卸载原有的cuda,更新cuda

概述&#xff1a;看了一下自己的gpu&#xff0c;发现驱动可能装低了&#xff0c;随即尝试更新驱动&#xff0c;写下此篇 注&#xff1a;我原先是10.2的版本&#xff0c;改了之后是11.2&#xff0c;下面的图都用11.2的&#xff0c;不过不碍事 目录 第一步&#xff1a;查看现在…

SmartChart的部署以及可能遇见的报错解决方案

简介 数据可视化是一种将数据转化为图形的技术&#xff0c;可以帮助人们更好地理解和分析数据。但是&#xff0c;传统的数据可视化开发往往需要编写大量的代码&#xff0c;或者使用复杂的拖拽工具&#xff0c;不仅耗时耗力&#xff0c;而且难以实现个性化的需求。有没有一种更…

深度解析:Elasticsearch检索请求原理

在上一篇文章中&#xff0c;我们学习了 Elasticsearch 的写入流程&#xff0c;今天我们来学习一下 Elasticsearch 的读取流程&#xff0c;当一个检索请求到达 Elasticsearch 之后是如何进行检索的呢&#xff1f; 下面先说一下一个总的检索流程。 1、客户端发送请求到任意一个…

深度思考:雪花算法snowflake分布式id生成原理详解

雪花算法snowflake是一种优秀的分布式ID生成方案&#xff0c;其优点突出&#xff1a;它能生成全局唯一且递增的ID&#xff0c;确保了数据的一致性和准确性&#xff1b;同时&#xff0c;该算法灵活性强&#xff0c;可自定义各部分bit位&#xff0c;满足不同业务场景的需求&#…

13-API风格(下):RPCAPI介绍

RPC在Go项目开发中用得也非常多&#xff0c;需要我们认真掌握。 RPC介绍 根据维基百科的定义&#xff0c;RPC&#xff08;Remote Procedure Call&#xff09;&#xff0c;即远程过程调用&#xff0c;是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机…

【面经八股】搜广推方向:面试记录(十一)

【面经&八股】搜广推方向:面试记录(十一) 文章目录 【面经&八股】搜广推方向:面试记录(十一)1. 自我介绍2. 实习经历问答4. 编程题5. 反问1. 自我介绍 。。。。。。 2. 实习经历问答 就是对自己实习事情要足够的清晰,不熟的不要写在简历上!!! 其中,有个 …

电脑分辨率怎么调,电脑分辨率怎么调整

随着电脑的普及以及网络的发展&#xff0c;我们现在在工作中都离不开对电脑的使用&#xff0c;今天小编教大家设置电脑分辨率&#xff0c;现在我们先了解这个分辨率是什么?通常电脑的显示分辨率就是屏幕分辨率&#xff0c;显示屏大小固定时&#xff0c;显示分辨率越高图像越清…

【Node.js】WebSockets

概述 WebSockets是一种在浏览器和服务器之间建立持久连接的协议&#xff0c;它允许服务器主动推送数据给客户端&#xff0c;并且在客户端和服务器之间实现双向通信。 建立连接&#xff1a;客户端通过在JavaScript代码中使用WebSocket对象来建立WebSockets连接。例如&#xff1…

KUKA机器人KR FORTEC-2 ultra重型机器人介绍

额定负载在480-800kg的KR FORTEC-2 ultra重型机器人专为快速精确处理高惯量的大型部件的工艺而设计&#xff0c;双支撑连接臂使性能更强&#xff0c;并可以在紧凑的空间内保持优异性能。在工业生产中&#xff0c;工件的转动惯量越大&#xff0c;对机器人的有效载荷和臂展要求越…

安全SCDN的威胁情报库对DDOS防护有什么好处

目前网络攻击事件频频发生&#xff0c;DDoS&#xff08;分布式拒绝服务&#xff09;攻击已成为各种企业&#xff08;小到区域性小公司大到各种跨国公司&#xff09;的主要威胁&#xff0c;DDoS 攻击可能会对企业造成重大损害和破坏&#xff0c;比如对目标公司的业务造成产生不利…

探索父进程和子进程

文章目录 通过系统调用查看进程PID父进程、子进程 通过系统调用创建进程-fork初识为什么fork给父进程返回子进程的PID&#xff0c;给子进程返回0fork函数如何做到返回两个值一个变量为什么同时会有两个返回值&#xff1f;bash总结 通过系统调用查看进程PID getpid()函数可以获…

win10配置CLion2022+ubuntu20.04远程部署

背景 在博文ubunut搭建aarch64 cuda交叉编译环境记录中&#xff0c;使用的ubuntu20.04虚拟机安装eclipse来交叉编译aarch64的程序&#xff0c;然后发送到jetson板子上执行。开发一段时间后发现eclipse IDE使用起来不太便捷&#xff0c;因此&#xff0c;考虑使用CLion IDE&…

接口自动化框架搭建(四):pytest的使用

1&#xff0c;使用说明 网上资料比较多&#xff0c;我这边就简单写下 1&#xff0c;目录结构 2&#xff0c;test_1.py创建两条测试用例 def test_1():print(test1)def test_2():print(test2)3&#xff0c;在pycharm中执行 4&#xff0c;执行结果&#xff1a; 2&#xff0…

新能源汽车充电桩常见类型及充电桩站场的智能监管方案

随着新能源汽车市场的迅猛发展&#xff0c;充电桩作为支持其运行的基础设施&#xff0c;也呈现出多样化的类型。这些充电桩不仅在外形和功能上存在差异&#xff0c;更在充电速度、充电方式以及使用场景等方面展现出独特的优势。 一、充电桩类型及区别 1、慢充桩&#xff08;交…

《算法笔记》系列----质数的判断(埃氏筛法)

目录 一、朴素算法 二、埃氏筛法 1、与朴素算法对比 2、算法介绍 3、例题即代码实现 一、朴素算法 从素数的定义中可以知道&#xff0c;一个整数n要被判断为素数&#xff0c;需要判断n是否能被2.3.n- 1中的一个整除。只2&#xff0c;3..n- 1都不能整除n&#xff0c;n才能…

Coursera自然语言处理专项课程03:Natural Language Processing with Sequence Models笔记 Week02

Natural Language Processing with Sequence Models Course Certificate 本文是https://www.coursera.org/learn/sequence-models-in-nlp 这门课程的学习笔记&#xff0c;如有侵权&#xff0c;请联系删除。 文章目录 Natural Language Processing with Sequence ModelsWeek 02…

springcloud基本使用(搭建eureka服务端)

创建springbootmaven项目 next next finish创建成功 删除项目下所有文件目录&#xff0c;只保留pox.xml文件 父项目中的依赖&#xff1a; springboot依赖&#xff1a; <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-s…