Flink第二章:基本操作

news2025/1/11 12:36:42

系列文章目录

Flink第一章:环境搭建
Flink第二章:基本操作


文章目录

  • 系列文章目录
  • 前言
  • 一、Source
    • 1.读取无界数据流
    • 2.读取无界流数据
    • 3.从Kafka读取数据
  • 二、Transform
    • 1.map(映射)
    • 2.filter(过滤)
    • 3.flatmap(扁平映射)
    • 4.keyBy(按键聚合)
    • 5.reduce(归约聚合)
    • 6.UDF(用户自定义函数)
    • 7.Rich Function Classes(富函数类)
  • 总结


前言

Flink的基本操作分为三类,Source,Transform,sink,分别负责数据的读取,转换和写入,现在来学习一些常用的api.


一、Source

添加需要的pom依赖

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>3.1.0-1.17</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>

创建文件
在这里插入图片描述

1.读取无界数据流

SourceBoundedTest.scala

package com.atguigu.chapter02.Source

import org.apache.flink.streaming.api.scala._

case class Event(user:String,url:String,timestamp:Long)

object SourceBoundedTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    //1.从元素中读取数据
    val stream: DataStream[Int] = env.fromElements(1, 2, 3, 4, 5)

    val stream1: DataStream[Event] = env.fromElements(Event("Mary", "./home", 1000L),
      Event("Bob", "./cart", 2000L)
    )

    //2.从集合中读取数据
    val clicks: List[Event] =List(Event("Mary", "./home", 1000L),Event("Bob", "./cart", 2000L))
    val stream2: DataStream[Event] = env.fromCollection(clicks)

    //3.从文本文件中读取
    val stream3: DataStream[String] = env.readTextFile("input/clicks.txt")

    //打印输出
    stream.print("number")
    stream1.print("1")
    stream2.print("2")
    stream3.print("3")

    env.execute()
  }
}

在这里插入图片描述

2.读取无界流数据

自定义数据源
ClickSource.scala

package com.atguigu.chapter02.Source

import org.apache.flink.streaming.api.functions.source.SourceFunction

import java.util.Calendar
import scala.util.Random

class ClickSource extends SourceFunction[Event] {
  //标志位
  var running = true

  override def run(sourceContext: SourceFunction.SourceContext[Event]): Unit = {
    //随机生成器
    val random = new Random()
    val users: Array[String] = Array("Mary", "Alice", "Cary")
    val urls: Array[String] = Array("./home", "./cart", "./fav", "./prod?id=1", "./prod?id=2", "./prod?id=3")

    //标志位结束循环
    while (running) {
      val event: Event = Event(user = users(random.nextInt(users.length)), url = urls(random.nextInt(users.length)), timestamp = Calendar.getInstance.getTimeInMillis)
      sourceContext.collect(event)

      //休眠1s
      Thread.sleep(1000)
    }

  }

  override def cancel(): Unit = running = false
}

读取数据
SourceCustomTest.scala

package com.atguigu.chapter02.Source

import org.apache.flink.streaming.api.scala._

object SourceCustomTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream: DataStream[Event] = env.addSource(new ClickSource)

    stream.print()

    env.execute()
  }
}

程序会实时打印收到的数据
在这里插入图片描述

3.从Kafka读取数据

package com.atguigu.chapter02.Source

import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.streaming.api.scala._

object SourceKafkaTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val source: KafkaSource[String] = KafkaSource.builder()
      .setBootstrapServers("hadoop102:9092")
      .setGroupId("consumer-group")
      .setValueOnlyDeserializer(new SimpleStringSchema())
      .setTopics("clicks")
      .build()

    val stream: DataStream[String] = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")

    stream.print()

    env.execute()

  }

}

在这里插入图片描述
运行程序然后生产数据

bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic clicks

在这里插入图片描述
Flink的支持的读取源有很多,这里只是随便举个例子,具体信息可以看官方文档.
在这里插入图片描述

