Flink的环境搭建及使用

news2024/11/6 1:12:12

在idea中创建一个Maven项目,导入Flink的依赖,在代码中创建Flink环境,编写代码.

如果不想去找flink依赖,就去flink官网,提供了一个mvn的命令,快速下载在本地构建一个flink的项目,可以直接从这个项目的pom.xml文件中拿到依赖配置

一、环境搭建

pom.xml文件的依赖导入

<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.15.4</flink.version>
        <target.java.version>1.8</target.java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <maven.compiler.source>${target.java.version}</maven.compiler.source>
        <maven.compiler.target>${target.java.version}</maven.compiler.target>
        <log4j.version>2.17.1</log4j.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>


二、使用Flink

以WordCount为例:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class Demo1WordCount {

    public static void main(String[] args) throws Exception {
        //1、创建flink的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //设置并行度,一个并行度对应一个task
        env.setParallelism(2);

        //修改数据从上游发送到下游的缓存时间
        env.setBufferTimeout(2000);


        /*
         * 无界流
         */
        //2、读取数据
        //nc -lk 8888
        DataStream<String> linesDS = env.socketTextStream("master", 8888);


        //一行转换成多行
        DataStream<String> wordsDS = linesDS
                .flatMap(new FlatMapFunction<String, String>() {
                    @Override
                    public void flatMap(String line, Collector<String> out) throws Exception {
                        for (String word : line.split(",")) {
                            //将数据发送到下游
                            out.collect(word);
                        }
                    }
                });

        //转换成kv格式
        DataStream<Tuple2<String, Integer>> kvDS = wordsDS
                .map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String word) throws Exception {
                        //返回一个二元组
                        return Tuple2.of(word, 1);
                    }
                });

        //按照单词进行分组
        //底层是hash分区
        KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS
                .keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
                    @Override
                    public String getKey(Tuple2<String, Integer> kv) throws Exception {
                        return kv.f0;
                    }
                });

        //统计数量
        DataStream<Tuple2<String, Integer>> countDS = keyByDS
                .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> kv1,
                                                          Tuple2<String, Integer> kv2) throws Exception {
                        int count = kv1.f1 + kv2.f1;
                        return Tuple2.of(kv1.f0, count);
                    }
                });

        //打印结果
        countDS.print();

        //3、启动flink
        env.execute("wc");
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2233952.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

蓝牙资讯|苹果AirPods Pro 2推出听力测试、助听器和听力保护等功能

苹果推送iOS 18.1 系统版本更新&#xff0c;AirPods Pro 2 用户也在 iOS 18.1 中获得了强大的新功能。 运行固件 7B19 的 AirPods Pro 2 用户&#xff0c;搭配 iOS 18.1 系统的 iPhone&#xff0c;将获得三项强大的听力健康功能&#xff1a;听力测试、助听器和听力保护。 听力…

【学习日记】Anaconda的安装与使用-小白大学生

目录 日记说明 解压安装&#xff1a; 配置 使用 日记说明 作者是个大学生 这个专栏主要收集课时常用的软件 以及女朋友上课用的软件的教程 所有安装包可以私聊我获取 免费 提前清除已有python环境 windows11 Anaconda-2024.02 垃圾话&#xff1a; Anaconda 是全球领先的数…

内网项目,maven本地仓库离线打包,解决Cannot access central in offline mode?

背景&#xff1a; 内网项目打包&#xff0c;解决Cannot access central in offline mode? 1、修改maven配置文件&#xff1a; localRepository改为本地仓库位置 <localRepository>D:\WorkSpace\WorkSoft\maven-repository\iwhalecloud-repository\business</loca…

教你怎样搭建自动化测试框架?

&#x1f345; 点击文末小卡片&#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 最近好多小伙伴都在说接口自动化测试&#xff0c;那么究竟什么是接口自动化测试呢&#xff1f;让我们一起往下看就知道了&#xff0c;首先我们得先弄清楚下面这…

反转链表.

给你单链表的头节点 head &#xff0c;请你反转链表&#xff0c;并返回反转后的链表。 示例 1&#xff1a; 输入&#xff1a;head [1,2,3,4,5] 输出&#xff1a;[5,4,3,2,1]示例 2&#xff1a; 输入&#xff1a;head [1,2] 输出&#xff1a;[2,1]示例 3&#xff1a; 输入&am…

数列分块入门

本期是数列分块入门。其中的大部分题目来自hzwer在LOJ上提供的数列分块入门系列。 Blog:here (其实是对之前分块的 blog 的整理补充) sto hzwer orz %%% [转载] ---------------------------------------------------------------------------------…

基于SpringBoot+Gpt个人健康管家管理系统【提供源码+答辩PPT+参考文档+项目部署】

作者简介&#xff1a;✌CSDN新星计划导师、Java领域优质创作者、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和学生毕业项目实战,高校老师/讲师/同行前辈交流。✌ 主要内容&#xff1a;&#x1f31f;Java项目、Python项目、前端项目、PHP、ASP.NET、人工智能…

苍穹外卖day-01

