Flink第三章:基本操作(二)

news2024/12/23 12:32:43

系列文章目录

Flink第一章:环境搭建
Flink第二章:基本操作.
Flink第三章:基本操作(二)


文章目录

  • 系列文章目录
  • 前言
  • 一、物理分区
    • 1.shuffle(随机分区)
    • 2.Round-Robin(轮询)
    • 3.rescale(重缩放分区)
    • 4.broadcast(广播)
    • 5.Custom(自定义分区)
  • 二、Sink
    • 1.写出到文件
    • 2.写入到Kafka
    • 3.写入到Mysql
  • 总结


前言

上一次博客中我们完成了Flink的Source操作和Transform的一部分,这次我们练习剩下的部分和Sink操作.


一、物理分区

在这里插入图片描述

1.shuffle(随机分区)

将数据随机地分配到下游算子的并行任务中去。
PartitionShuffleTest.scala

package com.atguigu.chapter02.Transform

import com.atguigu.chapter02.Source.{ClickSource, Event}
import org.apache.flink.streaming.api.scala._

object PartitionShuffleTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val steam: DataStream[Event] = env.addSource(new ClickSource)

    steam.shuffle.print("shuffle").setParallelism(4)

    env.execute()
  }
}

在这里插入图片描述

2.Round-Robin(轮询)

按照先后顺序将数据做依次分发
PartitionReblanceTest.scala

package com.atguigu.chapter02.Transform

import com.atguigu.chapter02.Source.{ClickSource, Event}
import org.apache.flink.streaming.api.scala._

object PartitionReblanceTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val steam: DataStream[Event] = env.addSource(new ClickSource)

    steam.rebalance.print("reblance").setParallelism(4)

    env.execute()
  }
}

在这里插入图片描述

3.rescale(重缩放分区)

重缩放分区和轮询分区非常相似。当调用 rescale()方法时,其实底层也是使用 Round-Robin算法进行轮询,但是只会将数据轮询发送到下游并行任务的一部分中.
PartitionRescaleTest.scala

package com.atguigu.chapter02.Transform

import com.atguigu.chapter02.Source.{ClickSource, Event}
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._

object PartitionRescaleTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val steam: DataStream[Int] = env.addSource(new RichParallelSourceFunction[Int] {
      override def run(sourceContext: SourceFunction.SourceContext[Int]): Unit = {
        for (i<-0 to 7) {
          if (getRuntimeContext.getIndexOfThisSubtask == (i+1) % 2)
          sourceContext.collect(i+1)
        }
      }

      override def cancel(): Unit = ???
    }).setParallelism(2)

    steam.rescale.print("rescale").setParallelism(4)

    env.execute()
  }

}

在这里插入图片描述
我们可以看到偶数都传到的1,2分区,奇数都传送到了3,4分区.

4.broadcast(广播)

数据会在不同的下游分区都传送一份
PartitionBroadcastTest.scala

package com.atguigu.chapter02.Transform

import com.atguigu.chapter02.Source.{ClickSource, Event}
import org.apache.flink.streaming.api.scala._

object PartitionBroadcastTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val steam: DataStream[Event] = env.addSource(new ClickSource)

    steam.broadcast.print("broadcast").setParallelism(4)

    env.execute()
  }

}

在这里插入图片描述

5.Custom(自定义分区)

PartitionCustonTest.scala

package com.atguigu.chapter02.Transform

import org.apache.flink.api.common.functions.Partitioner
import org.apache.flink.streaming.api.scala._

object PartitionCustonTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val steam: DataStream[Int] = env.fromElements(1,2,3,4,5,6,7,8)

    steam.partitionCustom(new Partitioner[Int] {
      override def partition(k: Int, i: Int): Int = {
        k % 2
      }
    },data=>data).print().setParallelism(4)

    env.execute()
  }
}

在这里插入图片描述

二、Sink

创建需要的
在这里插入图片描述

