Flink - souce算子

news2025/1/6 9:36:46

水善利万物而不争,处众人之所恶,故几于道💦

目录

  1. 从Java的集合中读取数据
  2. 从本地文件中读取数据
  3. 从HDFS中读取数据
  4. 从Socket中读取数据
  5. 从Kafka中读取数据
  6. 自定义Source

官方文档 - Flink1.13

在这里插入图片描述


1. 从Java的集合中读取数据

fromCollection(waterSensors)

public static void main(String[] args) {
    Configuration conf = new Configuration();
    conf.setInteger("rest.port",1000);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
    env.setParallelism(1);

    List<WaterSensor> waterSensors = Arrays.asList(
            new WaterSensor("ws_001", 1577844001L, 45),
            new WaterSensor("ws_002", 1577844015L, 43),
            new WaterSensor("ws_003", 1577844020L, 42));
    
    env
            .fromCollection(waterSensors)
            .print();

    try {
        env.execute();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

运行结果:
在这里插入图片描述

2. 从本地文件中读取数据

readTextFile(“input/words.txt”),支持相对路径和绝对路径

public static void main(String[] args) {
    Configuration conf = new Configuration();
    conf.setInteger("rest.port",1000);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
    env.setParallelism(1);

    env.readTextFile("input/words.txt").print();

    try {
        env.execute();
    } catch (Exception e) {
        e.printStackTrace();
    }

}

运行结果:
在这里插入图片描述

3. 从HDFS中读取数据

readTextFile(“hdfs://hadoop101:8020/flink/data/words.txt”)

要先在pom文件中添加hadoop-client依赖:

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>3.1.3</version>
</dependency>
public static void main(String[] args) {
    Configuration conf = new Configuration();
    conf.setInteger("rest.port",1000);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
    env.setParallelism(1);

    env.readTextFile("hdfs://hadoop101:8020/flink/data/words.txt").print();
    
    try {
        env.execute();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

运行结果:
在这里插入图片描述

4. 从Socket中读取数据

socketTextStream(“hadoop101”,9999),这个输入源不支持多个并行度。

public static void main(String[] args) {
    Configuration conf = new Configuration();
    conf.setInteger("rest.port",1000);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
    env.setParallelism(1);

    //从端口中读数据,  windows中 nc -lp 9999     Linux nc -lk 9999
    env.socketTextStream("hadoop101",9999).print();

    try {
        env.execute();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

运行结果:
在这里插入图片描述

5. 从Kafka中读取数据

addSource(new FlinkKafkaConsumer<>(“flink_source_kafka”,new SimpleStringSchema(),properties))

第一个参数是topic,

第二个参数是序列化器,序列化器就是在Kafka和flink之间转换数据 - 官方注释:The de-/serializer used to convert between Kafka’s byte messages and Flink’s objects.(反-序列化程序用于在Kafka的字节消息和Flink的对象之间进行转换。)

第三个参数是Kafka的配置。

public static void main(String[] args) {
    Configuration conf = new Configuration();
    conf.setInteger("rest.port",1000);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
    env.setParallelism(1);

    Properties properties = new Properties();
    // 设置集群地址
    properties.setProperty("bootstrap.servers", "hadoop101:9092,hadoop102:9092,hadoop103:9092");
    // 设置所属消费者组
    properties.setProperty("group.id", "flink_consumer_group");
    env.addSource(new FlinkKafkaConsumer<>("flink_source_kafka",new SimpleStringSchema(),properties)).print();

    try {
        env.execute();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

运行结果:
在这里插入图片描述

6. 自定义Source

addSource(new XXXX())

  大多数情况下,前面的数据源已经能够满足需要,但是难免会存在特殊情况的场合,所以flink也提供了能自定义数据源的方式.

public class Flink06_myDefDataSource {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port",1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(1);

        env.addSource(new RandomWatersensor()).print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

  自定义数据源需要定义一个类,然后实现SourceFunction接口,然后实现其中的两个方法,runcancel,run方法包含具体读数据的逻辑,当调用cancel方法的时候应该可以让run方法中的读数据逻辑停止

public class RandomWatersensor implements SourceFunction<WaterSensor> {
    private Boolean running = true;

    @Override
    public void run(SourceContext<WaterSensor> sourceContext) throws Exception {
        Random random = new Random();
        while (running){
            sourceContext.collect(new WaterSensor(
                    "sensor" + random.nextInt(50),
                    Calendar.getInstance().getTimeInMillis(),
                    random.nextInt(100)
            ));
            Thread.sleep(1000);
        }
    }

    /**
     * 大多数的source在run方法内部都会有一个while循环,
     * 当调用这个方法的时候, 应该可以让run方法中的while循环结束
     */
    @Override
    public void cancel() {
        running = false;
    }

}

运行结果:
在这里插入图片描述


demo2 - 自定义从socket中读取数据
public class Flink04_Source_Custom {
    public static void main(String[] args) throws Exception {


        // 1. 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env
          .addSource(new MySource("hadoop102", 9999))
          .print();

        env.execute();
    }

    public static class MySource implements SourceFunction<WaterSensor> {
        private String host;
        private int port;
        private volatile boolean isRunning = true;
        private Socket socket;

        public MySource(String host, int port) {
            this.host = host;
            this.port = port;
        }


        @Override
        public void run(SourceContext<WaterSensor> ctx) throws Exception {
            // 实现一个从socket读取数据的source
            socket = new Socket(host, port);
            BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
            String line = null;
            while (isRunning && (line = reader.readLine()) != null) {
                String[] split = line.split(",");
                ctx.collect(new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2])));
            }
        }

        /**
         * 大多数的source在run方法内部都会有一个while循环,
         * 当调用这个方法的时候, 应该可以让run方法中的while循环结束
         */

        @Override
        public void cancel() {
            isRunning = false;
            try {
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
/*
sensor_1,1607527992000,20
sensor_1,1607527993000,40
sensor_1,1607527994000,50
 */

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/812182.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

二叉树(C语言)

文章目录 1.树1.1概念1.2相关定义1.3 表示&#xff08;左孩子右兄弟&#xff09; 2.二叉树2.1概念2.2特殊的二叉树1. 满二叉树&#xff1a;2. 完全二叉树&#xff1a; 2.3二叉树的性质2.4练习 3.二叉树的存储结构1. 顺序存储2. 链式存储 4.完全二叉树的代码实现4.1堆的介绍1.堆…

ssm德宏贸易项目java人资企业办公jsp源代码mysql

本项目为前几天收费帮学妹做的一个项目&#xff0c;Java EE JSP项目&#xff0c;在工作环境中基本使用不到&#xff0c;但是很多学校把这个当作编程入门的项目来做&#xff0c;故分享出本项目供初学者参考。 一、项目描述 ssm德宏贸易项目 系统有1权限&#xff1a;管理员 二…

接口自动化测试平台

下载了大神的EasyTest项目demo修改了下<https://testerhome.com/topics/12648 原地址>。也有看另一位大神的HttpRunnerManager<https://github.com/HttpRunner/HttpRunnerManager 原地址>&#xff0c;由于水平有限&#xff0c;感觉有点复杂~~~ 【整整200集】超超超…

查询结果元数据-MetaData对象、数据库工具类的封装、通过反射实现数据查询的封装

六、查询结果元数据-MetaData对象 七、数据库工具类的封装 1、PropertieUtil类 2、DbUtil类 3、DBHepler类 查询&#xff1a; 4、TestDb测试类&#xff1a; 更新&#xff1a; 1&#xff09;插入&#xff1a; 2&#xff09;修改&#xff1a; 3&#xff09;删除&#xff1a; 查…

2024考研408-计算机网络 第二章-物理层学习笔记

文章目录 前言一、通信基础1.1、物理层基本概念1.1.1、认识物理层1.1.2、认识物理层的四种接口特性 1.2、数据通信基础知识1.2.1、典型的数据通信模型及相关术语1.2.2、数据通信相关术语1.2.3、设计数据通信系统要考虑的三个问题&#xff1a;问题1&#xff1a;采用单工通信/半双…

通讯录的实现(超详细)——C语言(进阶)

目录 一、创建联系人信息&#xff08;结构体&#xff09; 二、创建通讯录&#xff08;结构体&#xff09; 三、define定义常量 四、打印通讯录菜单 五、枚举菜单选项 六、初始化通讯录 七、实现通讯的的功能 7.1 增加加联系人 7.2 显示所有联系人的信息 ​7.3 单独查…

【自动化运维】Ansible常见模块的运用

目录 一、Ansible简介二、Ansible安装部署2.1环境准备 三、ansible 命令行模块3.1&#xff0e;command 模块3.2&#xff0e;shell 模块3.3&#xff0e;cron 模块3.4&#xff0e;user 模块3.5&#xff0e;group 模块3.6&#xff0e;copy 模块3.7&#xff0e;file 模块8&#xff…

C++之观察者模式(发布-订阅)

目录 模式简介 介绍 优点 缺点 代码实现 场景说明 实现代码 运行结果 模式简介 观察者模式&#xff08;Observer Pattern&#xff09;&#xff0c;也叫我们熟知的发布-订阅模式。 它是一种行为型模式。 介绍 观察者模式主要关注的是对象的一对多的关系&#xff0c; …

4-3 Working with time series

本文所用数据下载 Data from a Washington, D.C., bike-sharing system reporting the hourly count of rental bikes in 2011–2012 in the Capital Bikeshare system, along with weather and seasonal information. Our goal will be to take a flat, 2D dataset and trans…

搭建网站 --- 快速WordPress个人博客并内网穿透发布到互联网

文章目录 快速WordPress个人博客并内网穿透发布到互联网 快速WordPress个人博客并内网穿透发布到互联网 我们能够通过cpolar完整的搭建起一个属于自己的网站&#xff0c;并且通过cpolar建立的数据隧道&#xff0c;从而让我们存放在本地电脑上的网站&#xff0c;能够为公众互联…

JS——输入输出语法数组的操作

JavaScript输入输出语法 目标&#xff1a;能写出常见的JavaScript输入输出语法 输出语法 语法1&#xff1a; document.write(要输出的内容)作用&#xff1a; 向body内输出内容 注意&#xff1a; 如果输出的内容写的是标签&#xff0c;也会被解析成网页元素 语法2&#xff1a…

2023大同首届信息技术产业峰会举行,共话数字经济新未来

7月28日&#xff0c;“聚势而强共领信创”2023大同首届信息技术产业峰会圆满举行。本次峰会由中共大同市委、大同市人民政府主办&#xff0c;中国高科技产业化研究会国际交流合作中心、山西省信创协会协办&#xff0c;中共大同市云冈区委、大同市云冈区人民政府、诚迈科技&…

一文深入了解Cmk

目录 一、Cmk&#xff08;设备能力指数&#xff09;介绍&#xff1a;二、Cmk&#xff08;设备能力指数&#xff09; 概念&#xff1a;三、Cmk的应用时机&#xff1a;四、Cmk前期准备和要求&#xff1a;五、Cmk测试要求&#xff1a;六、CMK计算公式&#xff1a;七、Cmk实际操作&…

机器学习的关键词和算法总结

随着全球各行业的数据治理、数字化转型智能化辅助的引入发展&#xff0c;机器学习&#xff08;包括深度学习&#xff09;在逐步深入到各行各业&#xff0c;所以&#xff0c;有必要对机器学习的常见术语&#xff0c;经典算法及应用场景进行一次总结&#xff0c;其实机器学习兴起…

Vue2 第九节 过滤器

&#xff08;1&#xff09;定义&#xff1a;对要显示的数据进行特定格式化后再显示 &#xff08;2&#xff09;语法&#xff1a; ① 注册过滤器 1&#xff09;Vue.filter(name, callback) 全局过滤器 2&#xff09; new Vue({filters:{}}) 局部过滤器 ② 使用过滤器 1&…

SQL注入漏洞及解决(PreparedStatement对象)

四、SQL注入漏洞 加个or true都能查出来&#xff0c;很危险&#xff01;&#xff01;&#xff01; 会报错&#xff0c;因为代码中也拼了个引号 我们使用#将后面的内容注释起来&#xff0c;前面还是条完整的sql&#xff0c;还是能查到 使用—注释&#xff0c;代码可能加trim()&…

编译运行miniob最小数据库系统

minibo是一个用于教学的小型数据库系统&#xff0c;麻雀虽小五脏俱全&#xff0c;该项目包含了数据库的核心内容&#xff0c;并且代码量小&#xff0c;适合新手学习&#xff0c;最近由于需要学习c/cpp&#xff0c;因此打算从这个项目入手&#xff0c;本文就介绍编译运行miniob的…

《零基础入门学习Python》第072讲:GUI的终极选择:Tkinter9

这节课我们接着来讲解 Canvas 组件&#xff0c;既然 Cnavas 是画布的意思&#xff0c;那我们能不能让这个组件来设计一个画板呢&#xff1f;像Windows 自带的画图程序&#xff0c;我们的用户可以在上面随心所欲的绘制自己想要的图画&#xff0c;大家仔细想想&#xff0c;其实画…

美团小图币代挂教程

登陆美团官网获取对应的cookie 美团官网&#xff0c;点击右上角登陆对应账号。登陆成功后使用f12来获取 cookie 此时如果没有任何数据&#xff0c;点击网页刷新。找到如下的网络请求 赋值cookie项的全部内容&#xff0c;此时已经获取到对应账号的cookie 使用cookie登陆代挂…

NODEJS笔记

全局对象 global/window console.log/info/warn/error/time/timeEnd process.arch/platform/version/env/kill/pid/nextTick Buffer.alloc(5,abcde) String/toString setTimeout/clearTimeout setInterval/clearInterval setImmediate/clearImmediate process.nextTi…