Flink 常用API(1)—— 源算子

news2024/11/24 5:47:00

目录

执行环境(Execution Environment)

创建执行环境

执行模式配置

触发程序执行

源算子(Source)

从集合中读取数据

从文件中读取数据

从Socket读取数据

从Kafka读取数据***

自定义 Source(数据源)

Flink对POJO的要求

类型提示

DataStream API的基本构成:

执行环境(Execution Environment)

创建执行环境

StreamExecutionEnvironment——流处理环境

1.getExecutionEnvironment:获取当前程序运行的环境(根据当前运行的方式,自行决定该返回什么样的运行环境)

2.createLocalEnvironment(并行度):返回一个本地执行环境,可以在调用时传入一个参数,指定默认的并行度;如果不传入,则默认并行度就是本地的 CPU 核心数

StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();

3.createRemoteEnvironment:这个方法返回集群执行环境。需要在调用时指定 JobManager 的主机名和端口号,并指定要在集群中运行的 Jar 包

StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment
 .createRemoteEnvironment(
"host", // JobManager 主机名
 1234, // JobManager 进程端口号
 "path/to/jarFile.jar" // 提交给 JobManager 的 JAR 包
);

ExecutionEnvironment——批处理

API与流处理相同

执行模式配置

流处理/批处理/自动模式(将由程序根据输入数据源是否有界,来自动选择执行模式)

默认是流处理模式

设置为批处理:

命令行:bin/flink run -Dexecution.runtime-mode=BATCH

代码:env.setRuntimeMode(RuntimeExecutionMode.BATCH)

建议在命令行中配置

触发程序执行

env.execute();

Flink 是由事件驱动的,只有等到数据到来,才会触发真正的计算,这也被称为“延迟执行”或“懒执行”,所以我们需要显式地调用执行环境的 execute()方法,来触发程序执行

源算子(Source)

从集合中读取数据

fromCollection(集合对象)

ArrayList<Event> clicks = new ArrayList<>();
clicks.add(new Event("Mary","./home",1000L));
clicks.add(new Event("Bob","./cart",2000L));
//从集合中读取数据
DataStream<Event> stream = env.fromCollection(clicks);
stream.print();

fromElements(对象1.对象2.....)

//直接将元素列举出来,调用 fromElements 方法进行读取数据
DataStreamSource<Event> stream2 = env.fromElements(
 new Event("Mary", "./home", 1000L),
 new Event("Bob", "./cart", 2000L)
);

从文件中读取数据

readTextFile()

注意:使用hdfs路径的时候需要添加相关依赖:

<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-client</artifactId>
  <version>2.7.5</version>
  <scope>provided</scope>
</dependency>

从Socket读取数据

socketTextStream(hostname,port)

从Kafka读取数据***

Flink内部未提供与kafka进行连接的预实现方法,因此需要采用addSource 方式、实现一个 SourceFunction:

Flink官方提供了连接工具flink-connector-kafka,直接帮我们实现了一个消费者:FlinkKafkaConsumer,它就是用来读取 Kafka 数据的SourceFunction

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "hadoop102:9092");
properties.setProperty("group.id", "consumer-group");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "latest");

DataStreamSource<String> stream = env.addSource(new FlinkKafkaConsumer<String>(
    "clicks",
    new SimpleStringSchema(),
    properties
));

自定义 Source(数据源)

自定义数据源

//实现SourceFunction接口,能够自定义数据源;
//实现ParallelSourceFunction接口,能够设置并行度(否则并行度只能是1)
public class ClickSource implements SourceFunction<Event>, 
ParallelSourceFunction<Event> {

    // 声明一个布尔变量,作为控制数据生成的标识位
    private Boolean running = true;


    @Override
    public void run(SourceContext<Event> sourceContext) throws Exception {
        Random random = new Random(); // 在指定的数据集中随机选取数据

        String[] users = {"Mary", "Alice", "Bob", "Cary"};
        String[] urls = {"./home", "./cart", "./fav", "./prod?id=1",
                "./prod?id=2"};
        while (running) {
            sourceContext.collect(new Event(
                    users[random.nextInt(users.length)],
                    urls[random.nextInt(urls.length)],
                    Calendar.getInstance().getTimeInMillis()
            ));
            // 隔 1 秒生成一个点击事件,方便观测
            Thread.sleep(1000);
        }


    }

    @Override
    public void cancel() {
        running = false;
    }
}

run()方法:使用运行时上下文对象(SourceContext)向下游发送数据;

cancel()方法:通过标识位控制退出循环,来达到中断数据源的效果

读取自定义的数据源:

