大数据学习之Flink算子、了解(Source)源算子(基础篇二)

news2025/1/17 23:17:22

Source源算子(基础篇二)


目录

Source源算子(基础篇二)

二、源算子(source)

1. 准备工作

2.从集合中读取数据

可以使用代码中的fromCollection()方法直接读取列表

也可以使用代码中的fromElements()方法直接列出数据获取

3. 从文件中读取数据

说明:

4. 从Socket读取数据

(1)编写StreamWordCount

(2)在 Linux 环境的主机bigdata1 上,执行下列命令,发送数据进行测试:

(3)启动 StreamWordCount 程序

(4)从 bigdata1 发送数据:

(5)看控制台的输出结果

5.从Kafka读取数据

6.自定义源算子(source)

7.Flink支持的数据类型


二、源算子(source)

Flink 可以从各种来源获取数据,然后构建 DataStream 进行转换处理。一般将数据的输入 来源称为数据源,而读取数据的算子就是源算子(Source)。所以,Source 就是我们整个处理 程序的输入端。

Flink 代码中通用的添加 Source 的方式,是调用执行环境的 addSource()方法:

//通过调用 addSource()方法可以获取 DataStream 对象
val stream = env.addSource(...)

方法传入一个对象参数,需要实现 SourceFunction 接口,返回一个 DataStream。

1. 准备工作

case class Event(user: String, url: String, timestamp: Long)

2.从集合中读取数据

  • 最简单的读取数据的方式,就是在代码中直接创建一个集合,然后调用执行环境的 fromCollection 方法进行读取。
  • 这相当于将数据临时存储到内存中,形成特殊的数据结构后, 作为数据源使用,一般用于测试。
import org.apache.flink.streaming.api.scala._

case class Event(user: String, url: String, timestamp: Long)

object SourceCollection {
  def main(args: Array[String]): Unit = {
    //获取流执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //设置并行度(并行任务的数量)为1
    env.setParallelism(1)
    // 创建包含点击事件的列表
    // 点击操作中包含两个事件
    val clicks = List(Event("Mary", "/.home", 1000L), Event("Bob", "/.cart", 2000L))
    //将列表作为流输出
    //把clicks作为数据流
    val stream = env.fromCollection(clicks)
    //fromElements从给定的元素集合中创建一个DataStream
    val stream1 = env.fromElements(
      Event("zhangsan","/.opt",1000L),
      Event("lisi","/.opt",2000L)
    )
    stream.print("stream")
    stream1.print("stream1")
    env.execute()
  }
}

可以使用代码中的fromCollection()方法直接读取列表

也可以使用代码中的fromElements()方法直接列出数据获取

3. 从文件中读取数据

真正的实际应用中,自然不会直接将数据写在代码中。通常情况下,我们会从存储介质中 获取数据,一个比较常见的方式就是读取日志文件。这也是批处理中最常见的读取方式。

val stream = env.readTextFile("input/words.txt")
说明:
  • 参数可以是文件,可以是目录

  • 可以是绝对路径,也可以是相对路径

  • 相对路径是从系统属性 user.dir 获取路径:在 IDEA 下是 project 的根目录, standalone 模式下是集群节点根目录;

    • 系统属性 user.dir:这是一个Java系统属性,它表示用户当前的工作目录。在很多应用中,它通常被用作参考路径。

    • IDEA下是project的根目录:当你在IDEA中打开一个项目时,项目的根目录通常是IDEA的工作目录。相对路径就是基于这个根目录来确定的。

    • standalone模式下是集群节点根目录:如Hadoop分布式计算系统中的独立模式(standalone mode)。在这种模式下,路径可能是相对于集群节点的根目录。

  • 也可以从 HDFS 目录下读取, 使用路径 hdfs://...

    • 前提要在pom文件中添加hadoop相关依赖

4. 从Socket读取数据

不论从集合还是文件,我们读取的其实都是有界数据。在流处理的场景中,数据往往是无 界的。一个简单的例子,就是我们之前用到的读取 socket 文本流。这种方式由于吞吐量小、 稳定性较差,一般也是用于测试。

//通过主机名和端口号读取socket文本流
    val linDs = env.socketTextStream("bigdata1",7777)

具体实现案例:

(1)编写StreamWordCount

import org.apache.flink.streaming.api.scala._

object StreamWordCount {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //通过主机名和端口号读取socket文本流
    val linDs = env.socketTextStream("bigdata1",7777)
    //进行转换计算
    val result = linDs
      .flatMap(data => data.split(" ")) //用空格切分字符串
      .map((_,1)) //切分后的字符串转换为一个元组
      .keyBy(_._1) //使用元组的第一个字段进行分组
      .sum(1) //分组后的数据的第二个字段进行累加
    //打印计算结果
    result.print()
    env.execute()
  }
}

