FlinkSql读取kafka数据流的方法(scala)

news2024/12/25 23:46:07

我的scala版本为2.12

<scala.binary.version>2.12</scala.binary.version>

我的Flink版本为1.13.6

<flink.version>1.13.6</flink.version>

FlinkSql读取kafka数据流需要如下依赖:

    <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
    </dependency>

我们首先建一个kafka主题person_test。然后建立一个scala类作为kafka的生产者,示例内容如下:

package cha01

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}

import java.util.Properties
import Util._

import scala.util.Random

object FlinkSqlKafkaConnectorProducer {
  def main(args: Array[String]): Unit = {
    val producerConf = new Properties()
    producerConf.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"single01:9092")
    producerConf.setProperty(ProducerConfig.BATCH_SIZE_CONFIG,"10")
    producerConf.setProperty(ProducerConfig.LINGER_MS_CONFIG,"50")
    producerConf.setProperty(ProducerConfig.RETRIES_CONFIG,"2")
    producerConf.setProperty(ProducerConfig.ACKS_CONFIG,"1")
    producerConf.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.IntegerSerializer")
    producerConf.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")

    val topic = "person_test"
    val producer:KafkaProducer[Integer,String] = new KafkaProducer(producerConf);
    val rand = new Random()

    for(i <- 1 to 2000){
      val line: String = s"$i,Origami$i,${rand.nextInt(30)+18},${if (rand.nextInt(10) >=8) "Male" else "Female"}"
      val record: ProducerRecord[Integer, String] =
        new ProducerRecord[Integer, String](topic, 0, System.currentTimeMillis(), i, line)
      producer.send(record)
      Thread.sleep(50+rand.nextInt(500))
    }

    producer.flush()
    producer.close()

  }
}

此kafka生产者会随机生产出一系列类似以下内容的数据,类型为csv:

id,name,age,gender
1,Origami1,25,Female
2,Origami2,30,Male
3,Origami3,22,Female

随后再在工程中建立一个scala类,内容示例如下:

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object CSVKafkaSystem {
  def main(args: Array[String]): Unit = {
    val settings = EnvironmentSettings.newInstance()
      .inStreamingMode()
      .build()
    val see: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val tabEnv: StreamTableEnvironment = StreamTableEnvironment.create(see)

    tabEnv.executeSql(
      """
         |CREATE TABLE person(
         |id INT,
         |name STRING,
         |age INT,
         |gender STRING
         |) WITH (
         |'connector' = 'kafka',
         |'topic'= 'person_test',
         |'properties.bootstrap.servers' = 'single01:9092',
         |'properties.group.id' = 'person_test_group',
         |'scan.startup.mode' = 'earliest-offset',
         |'format' = 'csv',
         |'csv.ignore-parse-errors' = 'true',
         |'csv.field-delimiter' = ','
         |)
         |""".stripMargin)

    tabEnv.sqlQuery("SELECT * FROM person").execute().print()
  }
}

其中的一些参数解释如下:'

connector' = 'kafka'

指定连接器类型为kafka

'topic'= 'person_test'

指定要读取的kafka主题为person_test

'properties.bootstrap.servers' = 'single01:9092'

指定kafka所在的服务器的ip地址以及端口号

'properties.group.id' = 'person_test_group'

指定 Kafka 消费者组的 ID为person_test_group

'scan.startup.mode' = 'earliest-offset'

指定了控制 Flink 从 Kafka 中读取数据时的起始位置

  • earliest-offset 表示从 Kafka 中每个分区的最早消息开始读取。
  • latest-offset 表示从 Kafka 中每个分区的最新消息开始读取。
  • group-offsets 表示使用 Kafka 消费者组的偏移量来恢复上次消费的位置

'format' = 'csv'

指定了 kafka 消息的格式为csv

'csv.ignore-parse-errors' = 'true'

指定了忽略解析异常的信息

'csv.field-delimiter' = '

指定 CSV 数据中字段的分隔符为,

可以看到最终结果如下,数据在源源不断的生成,flinkSQL也在源源不断的读取表内容

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2239467.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Docker 安装Immich教程

