[flink 实时流基础] flink 源算子

news2024/10/12 6:19:26

学习笔记

Flink可以从各种来源获取数据,然后构建DataStream进行转换处理。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。所以,source就是我们整个处理程序的输入端。
image.png


文章目录

        • 1. 从集合读
        • 2. 从文件读取
        • 3. 从 socket 读取
        • 4. 从 kafka 读取
        • 5. 从数据生成器读取数据

在Flink1.12以前,旧的添加source的方式,是调用执行环境的addSource()方法:
DataStream stream = env.addSource(…);
方法传入的参数是一个“源函数”(source function),需要实现SourceFunction接口。
从Flink1.12开始,主要使用流批统一的新Source架构:
DataStreamSource stream = env.fromSource(…)
Flink直接提供了很多预实现的接口,此外还有很多外部连接工具也帮我们实现了对应的Source,通常情况下足以应对我们的实际需求。

1. 从集合读
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 1. 从集合读
//        DataStreamSource<Integer> source = env.fromCollection(Arrays.asList(1, 2, 3));

        // 2. 直接填元素
        DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4);

        source.print();

        env.execute();
    }
2. 从文件读取
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-files</artifactId>
			<version>${flink.version}</version>
		</dependency>

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        FileSource<String> source = FileSource.forRecordStreamFormat(
            new TextLineInputFormat(),
            new Path("input/world.txt"))
            .build();

        env
            .fromSource(source, WatermarkStrategy.noWatermarks(), "fileSource")
            .print();


        env.execute();
    }
3. 从 socket 读取
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> source = env.socketTextStream("localhost", 7777);
        source.print();


        env.execute();
    }

可以使用 nc -l 7777创建一个监听链接的 tcp

4. 从 kafka 读取
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-kafka</artifactId>
			<version>${flink.version}</version>
		</dependency>
public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
            .setBootstrapServers("hadoop102:9092")
            .setTopics("topic_1")
            .setGroupId("atguigu")
            .setStartingOffsets(OffsetsInitializer.latest())
            .setValueOnlyDeserializer(new SimpleStringSchema()) 
            .build();

        DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source");

        stream.print("Kafka");

        env.execute();
    }
5. 从数据生成器读取数据
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-datagen</artifactId>
			<version>${flink.version}</version>
		</dependency>
 public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(new GeneratorFunction<Long, String>() {
            @Override
            public String map(Long value) throws Exception {
                return "Number:" + value;
            }
        }, 10, // 自动生成的数字序列
            RateLimiterStrategy.perSecond(10), // 限速策略,每秒生成10条
            Types.STRING // 返回类型
        );


        env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "datagenerator").print();


        env.execute();


    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1567442.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

js手持小风扇

文章目录 1. 演示效果2. 分析思路3. 代码实现 1. 演示效果 2. 分析思路 先编写动画&#xff0c;让风扇先转起来。使用 js 控制动画的持续时间。监听按钮的点击事件&#xff0c;在事件中修改元素的animation-duration属性。 3. 代码实现 <!DOCTYPE html> <html lang…

(表征学习论文阅读)FINITE SCALAR QUANTIZATION: VQ-VAE MADE SIMPLE

1. 前言 向量量化&#xff08;Vector Quantization&#xff09;或称为矢量量化最早在1984年由Gray提出&#xff0c;主要应用于数据压缩、检索领域&#xff0c;具体的阐述可以参考我写的另一篇关于VQ算法的文章。随着基于神经网络的离散表征学习模型的兴起&#xff0c;VQ技术也…

Cisco ACI Simulator 6.0(5h) - ACI 模拟器

Cisco ACI Simulator 6.0(5h) - ACI 模拟器 Application Centric Infrastructure (ACI) Simulator Software 请访问原文链接&#xff1a;https://sysin.org/blog/cisco-acisim-6/&#xff0c;查看最新版。原创作品&#xff0c;转载请保留出处。 作者主页&#xff1a;sysin.o…

最新梨花带雨网页音乐播放器

源码简介 最新梨花带雨网页音乐播放器二开优化修复美化版全开源版本源码下载 梨花带雨播放器基于thinkphp6开发的XPlayerHTML5网页播放器前台控制面板,支持多音乐平台音乐解析。二开内容&#xff1a;修复播放器接口问题&#xff0c;把接口本地化&#xff0c;但是集成外链播放…

AcWing刷题-约数个数

约数的个数 代码 # 计数 def f(x)->int:cnt 0i 1while i * i < x:if x % i 0:cnt 1if i * i < x:cnt 1i 1return cntn int(input()) a list(map(int,input().split())) for i in a:print(f(i))

K8s Deployment 滚动更新、金丝雀发布、自定义钩子、生命周期解析

&#x1f407;明明跟你说过&#xff1a;个人主页 &#x1f3c5;个人专栏&#xff1a;《Kubernetes航线图&#xff1a;从船长到K8s掌舵者》 &#x1f3c5; &#x1f516;行路有良友&#xff0c;便是天堂&#x1f516; 目录 一、Deployment的高级特性 1、滚动更新 2、金丝雀…

Emacs之解除comment-region绑定C-c C-c快捷键(一百三十四)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 优质专栏&#xff1a;多媒…

对一个时间序列中的每个元素按照指定精度位置四舍五入

