Flink之Source

news2025/1/15 23:44:55

Flink 可以从各种来源获取数据,然后构建 DataStream 进行转换处理。一般将数据的输入来源称为数据源,而读取数据的算子就是源算子(Source)。所以,Source 就是我们整个处理程序的输入端。

Flink 代码中通用的添加 Source 的方式,是调用执行环境的 addSource()方法:

//通过调用 addSource()方法可以获取 DataStream 对象
val stream = env.addSource(...)

方法传入一个对象参数,需要实现 SourceFunction 接口,返回一个 DataStream。

首先先准备数据,假设数据来源是网页的埋点数据,数据格式为(用户名,网址,时间戳)的三元组,此处用case class样例类来表示数据格式。

字段名

数据类型

说明

user

String

用户名

url

String

网址

timestamp

long

时间戳

样例类代码如下:

object CC {

  //  用户浏览事件  用户名 网址 时间戳
  case class Event(user: String, url: String, timestamp: Long)

}
  • 从集合中读取数据

最简单的读取数据的方式,就是在代码中直接创建一个集合,然后调用执行环境的fromCollection 方法进行读取。这相当于将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用,一般用于测试。

  def main(args: Array[String]): Unit = {
//    获取流执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//    设置并行度为1
    env.setParallelism(1)
//    读取Event集合
    val stream: DataStream[Event] = env.fromCollection(List(
      Event("zhangsan", "index.html", 1L),
      Event("lisi", "commom.jsp", 10L),
      Event("wangwu", "baidu.com", 10L)))
    stream.print()
    env.execute()
  }
  • 从文件中读取

 //    读文本文件  有界的数据流
val stream: DataStream[String] = env.readTextFile("input/words.txt")
val sum: DataStream[(String, Int)] = stream
      .flatMap(_.split(" ")) // 按照空格切分扁平化
      .map((_, 1)) // (word,1) 二元组
      .keyBy(_._1) // 根据第一个元素聚合
      .sum(1) // 按照index 1 位置求和
sum.print()
  • 从Socket中读数据

不论从集合还是文件,我们读取的其实都是有界数据。在流处理的场景中,数据往往是无界的。一个简单的例子,就是我们之前用到的读取 socket 文本流。这种方式由于吞吐量小、稳定性较差,一般也是用于测试。

//    数据源读取socket文本流数据
    val stream: DataStream[String] = env.socketTextStream("192.168.0.30", 7777)
  • 从Kafka读数据

Kafka 作为分布式消息传输队列,是一个高吞吐、易于扩展的消息系统。而消息队列的传输方式,恰恰和流处理是完全一致的。所以可以说 Kafka 和 Flink 天生一对,是当前处理流式数据的双子星。在如今的实时流处理应用中,由 Kafka 进行数据的收集和传输,Flink 进行分析计算,这样的架构已经成为众多企业的首选。

Flink 官方提供了连接工具 flink-connector-kafka,直接帮我们实现了一个消费者FlinkKafkaConsumer,它就是用来读取 Kafka 数据的 SourceFunction。所以想要以 Kafka 作为数据源获取数据,我们只需要引入 Kafka 连接器的依赖。Flink 官方提供的是一个通用的 Kafka 连接器,它会自动跟踪最新版本的 Kafka 客户端。目前最新版本只支持 0.10.0 版本以上的 Kafka。

添加pom文件配置

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
</dependency>

然后调用 env.addSource(),传入 FlinkKafkaConsumer 的对象实例就可以了。

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.kafka.clients.consumer.ConsumerConfig

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val props = new Properties()
    props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.30:9092")
    props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink_test")

    val stream = env.addSource(new FlinkKafkaConsumer[String]("mytest", new SimpleStringSchema(), props))
    stream.print()
    env.execute()
  }

创建 FlinkKafkaConsumer 时需要传入三个参数:

  1. 第一个参数 topic,定义了从哪些主题中读取数据。可以是一个 topic,也可以是 topic列表,还可以是匹配所有想要读取的 topic 的正则表达式。当从多个 topic 中读取数据时,Kafka 连接器将会处理所有 topic 的分区,将这些分区的数据放到一条数据流中去。

  1. 第二个参数是一个 DeserializationSchema 或者 KeyedDeserializationSchema。Kafka 消息被存储为原始的字节数据,所以需要反序列化成 Java 或者 Scala 对象。上面代码中使用的 SimpleStringSchema,是一个内置的 DeserializationSchema,它只是将字节数组简单地反序列化成字符串。DeserializationSchema 和 KeyedDeserializationSchema 是公共接口,所以我们也可以自定义反序列化逻辑。

  1. 第三个参数是一个 Properties 对象,设置了 Kafka 客户端的一些属性。

  • 自定义源算子(Source)

接下来我们创建一个自定义的数据源,实现 SourceFunction 接口。主要重写两个关键方法:run()和 cancel()。

  1. run()方法:使用运行时上下文对象(SourceContext)向下游发送数据;

  1. cancel()方法:通过标识位控制退出循环,来达到中断数据源的效果。

