Flink DataStream之从集合/文件读数据

news2024/11/26 11:42:30
  • 从集合中读数据方式一
package test01;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;

public class TestRead {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Integer> source = env.fromElements(2, 142, 664);

        source.print();
        env.execute();
    }
}

  • 从集合中读数据方式二
  • package test01;
    
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.connector.file.src.FileSource;
    import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
    import org.apache.flink.core.fs.Path;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.util.Arrays;
    
    public class TestRead {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<Integer> source = env.fromCollection(Arrays.asList(5,523,637));
    
            source.print();
            env.execute();
        }
    }
    
     
  • 从文件中读数据 

需要先引入依赖:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>
package test01;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;

public class TestRead {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //DataStreamSource<Integer> source = env.fromElements(2, 142, 664);

        FileSource<String> fileSource = FileSource.forRecordStreamFormat(
                new TextLineInputFormat(),
                new Path("test_input/test_word.txt")
        ).build();
        //此处参数三自定义一个suorceName即可“filesource”
        env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "filesource").print();
        env.execute();
    }
}

运行结果:

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/701229.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

牛客·轻拍牛头

b[i]:输入 a[i]:计数 c[i]:答案void solve() {cin>>n;for(int i0;i<n;i){cin>>b[i];a[b[i]];}for(int i1;i<NN;i){if(a[i]){for(int ji;j<NN;ji)c[j]a[i];}}for(int i0;i<n;i){cout<<c[b[i]]-1<<\n;} }

【深度学习】contrastive loss与triplet loss

自己总结一下&#xff0c; 三元组如果正负样本足够开&#xff0c;距离足够远&#xff0c;loss为0&#xff0c;因为模型已经学的不错了&#xff0c;不需要继续学习。 最好的负样本是&#xff0c;model预测负样本的把握不太大的。 如果负样本是很难分的&#xff0c;例如d(a,p)>…

数据结构:Mysql索引原理(通俗易懂)

目录 前言正文索引结构-数组无序数组有序数组 索引数据结构-HashHash冲突-链式寻址法Hash冲突-再哈希法Hash冲突-开放地址法 索引数据结构-树二叉树平衡二叉树红黑树红黑树的特性红黑树如何减少旋转 B树B树 Mysql的索引一级索引二级索引 总结 前言 在工作中如果经常写业务代码…

信号链噪声分析4

目录 概要 整体架构流程 技术名词解释 技术细节 5.计算信号链的噪声 噪声谱密度 重点注意事项 有源滤波器配置 小结 概要 提示&#xff1a;这里可以添加技术概要 本文介绍对高速宽带宽信号链进行噪声性能理论分析的各个步骤。尽管选择了一个特 定信号链&#xff0c;…

传统商超巨头折戟即时零售

今年的618&#xff0c;包含沃尔玛、永辉、大润发、华润、物美、麦德龙、盒马、联华、家家悦、中百、华联在内10大商超巨头都集体“失声”了。 值得注意的是&#xff0c;今年618不仅各大商超没有公布相关数据&#xff0c;且在所有即时零售平台中&#xff0c;仅有商超到家的合作…

怎么用postman进行自动化接口测试,终于学到了

​ 目录 背景描述 创建一个GET请求 在pre-request scripts构建签名 脚本写在环境变量中 postman console的用法 Collection Runner 自动化API测试 创建接口的测试用例 选择并运行自动化接口测试 测试结果 总结&#xff1a; 背景描述 有一个项目要使用postman进行接…

元宇宙虚拟人物3d建模贯穿产业链数字化转型全链条

随着5G、AI、VR/AR、区块链、云计算、脑机等技术的发展&#xff0c;给构建这个虚拟的宇宙提供了技术支撑。今年以来&#xff0c;因为疫情等原因&#xff0c;大众在虚拟空间停留的时间变得更长&#xff0c;元宇宙概念重新火热。 元宇宙虚拟人的基本特征包括&#xff1a; 沉浸式体…

Android hook、检测及对抗相关

frida——hook 内存访问断点 环境&#xff1a;app&#xff1a;arm64 python 3.10 frida 15.2.2 简单的内存访问断点代码&#xff0c;可能还有些bug&#xff0c;根据apk需要自己改&#xff0c;下文为在apk中指定的地址调用函数时内存断点才被激活&#xff0c;以下需要…

【GESP】2023年03月图形化二级 -- 快乐时光

