大数据:快速入门Scala+Flink

news2024/11/15 10:59:30

一、什么是Scala

Scala 是一种多范式编程语言,它结合了面向对象编程和函数式编程的特性。Scala 这个名字是“可扩展语言”(Scalable Language)的缩写,意味着它被设计为能够适应不同规模的项目,从小型脚本到大型分布式系统。

以下是 Scala 的一些主要特点:

  1. 兼容 Java:Scala 代码可以编译成 Java 字节码,并且可以在任何支持 Java 的平台上运行。这意味着 Scala 可以直接使用大量的 Java 库和框架。

  2. 简洁性:Scala 提供了一种更加简洁的方式来表达复杂的逻辑。通过模式匹配、类型推断等特性,程序员可以用较少的代码完成更多的工作。

  3. 函数式编程:Scala 支持函数作为一等公民,允许高阶函数、不可变数据结构和懒惰求值等函数式编程概念。

  4. 面向对象:Scala 同样支持面向对象编程的所有核心概念,包括类、对象、继承、封装等。

  5. 类型安全:Scala 有一个强大的静态类型系统,这有助于在编译时捕获错误并提供更好的代码质量。

  6. 并发模型:Scala 提供了 Actor 模型来处理并发问题,这是通过 Akka 框架实现的,非常适合构建高并发的应用程序。

  7. 泛型:Scala 对泛型的支持非常强大,提供了更灵活和安全的泛型机制。

  8. 交互性:Scala 有一个 REPL(读取-求值-打印循环)环境,允许开发者快速测试代码片段。

Scala 被广泛用于开发大规模的数据处理应用、Web 应用以及企业级软件。由于其与 Java 的良好集成,很多公司采用 Scala 来增强他们的 Java 生态系统中的应用程序。例如,Apache Spark 就是用 Scala 编写的,它是一个流行的大数据处理框架。

二、什么是Flink

Apache Flink 是一个开源的流处理框架,它为分布式、高性能、随时可用以及准确的流处理应用程序提供支持。Flink 的核心是一个流数据流引擎,它提供了数据分布、通信和状态管理等功能。Flink 可以处理有界数据(如固定大小的数据集)和无界数据(如持续不断的事件流),这使得它既可以作为批处理也可以作为流处理框架来使用。

以下是 Apache Flink 的一些关键特性:

  1. 实时处理:Flink 能够实现低延迟的实时数据处理。
  2. 高吞吐量:设计用于处理大规模数据流,并能够维持高吞吐量。
  3. 容错性:提供强大的容错机制,确保即使在节点故障的情况下也能保证计算结果的正确性和一致性。
  4. 精确一次(Exactly-Once)语义:保证每个事件只被处理一次,这对于需要精确结果的应用非常重要。
  5. 窗口操作:支持基于时间、计数或会话的窗口操作,方便对流数据进行复杂的分析。
  6. 状态管理:允许用户定义和维护应用状态,这对于实现复杂的业务逻辑是必需的。
  7. 可扩展性:可以轻松地部署到各种集群环境中,包括 YARN, Kubernetes, Mesos 等。
  8. API 支持:提供了多种语言的 API,包括 Java 和 Scala,也支持 Python 以及其他语言通过 Table API 或 SQL 接口。

Flink 被广泛应用于实时分析、事件驱动应用、ETL 操作、报警系统等领域。随着大数据和实时数据处理需求的增长,Flink 在业界得到了越来越多的关注和应用。

三、流处理和批处理 的区别

流处理和批处理是数据处理的两种主要方式,它们各自适用于不同的场景,并且有着不同的特点。下面是这两种处理方式的主要区别:

批处理(Batch Processing)

  • 定义:批处理是指对固定大小的数据集进行处理的过程,这些数据通常是一次性加载到系统中的。
  • 数据特性:处理的是静态的、历史的数据集合,数据在处理之前就已经完全可用。
  • 延迟:由于需要收集完整的数据集后才能开始处理,因此批处理通常具有较高的延迟。
  • 应用场景:适合于不需要实时响应的场景,如日志分析、报告生成等。
  • 容错性:可以实现精确一次(Exactly-Once)语义,保证每个数据项被准确处理一次。
  • 资源使用:批处理任务可以在非高峰时段运行,以优化资源使用。

流处理(Stream Processing)

  • 定义:流处理是对连续不断的数据流进行即时处理的过程,数据项一旦到达就立即被处理。
  • 数据特性:处理的是动态的、实时的数据流,数据是持续产生的。
  • 延迟:能够提供非常低的延迟,甚至接近实时,因为数据一到达就可以被处理。
  • 应用场景:适用于需要快速反应的场景,如实时监控、在线广告投放、欺诈检测等。
  • 容错性:现代流处理框架如 Apache Flink 和 Kafka Streams 也支持精确一次(Exactly-Once)语义,但实现起来比批处理更复杂。
  • 资源使用:流处理通常要求更高的计算资源和更复杂的基础设施来保证低延迟和高吞吐量。