代码如下:

package com.myproject.entity

import com.myproject.entity.CC.Event
import org.apache.flink.streaming.api.functions.source.SourceFunction
import scala.util.Random

class ClickSource extends SourceFunction[Event] {

  var running = true

  override def run(ctx: SourceFunction.SourceContext[Event]): Unit = {
    val random = new Random()
    val users = List("zhangsan", "lisi", "wangwu", "laoliu")
    val urls = List("baidu.com", "sohu.com/index.html", "sina.cn", "12306.com","https://zhuanlan.zhihu.com")
    // 用标志位作为循环判断条件,不停地发出数据
    while (running) {
      val event = Event(users(random.nextInt(users.length)), urls(random.nextInt(urls.length)), System.currentTimeMillis())
      // 调用ctx的方法向下游发送数据
      ctx.collect(event)
      // 每隔1s发送一条数据
      Thread.sleep(1000)
    }
  }

  override def cancel(): Unit = {
    running = false
  }
}

这个数据源,我们后面会频繁使用,之后的代码若涉及 ClickSource()数据源,使用上面的代码就可以了。

下面的代码我们来读取一下自定义的数据源。有了自定义的 Source,接下来只要调用addSource()就可以了:

package com.myproject.analyse

import com.myproject.entity.ClickSource
import org.apache.flink.streaming.api.scala._

object DiySourceStreaming {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val lineDS = env.addSource(new ClickSource)
    lineDS.print()

    env.execute()
  }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/402354.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【零基础入门学习Python---Python的五大数据类型之数字类型】

一.Python的五大数据类型之数字类型 在Python中,变量用于存储数据。变量名可以是任何字母、数字和下划线的组合。Python支持多种数据类型,包括数字、字符串、列表、元组和字典。这篇文章我们就来学习一下五大数据类型中的数字类型。 1.1 数字类型 Python 中的数字类型主要…

【C语言蓝桥杯每日一题】—— 单词分析

【C语言蓝桥杯每日一题】—— 单词分析&#x1f60e;前言&#x1f64c;单词分析&#x1f64c;总结撒花&#x1f49e;&#x1f60e;博客昵称&#xff1a;博客小梦 &#x1f60a;最喜欢的座右铭&#xff1a;全神贯注的上吧&#xff01;&#xff01;&#xff01; &#x1f60a;作者…

JSP电动车充电运营管理系统用myeclipse定制开发mysql数据库mvc模式java编程servlet

一、源码特点 JSP 电动车充电运营管理系统 是一套完善的系统源码&#xff0c;对理解JSP java serlvet MVC编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。 研究的基本内容是基于Web的电动车充电运营管理系统&#xf…

论文笔记 | 标准误聚类问题

关于标准误的选择&#xff0c;如是否选择稳健性标准误、是否采取聚类标准误。之前一直是困惑的&#xff0c;惯用的做法是类似主题的文献做法。所以这一次&#xff0c;借计量经济学课程之故&#xff0c;较深入学习了标准误的选择问题。 在开始之前推荐一个知乎博主。他阅读了很…

ssl/tsl 加密原理

ssl/tsl 加密原理 对称加密 对称加密&#xff1a;即加密和解密用的都是同一个秘钥&#xff0c;主要优势就是速度比非对称加密快 非对称加密 非对称加密&#xff1a; 即加密和解密用的是不同的秘钥&#xff0c;例如&#xff1a;在服务端存在一对公钥和私钥&#xff0c;服务…

JWT令牌解析及刷新令牌(十一)

写在前面&#xff1a;各位看到此博客的小伙伴&#xff0c;如有不对的地方请及时通过私信我或者评论此博客的方式指出&#xff0c;以免误人子弟。多谢&#xff01;如果我的博客对你有帮助&#xff0c;欢迎进行评论✏️✏️、点赞&#x1f44d;&#x1f44d;、收藏⭐️⭐️&#…

mmdetection3d-之(三)--FCOS3d训练waymo数据集

本内容分为两部分 1. waymo数据集转KITTI格式2. FCOS3D训练KITTI格式的waymo数据集1 waymo数据集转kitti格式 1.1 waymo数据集简介 1.1.1 waymo数据集下载 waymo数据集v1.2.0可以从这里下载。其中&#xff0c;train&#xff08;32个压缩包&#xff09;&#xff0c;test&…

零入门kubernetes网络实战-22->基于tun设备实现在用户空间可以ping通外部节点(golang版本)

《零入门kubernetes网络实战》视频专栏地址 https://www.ixigua.com/7193641905282875942 本篇文章视频地址(稍后上传) 本篇文章主要是想做一个测试&#xff1a; 实现的目的是 希望在宿主机-1上&#xff0c;在用户空间里使用ping命令发起ping请求&#xff0c;产生的icmp类型的…

