flink的几种source来源

news2025/1/22 19:04:01

 简单的总结了flink的几种source来源,可以参考下

package com.atguigu.apitest


import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011

import scala.util.Random


/**
  * 功能:演示 flink的source来源
  *
  */

// 定义样例类,温度传感器
case class SensorReading(id:String, timestamp: Long,temmperature:Double )


object SourceTest {
  def main(args: Array[String]): Unit = {
    // 创建执行环境
    val env=StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

   // 1 从集合中读取数据
    val dataList=List(
      SensorReading("sensor_1", 1547718199, 35.8),
      SensorReading("sensor_6", 1547718201, 15.4),
      SensorReading("sensor_7", 1547718202, 6.7),
      SensorReading("sensor_10", 1547718205, 38.1)
    )

    val stream1=env.fromCollection(dataList)

    //val stream1=env.fromElements(1.0,35,"hello") // 直接创建集合的方法

    // 2  从文件中读取数据
    val inputPath="F:\\FlinkTutorial\\src\\main\\scala\\com\\atguigu\\apitest\\Sensor.txt"
    val stream2=env.readTextFile(inputPath)

    // 3 从kafka中读取数据
    val properties = new Properties()
   properties.setProperty("bootstrap.servers", "hadoop:9092")
    properties.setProperty("group.id", "consumer-group")
    // 没有调试出来,可能是因为版本的问题, 以前的kafka参数是 zookeeper   现在的kafka参数是 bootstrap.servers
    // Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
    val stream3=env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties) )
    // 4. 自定义Source
    val stream4 = env.addSource( new MySensorSource() )


    stream3.print() //  setParallelism(1)


    // 执行
    env.execute("source test")
  }
}

class MySensorSource() extends SourceFunction[SensorReading]{
  // 定义一个标识位flag, 用来表示数据源是否正常运行发出数据
  var  running:Boolean =true

  override def cancel(): Unit = {
    running=false
  }

  override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {
    // 定义一个随机数发生器
   val rand=new Random()

    // 随机生成一组(10个)传感器的初始温度: (id,temp)
    var curTemp = 1.to(10).map( i => ("sensor_" + i, rand.nextDouble() * 100) )

    // 定义无限循环,不停地产生数据,除非被cancel
    while (running) {
      // 在上次数据基础上微调,更新温度值
      curTemp=curTemp.map(
        data=>(data._1,data._2+rand.nextGaussian())
      )
      // 获取当前时间戳,加入到数据中,调用ctx.collect发出数据
     val curTime=System.currentTimeMillis()
      curTemp.foreach(
        data=>ctx.collect(SensorReading(data._1,curTime,data._2))
      )

      // 间隔 500 ms
      Thread.sleep(500)

    }



  }
}


 集合,文件以及自定义source 相对简单,重点演示kafka的对接

 kafka作为生产者进行数据的输入

 flink的数据产出

总结点:  

bootstrap.servers 属于kafka版本>= v2.2时的参数
旧版的kafka版本(< v2.2) 用的参数依然是 zookeeper node1:2181

如果kafka的版本过低,比如 v0.8.2.1 时,flink会一致等待中,最后报错

Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

建议kafka版本: v2.8.1  

直接从官网下载即可  https://kafka.apache.org/downloads

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/593649.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

涨点技巧:注意力机制---Yolov8引入Resnet_CBAM,CBAM升级版

1.计算机视觉中的注意力机制 一般来说,注意力机制通常被分为以下基本四大类: 通道注意力 Channel Attention 空间注意力机制 Spatial Attention 时间注意力机制 Temporal Attention 分支注意力机制 Branch Attention 1.1.CBAM:通道注意力和空间注意力的集成者 轻量级…

版本控制系统有哪些推荐? - 易智编译EaseEditing

以下是几个常用的版本控制系统&#xff08;Version Control System&#xff09;推荐&#xff0c;并对它们进行简单介绍&#xff1a; Git&#xff1a; Git是目前最流行的分布式版本控制系统。它具有高效、灵活和强大的功能&#xff0c;支持快速的代码提交、分支管理、合并操作…

高频面试八股文原理篇(五)索引相关

目录 索引的优缺点 MySQL索引类型 索引原理 常见索引类型 MySQL数据库要⽤B树存储索引⽽不⽤红⿊树、B树、 Hash的原因 怎么验证 MySQL 的索引是否满足需求 聚簇索引和非聚簇索引 索引的优缺点 索引的优点 可以大大加快数据的检索速度&#xff0c;这也是创建索引的最主…

蚂蚁Ant Design组件库的免费在线资源

Ant Design&#xff08;蚂蚁组件&#xff09;是蚂蚁集团体验技术部经过大量项目实践和总结&#xff0c;逐步打磨出的一个设计系统&#xff0c;内含带有 React 的 UI 库。它是为企业级产品设计而创建的。Ant Design 提供了高质量的交互界面设计组件和演示。作为 UI 设计师&#…

Spring:Spring框架结构 ②

