第三章 Flink DataStream API

news2025/1/10 1:45:25

Flink 系列教程传送门

第一章 Flink 简介

第二章 Flink 环境部署

第三章 Flink DataStream API

第四章 Flink 窗口和水位线

第五章 Flink Table API&SQL

第六章 新闻热搜实时分析系统


一、DataStream API是什么?

Flink 中的 DataStream 程序是对数据流(例如过滤、更新状态、定义窗口、聚合)进行转换的常规程序。数据流的起始是从各种源(例如消息队列、套接字流、文件)创建的。结果通过 sink 返回,例如可以将数据写入文件或标准输出(例如命令行终端)。Flink 程序可以在各种上下文中运行,可以独立运行,也可以嵌入到其它程序中。任务执行可以运行在本地 JVM 中,也可以运行在多台机器的集群上。

DataStream API 得名于特殊的 DataStream 类,该类用于表示 Flink 程序中的数据集合。可以简单理解为包含重复项的不可变数据集合。这些数据可以是有界的,也可以是无界的,但用于处理它们的API是相同的。

DataStream 在用法上类似于常规的 Java 集合,但在某些关键方面却大不相同。它们是不可变的,这意味着一旦它们被创建,你就不能添加或删除元素。你也不能简单地察看内部元素,而只能使用 DataStream API 操作来处理它们,DataStream API 操作也叫作转换(transformation)。

通过在 Flink 程序中添加 source 创建一个初始的 DataStream。然后基于 DataStream 派生新的流,并使用 map、filter 等 API 方法把 DataStream 和派生的流连接在一起。

 二、Source 数据源

Source 是程序从中读取其输入的地方。可以用 StreamExecutionEnvironment.addSource(sourceFunction) 将一个 source 关联到程序。Flink 也自带了许多预先实现的 source functions,也可以通过实现 SourceFunction 接口编写自定义的非并行 source,也可以通过实现 ParallelSourceFunction 接口编写自定义的并行 sources。

基于文件的Source

  • readTextFile(path) - 读取文本文件,遵守 TextInputFormat 规范的文件,逐行读取并将它们作为字符串返回。
  • readFile(fileInputFormat, path) - 按照指定的文件输入格式读取(一次)文件。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val data: DataStream[String] = senv.readTextFile("file:///path/to/file")  // env.readTextFile("hdfs://node01:9000/input/words.txt")
data.print()
env.execute("测试Source")

基于套接字的Source

  • socketTextStream(hostname: String, port: Int, delimiter: Char = '\n', maxRetry: Long = 0) - 从套接字读取。元素可以由分隔符分隔。Netcat官网下载地址
// 在Linux系统中是通过 nc -lk 端口号
// 在Window系统中是通过 nc -l -p 端口号
val data: DataStream[String] = senv.socketTextStream("localhost", 9999)

基于集合的Source

  • fromCollection(Collection) - 从Seq[T]或者Iterator[T]中创建数据流。集合中的所有元素必须属于同一类型。
  • fromCollection(Iterator, Class) - 从迭代器创建数据流。class 参数指定迭代器返回元素的数据类型。
  • fromElements(T ...) - 从给定的对象序列中创建数据流。所有的对象必须属于同一类型。
val data: DataStream[Any] = env.fromElements(1, 100, "hello", 'a')
val data:DataStream[String]=senv.fromCollection(List("hello","flink","scala"))

自定义的Source

  • addSource/fromSource - 实现一个新的 SourceFunctionRichSourceFunctionParallelsourceFunction interface实现类
    • 实现SourcePunction:是非并行的,不能指定并行度,即不能用setParallelism(num)算子,socketTextStreamFunction就是实现的SourceFunction
    • 实现ParallelSourceFunction:是并行化的Source所以能指定并行
    • 实现RichSourceFunction:实现了SourceFunction,还继承了AbstractRichFunction,所以比SourceFunction增加了openclose方法和getRuntimeContext()上下文对象

自定义随机Source

自定义数据源需要实现SourceFunction或者其对应的子接口,主要重写两个关键方法:

  • run():使用运行时上下文对象 SourceContext 向下游发送数据,用于编写产生数据的核心业务逻辑
  • cancel()方法是取消数据读取时调用,用于和run方法配置使用,判断用户使用取消数据读取
object MyRandomEventSource {
  // 模拟用户点击的行为
  // ID、Name、URL、ClickTime
  case class UserClick(name: String, url: String, clickTime: Long)
}