1.写出到文件

SinkToFileTest.scala

package com.atguigu.chapter02.Sink

import com.atguigu.chapter02.Source.Event
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.connector
import org.apache.flink.connector.file.sink.FileSink
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.scala._


object SinkToFileTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream: DataStream[String] = env.fromElements(
      Event("Mary", "./home", 1000L),
      Event("Bob", "./cart", 2000L),
      Event("Bob", "./cart", 3000L),
      Event("Alice", "./cart", 7000L),
      Event("Bob", "./prod?id=1", 4000L),
      Event("Bob", "./prod?id=2", 8000L),
      Event("Bob", "./prod?id=4", 4000L),
      Event("Bob", "./prod?id=6", 3000L),
    ).map(_.toString)


    val filesink: FileSink[String] = FileSink.forRowFormat(new Path("./output"), new SimpleStringEncoder[String]("UTF-8")).build()

    stream.sinkTo(filesink)

    env.execute()
  }
}

在这里插入图片描述

2.写入到Kafka

SinkToKafkaTest.scala

package com.atguigu.chapter02.Sink

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.sink._
import org.apache.flink.streaming.api.scala._

object SinkToKafkaTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream: DataStream[String] = env.readTextFile("input/clicks.txt")

    val sink: KafkaSink[String] = KafkaSink.builder()
      .setBootstrapServers("hadoop102:9092")
      .setRecordSerializer(KafkaRecordSerializationSchema.builder()
        .setTopic("clicks")
        .setValueSerializationSchema(new SimpleStringSchema()).build()).build()

    stream.sinkTo(sink)

    env.execute()

  }
}

在这里插入图片描述
查看最新的消费数据.

3.写入到Mysql

SinkToMysqlTest.scala

package com.atguigu.chapter02.Sink

import com.atguigu.chapter02.Source.Event
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala._

import java.sql.PreparedStatement

object SinkToMysqlTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream: DataStream[Event] = env.fromElements(
      Event("Mary", "./home", 1000L),
      Event("Bob", "./cart", 2000L),
      Event("Bob", "./cart", 3000L),
      Event("Alice", "./cart", 7000L),
      Event("Bob", "./prod?id=1", 4000L),
      Event("Bob", "./prod?id=2", 8000L),
      Event("Bob", "./prod?id=4", 4000L),
      Event("Bob", "./prod?id=6", 3000L),
    )

    val jdbcF: SinkFunction[Event] = JdbcSink.sink(
      "INSERT INTO clicks (user,url) VALUES (?,?)",
      new JdbcStatementBuilder[Event] {
        override def accept(t: PreparedStatement, u: Event): Unit = {
          t.setString(1, u.user)
          t.setString(2, u.url)
        }
      },
      new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
        .withUrl("jdbc:mysql://localhost:3306/test")
        .withDriverName("com.mysql.jdbc.Driver")
        .withUsername("root")
        .withPassword("123456")
        .build()
    )
    stream.addSink(jdbcF)

    env.execute()

  }
}

在这里插入图片描述
Flink提供的Sink接口还有很多,这里就随便举几个例子,还需要那些查文档就行了.


总结

Flink的基本操作就练习完成了.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/507445.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

leetcode:203.移除链表元素(两种方法详解)

前言&#xff1a;内容包括-题目&#xff0c;代码实现&#xff08;两种方法&#xff09;&#xff0c;大致思路&#xff0c;代码解读 题目&#xff1a; 给你一个链表的头节点 head 和一个整数 val &#xff0c;请你删除链表中所有满足 Node.val val 的节点&#xff0c;并返回 …

【CSS系列】第五章 · CSS文本属性

写在前面 Hello大家好&#xff0c; 我是【麟-小白】&#xff0c;一位软件工程专业的学生&#xff0c;喜好计算机知识。希望大家能够一起学习进步呀&#xff01;本人是一名在读大学生&#xff0c;专业水平有限&#xff0c;如发现错误或不足之处&#xff0c;请多多指正&#xff0…

