快速灵敏的 Flink1

news2024/11/15 21:29:15

一、flink单机安装

1、解压
tar -zxvf ./flink-1.13.2-bin-scala_2.12.tgz -C /opt/soft/
2、改名字
mv ./flink-1.13.2/ ./flink1132
3、profile配置
#FLINK
export FLINK_HOME=/opt/soft/flink1132
export PATH=$FLINK_HOME/bin:$PATH
4、查看版本
flink --version
5、启动关闭flink
start-cluster.sh
stop-cluster.sh
6、登录网页   http://192.168.91.11:8081

二、flink开发

1、步骤

创建运行环境--> 加载数据源--> 转换--> 下沉

2、案例

(1)学习数据源加载
package nj.zb.kb23.source

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object AA {
  def main(args: Array[String]): Unit = {
    //1、创建环境变量
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //设置并行步 1
    env.setParallelism(1)
    //2、加载数据源
    val stream: DataStream[Any] = env.fromElements(1,2,3,3,4,"hello",3.1415)
    //3、下沉
    stream.print()
    env.execute("sourcetest")
  }
}
(2)样例类加载数据源
package nj.zb.kb23.source

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import scala.util.Random
//定义样例类
case class SensorReading(id:String,timestamp:Long,temperature:Double)

object AA {
  def main(args: Array[String]): Unit = {
    //1、创建环境变量
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //设置并行步 1
    env.setParallelism(1)
    //2、加载数据源
    val stream: DataStream[SensorReading] = env.fromCollection(List(
      SensorReading("sensor_1", 1698731530, 26.3),
      SensorReading("sensor_2", 1698731530, 26.5),
      SensorReading("sensor_3", 1698731531, 26.7),
      SensorReading("sensor_4", 1698731530, 26.9),
    ))
    //3、输出,又叫下沉
    stream.print()
    env.execute("sourcetest")
  }
}

(3)指定文件加载数据
package nj.zb.kb23.source

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object AA {
  def main(args: Array[String]): Unit = {
    //1、创建环境变量
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //设置并行步 1
    env.setParallelism(1)
    //2、加载数据源
    val stream: DataStream[String] = env.readTextFile("D:\\caozuo\\ideal\\flinkstu\\resources\\sensor")
    //3、输出,又叫下沉
    stream.print()
    env.execute("sourcetest")
  }
}

(4)指定端口,实时处理数据源
package nj.zb.kb23.source

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

//定义样例类
case class SensorReading(id:String,timestamp:Long,temperature:Double)

object AA {
  def main(args: Array[String]): Unit = {
    //1、创建环境变量
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //设置并行步 1
    env.setParallelism(1)
    //2、加载数据源
    //(1)真实时处理 nc -lk 7777
        val stream: DataStream[String] = env.socketTextStream("192.168.91.11",7777)
        stream.print()
    //3、转换拼接
        val stream1: DataStream[(String, Int)] = stream
          .map(x=>x.split(","))
          .flatMap(x=>x)
          .map(x=>(x,1))
        stream1.print()
    //①sum
        val value: DataStream[(String, Int)] = stream
          .map(x=>x.split(","))
          .flatMap(x=>x).map(x=>(x,1))
          .keyBy(x=>x._1)
          .sum(1)
        value.print()
    //   ⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇相等
    //②reduce
        val value: DataStream[(String, Int)] = stream
          .map(x => x.split(","))
          .flatMap(x => x).map(x => (x, 1))
          .keyBy(x => x._1)
          .reduce((x, y) => (x._1 + "#" + y._1, x._2 + y._2))
        value.print()
    //4、输出,又叫下沉
    env.execute("sourcetest")
  }
}
(5)kafka加载数据
package nj.zb.kb23.source

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.kafka.clients.consumer.ConsumerConfig

//定义样例类
case class SensorReading(id:String,timestamp:Long,temperature:Double)

object AA {
  def main(args: Array[String]): Unit = {
    //1、创建环境变量
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //设置并行步 1
    env.setParallelism(1)
    //2、加载数据源
        val prop = new Properties()
        prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.91.11:9092")
        prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"sensorgroup1")
        prop.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer")
        prop.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer")
        prop.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest")
        val stream: DataStream[String] = env.addSource(
          new FlinkKafkaConsumer[String]("sensor", new SimpleStringSchema(), prop)
        )
        val value: DataStream[(String, Int)] = stream.flatMap(x => x.split(" "))
          .map(x => (x, 1))
          .keyBy(x => x._1)
          .reduce((x: (String, Int), y: (String, Int)) => (x._1, x._2 + y._2))
    //4、输出,又叫下沉
    stream.print()
    env.execute("sourcetest")
  }
}
(6)自定义数据源加载数据
package nj.zb.kb23.source

import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import scala.util.Random

//定义样例类
case class SensorReading(id:String,timestamp:Long,temperature:Double)