// 在这里模拟生成用户的点击行为数据
class MyRandomEventSource extends RichSourceFunction[UserClick] {

  private val names = Array("张三丰", "张无忌", "赵敏", "貂蝉", "刘皇叔")
  private val urls = Array("/index", "/about", "/pro_detail", "/pro?id=1", "/pro?id=2")
  private var running: Boolean = true // 程序的运行状态

  // 程序执行的时候会调用
  override def run(ctx: SourceFunction.SourceContext[UserClick]): Unit = {
      while (running) {
        val randName = names(Random.nextInt(names.length))
        val randUrl = urls(Random.nextInt(urls.length))
        // 收集产生的数据发送到调用
        ctx.collect(UserClick(randName, randUrl, System.currentTimeMillis()))

        // 每隔1-5秒钟随机产生一条数据
        // [0,5) + 1 [1,6)
        Thread.sleep((Random.nextInt(5) + 1) * 1000)
      }
  }

  // 用户手动关闭程序会调用
  override def cancel(): Unit = running = false
}

自定义MySQL Source

富函数类(Rich Function),是 DataStream API 提供的一个函数类的接口,所有 Flink 函数类都有其 Rich 版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。

  • open()方法是 rich function 的初始化方法,当一个算子例如 map 或者 filter被调用之前 open()会被调用。
  • run()方法是读取数据时调用,编写核心处理业务逻辑。
  • cancel()方法是取消数据读取时调用,用于和run方法配置使用,判断用户使用取消数据读取。
  • close()方法是生命周期中的最后一个调用的方法,做一些清理工作。
  • getRuntimeContext()方法提供了函数的 RuntimeContext 的一些信息,例如函数执行的并行度,任务的名字,以及 state 状态。

自定义MySQL 数据源,读取MySQL中的表数据

<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>5.1.47</version>
</dependency>

 初始化MySQL数据表

drop table tbl_student;
create table tbl_student(id int primary key auto_increment, name varchar(16), age int(3)) default charset = 'utf8';

insert into tbl_student(name,age) values('张三',18),('李四',28),('王五',22),('赵六',26),('田七',19);

select * from tbl_student;

 自定义非并行MySQL Source

// 定义实体样例类 Student 对接数据库表
object Entity {

  case class Student(id: Int, name: String, age: Int)

}

// 这里使用非并行Source
class MySQLSource extends RichSourceFunction[Student] {
  var conn: Connection = _ // 数据库连接对象,用于接收open方法中的初始化对象
  var ps: PreparedStatement = _
  var rs: ResultSet = _
  var isRunning = true // 是否持续从数据源中读取数据的标识

  // 初始化方法, 用于初始化MySQL数据库连接
  override def open(parameters: Configuration): Unit = {
    // 加载驱动
    Class.forName("com.mysql.jdbc.Driver")
    // 获取连接
    conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test_db?useSSL=false", "root", "root")
  }

  // 读取数据时调用,用于读取MySQL中的数据
  override def run(sourceContext: SourceFunction.SourceContext[Student]): Unit = {
    // 预处理SQL
    ps = conn.prepareStatement("select id, name, age from tbl_student")
    // 执行SQL获取结果
    rs = ps.executeQuery()
    // 循环读取集合中数据
    while (isRunning && rs.next()) {
      val student = Student(rs.getInt("id"), rs.getString("name"), rs.getInt("age"))
      // 把读取到的数据发送出去
      sourceContext.collect(student)
    }
  }

  // 取消数据读取时调用
  override def cancel(): Unit = isRunning = false
}


object TestMySQLSource{
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val data = env.addSource(new MySQLSource())
    data.print()
    env.execute("MySQL Source")
  }
}

Kafka Source

Kafka Source 提供了构建类来创建 KafkaSource 的实例。FlinkKafkaConsumer 已被弃用并将在 Flink 1.17 中移除,请改用 KafkaSource。

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.12</artifactId>
  <version>1.14.5</version>
</dependency>

Kafka Source 案例演示

