flink:java集成flink实现流数据处理(一)

news2025/1/1 23:27:14

文章目录

  • 0. 引言
  • 1. flink安装
  • 2. 流数据处理程序
    • 依赖包简介
    • 流处理4个部分
  • 3. 程序调用
  • 4. 总结

0. 引言

Apache Flink作为一款高性能的流处理框架,已成为企业级流数据处理的优选方案。本文将带领读者深入了解如何利用Java语言集成Flink,实现高效、可靠的流数据处理应用。本文将从Flink的基本原理入手,介绍Java与Flink的集成方法

1. flink安装

首先java的安装不再单独说明,大家可参考其他文章部署。我们讲解下开发环境的flink安装

1、在官网下载安装包,我这里选择的是1.13.3
版本,大家可以根据自己的需要选择对应版本

下载地址:https://flink.apache.org/zh/downloads/

在这里插入图片描述

2、解压安装包,修改conf文件夹下的flink-conf.yaml配置文件

# 将任务槽调大一点,默认是1,否则运行会因为任务槽不足报错
taskmanager.numberOfTaskSlots: 30 
# 如果要调整默认端口,则修改
rest.port: 8081

3、运行bin目录下的start-cluster.sh脚本

bin/start-cluster.sh 

在这里插入图片描述

4、然后访问 对应ip:8081端口,即可进入管理页面
在这里插入图片描述
5、同时为了方面后续直接调用flink指令,而不用跟上路径名,我们配置一个环境变量,这步不是必要,如果你没配置,调用脚本时使用全路径即可

export FLINK_HOME=/Library/software/flink/flink-1.13.3
export PATH=$PATH:$FLINK_HOME/bin

2. 流数据处理程序

1、首先创建一个springboot项目

依赖包简介

2、引入依赖

对应flink相关的依赖需要单独说明下,其jar版本需要根据flink版本来定,flink 1.11之前版本使用的是scala2.11, 之后加入了对scala2.12的支持,不同的版本引入的jar名称不同,比如flink-streaming-scala包有3个,对应不同的scala版本,flink-streaming-scala则为对多个版本的兼容版
在这里插入图片描述
一般我们根据scala版本来定,比如我这里使用flink1.13, 则对应scala2.12

<properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <spring-boot.version>2.6.13</spring-boot.version>
        <flink-verison>1.13.3</flink-verison>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

<!--        flink start-->

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink-verison}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink-verison}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>${flink-verison}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.12</artifactId>
            <version>${flink-verison}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.12</artifactId>
            <version>${flink-verison}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-jdbc_2.12</artifactId>
            <version>1.9.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_2.12</artifactId>
            <version>${flink-verison}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>${flink-verison}</version>
        </dependency>

<!--        flink end-->

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.33</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.7.1</version>
        </dependency>

    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

我这里将演示要用的包都添加上了,大家可以根据自己的需求选择,依赖包简介:

  • flink-streaming-java_2.12: 提供java集成flink实现流数据处理的接口
  • flink-clients_2.12:flink客户端与服务端集群交互的接口和工具
  • flink-connector-base:提供flink做各类连接的抽象类、工具等,是flink项目中的一个基础包
  • flink-connector-kafka_2.12:连接kafka的接口和工具
  • flink-connector-jdbc_2.12:连接数据库的jdbc接口和工具
  • flink-jdbc_2.12:与数据库交互的接口,可与flink-connector-jdbc_2.12配合使用
  • flink-runtime-web_2.12:本地模式提供web ui相关功能,不添加不影响运行,但是会报错FlinkException: The module flink-runtime-web could not be found in the class path. Please add this jar in order to enable web based job submission.
  • flink-streaming-scala_2.12:专门为 Scala 2.12 编译的流处理库。这个包提供了 Scala API,允许开发人员使用 Scala 编程语言构建流处理应用程序

3、配置文件application.properties中添加配置项,注意提前启动flink

flink:
  # flink集群的服务器ip和端口
  job-manager-host: localhost
  job-manager-port: 8081

4、接下来我们实现一个简单的案例,来帮助我们理解java集成flink的过程

在官网有关于流数据的介绍和一些简单案例
在这里插入图片描述

流处理4个部分