混合模式

近年来,随着技术的发展,出现了一些混合处理模式,比如微批处理(Micro-batching),它将数据流分成小批次进行处理,试图结合流处理和批处理的优点。这种模式既保持了较低的延迟,又简化了处理逻辑和状态管理。

选择哪种处理方式取决于具体的应用需求、数据特性和业务目标。例如,如果应用需要基于最新数据做出决策,那么流处理可能更适合;而对于需要定期生成报表或分析大量历史数据的情况,则批处理可能是更好的选择。

四、安装Scala

1、 首先确保jdk1.8安装成功
  首先在安装之前,确保本地已经安装了JDK1.5以上的版本,在此博主安装的是1.8版本。并且已经设置了JAVA_HOME 环境变量及JDK的bin目录。

2、下载对应的Scala安装文件scala-2.11.8.zip
接着我们从Scala官网地址 https://www.scala-lang.org/download/all.html 上下载Scala二进制的包。

在这里插入图片描述
在这里插入图片描述
3. 解压scala-2.11.8.zip
4. 配置Scala的环境变量

  1. 打开环境变量
      右击我的电脑,单击"属性",进入如图所示页面。下面开始配置环境变量,右击【我的电脑】–【属性】–【高级系统设置】–【环境变量】,如图:
    在这里插入图片描述
  2. 设置 SCALA_HOME 变量
      单击新建,在变量名栏输入:SCALA_HOME: 变量值一栏输入:D:\scala 也就是 Scala 的安装目录,根据个人情况有所不同,如果安装在 C 盘,将 D 改成 C 即可。
    在这里插入图片描述
  3. 设置 Path 变量
      找到系统变量下的"Path"如图,单击编辑。在"变量值"一栏的最前面添加如下的路径: %SCALA_HOME%\bin;

在这里插入图片描述
4. 设置 Classpath 变量
   找到找到系统变量下的"Classpath"如图,单击编辑,如没有,则单击"新建":
变量名: ClassPath
变量值: .D:\scala.;
在这里插入图片描述
5. 检查
   检查环境变量是否设置好了:调出"cmd"检查。单击 【开始】,在输入框中输入cmd,然后"回车",输入 scala,然后回车,如环境变量设置ok,你应该能看到这些信息。
在这里插入图片描述
6. 测试
在这里插入图片描述
Plugins库有很多插件可联网安装,但可以选择离线安装方式,单击红框,然后选择Scala插件所在的路径确认即可。
在这里插入图片描述

注:查看scala插件是否安装成功,这也是第二种查看scala是否安装的方法。
如图所示可在Plugins库列表中搜索到即已完成安装
在这里插入图片描述
安装完scala插件后重启IDEA工具使其生效,单击【Restart】
在这里插入图片描述

五、大数据案例代码

1、批处理

Maven依赖

首先,确保你的pom.xml中包含以下依赖(适用于Maven构建):

<dependencies>
    <!-- Flink Core -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_2.12</artifactId>
        <version>1.14.0</version>  <!-- 根据需要替换为您使用的Flink版本 -->
    </dependency>
    <!-- Flink Streaming Kafka Connector -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>
    <!-- Oracle JDBC Driver -->
    <dependency>
        <groupId>com.oracle.database.jdbc</groupId>
        <artifactId>ojdbc8</artifactId>
        <version>19.8.0.0</version> <!-- 确保版本与您的Oracle数据库兼容 -->
    </dependency>
</dependencies>
配置Kafka和Oracle

请确保你的Kafka主题已经创建并且你能够通过Kafka消费消息。同时,确保你具有Oracle数据库的访问权限,并且已创建适当的表格以插入数据。

Scala + Flink 程序

下面是一段示例代码,展示了如何从Kafka读取数据并插入到Oracle数据库中。

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.api.environment.CheckpointingMode
import org.apache.flink.streaming.api.functions.sink.jdbc.JdbcSink
import java.sql.{Connection, PreparedStatement}
import java.util.Properties