//有了自定义的 source function,调用 addSource 方法
 DataStreamSource<Event> stream = env.addSource(new ClickSource());

注意:

SourceFunction 接口定义的数据源,并行度只能设置为 1,如果数据源设置为大于 1 的并行度,则会抛出异常

ParallelSourceFunction接口可用来定义并行的数据源;

Flink对POJO的要求

在项目实践中,往往会将流处理程序中的元素类型定为 Flink 的 POJO 类型

类型提示

在Lambda 表达式中,通过自动提取系统提取的数据类型是不够精细的,所以需要显式地提供类型信息,才能使应用程序正常工作或提高其性能

也就是对应的类型提示API

通过return说明类型

简单数据类型:

泛型:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/518896.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【结构与算法】—— 游戏概率常用算法整理 | 游戏中的常见概率设计分析

&#x1f4e2;博客主页&#xff1a;肩匣与橘&#x1f4e2;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; 如有错误敬请指正&#xff01;&#x1f4e2;本文由肩匣与橘编写&#xff0c;首发于CSDN&#x1f649;&#x1f4e2;生活依旧是美好而又温柔的&#xff0c;你也是✨ …

Smartbi X 广州轻工集团,打造集团价值创造型总部

广州轻工工贸集团有限公司&#xff08;简称“广州轻工集团”&#xff09;是广州市第一家工贸合一的大型企业集团公司&#xff0c;最早起源于1950年9月成立的广州市合作事业管理局&#xff0c;逐渐演化形成广州市轻工业局。1995年10月&#xff0c;广州市轻工业局成建制改建为经济…

【C++】搜索二叉树

文章目录 &#x1f4d5; 概念&#x1f4d5; 搜索二叉树的实现框架插入节点查找节点★ 删除节点 ★ &#x1f4d5; 源代码 &#x1f4d5; 概念 二叉搜索树又称二叉排序树&#xff0c;它或者是一棵空树&#xff0c;或者是具有以下性质的二叉树&#xff1a; 若它的左子树不为空&…

5。STM32裸机开发(2)

嵌入式软件开发学习过程记录&#xff0c;本部分结合本人的学习经验撰写&#xff0c;系统描述各类基础例程的程序撰写逻辑。构建裸机开发的思维&#xff0c;为RTOS做铺垫&#xff08;本部分基于库函数版实现&#xff09;&#xff0c;如有不足之处&#xff0c;敬请批评指正。 &a…

全景描绘云原生技术图谱,首个《云原生应用引擎技术发展白皮书》发布

5月12日&#xff0c;由神州数码主办、北京经开区国家信创园、中关村云计算产业联盟协办的2023通明湖论坛-云原生分论坛在京召开。论坛期间&#xff0c;神州数码联合北京通明湖信息技术应用创新中心、中国信通院和通明智云正式发布了《云原生应用引擎技术发展白皮书》&#xff0…

干货 | 心理学人电脑选购指南来了!

Hello&#xff0c;大家好&#xff01; 这里是壹脑云科研圈&#xff0c;我是喵君姐姐&#xff5e; 当我们在选择电脑时经常会无从下手&#xff0c;不知道该如何才能选择一款既能满足我们的科研需要又具有良好性价比的电脑。 本期我们邀请到了唐仙和梦马来为我们详细解答心理学…

我的Makefile模板

OBJxxxxCFLAGS -g -Wall${OBJ}:${OBJ}.o main.o%*.o:%*.c.PHONY:clean clean:${RM} *.o ${OBJ} core xxxx → xxxx.c xxxx.h main.c 比如&#xff1a; 包含了&#xff1a;

PhotoScan拼接无人机航拍RGB照片

目录 背景 拼接步骤 1.新建并保存项目 2.添加照片 3.对齐照片 4.添加标记&#xff08;Markers&#xff09; 5.添加地面控制点 6.建立批处理任务 7.使用批处理文件进行批处理 8.导出DEM 9.导出DOM 背景 本文介绍使用地面控制点&#xff08;GCPs&#xff09;拼接​​…

Java面试知识点(全)- Java面试基础部分一

Java基础 语法基础 面向对象 封装 利用抽象数据类型将数据和基于数据的操作封装在一起&#xff0c;使其构成一个不可分割的独立实体。数据被保护在抽象数据类型的内部&#xff0c;尽可能地隐藏内部的细节&#xff0c;只保留一些对外接口使之与外部发生联系。用户无需知道对…

如何解决ChatGPT网络错误的问题,让AI对话更丝滑~