文章目录 快乐时光1. 准备工作2. 功能实现3. 设计思路与实现&#xff08;1&#xff09;角色、舞台背景设置a. 角色设置b. 舞台背景设置 &#xff08;2&#xff09;脚本编写a. 角色&#xff1a;小猫b. 角色&#xff1a;小猴 4. 评分标准 快乐时光 1. 准备工作 &#xff08;1&am…

CSS知识点汇总(十)--移动端适配

文章目录 怎么做移动端的样式适配&#xff1f;1、方案选择2. iPhoneX 适配方案 怎么做移动端的样式适配&#xff1f; 在移动端虽然整体来说大部分浏览器内核都是 webkit&#xff0c;而且大部分都支持 css3 的所有语法。但手机屏幕尺寸不一样&#xff0c;分辨率不一样&#xff0…

jenkins共享库配置及设计

jenkins共享库做模块封装时遇到的问题总结&#xff1a; 背景描述:使用jenkins共享库对SCM subversion操作进行封装时&#xff0c;使用了Checkout插件&#xff0c;生成的检出脚本代码为 checkout([$class: SubversionSCM, additionalCredentials: [], excludedCommitMessages: …

【Dashy安装使用】本地Linux 部署 Dashy 并远程访问

文章目录 简介1. 安装Dashy2. 安装cpolar3.配置公网访问地址4. 固定域名访问 转载自cpolar极点云文章&#xff1a;本地Linux 部署 Dashy 并远程访问 简介 Dashy 是一个开源的自托管的导航页配置服务&#xff0c;具有易于使用的可视化编辑器、状态检查、小工具和主题等功能。你…

spring源码编译笔记

下载源码 地址https://github.com/spring-projects/spring-framework/tree/v5.2.9.RELEASE 查看gradle对应版本 spring-framework-5.2.9.RELEASE/gradle/wrapper/gradle-wrapper.properties # 其他配置暂未了解具体用处&#xff0c;一切默认值 distributionBaseGRADLE_USE…

vue3案例

代码 <template><div class"app-page"><divv-for"title in data.titleArr":key"title.id"class"title-box"click"onClick(title)"><span>{{ title.name }}</span><img v-if"title…

Java基础——正则表达式

1 概述 正则表达式用于匹配规定格式的字符串。 除了上面的以外&#xff0c;还有一个符号就是括号&#xff0c;括号括起来的表示一个捕获组&#xff0c;一个捕获组可以作为一个重复单位来处理。 2 使用 2.1 判断是否匹配 String自带了一个可以使用正则表达式判断字符串是…

卸载 Navicat:正版 MySQL客户端,真香!

最近看到一款数据库客户端工具&#xff0c;DataGrip&#xff0c;是大名鼎鼎的JetBrains公司出品的&#xff0c;就是那个出品Intellij IDEA的公司。DataGrip是一款数据库管理客户端工具&#xff0c;方便连接到数据库服务器&#xff0c;执行sql、创建表、创建索引以及导出数据等。…

【Python从入门到进阶】25、urllib获取快餐网站店铺数据

接上篇《24、urllib获取网站电影排行》 上一篇我们讲解了如何使用urllib的get请求抓取某某电影排行榜信息。本篇我们来讲解如何使用urllib的post请求抓取某某快餐网站店铺数据。 一、某某快餐网站介绍 1、某某快餐网站 某某快餐店网址为&#xff1a;http://www.kfc.com.cn/k…

SciencePub学术 | 国人友好类重点SCIEEI征稿中

SciencePub学术 刊源推荐&#xff1a;国人友好类重点SCIE&EI征稿中&#xff01;信息如下&#xff0c;录满为止&#xff1a; 一、期刊概况&#xff1a; 二、期刊要求 1. 论文为原创&#xff0c;未公开发表&#xff0c;初稿可提交中文版本&#xff0c;终稿必须是英文版本; …

Redis 跳表skiplist

跳跃表 在单链表中查询一个元素的时间复杂度为O(n)&#xff0c;即使该单链表是有序的&#xff0c;我们也不能通过2分的方式缩减时间复杂度。 跳跃表(skiplist)是一种有序数据结构&#xff0c;它通过在每个节点中维持多个指向其他节点的指针(注&#xff1a;可以理解为维护了多条…

【计算机网络】数据链路层之随机接入-CSMA/CA协议(无线局域网)

1.概念 2.无线局域网可否实现碰撞检验CD 3.方案 CSMA/CA 碰撞避免 4. 两种帧间间隔 IFS 为什么需要等待DIFS? 为什么需要等待SIFS? 为什么还要退避一段时间才能使用信道&#xff1f; 5.退避算法 使用退避算法的情况 退避算法 举例 6.信道预约 7.虚拟载波监听 8.题目 9.解析 …