Flink-源算子Source(获取数据源)的使用

news2025/1/10 23:02:20

5.1 整体介绍

  1. 获取执行环境
  2. 读取数据源
  3. 定义基于数据的转换操作
  4. 定义计算结果的输出位置
  5. 触发程序执行

5.2 创建集成环境

5.2.1 获取执行环境

  1. 批处理getExecutionEnvironment
  • 提交命令行设置
bin/flink run -Dexecution.runtime-mode=BATCH ...
  • 代码
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

使用StreamExecutionEnvironment类调用getExecutionEnvironment的方法[不推荐,直接写死了]

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

设置setRuntimeMode 方法,传入 BATCH 模式

  1. 流处理
// 批处理环境
ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
// 流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

5.3 源算子

5.3.1 读取有界流

  1. 数据准备
  • 基本数据类型
import java.sql.Timestamp;

public class Event {
    public String user;
    public String url;
    public Long timestamp;

    //无参构造方法
    public Event() {
    }

    public Event(String user, String url, Long timestamp) {
        this.user = user;
        this.url = url;
        this.timestamp = timestamp;
    }

    @Override
    public String toString() {
        return "Event{" +
                "user='" + user + '\'' +
                ", url='" + url + '\'' +
                ", timestamp=" + new Timestamp(timestamp) +
                '}';
    }
}

基本数据类型有:用户名、url以及时间戳

  • 文件
Mary,./home,1000
Alice,./cart,2000
Bob,./prod?id=100,3000
Bob,./cart,4000
Bob,./home,5000
Mary,./home,6000
Bob,./cart,7000
Bob,./home,8000
Bob,./prod?id=10,9000
  1. 代码

3种了,从文件,从集合,从元素

public class SourceTest {
    public static void main(String[] args) throws Exception{
        //1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //2.从文件中读取数据(有界流)
        DataStreamSource<String> stream1 = env.readTextFile("input/clicks.txt");


        //3.从集合中读取数据
        ArrayList<Integer> nums = new ArrayList<>();
        nums.add(2);
        nums.add(5);
        DataStreamSource<Integer> numsStream = env.fromCollection(nums);

        //泛型选择event,从event读取数据
        ArrayList<Event> events = new ArrayList<>();
        events.add(new Event("Mary","./home",1000L));
        events.add(new Event("Bob","./cart",2000L));
        DataStreamSource<Event> stream2 = env.fromCollection(events);
        
        
        //4.从元素读取数据
        //不用通过数组中间装载,直接可以放到fromElement中
        DataStreamSource<Event> stream3 = env.fromElements(new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L));

        stream1.print();
        numsStream.print();
        stream2.print();
        stream3.print();
        env.execute();
        
            }
}
  1. 结果
2
5
Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:01.0}
Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:01.0}
Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:02.0}
Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:02.0}
Mary,./home,1000
Alice,./cart,2000
Bob,./prod?id=100,3000
Bob,./cart,4000
Bob,./home,5000
Mary,./home,6000
Bob,./cart,7000
Bob,./home,8000
Bob,./prod?id=10,9000

Process finished with exit code 0

5.3.2 从socket读取数据

  1. 启动hadoop2虚拟机中的nc -lk
  2. 代码
        //5.从socket文本流读取
        DataStreamSource<String> stream4 = env.socketTextStream("hadoop2", 7777);
        stream4.print();
        env.execute();
  1. 结果

在这里插入图片描述

5.3.3 读取kafka

  1. 引入连接器依赖
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
 <version>${flink.version}</version>
</dependency>
  1. 启动kafka
  • 启动zk和kadfa
[hadoop1@hadoop2 kafka]$ ./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties 
[hadoop1@hadoop2 kafka]$ ./bin/kafka-server-start.sh -daemon ./config/server.properties 
  • 启动生产者
[hadoop1@hadoop2 kafka]$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic clicks
  1. 启动程序

使用addSource方法中传入flink连接器传入的FlinkKafkaConsumer

		//6.从kafka读取数据

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","hadoop2:9092");
        properties.setProperty("group.id", "consumer-group");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");
        DataStreamSource<String> kafkaStream = env.addSource(new FlinkKafkaConsumer<String>("clicks", new SimpleStringSchema(), properties));
        kafkaStream.print();
        env.execute();
  • 结果

在这里插入图片描述

5.3.4 自定义Source

  1. 思路

自定义实现SourceFunction接口,重写两个方法run()和cancel()

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

  1. 代码
public class SourceCustomTest {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Event> customStream = env.addSource(new ClickSource());
        customStream.print();
        env.execute();

    }
}
  • 自定义的ClickSource
public class ClickSource implements SourceFunction<Event> {