我们从官网的实例可以知道,flink流数据处理实际上主要分成4个部分:
在这里插入图片描述

  • 1、StreamExecutionEnvironment.getExecutionEnvironment()执行环境创建,这是固定的书写
  • 2、输入数据的声明,这里是固定取的自定义对象来作为输入流,也可以引入固定字符串、socket流、数据库、kafka等作为输入数据源
  • 3、流数据的处理,这里只是作为一个简单的过滤,还有其他更多的方法来做复杂的数据处理操作, 实际上就是定义flink任务的算子,大家具体可以参考官方文档,后续再带大家通过案例来熟悉
    https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/learn-flink/etl/
    在这里插入图片描述
    5、下面我们来实现一个简单的词频统计的程序,数据输入就取固定文本,后续我们再讲解其他输入源

(1)先创建环境,指定并行数,输入流数据指定为固定字符串

// 创建执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 指定并行度,默认电脑线程数
            env.setParallelism(3);

             DataStream<String> stream = env.fromElements(
                    "Flink is a powerful framework",
                    "flink 是 一个 强大的 框架"
            );

(2)数据处理上稍微麻烦一旦,我们先对字符串用空格分组,然后作为key, 词频为1放入Tuple2对象中,这里Tuple2的<key, value>类型与我们要放入的要保持一致

然后调用returns方法显示声明结果值类型,否则系统只能得到Tuple2泛型,而无法知道具体的Tuple2<String, Integer>类型

然后使用keyBy进行分组,条件为key,即单词名,然后调用sum做个数汇总,其参数positionToSum是指做合计的字段位置,0 是key, 1就是value,即前面放入的词频数1,或者也可以传入字段名

SingleOutputStreamOperator<Tuple2<String, Integer>> result = stream
                    .flatMap(
                            (String value, Collector<Tuple2<String, Integer>> out) -> {
                                String[] words = value.split(" ");
                                for (String word : words) {
                                    out.collect(Tuple2.of(word, 1));
                                }
                            }
                    )
                    .returns(new TypeHint<Tuple2<String, Integer>>() {
                    })
                    .keyBy(value -> value.f0)
                    .sum(1);

(3)然后我们将其做一个打印输出,其结果输出是通过添加不同的SinkFunction来实现的

result.addSink(new RichSinkFunction<Tuple2<String, Integer>>() {
                @Override
                public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
                    String world = value.getField(0);
                    Integer count = value.getField(1);
                    // 打印
                    System.out.println("单词:"+world + ",次数:"+count);
                }
            });

(4)最后不要忘记,执行任务

// 执行
env.execute("fixed text stream job");

完整代码

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.util.Collector;

/**
 * @author benjamin_5
 * @Description 固定文本输出:统计词频
 * @date 2024/9/24
 */
public class FixedStringJob {

    private static final Log logger = LogFactory.getLog(FixedStringJob.class);

    // 启动本地flink ./bin/start-cluster.sh

    public static void main(String[] args) {
        try {
            // 创建执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 指定并行度,默认电脑线程数
            env.setParallelism(3);

            DataStream<String> stream = env.fromElements(
                    "Flink is a powerful framework",
                    "flink 是 一个 强大的 框架"
            );

            // 处理数据: 切换、转换、分组、聚合 得到统计结果
            SingleOutputStreamOperator<Tuple2<String, Integer>> result = stream
                    .flatMap(
                            (String value, Collector<Tuple2<String, Integer>> out) -> {
                                String[] words = value.split(" ");
                                for (String word : words) {
                                    out.collect(Tuple2.of(word, 1));
                                }
                            }
                    )
                    // 显式地提供类型信息:对于flatMap传入Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple2<String, Long>。只有显式设置系统当前返回类型,才能正确解析出完整数据
                    .returns(new TypeHint<Tuple2<String, Integer>>() {
                    })
                    .keyBy(value -> value.f0)
                    .sum(1);

            result.addSink(new RichSinkFunction<Tuple2<String, Integer>>() {
                @Override
                public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
                    String world = value.getField(0);
                    Integer count = value.getField(1);
                    // 打印
                    System.out.println("单词:"+world + ",次数:"+count);
                }
            });

            System.out.println("执行完成");
            // 执行
            env.execute("fixed text stream job");

        }catch (Exception e){
            e.printStackTrace();
            logger.error("流任务执行失败:", e);
        }
    }
}

3. 程序调用

1、通过flink的官方文档我们可以知道,其脚本的运行方式是通过flink指令调用jar脚本,然后将任务提交到flink服务端集群,在集群中执行对应任务
在这里插入图片描述
2、在idea中,我们可以直接运行类来进行调试调用,工作台可以看到打印信息
在这里插入图片描述
3、如果我们要放到服务器上运行时,就需要将其打包为jar,首先我们要调整下部署方式,并声明这里书写的class为mainClass,否则运行时会找不到主类