object TestSource {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 构建Kafka Source
    val kafkaSource: KafkaSource[String] = KafkaSource.builder().setBootstrapServers("localhost:9092")
      .setTopics("test")
      .setGroupId("my-group")
      // timestamp(1657256176000L)(从时间戳大于等于指定时间戳(毫秒)的数据开始消费)   latest(从最末尾位点开始消费)
      .setStartingOffsets(OffsetsInitializer.earliest()) // 默认值 earliest() 从最早位点开始消费
      .setValueOnlyDeserializer(new SimpleStringSchema())
      .build()
    // 关联Source
//    val props = new Properties()
//    props.setProperty("bootstrap.servers", "localhost:9092")
//    props.setProperty("group.id", "consumer-group")
//    env.addSource(new FlinkKafkaConsumer[String]("test", new SimpleStringSchema(), props))
    // WatermarkStrategy.noWatermarks() 水印策略,不使用水印
    val data = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source")
    data.print()

    env.execute("Flink Kafka Source")
  }
}

三、Transformation 算子

Transformation(转换)算子就是将一个或多个 DataStream 转换成新的 DataStream,在应用程序中可以将多个数据转换算子合并成一个复杂的数据流(Dataflow)拓扑。

常用算子

Map算子(DataStream → DataStream):map算子接收一个函数作为参数,并把这个函数应用于DataStream的每个元素,最后将函数的返回结果作为结果DataStream中对应元素的值,即将DataStream的每个元素转换成新的元素。

def main(args: Array[String]): Unit = {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val source = env.fromCollection(List(1, 2, 3, 4))
  
  source.map(x => x * 2).print("lambda")

  source.map(new MyMap).print("function")

  source.map(new MapFunction[Int, Int] {
      override def map(t: Int): Int = t * 10
  }).print("function2")
  
  env.execute()
}

class MyMap extends MapFunction[Int, Int] {
  override def map(t: Int): Int = t * 10
}

FlatMap算子(DataStream → DataStream):与map()算子类似,但是每个传入该函数func的DataStream元素会返回0到多个元素,最终会将返回的所有元素合并到一个DataStream

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

val source = env.fromCollection(List(1, 2, 3, 4, 5))
source.flatMap(item => List(item,item)).print("lambda1")

val source1 = env.fromElements("flink spark link", "flink spark link", "flink spark link")

source1.flatMap(item => item.split(" ")).print("lambda2")

source1.flatMap(new FlatMapFunction[String, String] {
  override def flatMap(value: String, out: Collector[String]): Unit = {
    for (item <- value.split(" ")) {
      out.collect(item)
    }
  }
}).print("function")

env.execute()

KeyBy算子(DataStream → KeyedStream):KeyedStream实际上是一种特殊的DataStream,因为其继承了DataStream。KeyedStream用来表示根据指定的key进行分组的数据流。

keyBy算子主要作用于元素类型是元组或数组的DataStream上。使用该算子可以将DataStream中的元素按照指定的key(字段)进行分组,具有相同key的元素将进入同一个分区中(不进行聚合),并且不改变原来元素的数据结构。在逻辑上将流划分为不相交的分区,在内部是通过哈希分区实现的。

val env = StreamExecutionEnvironment.getExecutionEnvironment

val source = env.fromElements((1, 2), (1, 6), (1, 9), (1, 7), (2, 1), (2, 2), (2, 10), (3, 1))
source.keyBy(item => item._1).print("lambda")

// source.keyBy(new KeySelector[Tuple2[Int, Int], Int] {
source.keyBy(new KeySelector[(Int, Int), Int] {
  override def getKey(value: (Int, Int)): Int = value._1
}).print("function")

source.keyBy(item => "default")  // 所有的数据分为一组(default),求最大的那个元素
      .maxBy(1)
      .print()

env.execute()

Aggregation算子(KeyedStream → DataStream) :常用的聚合算子有sum()max()min()max_by()min_by()等,这些聚合算子统称为Aggregation。Aggregation算子作用于KeyedStream上,并且进行滚动聚合。与keyBy算子类似,可以使用数字或字段名称指定需要聚合的字段

max_by()min_by()min()max()的区别在于,后者的只计算指定字段的值,其他字段会保留最初第一个数据的值,而前者会返回包含计算字段的整条数据。

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

val source = env.fromElements((1, 2), (1, 6), (1, 9), (1, 7), (2, 1), (2, 2), (2, 10), (3, 1))

source.keyBy(item => item._1)
  // .sum(1)   // 按索引位置
  .sum("_2") // 按字段名
  .print()