Immich 是一个开源的自托管照片和视频管理平台,专为帮助用户存储、管理、和分享个人媒体库而设计。Immich 的目标是提供一个类似 Google Photos 的替代方案,但不依赖于第三方服务,用户可以完全控制自己的数据。 本章教程,记录如何用Docker部署安装Immich,使用的操作系统的…

【CICD】CICD 持续集成与持续交付在测试中的应用

一、什么是CICD&#xff1f; CI/CD 是指持续集成&#xff08;Continuous Integration&#xff09;和持续部署&#xff08;Continuous Deployment&#xff09;或持续交付&#xff08;Continuous Delivery&#xff09; 1.1 持续集成&#xff08;Continuous Integration&#xf…

交友问题 | 动态规划

描述 如果有n个人&#xff0c;每个人都可以保持单身或与其他人结成一对。每个人只能找一个对象。求总共有多少种保持单身或结对的方式。用动态规划求解。 输入 输入第一行t表示测试用例的数量 对于每一个测试用例, 输入一个整数n表示人数1<n<18 输出 针对每个测试用…

Web开发:ABP框架6——appsetting.json的读取以及实例的注入

目录 一、模块配置服务 二、配置服务的编写&#xff08;配置ORM&#xff09; 三、高层代码的运用&#xff08;ORM实例查询 & 获取字符串&#xff09; 一、模块配置服务 BookStoreHttpApiHostModule 二、配置服务的编写&#xff08;配置ORM&#xff09; (以freesql为例子…

tomcat启动运行乱码,解决方案

tomcat启动运行乱码,解决方案 不要修改系统设置;我们是要让tomcat兼容我们系统,不是让系统兼容tomcat。不要修改系统设置;我们是要让tomcat兼容我们系统,不是让系统兼容tomcat。不要修改系统设置;我们是要让tomcat兼容我们系统,不是让系统兼容tomcat。解决方案 找到你的…

UE5材质篇 3 MaterialFunction

这个可以避免一部分的蜘蛛网&#xff0c;这样就用的时候很多蜘蛛网缩小成为一个节点 https://dev.epicgames.com/documentation/en-us/unreal-engine/creating-and-using-material-functions-in-unreal-engine 首先创建一个&#xff0c;这里这个名字他就是函数名&#xff0c;后…

linux命令详解,文件系统权限相关

文件系统权限相关 linux系统中一切都是文件 查看权限 Is -la /etc/passwd更改文件所有者 chown root file修改文件权限 sudo chmod urwx,grw,o-r file sudo chmod ux,gtw,o-r file chmod 400 <file>一、Linux系统中一切都是文件 在linux系统中&#xff0c;几乎所有的…

基本数据类型和包装类型的区别、缓存池、自动拆箱装箱(面试题)

目录 1. 八种基本类型及对应包装类型 2. 基本类型和包装类型 区别 3. 自动拆箱装箱 3.1 自动装箱 3.2 自动拆箱 3.3 缓存池 4. 高频面试案例分析 1. 八种基本类型及对应包装类型 基本数据类型类型描述范围&#xff08;指数形式&#xff09;位数包装类型byte整型&#x…

如何在Microsoft Edge中删除已保存的网站密码

目录 前言1. 如何进入Edge的密码管理界面1.1 打开Microsoft Edge的设置菜单1.2 进入个人资料设置1.3 进入密码管理 2. 在Edge中查看和删除已保存的密码2.1 查找需要删除的密码2.2 检查密码安全性2.3 删除特定网站的密码 3. 提升Edge密码管理的安全性3.1 启用Edge的多重身份验证…

Spring Boot框架:电商开发的新趋势

5 系统实现 系统实现部分就是将系统分析&#xff0c;系统设计部分的内容通过编码进行功能实现&#xff0c;以一个实际应用系统的形式展示系统分析与系统设计的结果。前面提到的系统分析&#xff0c;系统设计最主要还是进行功能&#xff0c;系统操作逻辑的设计&#xff0c;也包括…

后端:Aop 面向切面编程

