Flink安装与编程实践

news2025/1/13 10:14:41

系列文章目录

Ubuntu常见基本问题
Hadoop3.1.3安装(单机、伪分布)
Hadoop集群搭建
HBase2.2.2安装(单机、伪分布)
Zookeeper集群搭建
HBase集群搭建
Spark安装和编程实践(Spark2.4.0)
Spark集群搭建

文章目录

  • 系列文章目录
  • 一、安装Flink
    • 1、修改环境变量
    • 2、启动
    • 3、测试
  • 二、编程实现WordCount程序
    • 1、安装Maven
    • 2、编写代码
      • ① WordCountData.java
      • ② WordCountTokenizer.java
      • ③ WordCount.java
      • ④ pom.xml
    • 3、使用Maven打包Java程序
    • 4、通过flink run命令运行程序

一、安装Flink

  1. 先把压缩格式的文件flink-1.9.1-bin-scala_2.11.tgz下载到本地电脑,然后保存在“下载”中
  2. 解压安装包flink-1.9.1-bin-scala_2.11.tgz至路径 /usr/local,命令如下
sudo tar -zxvf ~/下载/flink-1.9.1-bin-scala_2.11.tgz -C /usr/local/
cd /usr/local
sudo mv ./flink-1.9.1 ./flink
sudo chown -R hadoop:hadoop ./flink

1、修改环境变量

使用如下命令添加环境变量:

vim ~/.bashrc

增加如下内容:

export FLINK_HOME=/usr/local/flink
export PATH=$FLINK_HOME/bin:$PATH

保存并退出.bashrc文件,然后执行如下命令让配置文件生效:

source ~/.bashrc

2、启动

cd /usr/local/flink
./bin/start-cluster.sh

使用jps命令查看进程,成功啦!!!
在这里插入图片描述

3、测试

如果能够看到TaskManagerRunner和StandaloneSessionClusterEntrypoint这两个进程,就说明启动成功。
Flink的JobManager同时会在8081端口上启动一个Web前端,可以在浏览器中输入“http://localhost:8081”来访问。
Flink安装包中自带了测试样例,这里可以运行WordCount样例程序来测试Flink的运行效果,具体命令如下:

cd /usr/local/flink/bin
./flink run /usr/local/flink/examples/batch/WordCount.jar

结果如下:
在这里插入图片描述

二、编程实现WordCount程序

1、安装Maven

  1. 先把压缩格式的文件apache-maven-3.6.3-bin.zip下载到本地电脑,然后保存在“下载”中
  2. 解压安装包apache-maven-3.6.3-bin.zip至路径 /usr/local,命令如下
sudo unzip ~/下载/apache-maven-3.6.3-bin.zip -d /usr/local
cd /usr/local
sudo mv apache-maven-3.6.3/ ./maven
sudo chown -R hadoop ./maven

2、编写代码

在用户主文件夹下创建一个文件夹flinkapp作为应用程序根目录:

cd ~ #进入用户主文件夹
mkdir -p ./flinkapp/src/main/java

① WordCountData.java

sudo vim ./flinkapp/src/main/java/WordCountData.java

修改为:

package cn.edu.xmu;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
 
public class WordCountData {
    public static final String[] WORDS=new String[]{"To be, or not to be,--that is the question:--", "Whether \'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles,", "And by opposing end them?--To die,--to sleep,--", "No more; and by a sleep to say we end", "The heartache, and the thousand natural shocks", "That flesh is heir to,--\'tis a consummation", "Devoutly to be wish\'d. To die,--to sleep;--", "To sleep! perchance to dream:--ay, there\'s the rub;", "For in that sleep of death what dreams may come,", "When we have shuffled off this mortal coil,", "Must give us pause: there\'s the respect", "That makes calamity of so long life;", "For who would bear the whips and scorns of time,", "The oppressor\'s wrong, the proud man\'s contumely,", "The pangs of despis\'d love, the law\'s delay,", "The insolence of office, and the spurns", "That patient merit of the unworthy takes,", "When he himself might his quietus make", "With a bare bodkin? who would these fardels bear,", "To grunt and sweat under a weary life,", "But that the dread of something after death,--", "The undiscover\'d country, from whose bourn", "No traveller returns,--puzzles the will,", "And makes us rather bear those ills we have", "Than fly to others that we know not of?", "Thus conscience does make cowards of us all;", "And thus the native hue of resolution", "Is sicklied o\'er with the pale cast of thought;", "And enterprises of great pith and moment,", "With this regard, their currents turn awry,", "And lose the name of action.--Soft you now!", "The fair Ophelia!--Nymph, in thy orisons", "Be all my sins remember\'d."};
    public WordCountData() {
    }
    public static DataSet<String> getDefaultTextLineDataset(ExecutionEnvironment env){
        return env.fromElements(WORDS);
    }
}