object AA {
  def main(args: Array[String]): Unit = {
    //1、创建环境变量
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //设置并行步 1
    env.setParallelism(1)
    //2、加载数据源
    val stream: DataStream[SensorReading] = env.addSource(new MySensorSource)
    //4、输出,又叫下沉
    stream.print()
    env.execute("sourcetest")
  }
}
//模拟自定义数据源
class MySensorSource extends SourceFunction[SensorReading]{
  override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {
    //(1)随机数,true一直生成随机数
    val random = new Random()
    while (true){
      val d: Double = Math.random()
      ctx.collect(SensorReading("随机数:"+random.nextInt(),System.currentTimeMillis(),d))
      Thread.sleep(1000)
    }
  }
  override def cancel(): Unit = {
  }
}

三、flink运行四大组件

1、作业管理器jobmanager

应用程序执行的主过程中,执行应用程序会被jobmanager最先接收,这个应用程序会包括:作业图(jobGraph),逻辑数据流图(logical dataflow graph)和打包了所有的类, 库和其他资源的jar包。jobmanager会向资源管理器请求执行任务必要的资源,也就是任务管理器上的插槽(slot)。一旦它获取了足够的资源,就会将执行图分发到真正运行它们的taskmanager上。在实际运行中,由jobmanager负责协调各项中央操作。

2、任务管理器taskmanager

taskmanager是指工作进程。Flink中包含了多个taskmanager,每个taskmanager中又存在着一定数量的插槽(slots),插槽的数量限制了TaskManager能够执行的任务数量。开始运行后,taskmanager中的插槽会被注册给资源管理器,在收到指令后,taskmanager会提供多个插槽任jobmanager调用。jobmanager通过给插槽分配tasks来执行。运行同一应用程序的taskmanager可以子啊执行过程中互相交换数据。

3、资源管理器resourcemanager

资源管理器在作业管理器申请插槽资源时,会将空闲插槽的任务管理器分配给作业管理器。如果没有足够的插槽来满足作业管理器的请求时,它会向资源提供平台发起会话,以提供启动taskmanager进程的容器。

4、分发器 dispatcher
  1. 提供了REST接口,在应用提交时可以跨作业运行。
  2. 在应用被提交执行的情况下,分发器启动将应用提交给jobmanager。
  3. Webui会由dispatcher启动,以便展示和监控作业的执行信息。
  4. 这取决于应用提交运行的方式取决于是否需要dispatche

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1163988.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[GitLab] 安装Git 指定版本

卸载旧版本 检查是否已经安装 git --version如果已经安装,先卸载 yum -y remove git安装新版本 在GitHub上选择需要下载的版本 Git版本 在/usr/local/目录下新建文件夹:git,并在/usr/local/git/文件夹内下载压缩包 wget https://github…

llava1.5模型安装、预测、训练详细教程

引言 本博客介绍LLava1.5多模态大模型的安装教程、训练教程、预测教程,也会涉及到hugging face使用与wandb使用。 源码链接:点击这里 demo链接:点击这里 论文链接:点击这里 一、系统环境 ubuntu 20.04 gpu: 2*3090 cuda:11.6 二、LLava环境安装 1、代码下载…

数据库的增删查改(一)

Mysql的基本操作 一. 新增1.单行数据全列插入 2.多行数据指定列插入二.查询1.全列查询2.指定列查询3.查询字段为表达式4.别名5.去重6.排序7.条件查询 一. 新增 1.单行数据全列插入 表示在SQL语句中一行一行插入. 2.多行数据指定列插入 二.查询 1.全列查询 全列查询就是将val…

linux查看文件夹使用情况以及查看文件大小

1、ls ls 命令是 Linux 中最常用的文件和目录列表命令之一。它可以显示文件的各种属性&#xff0c;包括文件大小。 ls -l <文件名>上述命令会显示文件的详细信息&#xff0c;其中包括文件的大小。文件大小以字节为单位显示&#xff0c;并且在输出中的第 5 列。4096 表示…

JavaScript基础之BOM与DOM

文章目录 BOM操作window对象window的子对象之navigator对象&#xff08;了解即可&#xff09;window的子对象之screen对象&#xff08;了解即可&#xff09;window的子对象之history对象&#xff08;了解即可&#xff09;window的子对象之location对象 弹出框警告框确认框提示框…

【MySQL】MVCC机制(undo log,read view)

文章目录 前言一. 预备知识二. 模拟MVCC三. Read View四. RC与RR的本质区别结束语 前言 MVCC&#xff08;多版本并发控制&#xff09;是一种用来解决读-写冲突的无锁并发控制 MVCC为事务分配单向增长的事务ID&#xff0c;为每个修改保存一个版本&#xff0c;版本与事物ID相关联…

043-第三代软件开发-第三方串口库使用

第三代软件开发-第三方串口库使用 文章目录 第三代软件开发-第三方串口库使用项目介绍第三方串口库使用示例代码 关键字&#xff1a; Qt、 Qml、 QextSerialPort、 QThread、 高频 项目介绍 欢迎来到我们的 QML & C 项目&#xff01;这个项目结合了 QML&#xff08;Qt…

Websocket传输协议是什么