keyBy算子会将DataStream转换为KeyedStream,而Aggregation算子会将KeyedStream转换为DataStream。 

 Reduce算子(KeyedStream → DataStream):从MapReduce开始,我们对reduce()操作就不陌生,它可以对已有的数据进行规约处理,把每一个新输入的数据和当前已经规约的数据进行聚合计算。

reduce算子主要作用于KeyedStream上,对KeyedStream数据流进行滚动聚合,即将当前元素与上一个聚合值进行合并,并且发射出新值。

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

val source = env.fromElements((1, 2), (1, 6), (1, 9), (1, 7), (2, 1), (2, 2), (2, 10), (3, 1))

source.keyBy(item => item._1)
  // 聚合规则:将每一组中元组的第二个字段进行累加,第一个字段保持不变。注意聚合后数据类型与聚合前保持一致(Int, Int)
  .reduce((state, data) => (state._1, state._2 + data._2))
  .print()

source.keyBy(item => "default")  // 一个组中求最大的元素
  .reduce((state,data) => if (state._2 > data._2) state else data)
  .print()

env.execute()

[案例] 读取CSV文件分组统计学生信息

使用流处理API读取指定数据文件students.csv,加载数据转换为样例对象Student,根据对象中的clazz班级字段进行分组,获取每个班级中年龄最小的学生信息。📎student.csv

object TestTransform {

  case class Student(id: Int, name: String, age: Int, clazzNo: Int)

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val source = env.readTextFile("D:\\WorkSpace\\IDEA\\flink_demo\\src\\main\\resources\\student.csv", "utf-8")
    
    source
    	.map(line => { val arr = line.split(","); Student(arr(0).toInt, arr(1), arr(2).toInt, arr(3).toInt) })
      .keyBy(_.clazzNo) 
      .min("age")  // .min(2)
      .print()
    
    env.execute()
  }
}

四、Sink 输出

Flink使用Data Sinks 将DataStream 转发到文件、套接字、外部系统或打印它们。Flink 自带了多种内置的输出格式,这些格式相关的实现封装在 DataStreams 的算子里:

  • print() / printToErr() - 在标准输出/标准错误流上打印每个元素的 toString() 值。 可选地,可以提供一个前缀(msg)附加到输出。这有助于区分不同的 print 调用。如果并行度大于1,输出结果将附带输出任务标识符的前缀。
  • writeToSocket - 根据 SerializationSchema 将元素写入套接字。
  • addSink/sinkTo - 调用自定义 Sink Function。Flink 捆绑了连接到其他系统(例如 JDBC、Apache Kafka)的连接器。

注意,DataStream 的 write*() 方法主要用于调试目的。它们不参与 Flink 的 checkpointing,这意味着这些函数通常具有至少有一次语义。刷新到目标系统的数据取决于 OutputFormat 的实现。这意味着并非所有发送到 OutputFormat 的元素都会立即显示在目标系统中。此外,在失败的情况下,这些记录可能会丢失。

为了将流可靠地、精准一次地传输到文件系统中,请使用 StreamingFileSink构建。此外,通过 .addSink(...) 方法调用的自定义实现也可以参与 Flink 的 checkpointing,以实现精准一次的语义。 

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.fromElements("a", "b", "c", "d", "e")
  .map(_.concat("-hello\n"))
  .writeToSocket("localhost", 6666, new SimpleStringSchema())
  env.execute()

输出到文件

FileSink为批处理和流处理提供了一个统一的Sink它可以将分区文件写入Flink支持的文件系统且保证精确一次的状态一致性,大大改进了之前流式文件输出的方式。

它的主要操作是将数据写入桶(buckets),每个桶中的数据都可以分割成一个个大小有限的分区文件,这样一来就实现真正意义上的分布式文件存储。

FileSink支持行编码(Row-encoded)和批量编码(Bulk-encoded,比如Parquet)格式。这两种不同的方式都有各自的构建器(builder),可以使用以下静态方法创建:

  • 行编码接收器FileSink.forRowFormat(basePath, rowEncoder)
  • 批量编码接收器FileSink.forBulkFormat(basePath, bulkWriterFactory)

在创建行或批量编码Sink时,我们需要传入两个参数,用来指定存储桶的基本路径(basePath)和数据的编码逻辑(rowEncoder 或bulkWriterFactory)。

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-files</artifactId>
  <version>${flink.version}</version>
</dependency>
case class Student(id: Int, name: String, age: Int, clazzNo: Int)