二、Transform

在这里插入图片描述

1.map(映射)

TransformMapTest.scala

package com.atguigu.chapter02.Transform

import com.atguigu.chapter02.Source.Event
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.streaming.api.scala._
object TransformMapTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream: DataStream[Event] = env.fromElements(Event("Mary", "./home", 1000L),
      Event("Bob", "./cart", 2000L)
    )
    //Map函数

    //1.匿名函数
    stream.map(_.user).print("1")

    //2.MapFunction接口
    stream.map(new UserExtractor).print("2")

    env.execute()
  }

  class UserExtractor extends MapFunction[Event, String] {
    override def map(t: Event): String = t.user
  }

}

![在这里插入图片描述](https://img-blog.csdnimg.cn/6630144c9130481b8ed9ad4cc26aa11b.png

2.filter(过滤)

package com.atguigu.chapter02.Transform

import com.atguigu.chapter02.Source.Event
import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.streaming.api.scala._
object TransformFilterTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream: DataStream[Event] = env.fromElements(Event("Mary", "./home", 1000L),
      Event("Bob", "./cart", 2000L)
    )

    //Filter
    stream.filter(_.user == "Mary").print("1")

    stream.filter(new UserFilter).print("2")

    env.execute()
  }

  class UserFilter extends FilterFunction[Event] {
    override def filter(t: Event): Boolean = t.user == "Bob"
  }
}

在这里插入图片描述

3.flatmap(扁平映射)

TransformFlatmapTest.scala

package com.atguigu.chapter02.Transform

import com.atguigu.chapter02.Source.Event
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object TransformFlatmapTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream: DataStream[Event] = env.fromElements(Event("Mary", "./home", 1000L),
      Event("Bob", "./cart", 2000L)
    )

    stream.flatMap(new MyFlastMap).print()

    env.execute()
  }

  class MyFlastMap extends FlatMapFunction[Event, String] {
    override def flatMap(t: Event, collector: Collector[String]): Unit = {
      if (t.user == "Mary") {
        collector.collect(t.user)
      }
      else if (t.user == "Bob") {
        collector.collect(t.user)
        collector.collect(t.url)
      }
    }
  }
}

在这里插入图片描述

4.keyBy(按键聚合)

TransformAggTest.scala

package com.atguigu.chapter02.Transform

import com.atguigu.chapter02.Source.Event
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.streaming.api.scala._

object TransformAggTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream: DataStream[Event] = env.fromElements(
      Event("Mary", "./home", 1000L),
      Event("Bob", "./cart", 2000L),
      Event("Alice", "./cart", 7000L),
      Event("Bob", "./prod?id=1", 4000L),
      Event("Bob", "./prod?id=2", 8000L),
      Event("Bob", "./prod?id=4", 4000L),
      Event("Bob", "./prod?id=6", 3000L),

    )

    stream.keyBy(_.user).max("timestamp").print("1")

    stream.keyBy(new MyKeySelector()).max("timestamp")
      .print("2")
    env.execute()

  }

  class MyKeySelector() extends KeySelector[Event, String] {
    override def getKey(in: Event): String = in.user
  }
}

在这里插入图片描述

5.reduce(归约聚合)

TransformReduceTest.scala

package com.atguigu.chapter02.Transform

import com.atguigu.chapter02.Source.Event
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala._

object TransformReduceTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream: DataStream[Event] = env.fromElements(
      Event("Mary", "./home", 1000L),
      Event("Bob", "./cart", 2000L),
      Event("Bob", "./cart", 3000L),
      Event("Mary", "./cart", 7000L),
      Event("Mary", "./prod?id=1", 4000L),
      Event("Bob", "./prod?id=2", 8000L),
      Event("Bob", "./prod?id=4", 4000L),
      Event("Bob", "./prod?id=6", 3000L),
    )

    // reduce 聚合

    stream.map(data => (data.user, 1L))
      .keyBy(_._1)
      .reduce(new Mysum()) //统计用户活跃度
      .keyBy(data => true) //将所有数据合并到同一个组
      .reduce((state, data) => if (data._2 >= state._2) data else state)
      .print()

    env.execute()
  }

  class Mysum() extends ReduceFunction[(String, Long)] {
    override def reduce(t: (String, Long), t1: (String, Long)): (String, Long) = (t._1, t._2 + t1._2)
  }

}