    //声明一个标志位控制数据生成
    private Boolean running = true;
    @Override
    //泛型为Event
    public void run(SourceContext<Event> ctx) throws Exception {

        //随机生成数据
        Random random = new Random();
        //定义字段选取的数据集
        String[] users = {"Mary","Alice","Bob","Cary"};
        String[] urls = {"./home","./cart","./fav","./prod?id=100","/prod?id=10"};

        //一直循环生成数据
        while (running){
            String user = users[random.nextInt(users.length-1)];
            String url = users[random.nextInt(urls.length-1)];
            //系统当前事件的毫秒数
            Long timestamp = Calendar.getInstance().getTimeInMillis();
            //collect收集Event发往下游
            ctx.collect(new Event(user,url,timestamp));

            Thread.sleep(1000L);
        }
    }

    @Override
    public void cancel() {
        running =false;
    }
}
  1. 结果

在这里插入图片描述

5.3.5 自定义并行Source

  1. 分析

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

传入的还是SourceFunction,于是说如果是继承了ParallelSourceFunction的话,就可以设置并行度

  1. 代码
public class SourceCustomTest {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //DataStreamSource<Event> customStream = env.addSource(new ClickSource());
        //这边并行度改成2
        DataStreamSource<Integer> customStream = env.addSource(new ParallelCustomSource())
                .setParallelism(2);
        customStream.print();
        env.execute();
    }

    //定义一个静态类吧
    //实现自定义的并行SourceFunction
    public static class ParallelCustomSource implements ParallelSourceFunction<Integer> {

        //同样来一个标志位
        private Boolean running =true;
        private Random random = new Random();

        @Override
        public void run(SourceContext<Integer> ctx) throws Exception {
            while (running){
                ctx.collect(random.nextInt());
                Thread.sleep(1000L);
            }


        }

        @Override
        public void cancel() {
            running = false;
        }
    }
}
  1. 结果

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/11337.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

pip Command Not Found – Mac 和 Linux 错误被解决

使用Python时&#xff0c;可能需要安装和使用某些软件包。有一个命令可用于’pip‘ 使用pip&#xff0c;您可以安装、升级和卸载各种Python包。在本文中&#xff0c;您将学习如何使用它&#xff0c;以及如何处理pip错误。 如何使用 pip Pip是一个可以在Linux或Mac命令行上使用…

HTTP(http+抓包Fiddler+协议格式+请求+响应)

目录 &#x1f984;1. 了解HTTP &#x1f984;2. 抓包 &#x1f984;3. http协议格式 &#x1f432;3.1 完整的HTTP请求格式 &#x1f432;3.2 完整的HTTP响应的格式 HTTP请求 &#x1f984;4. 认识URL &#x1f984;5. http中的"方法" &#x1f432;5.1…

智能与工程学院2022级计算机朱元华

智能与工程学院 《高级语言程序设计》 小组学习任务书 第 1 次 专业年级&#xff1a; 2022级计算机 指导教师&#xff1a; 朱元华 2022-2023学年 第 1 学期 一、任务 XXX信息管理系统的需求分析和功能设计 二、分组形式 学生自由组合&#xff0c;5-8人为一组&#xff0c;根据…

Tuxera NTFS2023Mac读写ntfs磁盘工具

Tuxera Ntfs for mac2023是Mac中专用于读写外置存储的工具&#xff0c;具有强大的磁盘管理和修复功能&#xff0c;它在Mac上完全读写NTFS格式硬盘&#xff0c;快捷的访问、编辑、存储和传输文件。能够在 Mac 上读写 Windows NTFS 文件系统。Tuxera NTFS 实现在Mac OS X系统读写…

【Spring】——6、按照条件向Spring容器中注册bean

&#x1f4eb;作者简介&#xff1a;zhz小白 公众号&#xff1a;小白的Java进阶之路 专业技能&#xff1a; 1、Java基础&#xff0c;并精通多线程的开发&#xff0c;熟悉JVM原理 2、熟悉Java基础&#xff0c;并精通多线程的开发&#xff0c;熟悉JVM原理&#xff0c;具备⼀定的线…

静态时序分析简明教程(六)]时钟组与其他时钟特性

生成时钟的sdc约束方法一、写在前面1.1 快速导航链接二、时钟组2.1 引入时钟组2.2 set_clock_group2.2.1 -name2.2.2 -group clock_list2.2.3 -logically_exclusive|-physically_exclusive|-asynchronous2.2.4 -allow_path2.2.5 -comment三、其他时钟特性3.1 过渡时间3.2 偏移与…

【Linux】进程间通信——管道

目录 一、概念 二、管道函数 1.popen函数 2.pclose函数 3.文件函数 三、管道的操作 1.管道的分类 无名管道 有名管道 管道的特点 四、管道的实现 操作系统对进程之间相互保护 两个进程之间相互通信 前言&#xff1a; 进程间通信的方法/IPC机制都有哪些&#xff1a; …

求二进制中1的个数的三种方法

求二进制中的1的个数 文章目录第一种方法&#xff1a;模2除2第二种方法&#xff1a;利用操作符右移后与1第三种方法&#xff1a;该数与上比它小1的数&#xff08;最优的方法&#xff09;第一种方法&#xff1a;模2除2 首先明白如何得到一个数的十进制的每一位&#xff1f; 以1…