def main(args: Array[String]): Unit = {

  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.setParallelism(1)

  val source = env.readTextFile("input/student.csv")

  val result = source
    .map(line => { val arr = line.split(","); Student(arr(0).toInt, arr(1), arr(2).toInt, arr(3).toInt) })
    .keyBy(_.clazzNo)
    .minBy("age")  // .min(2)

  // 文件系统
  val fileSink: FileSink[String] = FileSink
        .forRowFormat(new Path("output/sink"), new SimpleStringEncoder[String]())
        .withRollingPolicy( // 文件滚动策略,用于在以下 3 个条件中的任何一个下滚动进行中的部件文件
          DefaultRollingPolicy.builder()
            .withRolloverInterval(Duration.ofMinutes(15)) // 至少包含15分钟的数据
            .withInactivityInterval(Duration.ofMinutes(5)) // 最近5分钟没有收到数据
            .withMaxPartSize(MemorySize.ofMebiBytes(1024)) // 文件大小已达到 1 MB(写入最后一条记录后)
            .build())
        .build()

  // 结果转为字符串写到文件中
  result.map(item=>item.toString).sinkTo(fileSink)

  env.execute()
}

批量编码接收器的创建方式与行编码接收器类似,但是需要指定的是BulkWriter.Factory。 逻辑定义如何添加和刷新新元素,以及如何添加一批记录以便进一步编码。Flin内置了四个BulkWriterFactory:

  • ParquetWriterFactory:用于为 Avro 数据创建 Parquet 编写器工厂。若要在应用程序中使用 Parquet 批量编码器,需要添加flink-parquet_2.12依赖。
  • AvroWriterFactory:将数据写入 Avro 文件,若要在应用程序中使用 Avro 编写器,需要添加flink-avro依赖。
  • SequenceFileWriterFactory:在应用程序中使用SequenceFile批量编码器,需要添加flink-sequence-file依赖。
  • OrcBulkWriterFactory:将数据能够以ORC格式进行批量编码,需要添加flink-orc_2.12依赖。

输出到Kafka

KafkaSink 可将数据流写入一个或多个 Kafka topic。

在构建 KafkaSink 时是必须指定的:

  • Bootstrap servers, setBootstrapServers(String)
  • 消息序列化器(Serializer), setRecordSerializer(KafkaRecordSerializationSchema)
  • 如果使用DeliveryGuarantee.EXACTLY_ONCE 的语义保证,则需要使用 setTransactionalIdPrefix(String)
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.12</artifactId>
  <version>1.14.5</version>
</dependency>
case class Student(id: Int, name: String, age: Int, clazzNo: Int)

def main(args: Array[String]): Unit = {

  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.setParallelism(1)

  val source = env.readTextFile("E:\\WorkSpace\\IDEA\\demo\\src\\main\\resources\\student.csv")

  val result = source
    .map(line => { val arr = line.split(","); Student(arr(0).toInt, arr(1), arr(2).toInt, arr(3).toInt) })
    .keyBy(_.clazzNo)
    .minBy("age")  // .min(2)

  // 定义KafkaSink
  val kafkaSink = KafkaSink.builder()
    .setBootstrapServers("192.168.157.130:9092")
    .setRecordSerializer(KafkaRecordSerializationSchema.builder()
      .setTopic("test-topic")
      .setValueSerializationSchema(new SimpleStringSchema())
      .build()
    )
    .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)  // 语义保证
    .build()
  
  // 写出数据到Kafka中
  result.map(item=>item.toString).sinkTo(kafkaSink)

  env.execute()
}

输出到Redis

Redis是一个开源的内存式的数据存储,提供了像字符串(string)、哈希表(hash)、列表(list)、集合(set)、排序集合(sorted set)、位图(bitmap)、地理索引和流(stream)等一系列常用的数据结构。因为它运行速度快、支持的数据类型丰富,在实际项目中已经成为了架构优化必不可少的一员,一般用作数据库、缓存,也可以作为消息代理。

Flink没有直接提供官方的Redis连接器,不过Bahir 项目还是担任了合格的辅助角色,为我们提供了Flink-Redis 的连接工具。但版本升级略显滞后,目前连接器版本为 1.l,支持的Scala版本最新到2.11。由于我们的测试不涉及到Scala的相关版本变化,所以并不影响使用。在实际项目应用中,应该以匹配的组件版本运行。

百度网盘下载地址:Redis-x64-3.2.100.zip 密码:6666 AnotherRedisDesktopManager

