SpringBoot集成flink

news2025/1/15 23:40:48

Flink是一个批处理和流处理结合的统一计算框架,其核心是一个提供了数据分发以及并行化计算的流数据处理引擎。
最大亮点是流处理,最适合的应用场景是低时延的数据处理。
场景:高并发pipeline处理数据,时延毫秒级,且兼具可靠性。

环境搭建:

①、安装flink

https://nightlies.apache.org/flink/flink-docs-master/zh/docs/try-flink/local_installation/

②、安装Netcat

Netcat(又称为NC)是一个计算机网络工具,它可以在两台计算机之间建立 TCP/IP 或 UDP 连接。
用于测试网络中的端口,发送文件等操作。
进行网络调试和探测,也可以进行加密连接和远程管理等高级网络操作

yum install -y nc # 安装nc命令

nc -lk 8888 # 启动socket端口

无界流之读取socket文本流

一、依赖


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>springboot-demo</artifactId>
        <groupId>com.et</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>flink</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- 添加 Flink 依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>1.17.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.17.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>1.17.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-base -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>1.17.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>1.17.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>1.17.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web</artifactId>
            <version>1.17.0</version>
        </dependency>


    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>META-INF/spring.handlers</resource>
                                </transformer>
                                <transformer
                                        implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
                                    <resource>META-INF/spring.factories</resource>
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>META-INF/spring.schemas</resource>
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.et.flink.job.SocketJob</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

二、SoketJob

public class SocketJob{
	
	public static void main(String[] args)throws Exception{
		
		// 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 指定并行度,默认电脑线程数
        env.setParallelism(3);
        // 读取数据socket文本流 指定监听 IP 端口 只有在接收到数据才会执行任务
        DataStreamSource<String> socketDS = env.socketTextStream("172.24.4.193", 8888);

        // 处理数据: 切换、转换、分组、聚合 得到统计结果
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socketDS
                .flatMap(
                        (String value, Collector<Tuple2<String, Integer>> out) -> {
                            String[] words = value.split(" ");
                            for (String word : words) {
                                out.collect(Tuple2.of(word, 1));
                            }
                        }
                )
                .setParallelism(2)
                // // 显式地提供类型信息:对于flatMap传入Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple2<String, Long>。只有显式设置系统当前返回类型,才能正确解析出完整数据
                .returns(new TypeHint<Tuple2<String, Integer>>() {
                })
//                .returns(Types.TUPLE(Types.STRING,Types.INT))
                .keyBy(value -> value.f0)
                .sum(1);


        // 输出
        sum.print();

        // 执行
        env.execute();
	}
}

测试:

启动socket流:

nc -l 8888

本地执行:直接ideal启动main程序,在socket流中输入

abc bcd cde
bcd cde fgh
cde fgh hij

在这里插入图片描述

集群执行:
执行maven打包,将打包的jar上传到集群中
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1492792.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

永别了,垃圾病毒山寨软件!

在下载软件这一块&#xff0c;电脑与手机最大的不同就是手机有专门的应用商店&#xff0c;而电脑我们却只能使用搜索引擎找寻软件官网下载&#xff0c;这对于大多数对电脑不熟悉的人来说&#xff0c;很可能会下载到恶意病毒软件。 尽管从Win10开始&#xff0c;微软已经在系统中…

【kubernetes】关于k8s集群的存储卷

目录 一、存储卷的分类 二、empty存储卷以及特点 三、hostpath存储卷以及特点 四、nfs存储卷以及特点 五、pvc存储卷 查看pv的定义 查看pvc的定义 实操&#xff1a;静态创建pv的方式 实现pvc存储卷 步骤一&#xff1a;先完成nfs的目录共享&#xff0c;需要准备不同的目…

原油数据处理:1.聚类、盐含量测定与近红外光谱快速评估

一、原油种类的聚类分析 在塔里木盆地塔河油田的原油处理过程中&#xff0c;需要对原油进行地球化学特征研究&#xff0c;以了解其成因和特征。根据地球化学手段的综合研究结果&#xff0c;塔河油田奥陶系原油属于海相沉积环境&#xff0c;成熟度较高&#xff0c;正构烷烃分布…

Python爬虫实战第三例【三】【上】

零.实现目标 爬取视频网站视频 视频网站你们随意&#xff0c;在这里我选择飞某速&#xff08;狗头保命&#xff09;。 例如&#xff0c;作者上半年看过的“铃芽之旅”&#xff0c;突然想看了&#xff0c;但是在正版网站看要VIP&#xff0c;在盗版网站看又太卡了&#xff0c;…

爬虫实战——scrapy框架爬取多张图片

scrapy框架的基本使用&#xff0c;请参考我的另一篇文章&#xff1a;scrapy框架的基本使用 起始爬取的网页如下&#xff1a; 点击每张图片&#xff0c;可以进入图片的详情页&#xff0c;如下&#xff1a; 代码实现&#xff1a; 项目文件结构如下 img_download.py文件代码 im…

探究java反射取值与方法取值性能对比

探究java反射取值与方法取值性能对比 由于我开发框架时&#xff0c;经常需要对象取值。常用的取值方式有&#xff1a; 反射取值方法调用取值 环境 同一台电脑&#xff1a; jdk 21.0.2 idea 2023.3.3 1. 测试代码&#xff08;常用&#xff09; 1.1 反射取值 public stat…

IO流操作大集合