QT With OpenGL(SSAO)(Screen-Space Ambient Occlusion)

文章目录 在G_Buffer中加入深度信息使用深度信息得到环境遮蔽的结果1. 新建SSAO帧缓存类2.生成法向半球核心3. 生成随机核心转动纹理为什么要生成随机核心转动创建一个小的随机旋转向量纹理 4.使用G_Buffer渲染SSAO纹理传入参数着色器1. 获取当前像素在纹理中的信息2.计算TBN矩…

SpringBoot -- AOP

一直只听过AOP&#xff0c;但是并不知道AOP的原理使用&#xff0c;参考深入浅出SpringBoot2.x学习一下SpringBoot AOP编程 AOP是基于动态代理实现的 静态代理就是代理类中有一个实现类&#xff0c;和实现类相同名称的方法&#xff0c;调用代理的request方法&#xff0c;执行顺…

RF检测器/控制器MS2351可pin对pin兼容AD8314、MAX4003

MS2351M/MS2351D 是一款对数放大器芯片&#xff0c;主要用于接收信号强度指示 (RSSI) 与控制功率放大器&#xff0c;工作频率范围是50MHz&#xff5e;3000MHz&#xff0c;动态范围可达 35dB 到 45dB。可pin对pin兼容AD8314、MAX4003。 MS2351M/MS2351D 是电压响应器件&#xff…

【c语言】结构体详解 | 结构体数组/指针

创作不易&#xff0c;本篇文章如果帮助到了你&#xff0c;还请点赞 关注支持一下♡>&#x16966;<)!! 主页专栏有更多知识&#xff0c;如有疑问欢迎大家指正讨论&#xff0c;共同进步&#xff01; &#x1f525;c语言系列专栏&#xff1a;c语言之路重点知识整合 &#x…

【ChirpStack 】如何获取 JWT TOKEN

LoRa App Server 提供了两类 API 接口&#xff0c;其中 RESTful JSON API 提供了一个 API console&#xff0c;在AS地址的基础上使用 /api 即可访问&#xff0c;罗列了 API 端点和文档介绍&#xff0c;测试起来非常方便。 本文主要介绍 如何使用 chirpstack 的API 进行测试以及…

商户查询的缓存——基于逻辑过期方式解决缓存击穿问题

//基于逻辑过期方式解决缓存基穿问题 理论上讲都是可以命中的 public Shop queryWithLogincalExpire(Long id){ //1.从redis中查商铺缓存 String jsonShop stringRedisTemplate.opsForValue().get(CACHE_SHOP_KEY id); //2.未命中 if(StringUtils.isBlank(j…

Python程序运行中报Memoryerror的解决方案

在求解某高维时间依赖的PDE中&#xff0c;空间计算域在每一个空间方向均为M256&#xff0c;且快速算法被执行以解决储存与计算速度问题&#xff1b;而时间方向取T2000&#xff0c;时间步长0.01。 在Spyder提前运行过&#xff0c;第一次假如迭代了1468次&#xff0c;即会报Memo…

本地部署 Dolly V2

本地部署 Dolly V2 1. 什么是 Dolly V22. Github 地址3. 安装 Miniconda34. 创建虚拟环境5. 部署 Dolly V26. 编写测试程序7. 运行测试程序 1. 什么是 Dolly V2 Databricks的dolly-v2-12b&#xff0c;是一个在 Databricks 机器学习平台上训练的指令跟随型大型语言模型&#xf…

计算机图形学 | 实验七:完成摄像机类的创建

计算机图形学 | 实验七&#xff1a;完成摄像机类的创建 计算机图形学 | 实验七&#xff1a;完成摄像机类的创建摄像机/观察空间Look At 矩阵自由移动视角移动鼠标输入缩放 华中科技大学《计算机图形学》课程 MOOC地址&#xff1a;计算机图形学&#xff08;HUST&#xff09; 计…