object KafkaToOracle {
  def main(args: Array[String]): Unit = {
    // 创建StreamExecutionEnvironment
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 配置Kafka消费者
    val kafkaProps = new Properties()
    kafkaProps.setProperty("bootstrap.servers", "localhost:9092")  // Kafka Broker 地址
    kafkaProps.setProperty("group.id", "test")                     // 消费者组
    kafkaProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    kafkaProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

    val kafkaConsumer = new FlinkKafkaConsumer[String]("your_topic", new SimpleStringSchema(), kafkaProps)

    // 从Kafka读取数据
    val stream = env.addSource(kafkaConsumer)

    // 处理和插入数据到Oracle
    stream.map(record => {
      // 假设Kafka传来的数据是以逗号分隔的字符串
      val fields = record.split(",")
      (fields(0), fields(1)) // 返回元组(字段1,字段2)
    }).addSink(new JdbcSink[(String, String)](
      "jdbc:oracle:thin:@your_oracle_host:1521:your_service_name", // Oracle JDBC URL
      (statement: PreparedStatement, t: (String, String)) => {
        statement.setString(1, t._1) // 设置字段1
        statement.setString(2, t._2) // 设置字段2
      },
      new JdbcStatementBuilder[(String, String)] {
        override def accept(t: (String, String), preparedStatement: PreparedStatement): Unit = {
          preparedStatement.setString(1, t._1)
          preparedStatement.setString(2, t._2)
        }
      }
    ))

    // 执行任务
    env.execute("Kafka to Oracle Example")
  }
}
表结构

假设你在Oracle中有一个名为your_table的表,结构为:

CREATE TABLE your_table (
    field1 VARCHAR2(255),
    field2 VARCHAR2(255)
);

确保表结构与上面代码中的插入逻辑相匹配。

补充说明
  1. Kafka的Topic: 修改your_topic为您实际使用的Topic名称。
  2. JDBC URL: 确保jdbc连接字符串和凭据是正确的。
  3. 性能优化: 在生产环境中,可能需要对Flink配置进行调整,例如并行度、检查点设置等。

确保所有依赖项正确并且可以访问Kafka和Oracle数据库后,编译并运行这个程序。它将从Kafka主题读取数据,进行处理后再插入到Oracle表中。

2、流处理

Maven依赖

确保你的pom.xml中有以下依赖:

<dependencies>
    <!-- Flink Streaming -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_2.12</artifactId>
        <version>1.14.0</version>  <!-- 根据需要替换为您使用的Flink版本 -->
    </dependency>
    <!-- Flink Streaming Kafka Connector -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>
    <!-- Oracle JDBC Driver -->
    <dependency>
        <groupId>com.oracle.database.jdbc</groupId>
        <artifactId>ojdbc8</artifactId>
        <version>19.8.0.0</version>
    </dependency>
</dependencies>
Scala + Flink 程序

以下是从Kafka读取数据并实时插入Oracle数据库的流处理示例代码:

import java.sql.{Connection, PreparedStatement}
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.scala._

object KafkaToOracleStreaming {
  def main(args: Array[String]): Unit = {
    // 创建 StreamExecutionEnvironment
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // Kafka配置
    val kafkaProps = new Properties()
    kafkaProps.setProperty("bootstrap.servers", "localhost:9092") // Kafka Broker 地址
    kafkaProps.setProperty("group.id", "test")                    // 消费者组

    // 创建Kafka消费者
    val kafkaConsumer = new FlinkKafkaConsumer[String]("your_topic", new SimpleStringSchema(), kafkaProps)

    // 从Kafka读取数据流
    val stream = env.addSource(kafkaConsumer)

    // 处理数据并插入Oracle
    stream.map(record => {
      // 假设Kafka传来的数据是以逗号分隔的字符串
      val fields = record.split(",")
      (fields(0), fields(1)) // 返回元组 (字段1, 字段2)
    }).addSink(new OracleSink)

    // 执行任务
    env.execute("Kafka to Oracle Streaming Example")
  }

  // 自定义Sink向Oracle插入数据
  class OracleSink extends RichSinkFunction[(String, String)] {
    var connection: Connection = _
    var insertStmt: PreparedStatement = _

    override def open(parameters: org.apache.flink.configuration.Configuration): Unit = {
      // 初始化JDBC连接
      connection = java.sql.DriverManager.getConnection("jdbc:oracle:thin:@your_oracle_host:1521:your_service_name", "username", "password")
      // 创建插入语句
      insertStmt = connection.prepareStatement("INSERT INTO your_table (field1, field2) VALUES (?, ?)")
    }

    override def invoke(value: (String, String), context: Context): Unit = {
      // 设置参数值
      insertStmt.setString(1, value._1)
      insertStmt.setString(2, value._2)
      // 执行插入
      insertStmt.executeUpdate()
    }