在这里插入图片描述

6.UDF(用户自定义函数)

TransformUDFTest.scala

package com.atguigu.chapter02.Transform

import com.atguigu.chapter02.Source.Event
import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.streaming.api.scala._

object TransformUDFTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream: DataStream[Event] = env.fromElements(
      Event("Mary", "./home", 1000L),
      Event("Bob", "./cart", 2000L),
      Event("Bob", "./cart", 3000L),
      Event("Alice", "./cart", 7000L),
      Event("Bob", "./prod?id=1", 4000L),
      Event("Bob", "./prod?id=2", 8000L),
      Event("Bob", "./prod?id=4", 4000L),
      Event("Bob", "./prod?id=6", 3000L),
    )

    //测试UDF的用法,筛选url中包含摸个关键字 /home
    //1.实现自定义函数类

    stream.filter(new MyFilterFunction()).print("1")

    //2.使用匿名类
    stream.filter(new FilterFunction[Event] {
      override def filter(t: Event): Boolean = t.user.contains("Alice")
    }).print("2")

    //3.使用lambda表达式
    stream.filter(_.url.contains("cart")).print("3")

    env.execute()


  }

  class MyFilterFunction extends FilterFunction[Event] {
    override def filter(t: Event): Boolean = t.url.contains("prod")
  }

}

在这里插入图片描述

7.Rich Function Classes(富函数类)

可以理解为自定义函数的升级版,可以在线程开始之前和结束之后做更多的工作,常用于链接数据库.

TransformRichFunctionTest.scala

package com.atguigu.chapter02.Transform

import com.atguigu.chapter02.Source.Event
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._

object TransformRichFunctionTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream: DataStream[Event] = env.fromElements(
      Event("Mary", "./home", 1000L),
      Event("Bob", "./cart", 2000L),
      Event("Bob", "./cart", 3000L),
      Event("Alice", "./cart", 7000L),
      Event("Bob", "./prod?id=1", 4000L),
      Event("Bob", "./prod?id=2", 8000L),
      Event("Bob", "./prod?id=4", 4000L),
      Event("Bob", "./prod?id=6", 3000L),
    )
    //自定义RichMapFunction
    stream.map(new MyRich).print("1")

    env.execute()
  }

  class MyRich extends RichMapFunction[Event,Long]{
    override def open(parameters: Configuration): Unit = {
      println("索引号为"+getRuntimeContext.getIndexOfThisSubtask+"的任务开始")
    }

    override def map(in: Event): Long = in.timestamp

    override def close(): Unit = {
      println("索引号为"+getRuntimeContext.getIndexOfThisSubtask+"的任务结束")

    }
  }
}

在这里插入图片描述


总结

这里的内容比较多,下次博客继续写.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/505777.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

4个ChatGPT拓展出来的工具

现在ChatGPT 相关 的方向非常的多&#xff0c;各个大厂一个一个推出了自己的大模型&#xff0c;从国外到国内&#xff0c;ChatGPT 相关 也有几十个&#xff0c;这是大厂的方向。 对于比较小的团队&#xff0c;很多都是在ChatGPT 的基础上进行的开发&#xff0c;下面罗列出4个在…

ASO优化之应用内活动的投放策略

我们可以在“落地页”&#xff0c;“搜索结果页”&#xff0c;“详情页”&#xff0c;“today标签页”等各个版面展示应用的活动投放&#xff0c;这不仅能够快速被用户浏览到&#xff0c;自然能带来更多的流量&#xff0c;还能促进用户的活跃度。 那我们该如何进行投放呢&…