<dependency>
    <groupId>org.apache.bahir</groupId>
    <artifactId>flink-connector-redis_2.12</artifactId>
    <version>1.1.0</version>
</dependency>
case class Student(id: Int, name: String, age: Int, clazzNo: Int)

def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val source = env.readTextFile("E:\\WorkSpace\\IDEA\\demo\\src\\main\\resources\\student.csv")

    val result = source
    .map(line => {val arr = line.split(","); Student(arr(0).toInt, arr(1), arr(2).toInt, arr(3).toInt)})
    .keyBy(_.clazzNo)
    .minBy("age") // .min(2)

    // Redis Sink
    val conf = new FlinkJedisPoolConfig.Builder().setHost("192.168.157.130").build()
    result.addSink(new RedisSink[Student](conf, new MyRedisMapper))

    env.execute()
  }

class MyRedisMapper extends RedisMapper[Student] {
    override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(RedisCommand.HSET,"stu")

    override def getKeyFromData(t: Student): String = t.id.toString

    override def getValueFromData(t: Student): String = t.age.toString
  }

输出到MySQL

Flink 提供了 JDBC Connector。该连接器可以向 JDBC 数据库写入数据。添加下面的依赖以便使用该连接器(同时添加 JDBC 驱动)。

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-jdbc_2.12</artifactId>
  <version>1.14.5</version>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.47</version>
</dependency>

create table student(id int,name varchar(25),age int(3),clazz int(2))default charset=utf8; 

case class Student(id: Int, name: String, age: Int, clazzNo: Int)

def main(args: Array[String]): Unit = {

  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.setParallelism(1)

  val source = env.readTextFile("E:\\WorkSpace\\IDEA\\demo\\src\\main\\resources\\student.csv")

  val result = source
    .map(line => { val arr = line.split(","); Student(arr(0).toInt, arr(1), arr(2).toInt, arr(3).toInt) })
    .keyBy(_.clazzNo)
    .minBy("age")  // .min(2)

  // 创建JDBC Sink
  val jdbcSink = JdbcSink.sink[Student](
    "insert into tbl_student(id,name,age,clazz_no) values(?,?,?,?)",
    // (ps: PreparedStatement, t: Student) => {
    // 这里不能使用箭头函数,否则会报:The implementation of the RichOutputFormat is not serializable. The object probably contains or references non serializable fields.
    new JdbcStatementBuilder[Student] {
      override def accept(ps: PreparedStatement, t: Student): Unit = {
        ps.setInt(1, t.id)
        ps.setString(2, t.name)
        ps.setInt(3, t.age)
        ps.setInt(4, t.clazzNo)
      }
    },
    new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
      .withUrl("jdbc:mysql://localhost:3306/test_db?useSSL=false")
      .withDriverName("com.mysql.jdbc.Driver")
      .withUsername("root")
      .withPassword("root")
      .build()
  )

  // 数据写入到JDBC Sink中
  result.addSink(jdbcSink)

  env.execute()
}

 在Scala中,输入插入ps不能使用箭头lambda的方式设置占位数据,需要通过原始的匿名接口类实现 JdbcStatementBuilder

与Hadoop依赖冲突,导致jar缺少,需要手动加入缺少jar commons-compress

[案例] Flink整合Kafka计算实时单词数量

Flink集成了通用的Kafka连接器,用于消费Kafka中的数据或向Kafka中写入数据。

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.12</artifactId>
  <version>1.14.5</version>
</dependency>

案例代码 

object StreamKafkaWordCount {
  def main(args: Array[String]): Unit = {
    // 创建流处理执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 从kafka中读取数据
    val kafkaSource = KafkaSource.builder()
      .setBootstrapServers("localhost:9092")
      .setTopics("test01")
      .setGroupId("my-group")
      .setStartingOffsets(OffsetsInitializer.earliest()) // 默认值 earliest() 从最早位点开始消费
      .setValueOnlyDeserializer(new SimpleStringSchema())
      .build()
    // 添加数据源,不指定水印策略
    val dataStream = env.fromSource(kafkaSource,WatermarkStrategy.noWatermarks(), "Kafka Source")
    // 转换Kafka流数据
    val result = dataStream.flatMap(line => line.split(" "))
      .filter(word=>word.nonEmpty)
      .map(word=> (word,1))
      .keyBy(item=>item._1)
      .sum(1)
    // 数据结果到控制台
    result.print()
    // 触发任务执行
    env.execute("StreamKafkaWordCount")
  }
}