后端环境搭建 创建git仓库 提交代码 创建gitee远程仓库 开始连接远程仓库 运行sql文件&#xff0c;创建数据库。这里选取的可视化工具是navicat 编译一下项目 运行项目 登录的账号和密码在数据库中的emploee表中 退出前端界面登录后再重新登录&#xff0c;可以从后台清晰看到前…

【Docker故障处理】Ubuntu系统下tab键无法补全问题解决

【Docker故障处理】Ubuntu系统下tab键无法补全问题解决 一、环境介绍1.1 本地环境规划1.2 本次实践说明二、故障现象三、故障分析3.1 可能的原因3.2 排错思路四、故障处理4.1 安装bash-completion4.2 下载补全脚本4.3 配置永久生效五、测试tab键补全六、总结一、环境介绍 1.1 …

若依框架-添加测试类-最新

1、在【ruoyi-admin】的pom.xml下添加依赖 <!-- 单元测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-test</artifactId><scope>test</scope></dependency><dependency>…

CSS基础概念:什么是 CSS ? CSS 的组成

什么是 CSS&#xff1f; CSS&#xff08;层叠样式表&#xff0c;Cascading Style Sheets&#xff09;是一种用于控制网页外观的样式表语言。通过定义样式规则&#xff0c;CSS 可以指定 HTML 页面中各个元素的显示方式&#xff0c;包括颜色、布局、字体、间距等。 与 HTML 专注…

解密RFID技术提升应急消防管理效率的过程

一、部署RFID消防应急解决策略的具体步骤 &#xff08;1&#xff09;需求探讨与战略规划阶段 深入探究&#xff1a;全面、深刻地理解消防领域在资源分配、人员跟踪、应急救援等方面的实际需求。与消防机构紧密合作&#xff0c;共同确定RFID技术的应用提升和具体实施范围。 细…

国内短剧源码短剧系统搭建小程序部署H5、APP打造短剧平台

​在当今的互联网时代&#xff0c;短剧作为一种新兴的娱乐形式&#xff0c;受到了越来越多用户的喜爱。为了提供更好的用户体验和满足用户需求&#xff0c;一个好的短剧系统需要具备多元化的功能和优质的界面设计。 本文将介绍国内短剧源码短剧系统搭建小程序部署H5、APP所需的…

使用docker安装zlmediakit服务(zlm)

zlmediakit安装 zlmediakit安装需要依赖环境和系统配置&#xff0c;所以采用docker的方式来安装不容易出错。 docker pull拉取镜像(最新) docker pull zlmediakit/zlmediakit:master然后先运行起来 sudo docker run -d -p 1935:1935 -p 80:80 -p 8554:554 -p 10000:10000 -p …

qt QDragEnterEvent详解

1、概述 QDragEnterEvent是Qt框架中用于处理拖放进入事件的一个类。当用户将一个拖拽对象&#xff08;如文件、文本或其他数据&#xff09;拖动到支持拖放操作的窗口部件&#xff08;widget&#xff09;上时&#xff0c;系统会触发QDragEnterEvent事件。这个类允许开发者在拖拽…

HarmonyOS Next星河版笔记--界面开发(3)

属性 1.1.设计资源-svg图标 需求&#xff1a;界面中展示图标→可以使用的svg图标(任意放大缩小不失真、可以改变颜色) 使用方式&#xff1a; ①设计师提供&#xff1a;基于项目的图标&#xff0c;拷贝到项目目录使用 Image($r(app.media.ic_dianpu)) .width(40) fillColor…

查找连表的倒数第k个节点

居安思危 何解&#xff1f; 1、假如有1、2、3三个节点&#xff0c;找倒数第二个&#xff0c;实际是整数第几个&#xff1f; 3-21 2 &#xff1a; 及 length - k 1 ,所以先遍历找节点长度&#xff0c;在遍历找所需节点 // 今天这不是力扣的var findNode function(head , k){…

陪玩系统源码APP中的语音聊天直播房间有哪些功能?

陪玩系统源码APP通常采用Springboot、MybatisPlus和MySQL等后端技术栈来构建后端服务。这些技术提供了强大的数据处理能力和灵活的扩展性&#xff0c;能够满足高并发、低延迟的业务需求。 陪玩系统源码线上线下家政游戏陪玩前端开发框架如uniapp&#xff08;针对Web和小程序&am…

【python】OpenCV—findContours(4.3)

文章目录 1、功能描述2、代码实现3、完整代码4、结果展示5、涉及到的库函数5.1、cv2.Canny5.2 cv2.boxPoints 6、参考 1、功能描述 找出图片中的轮廓&#xff0c;拟合轮廓外接椭圆和外接矩阵 2、代码实现 导入必要的库&#xff0c;固定好随机种子 import cv2 as cv import …

介绍目标检测中mAP50和mAP50-95的区别

在目标检测任务中&#xff0c;mAP&#xff08;mean Average Precision&#xff09;是一个常用的性能评估指标&#xff0c;用于衡量模型在不同类别和不同IoU&#xff08;Intersection over Union&#xff09;阈值下的平均精度。mAP50和mAP50-95是mAP的两个特定版本&#xff0c;它…