哪一本书让你逢人就推荐的?

小编逢人就推荐的程序员经典书目&#xff1a; 1、【樊登推荐】浪潮之巅 第四版 作者&#xff1a;吴军 这不是一本科技产业发展历史集&#xff0c;而是在这个数字时代&#xff0c;一本IT人非读不可&#xff0c;而非IT人也应该拜读的作品。 《浪潮之巅 第四版》是一本介绍互联…

【算法与数据结构】链表——题目详解

题目详解 Leetcode-206. 反转链表 给你单链表的头节点 head &#xff0c;请你反转链表&#xff0c;并返回反转后的链表。 示例 1&#xff1a; 输入&#xff1a;head [1,2,3,4,5] 输出&#xff1a;[5,4,3,2,1] 示例 2&#xff1a; 输入&#xff1a;head [1,2] 输出&#x…

《Andorid开源》greenDao 数据库orm框架

一 前言&#xff1a;以前没用框架写Andorid的Sqlite的时候就是用SQLiteDatabase &#xff0c;SQLiteOpenHelper ,SQL语句等一些东西&#xff0c;特别在写SQL语句来进行 数据库操作的时候是一件很繁琐的事情&#xff0c;有时候没有错误提示的&#xff0c;很难找到错误的地方&a…

C#--使用Quartz实现定时任务

C#小技巧–使用Quartz实现定时任务 Quartz.net 简介 Quartz.NET是一个开源的作业调度框架&#xff0c;非常适合在平时的工作中&#xff0c;定时轮询数据库同步&#xff0c;定时邮件通知&#xff0c;定时处理数据等。 Quartz.NET允许开发人员根据时间间隔&#xff08;或天&…

js - 原型和原型链的简单理解

前言 有一个概念需要清楚&#xff0c;只有构造函数才有.prototype对象&#xff0c;对象是没有这个属性的&#xff0c;__proto__只是浏览器提供的非标准化的访问对象的构造函数的原型对象的一种方式; prototype(原型对象) 函数即对象&#xff0c;每个函数都有一个prototype属…

代码随想录 数组篇 螺旋矩阵II Java实现

文章目录 &#xff08;中等&#xff09;59. 螺旋矩阵II&#xff08;中等&#xff09;54. 螺旋矩阵&#xff08;简单&#xff09;JZ29 顺时针打印矩阵 &#xff08;中等&#xff09;59. 螺旋矩阵II 因为我是先做的JZ29&#xff0c;所以看到这题的时候&#xff0c;几乎就是一样的…

最适合家用的洗地机哪个牌子好?2023洗地机推荐

洗地机是目前众多清洁工具中的热门之选&#xff0c;我身边很多朋友都选择了洗地机来处理家居清洁&#xff0c;一说一&#xff0c;洗地机可以处理干湿垃圾&#xff0c;还都有一键自清洁功能&#xff0c;用起来确实方便简单。不过&#xff0c;市面上的洗地机参差不齐&#xff0c;…

RabbitMQ 详解

文章目录 RabbitMQ 详解一、MQ 简介1. MQ优缺点2. MQ应用场景3. AMQP 和 JMS4. 常见的 MQ 产品 二、RabbitMQ 工作原理三、Linux环境安装RabbitMQ1. 安装 Erlang2. 安装 RabbitMQ3. 管控台 四、RabbitMQ 工作模式1. 简单模式(Hello World)2. 工作队列模式(Work Queue)3. 发布订…

PMP项目管理-[第十三章]相关方管理

相关方管理知识体系&#xff1a; 识别相关方&#xff1a; 规划相关方参与&#xff1a; 管理相关方参与&#xff1a; 监督相关方参与 &#xff1a; 13.1 识别相关方 定义&#xff1a;定期识别项目相关方&#xff0c;分析和记录他们的利益、参与度、相互依赖性、影响力和对项目成…