1、分类 File只能对文件进行处理&#xff0c;如果想要处理文件里的内容&#xff0c;则要使用文件流。 文件流可分为字符和字节流&#xff1a; 字节流&#xff08;可读写任何格式的文件&#xff09; inputStream&#xff08;抽象父类&#xff09; FileInputStream outputStre…

STM32CubeIDE基础学习-基础外设初始化配置

STM32CubeIDE基础学习-基础外设初始化配置步骤 前言 前面的文章介绍了基础工程的创建步骤&#xff0c;这篇文章就接着在基础工程的基础上来配置相关外设了&#xff0c;下面以STM32F103C8T6的主芯片为例进行简单配置。 基础工程创建步骤回顾 具体的配置步骤流程如下&#xff1…

[两个栈实现队列]

[两个栈实现队列] 一、题目二、思路三、代码 一、题目 二、思路 //思路:两个栈实现队列&#xff0c;栈是先入后出&#xff0c;队列是队尾入&#xff0c;对头出&#xff0c;&#xff08;先入先出&#xff09;&#xff0c;那么可以这样干&#xff0c;假设一个栈Pushst&#xff0c…

Java基础 - 8 - 算法、正则表达式

一. 算法 什么是算法&#xff1f; 解决某个实际问题的过程和方法 学习算法的技巧&#xff1f; 先搞清楚算法的流程&#xff0c;再直接去推敲如何写算法 1.1 排序算法 1.1.1 冒泡排序 每次从数组中找出最大值放在数组的后面去 public class demo {public static void main(S…

如何使用 ArcGIS Pro 统计四川省各市道路长度

在某些时候&#xff0c;我们需要进行分区统计&#xff0c;如果挨个裁剪数据再统计&#xff0c;不仅步骤繁琐、耗时&#xff0c;还会产生一些多余的数据&#xff0c;这里教大家如何在不裁剪数据的情况下统计四川各市的道路长度&#xff0c;希望能对你有所帮助。 数据来源 教程…

web前端之uniApp实现选择时间功能

MENU 1、孙子组件1.1、html部分1.2、JavaScript部分1.3、css部分 2、子组件2.1、html部分2.2、JavaScript部分2.3、css部分 3、父组件3.1、html部分3.2、JavaScript部分 4、效果图 1、孙子组件 1.1、html部分 <template><view><checkbox-group change"ch…

HashSet在添加元素时,是如何判断元素重复的? (原理详解 | 易错案例)

前言&#xff1a;我们知道Set中所存储的元素是不重复的&#xff0c;那么Set接口的实现类HashSet在添加元素时是怎么避免重复的呢&#xff1f; HashSet在添加元素时&#xff0c;是如何判断元素重复的? ● 在底层会先调用hashCode()&#xff0c;注意&#xff0c;Obje…

分享一个完全免费的GPT4站点,gpts也可以用

给大家分享一个完全免费的GPT4站点&#xff0c;gpts也可以用点击链接可用

板级PDN(电源分配网络)设计要点综述

目录 目标阻抗去耦方法 确定目标阻抗 确定目标频点 VRM 去耦电容 安装电感 平面电容 总结 去耦电容 PCB叠层设计 扩展阅读 目标阻抗去耦方法 确定PCB去耦方案的策略是使用频域目标阻抗法&#xff0c;通过层间电容和分立电容器组合的使用&#xff0c;保证电源轨阻抗在…

【MySQL】视图 -- 详解

视图 是一个虚拟表&#xff0c;其内容由查询定义。同真实的表一样&#xff0c;视图包含一系列带有名称的列和行数据。视图的数据变化会影响到基表&#xff0c;基表的数据变化也会影响到视图。 一、基本使用 1、创建视图 create view 视图名 as select 语句; 好处&#xff1a;…

基于springboot+vue实现电子商务平台管理系统项目【项目源码+论文说明】

基于springboot实现电子商务平台管理系统演示 研究的目的和意义 据我国IT行业发布的报告表明&#xff0c;近年来&#xff0c;我国互联网发展呈快速增长趋势&#xff0c;网民的数量已达8700万&#xff0c;逼近世界第一&#xff0c;并且随着宽带的实施及降价&#xff0c;每天约有…

Node.js最准确历史版本下载(以下载Node.js16.17.1版本为例)

先进入官网:Node.js https://nodejs.org/en 括号中LTS代表稳定版本. 嫌其他冗余博客帖子多&#xff0c;找起来费眼睛,可以到/release下载:Node.js,在blog后面加/release https://nodejs.org/en/blog/release/ 点击next翻页,跟上面同样的步骤

吴恩达深度学习笔记:深度学习引言1.1-1.6

目录 第一门课&#xff1a;神经网络和深度学习 (Neural Networks and Deep Learning)第一周&#xff1a;深度学习引言(Introduction to Deep Learning)1.1 欢迎(Welcome)1.2 什么是神经网络&#xff1f;(What is a Neural Network) 第一门课&#xff1a;神经网络和深度学习 (Ne…

leetcode 3.5

普通数组 1.最大子数组和 最大子数组和 前缀和pre 动态规划 pre保留的是当前包含了当前遍历的最大的前缀和&#xff0c;如果之前的pre 对结果有增益效果&#xff0c;则 pre 保留并加上当前遍历, 如果pre 对结果无增益效果&#xff0c;需要舍弃&#xff0c;则 pre 直接更新为…