一、结构体现的价值 1、可读性强。 2、可维护性。 3、优秀的框架均具有分而治之的思想。清晰的设计、合理的归类、模块化是走向优秀框架的基础性武器。 二、Spring框架的模块划分 1、整体轮廓 Spring框架包含的功能大约由20个小模块组成。这些模块按组可分为核心容器(Core Co…

独立、相关、正交

文章目录 【1. 独立】【2. 相关】【3.正交】【4. 相互关系】相关和独立相关和正交独立和正交独立、不相关和正交小结 【5. 参考文献】 【1. 独立】 独立&#xff1a;对于两个随机变量 y 1 y_1 y1​ 和 y 2 y_2 y2​&#xff0c;若 y 1 y_1 y1​ 的有关信息不给出 y 2 y_2 …

基于AT89C52单片机的无线温度监测设计

点击链接获取Keil源码与Project Backups仿真图&#xff1a; https://download.csdn.net/download/qq_64505944/87848530?spm1001.2014.3001.5503 源码获取 主要内容&#xff1a; 设计一个温度监测器&#xff0c;温度异常时警报器能够响起&#xff0c;设定初始温度&#xff0…

0基础学习VR全景平台篇第33章:场景功能-嵌入标尺

功能位置示意 一、本功能将用在哪里&#xff1f; 嵌入功能可对VR全景作品嵌入【图片】【视频】【文字】【标尺】四种不同类型内容&#xff1b; 本次主要带来标尺类型的介绍&#xff0c;可对VR全景作品中&#xff0c;位置信息较多的场景进行标注&#xff0c;在单场景中植入更多…

R-Meta分析与【文献计量分析、贝叶斯、机器学习等】多技术融合实践

Meta分析是针对某一科研问题&#xff0c;根据明确的搜索策略、选择筛选文献标准、采用严格的评价方法&#xff0c;对来源不同的研究成果进行收集、合并及定量统计分析的方法&#xff0c;最早出现于“循证医学”&#xff0c;现已广泛应用于农林生态&#xff0c;资源环境等方面。…

linux守护进程简单创建

1.什么是守护进程&#xff1f; 守护进程(daemon)是一类在后台运行的特殊进程&#xff0c;用于执行特定的系统任务。很多守护进程在系统引 导的时候启动&#xff0c;并且一直运行直到系统关闭。另一些只在需要的时候才启动&#xff0c;完成任务后就自动结束。 用户使守护进程独立…

平行云X火山引擎:探索XR观展的极致体验

5月20日&#xff0c;素有艺术界“奥林匹克”之称的第18届威尼斯国际建筑双年展&#xff08;以下简称“威尼斯双年展”&#xff09;中国国家馆展览正式开幕。 威尼斯双年展为当今世界规模最大、最具影响力的国际艺术盛事之一&#xff0c;中国文化和旅游部自2005年起主办中国国家…

六一儿童节海外网红营销指南:出海品牌的增长秘诀

六一儿童节作为全球范围内备受关注的节日之一&#xff0c;为孩子们提供了欢乐和庆祝的机会。对于出海品牌来说&#xff0c;利用六一儿童节进行海外网红营销不仅可以吸引年轻消费者的关注&#xff0c;还能够增加品牌的知名度和影响力。根据行业研究机构Statista的数据&#xff0…

第十三章行为型模式—模板模式

文章目录 模板模式解决的问题结构实例存在的问题适用场景 JDK源码 - InputStream 行为型模式用于描述程序在运行时复杂的流程控制&#xff0c;即描述多个类或对象之间怎样相互协作共同完成单个对象无法单独完成的任务&#xff0c;它涉及算法与对象间职责的分配。行为型模式分为…

hive如何实现oracle的connect by prior函数

Hive中如何实现层级查询 类似oracle中 connect by prior 实现的效果&#xff1f; - 知乎 大佬写的很详细&#xff0c;有兴趣自己看&#xff0c;但是存在一个问题 create table test.emp ( empno string, ename string, job string, mgr strin…

如何使用MapStruct优雅的告别get,set

我们开发过程中会遇到很多bean拷贝的过程&#xff0c;最简单粗暴得方法就是set/get方法&#xff0c;当然这也是最臃肿的方法&#xff0c;代码显得过于冗长和笨重&#xff0c;其次还有框架BeanUtils在使用反射的时候都会影响到性能。虽然我们可以进行反射信息的缓存来提高性能。…

网络安全实用篇—iptables防火墙学习总结

iptables防火墙学习总结 目录 iptables简介&#xff1a; iptables题目练习&#xff1a; 题目包含&#xff08;市赛、省赛、国赛&#xff09;覆盖所有比赛面&#xff01; iptables简介&#xff1a; Iptables是Linux系统中的一个防火墙工具&#xff0c;它可以对进出本机的…

用Photoshop软件制作法线图以及查看效果细节

这里是在windows系统下用PS2020做演示。 第一步、在Photoshop软件中打开一张图(最好是正方形&#xff0c;边长是2的n次方大小的像素&#xff0c;例如宽和高都是512像素)&#xff0c;如下图所示&#xff1a; 第二步、在菜单栏选择滤镜然后再选择3D接着再选择生成法线图&#xf…

【漏洞复现】DedeCMS存在文件包含漏洞导致后台getshell(CVE-2023-2928)

复现环境下载 https://updatenew.dedecms.com/base-v57/package/DedeCMS-V5.7.106-UTF8.zip 影响版本 DedeCMS V5.7.106 CNVD编号&#xff1a;CNVD-2023-40504 漏洞分析 漏洞文件: uploads/dede/article_allowurl_edit.php存在缺少对该文件中写入内容的任何过滤是导致该漏洞的…

程序员的新型开发工具——低代码平台

低代码的热潮至今未消停&#xff0c; 从阿里钉钉跨平台协作方式&#xff0c;再到飞书上的审批流程&#xff0c;以及目前我们接触到的表单审批、投票的模板&#xff0c;这些都是关于低代码的实现方式。 一、低代码平台概述 按维基百科的说法&#xff0c;低代码这个称呼是 Forres…

大数据时代,Python实现API调用的步骤及示例代码;

Python是一种非常流行的编程语言&#xff0c;可以用于实现各种各样的应用程序&#xff0c;其中包括通过API对各种服务进行调用。API是应用程序接口的缩写&#xff0c;它提供了一种编程接口&#xff0c;允许软件开发者使用其他服务的功能&#xff0c;包括访问数据库、发送电子邮…