【自然语言处理】自然语言处理 --- NLP入门指南

文章目录 一、什么是NLP二、NLP任务类型三、NLP的预处理英文 NLP 语料预处理的 6 个步骤中文 NLP 语料预处理的 4 个步骤第1步&#xff1a;收集您的数据---语料库第2步&#xff1a;清理数据 --- 文本清洗第3步&#xff1a;分词第4步&#xff1a;标准化第5步&#xff1a;特征提取…

花式玩转二叉树层序遍历——实现二叉树Z字输出

文章目录 题目介绍二叉树层序遍历——队列实现Java完整代码 分析Java完整代码实现总结 题目介绍 这个题目是在做一个测试里面遇到的&#xff0c;大致描述如下&#xff1a; 现在有一棵二叉树&#xff0c;需要实现如图所示的交叉来回遍历&#xff1a; 即相较于普通的层序遍历&a…

基于目标级联法的微网群多主体分布式优化调度(已更新)

目录 一、主要内容 1.1 上层微网群模型 1.2 下层微网模型 二、部分程序 三、实现效果 四、下载链接 一、主要内容 本文复现《基于目标级联法的微网群多主体分布式优化调度》文献的目标级联部分&#xff0c; 建立微网群系统的两级递阶优化调度模型: 上层是微网群能量调度中…

Jvm --java虚拟机(下)

目录 执行引擎 什么是执行引擎&#xff1f; 什么是解释器&#xff1f;什么是 JIT 编译器&#xff1f; 为什么 Java 是半编译半解释型语言&#xff1f; JIT 编译器执行效率高为什么还需要解释器&#xff1f; 垃圾回收 垃圾回收概述 什么是垃圾&#xff1f; 为什么需要GC&a…

Redis持久化--RDB

一. RDB是什么 在指定的时间间隔内将内存中的数据集快照写入磁盘&#xff0c; 也就 Snapshot 快照&#xff0c;恢复时将快照文件读到内存二. RDB持久化的流程 解读&#xff1a; redis 客户端执行 bgsave 命令或者自动触发 bgsave 命令&#xff1b;主进程判断当前是否已经存在…

【开源之夏 2023】欢迎报名 SOFAStack 社区项目!

开源之夏是由“开源软件供应链点亮计划”发起并长期支持的一项暑期开源活动&#xff0c;旨在鼓励在校学生积极参与开源软件的开发维护&#xff0c;促进优秀开源软件社区的蓬勃发展&#xff0c;培养和发掘更多优秀的开发者。 活动联合国内外各大开源社区&#xff0c;针对重要开…

荔枝派Zero(全志V3S)驱动开发之RGB LCD屏幕显示bmp图片

文章目录 前言一、如何在 linux 下驱动 LCD1、什么是 Framebuffer 设备2、如何确保 Framebuffer 设备已存在3、Frame_buffer 设备结构体<1>、fb_info 详解<2>、struct fb_fix_screeninfo 详解<3>、struct fb_var_screeninfo 详解 4、设备树中有关 framebuffe…

使用 Appium 进行 WPF 自动化

文章目录 关于1 环境准备2 集成单元测试3 新增基本测试代码4 测试 WPF 程序5 启动测试 关于 参考链接&#xff1a;Get Your WPF Apps Automated With Appium Appium官网&#xff1a;http://appium.io/docs/en/2.0/quickstart 1 环境准备 一、下载 Windows Application Driv…

JVM 虚拟机栈

虚拟机栈概述 背景: 由于跨平台性的设计&#xff0c;Java 的指令都是根据栈来设计的。不同平台 CPU 架构不同&#xff0c;所以不能设计为基于寄存器的优点是跨平台, 指令集小&#xff0c;编译器容易实现&#xff0c;缺点是性能下降&#xff0c;实现同样的功能需要更多的指令 …