    override def close(): Unit = {
      // 关闭连接和语句
      if (insertStmt != null) insertStmt.close()
      if (connection != null) connection.close()
    }
  }
}
  1. Kafka消费者: 使用FlinkKafkaConsumer从Kafka主题获取数据。
  2. 数据处理: 每条从Kafka获取的记录在此处被转换为一个元组(字段1, 字段2),假设它们是通过逗号分隔的。
  3. 自定义Sink: OracleSink类继承自RichSinkFunction,负责与Oracle数据库的连接和数据插入。
    • open方法中,建立与Oracle的连接。
    • invoke方法中,执行插入操作。
    • close方法中,确保正确关闭连接和语句。
  4. 执行环境: 最后,通过env.execute("Kafka to Oracle Streaming Example")来启动Flink流处理任务。

六、项目部署

Scala+Flink 打包以后依旧是jar 通过Java程序的方式部署即可

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2158036.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vue2 中使用 Tinymce 富文本编辑器详解

vue2.x使用Tinymce富文本 项目中Tinymce效果图安装依赖包/创建依赖文件创建skins文件夹汉化文件-zh_CN.js 封装组件Tinymce.vue组件中使用封装组件tinymce.vueTinymce 扩展插件集合 项目中Tinymce效果图 如果想先了解一下&#xff0c;可以浏览一博主整理的的TinyMCE中文文档&am…

【揭秘大脑与AI的鸿沟:电化学信号与非线性动态交互的奥秘】

目录 【揭秘大脑与AI的鸿沟:电化学信号与非线性动态交互的奥秘】 1. 信息传递的奇迹:电化学信号的奥秘 2. 非线性动态交互:大脑的智慧之源 3. 结构与功能的鸿沟:从并行分布到有限层次 结语:探索未知的边界 【揭秘大脑与AI的鸿沟:电化学信号与非线性动态交互的奥秘】…

【深度学习】【TensorRT】【C++】模型转化、环境搭建以及模型部署的详细教程

【深度学习】【TensorRT】【C】模型转化、环境搭建以及模型部署的详细教程 提示:博主取舍了很多大佬的博文并亲测有效,分享笔记邀大家共同学习讨论 文章目录 【深度学习】【TensorRT】【C】模型转化、环境搭建以及模型部署的详细教程前言模型转换--pytorch转engineWindows平台搭…