文章目录 1. Aop 初步学习面向切面编程&#xff0c;EnableAspectJAutoProxy2. AOP的核心概念3. 前置通知&#xff08;Before&#xff09;4. 后置通知&#xff08;After&#xff09;5. 返回通知&#xff08;AfterReturning&#xff09;6. 异常通知&#xff08;AfterThrowing&…

三周精通FastAPI:41 部署:FastAPI版本和HTTPS概念

官方文档&#xff1a;https://fastapi.tiangolo.com/zh/deployment/versions/ 关于 FastAPI 版本 FastAPI 已在许多应用程序和系统的生产环境中使用。 并且测试覆盖率保持在100%。 但其开发进度仍在快速推进。 经常添加新功能&#xff0c;定期修复错误&#xff0c;并且代码仍…

Pinpoint(APM)进阶--Pinot指标采集(System Metric/Inspector)

接上文 Pinpoint使用Pinot进行指标数据存储&#xff0c;Pinot流摄入需要Kafka 本文详解Kafka和Pinot的安装部署&#xff0c;以及Pinpoint的指标采集 Pinot 简介 Apache Pinot是一个实时分布式OLAP数据存储&#xff0c;专为低延迟、高吞吐量分析而构建&#xff0c;非常适合面…

ReactPress:重塑内容管理的未来

ReactPress Github项目地址&#xff1a;https://github.com/fecommunity/reactpress 欢迎提出宝贵的建议&#xff0c;欢迎一起共建&#xff0c;感谢Star。 ReactPress&#xff1a;重塑内容管理的未来 在当今信息爆炸的时代&#xff0c;一个高效、易用的内容管理系统&#xff0…

uniapp路由与页面跳转详解:API调用与Navigator组件实战

UniApp路由与页面跳转详解&#xff1a;API调用与Navigator组件实战 路由 uniapp页面路由为框架统一管理&#xff0c;开发者需要在page.json里面配置每个路由页面的路径及页面样式。 路由跳转 uniapp有两种页面路由跳转方式&#xff0c;调用API跳转和navigator组件跳转。 调…

35.Redis 7.0简介

2022 年 2 月初&#xff0c;Redis 7.0 迎来了首个候选发布&#xff08;RC&#xff09;版本。这款内存键值数据库迎来了“重大的性能优化”和其它功能改进&#xff0c;性能优化包括降低写入时复制内存的开销、提升内存效率&#xff0c;改进 fsync 来避免大量的磁盘写入和优化延迟…

MySQL技巧之跨服务器数据查询:基础篇-如何获取查询语句中的参数

MySQL技巧之跨服务器数据查询&#xff1a;基础篇-如何获取查询语句中的参数 上一篇已经描述&#xff1a;借用微软的SQL Server ODBC 即可实现MySQL跨服务器间的数据查询。 而且还介绍了如何获得一个在MS SQL Server 可以连接指定实例的MySQL数据库的连接名: MY_ODBC_MYSQL 以…

连续15年霸榜“双11”行业第一,九牧做对了什么?

文 | 螳螂观察&#xff08;TanglangFin&#xff09; 作者 | 余一 随着“双十一”的落幕&#xff0c;各类销售榜单再次成为热门话题。 天猫“双11”全周期589个品牌成交额破亿&#xff0c;其中苹果、海尔、美的、小米、九牧等45个品牌成交额突破10亿。 值得注意的是在绝大多…

【网页设计】HTML5 和 CSS3 提高

目标 能够说出 3~5 个 HTML5 新增布局和表单标签能够说出 CSS3 的新增特性有哪些 1. HTML5 的新特性 注&#xff1a;该部分所有内容可参考菜鸟教程菜鸟教程 - 学的不仅是技术&#xff0c;更是梦想&#xff01; (runoob.com) HTML5 的新增特性主要是针对于以前的不足&#xf…

Linux手动安装nginx

本次以安装nginx-1.12.2为例 1、首先说明一下,安装nginx之前需要安装如下素材: 2、开始安装 第一步,安装依赖yum -y install gcc zlib zlib-devel pcre-devel openssl openssl-devel第二步,下载并安装nginx安装包(nginx官网:http://nginx.org/)# 下载 wget http://nginx…