【小白从小学Python、C、Java】 【计算机等考500强证书考研】 【Python-数据分析】 对一个时间序列中的每个元素 按照指定精度位置四舍五入 Series.dt.round() 选择题 以下代码的输出结果中正确的是? import pandas as pd ts pd.Series(pd.date_range("2024-04-04 08:…

16 RGB-LCD 彩条显示

RGB TFT-LCD 简介 TFT-LCD 的全称是 Thin Film Transistor-Liquid Crystal Display&#xff0c;即薄膜晶体管液晶显示屏&#xff0c;它显示的每个像素点都是由集成在液晶后面的薄膜晶体管独立驱动&#xff0c;因此 TFT-LCD 具有较高的响应速度以及较好的图像质量。液晶显示器是…

使用pip安装geopandas(24.4更新)

geopandas是我们用Python进行地理分析常用的库&#xff0c;在数据处理、分析、制图等场景中有着极为广泛的应用&#xff0c;但是在安装过程中会出现各种问题。​geopandas的安装方式有很多&#xff0c;今天我们选取较为简单的pip来进行geopandas的安装。 ​首先&#xff0c;我…

动规训练2

一、最小路径和 1、题目解析 就是一个人从左上往做下走&#xff0c;每次只能往右或者往下&#xff0c;求他到终点时&#xff0c;路径上数字和最小&#xff0c;返回最小值 2、算法原理 a状态表示方程 小技巧&#xff1a;经验题目要求 用一个二维数组表示&#xff0c;创建一个…

【WEEK6】 【DAY3】MySQL函数【中文版】

2024.4.3 Wednesday 目录 5.MySQL函数5.1.常用函数5.1.1.数据函数5.1.2.字符串函数5.1.2.1.CHAR_LENGTH(str)计算字符串str长度5.1.2.2.CONCAT(str1,str2,...)拼接字符串str1 str2 ...5.1.2.3.INSERT(str,pos,len,newstr)把原文str第pos位开始长度为len的字符串替换成newstr5.…

vue3数据库中存头像图片相对路径在前端用prop只能显示路径或无法显示图片只能显示alt中内容的问题的解决

不想看前情可以直接跳到头像部分代码 前情&#xff1a; 首先我们是在数据库中存图片相对路径&#xff0c;这里我们是在vue的src下的assets专门建一个文件夹img存头像图片。 然后我们如果用prop"avatar" label"头像"是只能显示图片路径的&#xff0c;即lo…

CEF的了解

(14 封私信 / 80 条消息) CEF和Electron的区别是什么&#xff1f; - 知乎 (zhihu.com) Electron面向的开发者&#xff1a;会用JavaScript,HTML,CSS&#xff0c;不会C CEF面向的开发者&#xff1a;会用JavaScript,HTML,CSS&#xff0c;会C (14 封私信 / 80 条消息) liulun - …

代码随想录Day28:回溯算法Part4

Leetcode 93. 复原IP地址 讲解前&#xff1a; 这道题其实在做完切割回文串之后&#xff0c;学会了使用切割的方法来找到字符串的possible 子串之后&#xff0c;思路就会很快找到&#xff0c;细想一下其实无非也就是对given string然后进行切割&#xff0c;只是深度是固定的因…

【数据结构与算法】二叉搜索树和平衡二叉树

二叉搜索树 左子树的结点都比当前结点小&#xff0c;右子树的结点都比当前结点大。 构造二叉搜索树&#xff1a; let arr [3, 4, 7, 5, 2]function Node(value) {this.value valuethis.left nullthis.right null }/*** 添加结点* param root 当前结点* param num 新的结…

50道Java经典面试题总结

1、那么请谈谈 AQS 框架是怎么回事儿&#xff1f; &#xff08;1&#xff09;AQS 是 AbstractQueuedSynchronizer 的缩写&#xff0c;它提供了一个 FIFO 队列&#xff0c;可以看成是一个实现同步锁的核心组件。 AQS 是一个抽象类&#xff0c;主要通过继承的方式来使用&#x…

AI绘图:Stable Diffusion WEB UI 详细操作介绍:基础篇

接上一篇《AI绘图体验&#xff1a;Stable Diffusion本地化部署详细步骤》本地部署完了SD后&#xff0c;大家肯定想知道怎么用&#xff0c;接下来补一篇Stable Diffusion WEB UI 详细操作&#xff0c;如果大家还没有完成SD的部署&#xff0c;请参考上一篇文章进行本地化的部署。…

抽象类与接口(3)(接口部分)

❤️❤️前言~&#x1f973;&#x1f389;&#x1f389;&#x1f389; hellohello~&#xff0c;大家好&#x1f495;&#x1f495;&#xff0c;这里是E绵绵呀✋✋ &#xff0c;如果觉得这篇文章还不错的话还请点赞❤️❤️收藏&#x1f49e; &#x1f49e; 关注&#x1f4a5;&…

Spring Boot:Web开发之视图模板技术的整合

Spring Boot 前言Spring Boot 整合 JSPSpring Boot 整合 FreeMarkerSpring Boot 整合 ThymeleafThymeleaf 常用语法 前言 在 Web 开发中&#xff0c;视图模板技术&#xff08;如 JSP 、FreeMarker 、Thymeleaf 等&#xff09;用于呈现动态内容到用户界面的工具。这些技术允许开…