(2)在 Linux 环境的主机bigdata1 上,执行下列命令,发送数据进行测试:

$ nc -lk 7777

(3)启动 StreamWordCount 程序

我们会发现程序启动之后没有任何输出、也不会退出。这是正常的——因为 Flink 的流处 理是事件驱动的,当前程序会一直处于监听状态,只有接收到数据才会执行任务、输出统计结果。

(4)从 bigdata1 发送数据:

hello flink
hello world
hello scala

(5)看控制台的输出结果

5.从Kafka读取数据

Kafka 作为分布式消息传输队列,是一个高吞吐、易于扩展的消息系统。

而消息队列的传输方式,恰恰和流处理是完全一致的。

所以可以说 Kafka 和 Flink 天生一对,是当前处理流式数据的双子星。

在如今的实时流处理应用中,由 Kafka 进行数据的收集和传输,Flink 进行分析计算,这样的架构已经成为众多企业的首选

调用 env.addSource(),传入 FlinkKafkaConsumer 的对象实例就可以了。

创建 FlinkKafkaConsumer 时需要传入三个参数:

  • 第一个参数 topic,定义了从哪些主题中读取数据。可以是一个 topic,也可以是 topic 列表,还可以是匹配所有想要读取的 topic 的正则表达式。当从多个 topic 中读取数据 时,Kafka 连接器将会处理所有 topic 的分区,将这些分区的数据放到一条数据流中 去。
  • 第二个参数是一个 DeserializationSchema 或者 KeyedDeserializationSchema。Kafka 消 息被存储为原始的字节数据,所以需要反序列化成 Java 或者 Scala 对象。上面代码中 53 使用的 SimpleStringSchema,是一个内置的 DeserializationSchema,它只是将字节数 组简单地反序列化成字符串。DeserializationSchema 和 KeyedDeserializationSchema 是 公共接口,所以我们也可以自定义反序列化逻辑。
  • 第三个参数是一个 Properties 对象,设置了 Kafka 客户端的一些属性。
更新中...

6.自定义源算子(source)

接下来我们创建一个自定义的数据源,实现 SourceFunction 接口。主要重写两个关键方法: run()和 cancel()。

  • run()方法:使用运行时上下文对象(SourceContext)向下游发送数据;
  • cancel()方法:通过标识位控制退出循环,来达到中断数据源的效果。

7.Flink支持的数据类型

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1413580.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

用户密码网络传输、保存方案分析

大华 1、大华19年的IPC,登录认证接口有两次,第一次请求算法所需数据,第二次传输摘要值,看样子是私有算法。 2、添加用户传输用户密码等敏感数据时,使用"RPAC-256"算法,应该是大华内部的私有算法。…

【从零到一】跑通CATR(二):在并行超算云上使用Cifar-10进行测试

从零到一配环境篇 由于今年要展开大量的编程工作,实验室在用的云计算平台是并行超算云,因此打算在寒假期间先熟悉一下超算云的环境,并从配套的文档和网上的教程开始,从零到一先跑通一个用于音视频分割的模型CATR。 以blog的形式…

docker-compose部署单机ES+Kibana

记录部署的操作步骤 准备工作编写docker-compose.yml启动服务验证部署结果 本次elasticsearch和kibana版本为8.2.2 使用环境:centos7.9 本次记录还包括:安装elasticsearch中文分词插件和拼音分词插件 准备工作 1、创建目录和填写配置 mkdir /home/es/s…

Vue3中的ref和shallowRef、reactive和shallowReactive

一:ref、reactive简介 ref和reactive是Vue3中定义响应式数据的一种方式。ref通常用来定义基础类型数据。reactive通常用来定义复杂类型数据。 二、shallowRef、shallowReactive简介 shallowRef和shallowReactive是Vue3中定义浅层次响应式数据的方式 三、Api使用对比…

【寒假每日一题·2024】AcWing 5307. 小苹果(补)

文章目录 一、题目1、原题链接2、题目描述 二、解题报告1、思路分析2、时间复杂度3、代码详解 三、知识风暴 一、题目 1、原题链接 5307. 小苹果 2、题目描述 二、解题报告 1、思路分析 思路参考y总:y总讲解视频 (1)根据题目可以分析出&…

中小型企业机房设计部署方案

我对接参与过至少十几个分公司和总部的机房设计,结合十几年实际工作管理经验,归纳设计了以下这个机房方案,这个机房最大化利用了空间的同时,最大化设计了各方面的冗余。 机房包含了UPS隔离,噪音隔离,功率冗…

SpringBoot之分页查询的使用

背景 在业务中我们在前端总是需要展示数据,将后端得到的数据进行分页处理,通过pagehelper实现动态的分页查询,将查询页数和分页数通过前端发送到后端,后端使用pagehelper,底层是封装threadlocal得到页数和分页数并动态…