[C#]winform 使用opencvsharp实现玉米粒计数

【算法介绍】 这段代码是使用OpenCvSharp库&#xff08;OpenCV的C#封装&#xff09;对图像进行处理&#xff0c;主要流程包括图像的二值化、腐蚀操作、距离变换、轮廓检测&#xff0c;并在原图上标出检测到的轮廓位置及数量。下面是对代码的详细解读&#xff1a; 初始化&…

网络通信——路由器、交换机、集线器(HUB)

注意&#xff1a;传输层&#xff0c;应用层没有网路设备 一.路由器&#xff08;网络层设备&#xff09; 1.分割广播域 2.一个接口就是一个广播域 3.一般接口位4&#xff0c;8&#xff0c;12。 4.数据转发 &#xff08;由路由表转发数据&#xff09; 5.根据路由表来进行路径选…

基于微信小程序的美食外卖管理系统

作者&#xff1a;计算机学姐 开发技术&#xff1a;SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等&#xff0c;“文末源码”。 专栏推荐&#xff1a;前后端分离项目源码、SpringBoot项目源码、Vue项目源码、SSM项目源码 精品专栏&#xff1a;Java精选实战项目…

面试速通宝典——1

1. 内存有哪几种类型&#xff1f; ‌‌‌‌  内存分为五个区&#xff0c;堆&#xff08;malloc&#xff09;、栈&#xff08;如局部变量、函数参数&#xff09;、程序代码区&#xff08;存放二进制代码&#xff09;、全局/静态存储区&#xff08;全局变量、static变量&#…

米壳AI:自媒体如何获取高清原画质!真8K视频是这样下载的!

作为一名新手自媒体博主&#xff0c;你是不是也在各种短视频平台上疯狂搜索保存外网视频的方法和软件呢&#xff1f;&#x1f603;然而&#xff0c;真正能下载真 4K 视频的却寥寥无几。 别苦恼啦&#xff01;今天我就来给大家分享一个小编亲测过后真实好用的工具 —— 米壳 AI。…

网页通知设计灵感:CSS 和 JS 的 8 大创意实现

文章目录 前言正文1.霓虹灯风格的通知系统2.垂直时间轴通知3.动画徽章通知4.项目式通知5.多种状态通知&#xff1a;成功、错误、警告6.信息、警告、提示组件7.扁平化风格通知8.社交媒体风格弹出通知 总结 前言 网页通知如今已成为电商、社交平台等网站的常见功能&#xff0c;它…

Pandas -----------------------基础知识(二)

dataframe读写数据操作 import pandas as pd# 准备数据(字典) data [[1, 张三, 1999-3-10, 18],[2, 李四, 2002-3-10, 15],[3, 王五, 1990-3-10, 33],[4, 隔壁老王, 1983-3-10, 40] ]df pd.DataFrame(data, columns[id, name, birthday, age]) df写到csv文件中 &#xff0c;…

SOLIDWORKS 2025 重点新功能大放送(壹)

SOLIDWORKS 2025涵盖全新以用户为中心的增强功能&#xff0c;致力实现更智能、更快速地与团队和外部合作伙伴协同工作。 小索是设计部负责人&#xff0c;SOLIDWORKS资深使用者&#xff0c;使用SOLIDWORKS软件多年&#xff0c;喜欢分享&#xff0c;正在体验SOLIDWORKS 2025版本…

tensorboard展示不同运行的曲线结果

运行tensorboard曲线如下&#xff1a; tensorboard --logdir .有时候&#xff0c;曲线图会展示多条曲线&#xff0c;以至于我们想分辨哪条线来自哪次训练都做不到了。如下图是设置smoothing-0.6的结果&#xff1a; smoothing可以在页面找到设置按钮&#xff0c;呼出设置侧边…

【算法笔记】二分查找 红蓝染色法

目录 二分查找 红蓝染色法&#xff08;感谢灵神&#xff09;闭区间[left, right]左闭右开区间[left, right)开区间(left, right)变式 二分查找 红蓝染色法&#xff08;感谢灵神&#xff09; 这里是灵神的教学视频&#xff1a;二分查找 红蓝染色法_哔哩哔哩_ bilibili 学了二分…

玩转RabbitMQ声明队列交换机、消息转换器

♥️作者&#xff1a;小宋1021 &#x1f935;‍♂️个人主页&#xff1a;小宋1021主页 ♥️坚持分析平时学习到的项目以及学习到的软件开发知识&#xff0c;和大家一起努力呀&#xff01;&#xff01;&#xff01; &#x1f388;&#x1f388;加油&#xff01; 加油&#xff01…

中兴交换机三层配置

中兴交换机三层配置 目的&#xff1a;将1-10端口划分到3001vlan&#xff0c;11-20端口划分到3002vlan中去 客户端客户端IPvlan网关主机A88.88.1.1203001192.168.1.254主机B192.168.100.1303002192.168.100.254 1、通过Console线登录设备 **********************************…

导出导入Oracle数据库使用黑框命令方式exp、imp【亲测】

下载工具 根据自己数据库的版本下载&#xff0c;以v19为例&#xff1a; 下载基础包Basic Package和工具包Tools Package 两个压缩包中的文件夹一样&#xff0c;但内容不一样&#xff0c;将两个压缩包中的文件解压合并到一起 https://www.oracle.com/database/technologies/inst…

TLV解码 - 华为OD统一考试(E卷)

2024华为OD机试&#xff08;E卷D卷C卷&#xff09;最新题库【超值优惠】Java/Python/C合集 题目描述 TLV编码是按 [Tag Length Value] 格式进行编码的&#xff0c;一段码流中的信元用Tag标识&#xff0c;Tag在码流中唯一不重复&#xff0c;Length表示信元Value的长度&#xff…

Zotero(7.0.5)+123云盘同步空间+Z-library=无限存储文献pdf/epub电子书等资料

选择123云盘作为存储介质的原因 原因1&#xff1a; zotero个人免费空间大小&#xff1a;300M&#xff0c;如果zotero云端也保存文献pdf资料则远远不够 原因2&#xff1a; 百度网盘同步文件空间大小&#xff1a;1G123云盘同步文件空间大小&#xff1a;10G 第一台电脑实施步骤…

微服务--Gateway网关

在微服务架构中&#xff0c;Gateway&#xff08;网关&#xff09;是一个至关重要的组件&#xff0c;它扮演着多种关键角色&#xff0c;包括路由、负载均衡、安全控制、监控和日志记录等。 Gateway网关的作用 统一访问入口&#xff1a; Gateway作为微服务的统一入口&#xff0c…

DNF Decouple and Feedback Network for Seeing in the Dark

DNF: Decouple and Feedback Network for Seeing in the Dark 在深度学习领域&#xff0c;尤其是在低光照图像增强的应用中&#xff0c;RAW数据的独特属性展现出了巨大的潜力。然而&#xff0c;现有架构在单阶段和多阶段方法中都存在性能瓶颈。单阶段方法由于域歧义&#xff0c…