从FVM上线前的测试网统计报告中看前景,Filecoin将会迎来什么变化?

FEVM将在2023/03/14主网上线&#xff01;在Calibration网络升级正式完成后&#xff0c;Filecoin V18 Hygge升级将于2023年3月14日&#xff08;π日&#xff09;正式上线&#xff01;此次升级将正式为Filecoin网络带来智能合约。基于FVM的可编程性。此次更新升级将释放数据经济的…

Spring Cloud Alibaba Sentinel 集群流量控制

为什么要进行集群流控 假设集群中有 10 台机器&#xff0c;我们给每台机器设置单机限流阈值为 10 QPS&#xff0c;理想情况下整个集群的限流阈值就为 100 QPS。不过实际情况下流量到每台机器可能会不均匀&#xff0c;会导致总量没有到的情况下某些机器就开始限流。因此仅靠单机…

因特网基础

1、因特网的概述 1-1、网络、互联网和因特网 网络&#xff08;Network&#xff09;是由若干节点&#xff08;Node&#xff09;和连接这些节点的链路&#xff08;Link&#xff09;组成。 多个网络还可以通过路由器相互连起来&#xff0c;这样就构成了一个覆盖范围更大的网络&…

【刷题笔记】--双指针--189. 轮转数组

题目&#xff1a; 思路1&#xff1a; 再设一个数组&#xff0c;通过下标的规律&#xff0c;进行更新数组。 关于这个平移的下标规律&#xff1a;%numbersize&#xff1b; 假设数组1&#xff0c;2&#xff0c;3&#xff0c;4&#xff0c;5&#xff0c;6&#xff0c;7 要整体…

MTK平台 Wireless Authentication Denied问题

这个问题一开始的现象是部分station无法连接某台AP。无线抓包后发现station和AP间进行了频繁的Auth认证,即station发送Auth包,ap回复Auth包出现 Expert: Wireless Authentication Denied (13: Requested authentication algorithm not supported)这样错误提示…

Lesson 9.2 随机森林回归器的参数

文章目录一、弱分类器的结构1. 分枝标准与特征重要性2. 调节树结构来控制过拟合二、弱分类器的数量三、弱分类器训练的数据1. 样本的随机抽样2. 特征的随机抽样3. 随机抽样的模式四、弱分类器的其他参数在开始学习之前&#xff0c;先导入我们需要的库。 import numpy as np im…

【项目精选】俄罗斯方块项目(视频+论文+源码)

点击下载源码 俄罗斯方块项目&#xff0c;基本功能包括&#xff1a;游戏主界面显示模块、方块及数据显示模块、方块移动控制模块、游戏界面颜色控制模块、游戏进度、等级控制模块等。本项目结构如下&#xff1a; &#xff08;1&#xff09;游戏主界面显示模块&#xff1a; 显示…

【密码学篇】密码行业标准汇总(GM)

【密码学篇】密码行业标准汇总&#xff08;GM&#xff09; 截止到2023年03月10日&#xff0c;共130个密码行业标准&#xff0c;适用商用密码应用与安全性评估等密码行业&#xff0c;可点击链接预览或下载标准—【蘇小沐】 文章目录【密码学篇】密码行业标准汇总&#xff08;GM…

【洛谷 P1044】[NOIP2003 普及组] 栈 题解(递归+记忆化搜索)

[NOIP2003 普及组] 栈 题目背景 栈是计算机中经典的数据结构&#xff0c;简单的说&#xff0c;栈就是限制在一端进行插入删除操作的线性表。 栈有两种最重要的操作&#xff0c;即 pop&#xff08;从栈顶弹出一个元素&#xff09;和 push&#xff08;将一个元素进栈&#xff…

常见数量关系分析

考点一相遇追及问题&#xff08;一&#xff09;直线型1. 单次相遇&#xff1a;相距两地&#xff0c;同时出发&#xff0c;相向而行。2. 单次追及&#xff1a;同时出发&#xff0c;同向而行。3. 直线型相遇追及问题公式总结&#xff08;二&#xff09;环线型环线型相遇追及问题公…

弹性存储-块存储和文件存储部分

存储通用知识 存储架构发展历程&#xff1a;直连存储-》存储网络-〉分布式存储/云存储 块存储、文件存储及对象存储使用场景 块存储、文件存储及对象存储性能对比 块存储及文件存储-块存储介绍 块存储EBS&#xff08;Elastic Block Storage&#xff09;是为了云服务器提…

CleanMyMac4.20新版本核心功能介绍

CleanMyMac4.20是Mac清理工具&#xff0c;具有很多功能。如‬删除大量不可见的缓存文件&#xff0c;可以批量删除未使用的DMG、不完整的下载以及其余的旧包。 与 CleanMyMac 3相比&#xff0c;新版本 UI设计焕然一新&#xff0c;采用了完全不同的风格。 CleanMyMac X4.20全新版…