启动服务-数据测试 

root@LenovoX:~ $ zookeeper-server-start.sh -daemon $KAFKA_HOME/config/zookeeper.properties
root@LenovoX:~ $ kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
root@LenovoX:~ $ kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test01 1
Created topic test01.
root@LenovoX:~ $ kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test01
>flink flink spark java hive
>flink spark java hive

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/146494.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Android 深入系统完全讲解(2)

1 系统启动过程、嵌入式系统启动过程 这是我之前画的启动过程的图&#xff0c;这个主要就是给大家讲明白&#xff0c;启动过程整个的流程。 第一个阶段&#xff0c;bootloader 系统在上电的时候&#xff0c;系统会从固定的地方加载一段代码进入内部 ram 进行运行。这段代码 通…

【数学思维】Quasi-convex and quesi-concave

【数学思维】Quasi-convex and quesi-concaveConvex function 定义如下 f(λx(1−λ)y)≤λf(x)(1−λ)f(y)f(\lambda x(1-\lambda)y)\le \lambda f(x)(1-\lambda)f(y) f(λx(1−λ)y)≤λf(x)(1−λ)f(y)Quasi-convex function 定义如下 f(λx(1−λ)y)≤max⁡{f(x),f(y)}f(\l…

【阶段三】Python机器学习01篇:机器学习概念、机器学习类别、机器学习应用场景与机器学习基本技术:特征、标签、模型

本篇的思维导图: 机器学习概念 机器学习是AI人工智能的分支技术,而深度学习是机器学习的重要分支。 人工智能、机器学习、深度学习三者的关系 机器学习就是从数据中发现规律,机器学习的关键内涵之一在于利用计算机的运算能力从大量的数据中发现一个“函数”或…

Linux——VMware Tools的介绍及安装方法

一、VMware Tools的作用 1.最大的好处是可以直接把windows界面的文件拖进linux虚拟机内。 2.鼠标可以直接从虚拟机移动到windows等等好处。 二、VMware Tools的安装步骤 1.首先把linux虚拟机关机或退出&#xff0c;然后点击“编辑虚拟机设置”。 2.点击CD/DVD&#xff0c;…

电子、半导体废水深度除氟、除重金属的技术详解

电子半导体行业废水来源及水质特点电子废水主要是印刷线路板中每个环节产生的废水&#xff0c;如线路板上的赋铜线路、电子元器件、二极管、三极管、电容等&#xff0c;日常生活中常见的就是电脑元器件&#xff0c;如内存条、CPU、主板等。这些东西在生产成型的过程中主要通过电…

968. 监控二叉树

题目 分析 首先明确把摄像头放在叶子节点的父节点位置&#xff0c;才能充分利用摄像头的覆盖面积。 贪心算法&#xff1a; 从下到上看局部最优&#xff0c;因为下面的节点最多&#xff0c;让叶子节点的父节点安摄像头最合理&#xff0c;所用摄像头最少。 整体全局最优&#xf…

基于融合SPD+BIFPN+CBAM改进YOLOv5的奶牛检测识别分析系统开发

在我之前的几篇文章中分别应用了不同的tricks来改进yolov5模型如下&#xff1a;加入针对小目标的SPD-Conv模块《yolov5s融合SPD-Conv用于提升小目标和低分辨率图像检测性能实践五子棋检测识别》加入BIFPN特征融合模块《基于yolov5sbifpn实践隧道裂缝裂痕检测》加入注意力机制模…

[数据结构]栈和队列

目录[数据结构]栈和队列一.栈1.栈的基本概念1.2 栈的常见基本操作1.3 栈的实现1.3.1 入栈1.3.2 出栈1.3.3获取栈顶元素1.3.4 判断栈为空1.3.5 栈实现二.队列2.1 入队2.2 出队2.3 获取队首元素2.4 实现队列[数据结构]栈和队列 一.栈 1.栈的基本概念 栈&#xff08;Stack&#…

你了解真正的数字孪生吗?

数字孪生的目的是在虚拟空间构建数字化的复杂系统“镜像”&#xff0c;可以低成本、反复的从多个视角观察、控制、分析、验证和推演&#xff0c;从而帮助人们更好的在现实世界中完成设计、生产、运营等活动。 近年来&#xff0c;数字孪生技术在航空航天、工业制造、交通物流等多…

hbase2.x HBCK Report Region Holes Overlaps问题修复

Region Holes 查看该问题可通过master UI界面的HBCK Report查看 最下方RegionInfo展示了哪两个region之间存在空洞 也可以通过master日志查看 解决步骤 解决方法很简单&#xff0c;直接使用 hbck2 提供的 fixMeta 操作即可。 在 Usage 说明中能看到 fixMeta 能够修复 ‘…

【Linux】make/Makefile的使用

本文目录 背景简介 使用方法 为什么执行的指令是make和make clean呢&#xff1f; gcc如何判断文件是否需要重新执行&#xff1f; 背景简介 一个工程中的源文件不计数&#xff0c;其按类型、功能、模块分别放在若干个目录中&#xff0c;makefifile定义了一系列的规则来指定…

16万字智慧医疗-医院信息化大数据建设 方案

【版权声明】本资料来源网络&#xff0c;知识分享&#xff0c;仅供个人学习&#xff0c;请勿商用。【侵删致歉】如有侵权请联系小编&#xff0c;将在收到信息后第一时间删除&#xff01;完整资料领取见文末&#xff0c;部分资料内容&#xff1a; 目 录 1 概述 1.1 现状分析 …

人工智能-网络编程、TCP

目录1、网络编程1.1网络介绍1.2IP地址1.3 ifconfig和ping命名1.4端口和端口号1.5端口号的分类1.6 socket介绍1.7 TCP介绍&#xff12;、TCP的网络应用程序开发2.1 python&#xff13;编码转换2.2 TCP客户端程序开发流程2.3 TCP客户端程序开发2.4 TCP服务端程序开发流程2.5 TCP…

[Android]序列化原理Parcelable

Parcelable是Android为我们提供的序列化的接口&#xff0c;Parcelable相对于Serializable的使用相对复杂一些&#xff0c;但Parcelable的效率相对Serializable也高很多&#xff0c;这一直是Google工程师引以为傲的&#xff0c;Parcelable和Serializable的效率对比Parcelable vs…

CesiumLab中对输入人工模型的格式要求 CesiumLab系列教程

人工模型数据&#xff08;或者手工模型数据&#xff09;是三维 GIS 行业发展的最早的需求来源&#xff0c;通过3dsmax&#xff0c;maya 等建模工具人工建模的数据。 ​ 编辑 人工模型 这张图是cesiumlab上对人工模型来源的流程&#xff0c;在上面我们只罗列了四个建模工具&a…

Rabbitmq消息队列详解(二)——消息模式API

官网提供的消息模式&#xff1a; 依赖&#xff1a; <!-- 加入rabbitmq --> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>hello模型 没有交换机&…

电商物流第三方云仓到底是什么?

仓库能够简单地定义为一个规划空间&#xff0c;通常是一个用于处置和贮存货物的大型商业建筑。因而&#xff0c;仓储是指在这样一个规划空间中存储和处置货物所触及的一切过程。仓库中常见的货物包括&#xff1a;;机械零配件、建筑资料、废品农产品、家具和电子产品。仓库中的一…

Git、GitHub、Gitee、GitLab

Git、GitHub、Gitee、GitLab 文章目录前言一、Git1.1 Git概述1.2 版本控制1.3 为什么需要版本控制1.4 版本控制工具1.5 Git简史1.6 Git工作机制1.7 Git和代码托管中心二、Git安装三、 Git常用命令3.1 设置用户签名3.2 初始化本地库3.3 查看本地库状态3.3.1 首次查看&#xff08…

【c++】 仿函数的概念详解与基础实现

四、仿函数 函数对象(仿函数)是一个类&#xff0c;不是一个函数。 函数对象(仿函数)重载了”() ”操作符使得它可以像函数一样调用。分类:假定某个类有一个重载的operator()&#xff0c;而且重载的operator()要求获取一个参数&#xff0c;我们就将这个类称为“一元仿函数”&…

【算法笔记】最近公共祖先(LCA)算法求解——倍增算法

0. 前言 最近公共祖先简称 LCA&#xff08;Lowest Common Ancestor&#xff09;。两个节点的最近公共祖先&#xff0c;就是这两个点的公共祖先里面&#xff0c;离根最远的那个。 这种算法应用很广泛&#xff0c;可以很容易解决树上最短路等问题。 为了方便&#xff0c;我们记…