rk3568 修改开机logo

rk3568 修改开机显示logo Android 显示 logo 的作用是为了标识应用程序或设备的品牌和身份。在应用程序中&#xff0c;logo 可以帮助用户快速识别应用程序&#xff0c;并与其他应用程序区分开来。在设备中&#xff0c;logo 可以帮助用户识别设备的品牌和型号&#xff0c;以及与…

抽象轻松js

全新声明类型2.0版本 var、let、const 三者的区别 用我的世界来区别三者关系 特别的本质关系是一样&#xff0c;都是有木头&#xff08;声明&#xff09;钻石&#xff08;赋值&#xff09;组成 木头&#xff08;声明&#xff09;钻石&#xff08;赋值&#xff09; 钻石剑(…

鸿蒙Hi3861学习六-Huawei LiteOS-M(软件定时器)

一、简介 软件定时器&#xff0c;是基于系统Tick时钟中断且由软件来模拟的定时器。当经过设定的Tick时钟计数值后&#xff0c;会触发用户定义的回调函数。定时精度与系统Tick时钟周期有关。 硬件定时器受硬件的限制&#xff0c;数量上不足以满足用户的实际需求。因此&#xff0…

如何监控软件定义的数据中心(SDDC)

网络管理不仅要防止网络停机&#xff0c;还要优化网络性能&#xff0c;最终增强最终用户体验。当今的网络变得如此先进&#xff0c;以至于传统模型已经过时&#xff0c;无法满足现代动态需求。用日益敏捷、安全、可扩展和可靠的现代可部署解决方案取代传统的遗留系统至关重要。…

企业遭受勒索攻击后,要支付赎金吗?

企业遭受勒索攻击后&#xff0c;要支付赎金吗&#xff1f; 针对这个问题的答案&#xff0c;一些企业选择了“不要”。例如&#xff1a;意大利法拉利公司拒绝向黑客支付赎金&#xff1b;蔚来汽车老板拒绝支付1500万的赎金&#xff1b;澳洲最大医保公司在被盗取970万客户信息后&a…

关于《永恒之塔私服》收费模式的大胆猜想

我们都知道从第一个网络游戏走进中国时就已经使用了点卡模式的收费方式,但是随着游戏行业的快速发展,这种点卡模式的游戏也渐渐快要退出游戏收费的平台,也随着越来越多的游戏加入到中国的游戏市场也导致了游戏的竞争也越来越大,游戏公司也挖空心事来吸引玩家,为了吸引玩家2004年…

景23转债,海能转债上市价格预测

景23转债 基本信息 转债名称&#xff1a;景23转债&#xff0c;评级&#xff1a;AA&#xff0c;发行规模&#xff1a;11.54亿元。 正股名称&#xff1a;景旺电子&#xff0c;今日收盘价&#xff1a;22.52元&#xff0c;转股价格&#xff1a;25.71元。 当前转股价值 转债面值 / …

汉诺塔+小青蛙跳台阶---《递归》

目录 前言&#xff1a; 1.汉诺塔&#xff1a; 1.1分析盘子数从1-3的情况 1.2盘子移动的规律总结 2.青蛙跳台阶&#xff1a; 2.1跳一个台阶或跳两个台阶 2.2扩展 ❤博主CSDN:啊苏要学习 ▶专栏分类&#xff1a;C语言◀ C语言的学习&#xff0c;是为我们今后学习其它语言打…

asp.net+C#教育机构高校教务管理系统

1.1用户类别 本系统分为3个角色&#xff1a;管理员、教师、学生&#xff1b; 1、管理员权限最大&#xff0c;排课、调课、汇总各类考试成绩、管理各类用户基本信息&#xff0c;以及各类查询统计、发布公告、收发邮件等功能&#xff1b; 2、教师查看自己的信息、修改登陆密码…