前言 在当今人工智能技术的飞速发展中&#xff0c;ChatGPT 作为一款大型语言模型备受瞩目。近期&#xff0c;其在各大社交媒体平台上的表现更是引来了一片关注之声。无论是与用户进行有趣的对话&#xff0c;还是帮助人们解决实际问题&#xff0c;ChatGPT 展现出了其强大的自然…

谷歌慌了!想发论文得审批,优先开发产品,让OpenAI没得看

来源 | 机器之心 ID | almosthuman2014 众所周知&#xff0c;谷歌就像人工智能领域的「黄埔军校」&#xff0c;自深度学习兴起后培养出了整整一代机器学习研究人员和工程师。很长一段时间里&#xff0c;谷歌就是领先 AI 技术的代名词。 人们已经习惯跟随谷歌的脚步&#xff0c…

操作符(算术操作符、移位操作符、位操作符、赋值操作符、单目操作符、关系操作符、逻辑操作符)

目录 算术操作符 移位操作符 移位规则 位操作符 交换两个整形变量的写法 赋值操作符 单目操作符 sizeof和数组的纠缠 和--运算符 多组输入的方案 关系操作符 逻辑操作符 算术操作符 -- 加法操作符&#xff08;&#xff09;&#xff1a;用于将两个值相加。 -- 减法操…

算法修炼之练气篇——练气八层

博主&#xff1a;命运之光 专栏&#xff1a;算法修炼之练气篇 前言&#xff1a;每天练习五道题&#xff0c;炼气篇大概会练习200道题左右&#xff0c;题目有C语言网上的题&#xff0c;也有洛谷上面的题&#xff0c;题目简单适合新手入门。&#xff08;代码都是命运之光自己写的…

cv2BGR转化为RGB

import cv2 import matplotlib.pyplot as plt img cv2.imread(1.png,1)#1加载彩图 0加载灰度图 img2 cv2.cvtColor(img,cv2.COLOR_BGR2RGB)#cv2读取是BGR 如果使用plt包要转换为RGB plt.subplot(1,2,1) plt.imshow(img2) plt.subplot(1,2,2) plt.imshow(img) plt.savefig(&qu…

【Java零基础入门篇】第 ⑥ 期 - 异常处理

博主&#xff1a;命运之光 专栏&#xff1a;Java零基础入门 学习目标 掌握异常的概念&#xff0c;Java中的常见异常类&#xff1b; 掌握Java中如何捕获和处理异常&#xff1b; 掌握自定义异常类及其使用&#xff1b; 目录 异常概述 异常体系 常见的异常 Java的异常处理机制…

【数学】通俗理解泰勒公式(牛顿迭代法有用到)

【数学】通俗理解泰勒公式&#xff08;牛顿迭代法有用到&#xff09; 文章目录 【数学】通俗理解泰勒公式&#xff08;牛顿迭代法有用到&#xff09;1. 介绍2. 通俗理解2.1 近似计算 3. 泰勒公式的推导4. 泰勒公式的定义5. 扩展 — 麦克劳林公式参考 1. 介绍 最近在看一些机器…

java异常的分类(常见的异常类型)

异常的分类 1. 编译时异常 在程序编译期间发生的异常&#xff0c;称为编译时异常&#xff0c;也称为受检查异常(Checked Exception) public class Person {int age;private String name;private String gender;// 想要让该类支持深拷贝&#xff0c;覆写Object类的clone方法即…

C语言—字符函数和字符串函数

字符函数和字符串函数 strlenstrcpystrcatstrcmpstrncpystrncatstrncmpstrstrstrtokstrerrorperror字符分类函数字符转换函数memcpymemmovememmcmpmemset C语言中对字符和字符串的处理很是频繁&#xff0c;但是C语言本身是没有字符串类型的&#xff0c;字符串通常放在 常量字符…

ChatGPT最强对手Claude如何无门槛使用?

Claude&#xff0c;一个冉冉升起的新星&#xff0c;由 chatgpt 团队出来的员工开发的&#xff0c;由于他们对模型的一些发展理念不同&#xff0c;单独融资创建了 Claude&#xff0c;总体来说表现可圈可点&#xff0c;但整体看可能还不如 chatgpt4.0。 ChatGPT 眼中的 Claude C…

【产品应用】一体化电机在卡盘设备中的应用

在现代工业生产中&#xff0c;自动化程度的提高和生产效率的提升对于生产设备的要求也越来越高。卡盘设备作为自动化生产线中的重要组成部分&#xff0c;其设计和制造也必须适应现代工业的需求。一体化电机在卡盘设备中的应用&#xff0c;不仅可以提高生产效率和精度&#xff0…