<build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.example.flink_demo.job.FixedStringJob</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

4、打包项目

mvn clean package -DskipTests

5、运行指定类

flink run -sae -c com.example.flink_demo.job.FixedStringJob flink_demo-0.0.1.jar

在这里插入图片描述
在flink的日志目录log下可以看到打印信息
在这里插入图片描述

在这里插入图片描述

4. 总结

如上我们实现了一个简单的flink处理流数据的案例,下一期我们继续讲解实现数据库输入输出、kafka接入流数据等案例

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2171223.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Tomcat CVE-2017-12615 靶场攻略

漏洞描述 当 Tomcat运⾏在Windows操作系统时&#xff0c;且启⽤了HTTP PUT请求⽅法&#xff08;例如&#xff0c;将 readonly初始化参数由默认值设置为false&#xff09;&#xff0c;攻击者将有可能可通过精⼼构造的攻击请求数据包向服务器上传包含任意代 的 JSP ⽂件&#xf…

Kubernetes 资源详解

1. Namespace Namespace 是 kubernetes 系统中的一种非常重要资源&#xff0c;它的主要作用是用来实现多套环境的资源隔离或者多租户的资源隔离。 默认情况下&#xff0c;kubernetes集群中的所有的Pod都是可以相互访问的。但是在实际中&#xff0c;可能不想让两个Pod之间进行…

鸡汤,你在生活中有哪些观人术和识人技巧?

看有没有边界感吧&#xff0c;npd一般没有边界感。还有就是三角测量 吧&#xff0c;可能就是把你和其他人对比看你对他好不好了。还有就是看他有没有情感吧&#xff0c;npd情感这方面可以说一无所有。我是npd不过npd也就那样吧&#xff0c;不过别人怎么看我并不重要&#xff0…

AlphaFold3 | 详解 AlphaFold3 的模型结构及其在不同类型的预测实验中的表现

Jumper 本文将介绍 24 年 5 月发布的 Alaphafold3&#xff0c;其以“使用 AlphaFold 3 进行生物分子相互作用的精确结构预测”为标题发表在《nature》上&#xff0c;通讯作者为 Jumper。 Jumper 具有物理、化学、生物和计算方面的丰富背景。Jumper 本科学的是物理和数学&#…

excel数据常用函数学习记录

1、VLOOKUP查询函数&#xff0c;匹配数据返回 vlookup(查找值,查找范围,要查找的值在第几列,0表示精确查找) enter键返回 例如&#xff1a;在E列中返回&#xff0c;A列的值与D列一致的对应的B值。只会返回查找到的第一个 如果需要把查找到的匹配的数据都返回到单元格中&…

Power Automate 设置流Owner不生效的bug

在查找某个功能没生效时&#xff0c;定位到是一个Power automate的流停了&#xff0c;查看原因是因为创建流的owner被disable了 但是当把流的owner更新为可用的用户时&#xff0c;流依旧没被触发&#xff0c;触发的条件很简单&#xff0c;某个表的记录创建时&#xff0c;因为是…

白嫖10个免费视频剪辑素材网站,新手必备。

视频剪辑不知道去哪里找素材&#xff0c;那就看看这10个网站吧&#xff0c;免费下载&#xff0c;赶紧收藏&#xff01; 视频素材 1、菜鸟图库 视频素材下载_mp4视频大全 - 菜鸟图库 菜鸟图库网素材非常丰富&#xff0c;网站主要还是以设计类素材为主&#xff0c;高清视频素材…

51单片机应用开发(进阶)---数码管显示按键“加”“减”计数

实现目标 1、巩固按键操作 2、巩固数码管显示操作 一、内容描述 功能描述&#xff1a;1、开机显示10&#xff0c;每按一次K1 &#xff0c;数码管计数值加1&#xff0c;当加到20,&#xff0c;再按K1&#xff0c;数值一直停留在20&#xff1b;2、每按一次K2&#xff0c;数码管计…

Pencils Protocol上线 Vaults 产品,为 $DAPP 深入赋能

Pencils Protocol 是 Scroll 生态一站式综合收益平台&#xff0c;该平台以 DeFi 功能作为抓手&#xff0c;基于 Farming、Vaults、Auction 等功能不断向 LRT、LaunchPad、AI、FHE、RWA 等领域深入的拓展。 近期 Pencils Protocol 生态不断迎来重磅进展&#xff0c;一个是 $DAPP…