WebSocket 是一种网络通信协议&#xff0c;属于 HTML5 规范的一部分。它提供了在单个长期连接上进行全双工通信的能力&#xff0c;使得数据可以从客户端发送到服务器&#xff0c;也可以从服务器发送到客户端&#xff0c;这与传统的 HTTP 请求和响应模型不同。 WebSocket 协议定…

ROS Hello World

万物始于Hello World&#xff0c;为了体验ROS&#xff0c;使用Hello World介绍ROS的简单使用。 一、Hello World工程简介 首先需要创建工程&#xff0c;流程为&#xff1a; 创建工作空间目录&#xff08;即工程根目录&#xff0c;注意此时还不是ROS工作空间&#xff0c;只是…

>LINK : fatal error LNK1561和LINK : fatal error LNK1168:解决方法

>LINK : fatal error LNK1561和LINK : fatal error LNK1168:解决方法 >LINK : fatal error LNK1561和LINK : fatal error LNK1168:解决方法_fatal link error-CSDN博客 如果无法解决&#xff1a;只能试试在之前的项目中能否运行 这个错误提示表明在编译连接时&#xff…

PubScholar-可检索1.7亿篇科技文献的公益平台来了!

可检索1.7亿篇科技文献的 公益平台来了&#xff01; 11月1日 由中国科学院等单位联合建设的 PubScholar公益学术平台 正式对社会公众开放 网址&#xff1a;https://pubscholar.cn/ “公益学术平台”旨在为我国科技界和全社会提供高质量的公益性学术资源&#xff0c;提供学…

安全第一!速卖通测评补单稳定的系统注意事项大盘点

对新卖家而言&#xff0c;测评并非可耻之事&#xff0c;反而是无法起步、耗费自身时间才是真正的可耻。由于速卖通新店几乎无法获得任何活动的支持&#xff0c;流量也基本没有&#xff0c;因此要在90天内达成60单的业绩对于许多卖家来说都是一项挑战。因此&#xff0c;通过快速…

算法训练 第五周

一、多数元素 本题给了我们一个数组&#xff0c;要求我们找出这个数组中出现次数大于这个数组元素总量一半的那个元素&#xff0c;也可以理解为找出数组中出现次数最多的那个元素&#xff0c;本题的解决方法有很多&#xff0c;在此我们主要讨论三种解决思路。 1.Hash表 我们可…

Linux驱动——并发与竞态

并发 并发指多个执行单元同时、并行被执行&#xff0c;而并发执行单元对共享资源&#xff08;硬件资源和软件上的全局变量、静态变量等&#xff09;的访问很容易导致竞态。 如下列AB写&#xff0c;C读&#xff0c;AB在写的过程中&#xff0c;C读的话就会出错。 对称多处理器…

Lightdb23.4 Client 包含ecpg可执行程序及相关库文件

功能介绍 部分客户在使用Lightdb client绿色包时需要ecpg程序和ecpg相关的头文件和库文件&#xff0c;所以在Lightdb 23.4版本client绿色包中新增了ecpg的程序和相关头文件和库文件&#xff0c;以方便用户的使用。 Client包目录结构 bin目录是可执行程序和脚本&#xff0c;i…

windows wsl使用,安装ubuntu

windows wsl使用 环境配置windows 家庭版 打开 hyper11 安装ubuntuwsl 命令 环境配置 搜索 启动和关闭 Windows功能 打开下面组件 windows 家庭版 打开 hyper11 解决Windows11 Home 没有 Hyber-v 创建 hyber-v.cmd 并管理员身份执行 pushd "%~dp0" dir /b %Sy…

使用QEMU启动uboot引导linux内核

上篇文章中实现了使用qemu启动uboot&#xff0c;本文实现使用qemu启动uboot引导内核的过程。 一、环境准备 主机系统&#xff1a;WSL-ubuntu20.04 uboot版本&#xff1a;u-boot-2023.10 Kernel版本&#xff1a;linux-5.4.18 二、制作sd卡 qemu支持模拟sd卡&#xff0c;可以…

Linux C语言进阶-D11多级指针、void指针及const

多级指针 多级指针变量&#xff1a;指向指针变量的指针变量 在下图中&#xff0c;定义一个a数组&#xff0c;再定义一个指针数组p[2]&#xff0c;其中p数组中存储的是地址&#xff0c;再定义一个二级指针q指向p[0]&#xff0c;即&p[0]&#xff0c;又由于&p[0]就是p&am…

SpringCloud(七) Feign远程调用

目录 一, RestTemplate远程调用存在的问题 二, Feign的远程调用 2.1 什么是Fegin 2.2 Feign的使用(代替RestTemplate) 1. 引入依赖 2. 添加注解 3. 编写Feign的客户端 4. 测试 5. 总结 2.3 自定义配置 1. 配置文件方式 2. Java代码方式 三, Feign使用优化 3.…

keepalived与nginx与MySQL

keepalived VRRP介绍 集群&#xff08;cluster&#xff09;技术是一种较新的技术&#xff0c;通过集群技术&#xff0c;可以在付出较低成本的情况下获得在性能、可靠性、灵活性方面的相对较高的收益&#xff0c;其任务调度则是集群系统中的核心技术。 集群组成后&#xff0c;可…