② WordCountTokenizer.java

sudo vim ./flinkapp/src/main/java/WordCountTokenizer.java

修改为:

package cn.edu.xmu;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
 
 
public class WordCountTokenizer implements FlatMapFunction<String, Tuple2<String,Integer>>{
 
    public WordCountTokenizer(){}
 
 
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
        String[] tokens = value.toLowerCase().split("\\W+");
        int len = tokens.length;
 
        for(int i = 0; i<len;i++){
            String tmp = tokens[i];
            if(tmp.length()>0){
                out.collect(new Tuple2<String, Integer>(tmp,Integer.valueOf(1)));
            }
        }
    }
}

③ WordCount.java

sudo vim ./flinkapp/src/main/java/WordCount.java

修改为:

package cn.edu.xmu;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.utils.ParameterTool;
 
 
public class WordCount {
 
    public WordCount(){}
 
    public static void main(String[] args) throws Exception {
        ParameterTool params = ParameterTool.fromArgs(args);
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(params);
        Object text;
        //如果没有指定输入路径,则默认使用WordCountData中提供的数据
        if(params.has("input")){
            text = env.readTextFile(params.get("input"));
        }else{
            System.out.println("Executing WordCount example with default input data set.");
            System.out.println("Use -- input to specify file input.");
            text = WordCountData.getDefaultTextLineDataset(env);
        }
 
        AggregateOperator counts = ((DataSet)text).flatMap(new WordCountTokenizer()).groupBy(new int[]{0}).sum(1);
        //如果没有指定输出,则默认打印到控制台
        if(params.has("output")){
            counts.writeAsCsv(params.get("output"),"\n", " ");
            env.execute();
        }else{
            System.out.println("Printing result to stdout. Use --output to specify output path.");
            counts.print();
        }
 
    }
}

④ pom.xml

cd ~/flinkapp
vim pom.xml

修改为:

<project>
    <groupId>cn.edu.xmu</groupId>
    <artifactId>simple-project</artifactId>
    <modelVersion>4.0.0</modelVersion>
    <name>Simple Project</name>
    <packaging>jar</packaging>
    <version>1.0</version>
    <repositories>
        <repository>
            <id>jboss</id>
            <name>JBoss Repository</name>
            <url>http://repository.jboss.com/maven2/</url>
        </repository>
    </repositories>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.9.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
<version>1.9.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.9.1</version>
        </dependency>
    </dependencies>
</project>

3、使用Maven打包Java程序

为了保证Maven能够正常运行,先执行如下命令检查整个应用程序的文件结构:

cd ~/flinkapp
find .

文件结构应该是类似如下的内容:

.
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/WordCount.java
./src/main/java/WordCountTokenizer.java
./src/main/java/WordCountData.java

然后进行打包:

cd ~/flinkapp    #一定把这个目录设置为当前目录
/usr/local/maven/bin/mvn package

成功啦!!!
在这里插入图片描述
如果打包很慢,可以进行更换为国内源:Ubuntu常见基本问题

4、通过flink run命令运行程序

最后,可以将生成的JAR包通过flink run命令提交到Flink中运行(请确认已经启动Flink),命令如下:

/usr/local/flink/bin/flink run --class cn.edu.xmu.WordCount ~/flinkapp/target/simple-project-1.0.jar