基于二分查找的动态规划 leetcode 300.最长递增子序列

如题&#xff1a; https://leetcode.cn/problems/longest-increasing-subsequence/description/ 其实常规动态规划的解法就没什么好说的了&#xff0c;有意思的是官方放出了一个二分查找的动态规化解法&#xff0c;时间复杂度能降到O(nlog(n))&#xff0c;但是为什么这样能解&…

PPT 快捷键使用、技巧

前言&#xff1a; 本文操作是以office 2021为基础的&#xff0c;仅供参考&#xff1b;不同版本office 的 ppt 快捷键 以及对应功能会有差异&#xff0c;需要实践出真知。 shift 移动 水平/垂直 移动 &#xff1b; shift 放大/缩小 等比例放大 缩小 &#xff1b; 正圆 正…

Python编程:01-基本数据类型-数值字符串,列表与元组,字典,集合set

python的数据类型有如下&#xff1a; 1、数字 数字类型是python中常用的类型&#xff0c;她是不可变的&#xff0c;创建一个数字很简单可以用一个变量来接收它 num12 在这里插入代码片 #创建变量num1 num29 #创建变量num2数字的类型分为如下几类&#xff1a; 整型&#x…

再谈QT的界面开发 - QT的社区版本的获取 - 2024-09

前言&#xff1a; QT的跨平台特性&#xff0c;赋予了QT的生命。2024年&#xff0c;因为项目的原因&#xff0c;重新开启了一个基于QT的跨平台项目。 QT有付费的版本和社区的版本。 1 获取社区的版本&#xff1a; 1.1 社区的版本的软件授权说明&#xff1a; Qt - Obligation…

Spring Boot 2.x基础教程:实现文件上传

​ 博客主页: 南来_北往 系列专栏&#xff1a;Spring Boot实战 前言 文件上传的功能实现是我们做Web应用时候最为常见的应用场景&#xff0c;比如&#xff1a;实现头像的上传&#xff0c;Excel文件数据的导入等功能&#xff0c;都需要我们先实现文件的上传&#xff0c;然…

【含文档】基于Springboot+微信小程序 的高校心理咨询系统(含源码+数据库+lw)

1.开发环境 开发系统:Windows10/11 架构模式:MVC/前后端分离 JDK版本: Java JDK1.8 开发工具:IDEA 数据库版本: mysql5.7或8.0 数据库可视化工具: navicat 服务器: SpringBoot自带 apache tomcat 主要技术: Java,Springboot,mybatis,mysql,vue 2.视频演示地址 3.功能 系统定…

WordPress最佳恶意软件扫描插件:入门级指南

在现代互联网环境中&#xff0c;网站安全已经成为每个网站管理员必须重视的问题。特别是对于使用WordPress的用户来说&#xff0c;由于其普及度高&#xff0c;WordPress网站常常成为黑客的首要攻击目标。幸运的是&#xff0c;有许多优秀的恶意软件扫描插件可以帮助我们保护网站…

案例精选 | 海门北部新城医学综合体智能化日志管理系统部署

海门北部新城医学综合体&#xff0c;即海门中医院新院区&#xff0c;坐落于江苏省南通市海门区北部新城的核心地带&#xff0c;是一座全新的现代化三级甲等中医医院。医院于2024年初正式启用&#xff0c;占地约64710平方米&#xff0c;拥有超过12万平方米的建筑面积&#xff0c…

【Python】The Algorithms:开源算法的宝库

The Algorithms 是一个开源项目&#xff0c;旨在为开发者提供各种编程语言的算法实现。该项目汇集了数千种算法的实现&#xff0c;涵盖了数据结构、排序算法、数学算法、机器学习、密码学等领域。通过该平台&#xff0c;开发者可以学习、理解并应用不同编程语言中的算法&#x…

企业安全策略制定

如今&#xff0c;网络安全是所有组织的必需品&#xff0c;而不是奢侈品。现代企业面临着针对其数据、网络和系统的复杂且不断演变的威胁。 即使一个漏洞也可能导致严重违规、财务损失和声誉受损。正如堡垒依靠多层防御共同作用一样&#xff0c;公司的安全措施必须作为一个整体…

MAC M1 安装brew 配置环境变量,安装dart

一. 下载 brew 1. 终端输入 /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)" 2. 如遇到下载失败情况&#xff0c;需要VPN/代理 curl: (7) Failed to connect to raw.githubusercontent.com port 443 after 8 m…