PHP代码审计入门-DVWA靶场CSRF篇

0x00 写在前面 从零学习php&#xff0c;最终目的实现代码审计入门&#xff0c;软件采用sublime text&#xff0c;环境使用phpstudy搭建&#xff0c;数据库是navicat&#xff0c;需要有基本的前端基础、简单的phpmysql后端基础、渗透知识和漏洞原理&#xff0c;文章跟随流沙前…

bizlog通用操作日志组件(使用篇)

引言 如上图所示&#xff0c;产品的新需求&#xff0c;需要将操作人在系统中具体编辑操作的变更内容记录下来。 按正常思路来说&#xff0c;无非就是将修改前后的对象字段逐个比较&#xff0c;再拼接为详细的操作描述记录到操作日志表中。如果是一个模块的需求&#xff0c;单独…

用HTML+CSS做一个学生抗疫感动专题网页设计作业网页

&#x1f389;精彩专栏推荐 &#x1f4ad;文末获取联系 ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 &#x1f482; 作者主页: 【主页——&#x1f680;获取更多优质源码】 &#x1f393; web前端期末大作业&#xff1a; 【&#x1f4da;毕设项目精品实战案例 (10…

springboot如何修改thymeleaf中默认的页面路径及默认的后缀名呢?

转自: springboot如何修改thymeleaf中默认的页面路径及默认的后缀名呢? 下文讲述springboot修改thymefleaf修改页面默认路径及后缀名的方法分享&#xff0c;如下所示: 实现思路:只需在配置文件中修改相应的配置即可&#xff0c;如&#xff1a;application.yaml spring:thym…

MySQL单表查询操作详解,不做CRUD程序员

在我们对数据进行操作时&#xff0c;查询无疑是至关重要的&#xff0c;查询操作灵活多变&#xff0c;我们可以根据开发的需求&#xff0c;设计高效的查询操作&#xff0c;把数据库中存储的数据展示给用户。 文章目录前言1. 基础查询1.1 基础查询语法1.2 基础查询练习2. 条件查询…

大数据路线

一、概念部分 1.1 大数据、数仓、数据湖、中台的概念 区别数仓数据湖使用场景批处理&#xff0c;BI&#xff0c;数据可视化机器学习、预测分析、数据分析Schema写入型读取型数据源类型OLTP为主的结构化数据loT&#xff0c;日志&#xff0c;各个端等结构非结构均可性价比需要快…

牛客刷题总结——Python入门08:面向对象、正则表达式

&#x1f935;‍♂️ 个人主页: 北极的三哈 个人主页 &#x1f468;‍&#x1f4bb; 作者简介&#xff1a;Python领域优质创作者。 &#x1f4d2; 系列专栏&#xff1a;《牛客题库-Python篇》 &#x1f310;推荐《牛客网》——找工作神器|笔试题库|面试经验|实习经验内推&am…

Design A Youtube

title: Notes of System Design No.05 — Design a Youtube description: ‘Design a Youtube’ date: 2022-05-14 13:45:37 tags: 系统设计 categories: 系统设计 01. Funtional Requirements 02. Non Functional Requirements 03.Assumption 04 API 05 High Level Design 上…

05 MSYS2中安装树莓派32位和64位ARM交叉编译工具

作者将狼才鲸创建日期2022-11-14 Gitee源码和工程地址&#xff1a;才鲸嵌入式 / 开源安防摄像机&#xff08;嵌入式软件&#xff09;CSDN文章地址&#xff1a;项目介绍&#xff1a;开源安防摄像机&#xff08;嵌入式软件&#xff09; 4.3 MSYS2中安装32位和64位ARM交叉编译工具…

1524_AURIX TC275存储分布_下

全部学习汇总&#xff1a; GreyZhang/g_TC275: happy hacking for TC275! (github.com) 继续前面的学习&#xff0c;这一次把这个小章节的剩余信息看完。 这一部分是外设相关的寄存器地址区间描述&#xff0c;看起来一个模块的地址空间占用基本都是256个字节。具体包括什么暂时…

Unity技术手册-UGUI零基础详细教程-Graphic Raycaster 射线检测和Canvas Group

往期文章分享点击跳转>《导航贴》- Unity手册&#xff0c;系统实战学习点击跳转>《导航贴》- Android手册&#xff0c;重温移动开发 本文约3千字&#xff0c;新手阅读需要6分钟&#xff0c;复习需要2分钟 【收藏随时查阅不再迷路】 &#x1f449;关于作者 众所周知&#…

outsystems合集系列(三)

outsystemsModeling DataDatabase Entities的介绍如何创建Database Entities如何用excel快速导入真实数据到entity?如何用excel快速创建entity并导入真实数据?Static Entities的介绍Modeling Data 这一节我将介绍在outsystems中建模数据(model data)的一些思路。注意在这里我…