成功啦!!!
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/684832.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

mongoDB相关知识

目录 常用操作删除数据库 启动问题集如何远程访问mongDB数据库由于widows安全策略&#xff0c;linux访问不到windows的mongDB 常用操作 删除数据库 windows下mongDB通过下面命令行进入 D:\mongodb\mongodb-win32-x86_64-2008plus-ssl-3.6.23-8-gc2609ed3ed\bin>mongod.exe…

Unity开发前的一些建议1_设置脚本的编码格式,设置IDE的编码格式

Unity开发前的一些建议1_设置脚本的编码格式&#xff0c;设置IDE的编码格式 乱码之后是是不可以撤回的哦。 这么做的理由&#xff0c;Unity右侧的Inspector面板看代码是UTF-8格式的。可以在Inspector中速览代码&#xff0c;且如果修改IDE&#xff0c;UTF-8比其他编码格式用的…

K8S复习

本文原文出自本人自己复习时整理&#xff0c;原文非常系统&#xff0c;建议拜师#yyds干货盘点# 手把手教你玩转 Kubernete 集群搭建(03)_wzlinux的博客-CSDN博客 1.docker的优势 在某一段时期内&#xff0c;大家一提到 Docker&#xff0c;就和容器等价起来&#xff0c;认为 Doc…

【架构】后端服务架构高性能设计方法

文章目录 前言1、无锁化1.1、串行无锁1.2、结构无锁 2、零拷贝2.1、内存映射2.2、零拷贝 3、序列化3.1、分类3.2、性能指标3.3、选型考量 4、池子化4.1、内存池4.2、线程池4.3、连接池4.4、对象池 5、并发化5.1、请求并发5.2、冗余请求 6、异步化6.1、调用异步化6.2、流程异步化…

【跟晓月学数据库】使用MySQLdump 对数据导入导出

前言 大家好&#xff0c;我是沐风晓月&#xff0c;今天给大家介绍MySQLdump的数据导出导入&#xff0c;希望对你有用。 &#x1f3e0;个人主页&#xff1a;我是沐风晓月 &#x1f9d1;个人简介&#xff1a;大家好&#xff0c;我是沐风晓月&#xff0c;阿里云社区专家博主&…

vue3+antd-design-vue+vite项目总结

代码热更新能力失效&#xff0c;每次都需要手动刷新&#xff0c;开发体验极差 1、先看看是否开启了热更新 2、再看看引入模块文件名是否正确。当前的项目部分人可以更新&#xff0c;部分不能&#xff0c;所以和1没什么关系&#xff0c;网上搜索发现vite对文件名大小写十分敏感&…

2-3查找树

2-3查找树 为了保证查找树的平衡性&#xff0c;我们需要一些灵活性&#xff0c;因此在这里我们允许树中的一个结点保存多个键。确切的说&#xff0c;我 们将一棵标准的二叉查找树中的结点称为2-结点(含有一个键和两条链)&#xff0c;而现在我们引入3-结点&#xff0c;它含有两…

Java版本企业招投标采购管理系统源码 +支持二开+spring cloud

一、立项管理 1、招标立项申请 功能点&#xff1a;招标类项目立项申请入口&#xff0c;用户可以保存为草稿&#xff0c;提交。 2、非招标立项申请 功能点&#xff1a;非招标立项申请入口、用户可以保存为草稿、提交。 3、采购立项列表 功能点&#xff1a;对草稿进行编辑&#x…

如何访问NetApp E系列存储的CLI命令行

NetApp存储的E系列&#xff08;e-series&#xff09;是收购LSI存储而来的&#xff0c;所以这个产品的install base&#xff0c;也就是安装量其实是很大的&#xff0c;因为早期LSI的商业模式就是OEM&#xff0c;给很多的IT公司做过OEM&#xff0c;比较典型的就是IBM的早期的DS存…

我想搭建一个商城?有哪些流程?