2. MySQL 多实例

重点: MySQL 的 三种安装方式:包安装,二进制安装,源码编译安装。 MySQL 的 基本使用 MySQL 多实例 DDLcreate alter drop DML insert update delete DQL select 2.5)通用 二进制格式安装 MySQL 2.5.1&#xff…

深入理解ZooKeeper分布式锁

第1章:引言 分布式系统,简单来说,就是由多台计算机通过网络相连,共同完成任务的系统。想象一下,咱们平时上网浏览网页、看视频,背后其实都是一大堆服务器在协同工作。这些服务器之间需要协调一致&#xff…

拥有大规模犯罪联盟链的网络攻击中心

VexTrio 是一个网络犯罪集团,其历史至少可以追溯到 2017 年,该集团涉嫌利用复杂的字典域生成算法 (DDGA) 进行邪恶活动。 他们的恶意活动包括诈骗、风险软件、间谍软件、广告软件、隐匿垃圾程序 (PUP) 和露骨内容,其中 2022 年发生的一次引…

【广度优先搜索】【拓扑排序】【C++算法】913. 猫和老鼠

作者推荐 【动态规划】【map】【C算法】1289. 下降路径最小和 II 本文涉及知识点 广度优先搜索 拓扑排序 逆推 LeetCode913. 猫和老鼠 两位玩家分别扮演猫和老鼠,在一张 无向 图上进行游戏,两人轮流行动。 图的形式是:graph[a] 是一个列…

067:Vue2 + vite 开发环境的搭建(含源文件包,运行即可)

第067个 查看专栏目录: VUE 专栏目标 在vue和element UI联合技术栈的操控下,本专栏提供行之有效的源代码示例和信息点介绍,做到灵活运用。 提供vue2的一些基本操作:安装、引用,模板使用,computed,watch&am…

【机组】单元模块实验的综合调试与驻机键盘和液晶显示器的使用方式

​🌈个人主页:Sarapines Programmer🔥 系列专栏:《机组 | 模块单元实验》⏰诗赋清音:云生高巅梦远游, 星光点缀碧海愁。 山川深邃情难晤, 剑气凌云志自修。 目录 1. 综合实验的调试 1.1 实验…

YOLOv8改进 | Conv篇 | 结合Dual思想利用HetConv创新一种全新轻量化结构CSPHet(参数量下降70W)

一、本文介绍 本文给大家带来的改进机制是我结合Dual的思想利用HetConv提出一种全新的结构CSPHet,我们将其用于替换我们的C2f结构,可以将参数降低越75W,GFLOPs降低至6.6GFLOPs,同时本文结构为我独家创新,全网无第二份,非常适合用于发表论文,该结构非常灵活,利用Dual卷…

调用阿里通义千问大语言模型API-小白新手教程-python

阿里大语言模型通义千问API使用新手教程 最近需要用到大模型,了解到目前国产大模型中,阿里的通义千问有比较详细的SDK文档可进行二次开发,目前通义千问的API文档其实是可以进行精简然后学习的,也就是说,是可以通过简单的API调用在自己网页或…

【GitHub项目推荐--推荐一个开源的任务管理工具(仿X书/X钉)】【转载】

推荐一个开源的任务管理工具,该工具会提供各类文档协作功能、在线思维导图、在线流程图、项目管理、任务分发、即时 IM,文件管理等等。该开源项目使用到 Vue、Element-UI、ECharts 等技术栈。 开源地址:www.github.com/kuaifan/dootask 预览地…

ES的一些名称和概念总结

概念 先看看ElasticSearch的整体架构: 一个 ES Index 在集群模式下,有多个 Node (节点)组成。每个节点就是 ES 的Instance (实例)。每个节点上会有多个 shard (分片), P1 P2 是主分片, R1 R2…

Flink实现数据写入MySQL

先准备一个文件里面数据有: a, 1547718199, 1000000 b, 1547718200, 1000000 c, 1547718201, 1000000 d, 1547718202, 1000000 e, 1547718203, 1000000 f, 1547718204, 1000000 g, 1547718205, 1000000 h, 1547718210, 1000000 i, 1547718210, 1000000 j, 154771821…

数学建模-------误差来源以及误差分析

绝对误差:精确值-近似值; 举个例子:从A到B,应该有73千米,但是我们近似成了70千米;从C到D,应该是1373千米,我们近似成了1370千米,如果使用绝对误差,结果都是3…

Docker容器部署OpenCV,打造高效可移植的计算机视觉开发环境

推荐 海鲸AI-ChatGPT4.0国内站点:https://www.atalk-ai.com 前言 在计算机视觉领域,快速部署和测试算法是研究和开发的关键。OpenCV作为一个强大的开源计算机视觉库,广泛应用于各种图像处理和视频分析任务。然而,配置OpenCV环境可…