近年来&#xff0c;我国电子商务发展迅速。淘宝、京东、亚马逊等一大批电子商务巨头受到越来越多消费者的青睐。互联网普及率大大提高&#xff0c;消费者也逐渐形成了网上购物的习惯。在支付体验、物流服务和售后服务不断提升的过程中&#xff0c;越来越多的消费者依赖网络购物…

保险信创 数据领航|GBASE南大通用亮相2023年保险行业信息技术应用创新大会

2023年6月&#xff0c;2023年保险行业信息技术应用创新大会在京召开&#xff0c;会议重点围绕保险核心业务系统的改造及终端的选型和应用展开探讨学习&#xff0c;分享行业成功实践经验&#xff0c;着力解决行业信创发展痛点难点&#xff0c;助力行业加快信创生态建设&#xff…

Linux用户权限和认证

linux公钥登录 useradd test -m -g root创建名为test的用户&#xff0c;生成相应目录&#xff0c;同时加入root组passwd test重置密码 分配test用户sudo权限&#xff0c;sudo vim etc/sudoers 编辑SSH文件 vim /etc/ssh/sshd_config PermitRootLoginyes改为no关闭ROOT登录…

基于全卷积神经网络(FCN)实现图像分割

目录 1、作者介绍2、网络及数据集介绍2.1 FCN算法2.2 VOC_2012数据集2.3 制作自己的语义分割数据集2.3.1 标注方式一&#xff1a;多边形标注2.3.1.1 labelMe安装与数据标注2.3.1.2 数据格式转换2.3.1.3 数据集分类 2.3.2 标注方式二&#xff1a;像素级涂抹 3、基于RESNet50骨干…

Linux 学习记录39(C高级篇)

Linux 学习记录39(C高级篇) 本文目录 Linux 学习记录39(C高级篇)一、gdb调试工具1. gdb工具的使用 二、枚举类型 enum1. 定义 三、Makefile1. 什么是makefile2. 什么是Make3. Make用途4. Makefile的工作过程(1. Makefile分步编译的目的 5. Makefile的语法规则6. Makefile变量的…

【unity每日一记】那些动作基础你还记得吗—移动和旋转相关

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 秩沅 原创 &#x1f468;‍&#x1f4bb; 收录于专栏&#xff1a;uni…

农村电商APP软件开发 电商新农村紧跟时代脚步

互联网技术的深入发展让电子商务走向了高潮&#xff0c;不过在一些交通闭塞的乡村地区&#xff0c;电商发展还处于初期阶段。大量农产品从生产到流通渠道的变革&#xff0c;让互联网农产品成为新型农业发展的重要手段&#xff0c;不仅增加了农业产值提高了农民收入&#xff0c;…

《kafka 核心技术与实战》课程学习笔记(六)

生产者消息分区机制原理剖析 为什么分区&#xff1f; Kafka 有主题&#xff08;Topic&#xff09;的概念&#xff0c;它是承载真实数据的逻辑容器&#xff0c;而在主题之下还分为若干个分区&#xff0c;也就是说 Kafka 的消息组织方式实际上是三级结构&#xff1a;主题 - 分区…

[CKA]考试之四层负载均衡service

由于最新的CKA考试改版&#xff0c;不允许存储书签&#xff0c;本博客致力怎么一步步从官网把答案找到&#xff0c;如何修改把题做对&#xff0c;下面开始我们的 CKA之旅 题目为&#xff1a; Task 重新配置一个已经存在的front-end的deployment&#xff0c;在名字为nginx的容…

spring boot 整合EasyPoi导入导出,下载模版功能

引入依赖 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.5.RELEASE</version><relativePath/> <!-- lookup parent from repository --></…

【算法题】链表系列之从尾到头打印链表、重建二叉树、用两个栈实现队列

【算法题】链表系列 一、从尾到头打印链表1.1、题目描述1.2、递归法1.3、栈&#xff08;stack&#xff09; 二、重建二叉树2.1、题目描述2.2、前置知识&#xff1a;2.3、分治算法2.4、小结 三、用两个栈实现队列3.1、题目描述3.2、双栈法3.3、小结 总结 一、从尾到头打印链表 …