Flink 系列三 Flink 实战

news2024/10/7 10:16:39

目录

​编辑

前言

1、安装flink环境

2、在idea中创建flink的第一个demo

2.1、执行如下maven命令

2.2、填写'groupId'、'artifactId'、'version'、'package'

2.3、选择Yes即可生成创建好的工程

3、开发第一个flink程序

3.1、开发一个简单的统计程序

3.2、直接编译得到jar包

4、启动环境

4.1、启动已经下载好的flink环境

4.2、创建一个服务端的Tcp 监听

4.3、打开计算日志

4.4、在建立nc监听端口中输入text

4.5、在输出日志中就有统计


在这里插入图片描述

前言

        Flink 做为一款流式计算框架,它可用来做批处理,即处理静态的数据集、历史的数据集;也可以用来做流处理,即实时的处理些实时数据流,实时的产生数据流结果,只要数据源源不断的过来,Flink 就能够一直计算下去。详细的介绍我本篇就不做阐述了,感兴趣的同学可以回复往期的文章:Flink 系列二 Flink 状态化流处理概述,Flink 系列一 开发机 安装。本篇作为 Flink 系列的第三篇,咱们尝试在本地安装和实操 Flink。

1、安装flink环境

        首先需要在你的本地环境安装apache-flink,执行如下命令即可,若采用docker安装更加方便。

brew install apache-flink

2、在idea中创建flink的第一个demo

2.1、执行如下maven命令

        执行如下命令创建工程。这个命令的作用是使用Maven构建一个基于Apache Flink的Java快速启动项目模板。执行完后会下载对应的依赖包。

mvn archetype:generate                               \
-DarchetypeGroupId=org.apache.flink              \
-DarchetypeArtifactId=flink-quickstart-java      \
-DarchetypeVersion=1.8.0     \
-DarchetypeCatalog=local

解释一下具体含义:

  • mvn是Maven的命令行工具。
  • archetype:generate表示使用原型机模板生成一个新项目。
  • -DarchetypeGroupId指定了项目模板的组ID,即Apache Flink团队为项目提供的默认模板组ID。
  • -DarchetypeArtifactId指定了项目模板的Artifact ID,即Apache Flink团队为项目提供的默认模板Artifact ID。
  • -DarchetypeVersion指定了项目模板的版本号。
  • -DarchetypeCatalog指定了本地的模板目录。
  • 反斜杠(\)是命令的折行符,它表示这个命令是连续的,但是出于格式上的考虑需要分成多行。

2.2、填写'groupId'、'artifactId'、'version'、'package'

Define value for property 'groupId': com.lly.flink.java
Define value for property 'artifactId': flink-traning
Define value for property 'version' 1.0-SNAPSHOT: : 1.0.0
Define value for property 'package' com.lly.flink.java: : 
Confirm properties configuration:
groupId: com.lly.flink.java
artifactId: flink-traning
version: 1.0.0
package: com.lly.flink.java

2.3、选择Yes即可生成创建好的工程

         特别注意,这里一定要选择 “Y”,保证项目顺利生产。

 Y: : Y
 [INFO] ----------------------------------------------------------------------------
[INFO] Using following parameters for creating project from Archetype: flink-quickstart-java:1.8.0
[INFO] ----------------------------------------------------------------------------
[INFO] Parameter: groupId, Value: com.lly.flink.java
[INFO] Parameter: artifactId, Value: flink-traning
[INFO] Parameter: version, Value: 1.0.0
[INFO] Parameter: package, Value: com.lly.flink.java
[INFO] Parameter: packageInPathFormat, Value: com/lly/flink/java
[INFO] Parameter: package, Value: com.lly.flink.java
[INFO] Parameter: version, Value: 1.0.0
[INFO] Parameter: groupId, Value: com.lly.flink.java
[INFO] Parameter: artifactId, Value: flink-traning
[WARNING] CP Don't override file /Users/liluyang/flink-traning/src/main/resources
[INFO] Project created from Archetype in dir: /Users/liluyang/flink-traning
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  01:17 min
[INFO] Finished at: 2020-11-05T12:42:42+08:00
[INFO] ------------------------------------------------------------------------

3、开发第一个flink程序

3.1、开发一个简单的统计程序

package com.lly.flink.java;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @author lly
 * @date 2020-11-05
 **/
public class SocketTextStreamWordCount {
    public static void main(String[] args) throws Exception {
        //参数检查
        if (args.length != 2) {
            System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>");
            return;
        }

        String hostname = args[0];
        Integer port = Integer.parseInt(args[1]);


        // set up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //获取数据
        DataStreamSource<String> stream = env.socketTextStream(hostname, port);

        //计数
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = stream.flatMap(new LineSplitter())
                .keyBy(0)
                .sum(1);

        sum.print();

        env.execute("Java WordCount from SocketTextStream Example");
    }

    public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) {
            String[] tokens = s.toLowerCase().split("\\W+");

            for (String token : tokens) {
                if (token.length() > 0) {
                    collector.collect(new Tuple2<String, Integer>(token, 1));
                }
            }
        }
    }
}

        我这里简单解释一些这段代码,希望刚开始学习的同学可以理解的更深刻。这段代码使用 Flink 实现了通过网络流读取数据,统计单词出现次数的功能。具体实现细节如下:

  1. 声明一个 SocketTextStreamWordCount 类,定义该类的 main 方法作为程序的入口。

  2. main 方法中,首先对传入的命令行参数进行检查,如果参数个数不为 2,则输出使用说明后直接返回。

  3. 然后获取主机地址和端口号,用于后续建立套接字连接。

  4. 接着创建 Flink 流处理的环境对象 StreamExecutionEnvironment,用于设置执行环境和创建数据流。

  5. 调用 socketTextStream 方法,获取一个 DataStreamSource<String> 对象,用于从套接字连接中获取数据流。

  6. 对获取的数据流进行 flatMap 操作,使用 LineSplitter 类作为转换器将每行文本数据切分成单词,并将单词转化为 "单词,1" 的元组格式,用于后续统计。

  7. 对转换后的数据流使用 keyBy 方法,按照第一个字段(即单词)进行分组。

  8. 对分组后的数据使用 sum 方法,对第二个字段(即出现次数)进行求和,返回一个 SingleOutputStreamOperator<Tuple2<String, Integer>> 类型的结果流。

  9. 最后调用 print 方法,将结果打印输出到控制台。

  10. 最后调用 execute 方法,传入一个字符串 "Java WordCount from SocketTextStream Example" 作为任务名称,开始执行整个 Flink 应用程序。

  11. 声明了一个静态内部类 LineSplitter,实现了 Flink 的 FlatMapFunction 接口,并重写了 flatMap 方法。该方法将输入的文本行按照非单词字符(如空格、逗号等)进行切分,并将每个单词转化为一个元组,其中第一个字段为单词,第二个字段为 1,表示该单词出现了 1 次。 

3.2、直接编译得到jar包

4、启动环境

4.1、启动已经下载好的flink环境

flink run -c 业务类包路径 jar包路径 IP 端口 示例

flink run -c 业务类包路径 jar包路径 IP 端口
示例:
flink run -c com.lly.flink.SocketTextStreamWordCount /Users/liluyang/flink-traning/target/original-flink-traning-1.0.0.jar 127.0.0.1 9000

启动成功之后会生成Job ID

Job has been submitted with JobID b04bad9f4c05efd67344179ee676b513

启动成功之后访问:http://localhost:8081/就可以直接当问flink的的操作后台,操作后台可以直观的看到Job的执行情况和基本的操作

 4.2、创建一个服务端的Tcp 监听

创建一个server监听并接受链接

nc -l 9000

4.3、打开计算日志

cd /usr/local/Cellar/apache-flink/1.10.0/libexec/log

 4.4、在建立nc监听端口中输入text

liluyang@liluyangdeMacBook-Pro ~ % nc -l 9000



cda
cda
dsas
assgasg
nihao 
nihao 
nihao
nihao
1
1
1
1
1
1
1

4.5、在输出日志中就有统计

liluyang@liluyangdeMacBook-Pro log % tail -100f flink-liluyang-taskexecutor-0-liluyangdeMacBook-Pro.local.out
(cda,1)
(cda,2)
(dsas,1)
(assgasg,1)
(nihao,1)
(nihao,2)
(nihao,3)
(nihao,4)
(1,1)
(1,2)
(1,3)
(1,4)
(1,5)
(1,6)

至此:Mac 电脑上安装 Flink,并且运行它。接着通过一个简单的 Flink 程序来介绍如何构建及运行Flink 程序。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/743841.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Java基础】AQS (AbstractQueuedSynchronizer) 抽象队列同步器

关于作者&#xff1a;CSDN内容合伙人、技术专家&#xff0c; 从零开始做日活千万级APP。 专注于分享各领域原创系列文章 &#xff0c;擅长java后端、移动开发、人工智能等&#xff0c;希望大家多多支持。 目录 一、导读1.1 CLH锁 二、概览三、使用场景3.1 AQS 对资源的共享方式…

【自我提升】JPA从搭建到CRUD快速入门(IDEA+MAVEN)

写在前面&#xff1a;今天又学习一点新的东西&#xff0c;方面日后查询和巩固学习&#xff0c;下面将学习过程记录下来。 一、创建MAVEN工程 1. 打开IDEA创建一个纯净的maven工程项目 2. 打开pom文件&#xff0c;导入maven坐标 注意&#xff1a;我使用的postgres数据库&#x…

【微服务】搭建项目以及子模块

目录 方法一&#xff1a;搭建远程仓库新建idea 项目创建子模块创建父模块的pom父模块的gitignore文件修改查看远程代码仓库 方法二 方法一&#xff1a; 搭建远程仓库 选择 下载地址 新建idea 项目 复制 项目远程仓库的下载地址 下载完成 创建子模块 导入微服务的必要组件…

Apache Pulsar 分布式部署

1.Pulsar 简介 Pulsar 是一个支持多租户的、高性能的消息中间件&#xff1b;最初是由 Yahoo 研发的开源&#xff0c;分布式pub-sub系统&#xff0c;现在是Apache的一个顶级开源项目 Pulsar 提供了四种订阅类型&#xff0c;它们可以共存在同一个主题上&#xff0c;以订阅名进行区…

【html页面引入vue3语法模板】在html页面中使用vue3语法和elementul-plus组件库的简单模板

前言 这是最近在看这些东西&#xff0c;因为看别的地方是用脚手架直接用的。 我这个项目想要在html上直接使用。 所以我就试了下如何在html上使用vue3语法 目前摸索出来的是这样可以使用。 记录下来供参考&#xff0c;如果有不好的地方后续改进 效果图 这里就是简单的试了一…

修改npm路径

npm config ls如果是第一次使用NPM安装包的话&#xff0c;在配置中只会看到prefix的选项&#xff0c;就是NPM默认的全局安装目录。但是如果有多次使用NPM安装包的话&#xff0c;就会看到cache和prefix两个路径。 新建两个文件夹node_global_modules和node_cache npm config s…

mac android studio设置跟mac系统一样的快捷键

mac版的android studio 跟mac系统的快捷键不一样,主要修改了下面几组操作,为了跟mac系统快捷键相同 setting->Keymap 搜索bottom 修改3个快捷键: cmd↓ 设置让鼠标移动到屏幕最后面 shiftcmd↓ 选中从当前位置到屏幕最下面 option↓. 或者 end 滚动到屏幕最下方 // 因为默认…

详解 HTTPS、TLS、SSL、HTTP区别和关系

一、什么是HTTPS、TLS、SSL HTTPS&#xff0c;也称作HTTP over TLS。TLS的前身是SSL&#xff0c;TLS 1.0通常被标示为SSL 3.1&#xff0c;TLS 1.1为SSL 3.2&#xff0c;TLS 1.2为SSL 3.3。下图描述了在TCP/IP协议栈中TLS(各子协议)和HTTP的关系。 二、HTTP和HTTPS协议的区别 …

【开源项目】中后台开发框架vue-next-admin

vue-next-admin 基本介绍 基于 vue3.x CompositionAPI setup 语法糖 typescript vite element plus vue-router-next pinia 技术&#xff0c;适配手机、平板、pc 的后台开源免费模板&#xff0c;希望减少工作量&#xff0c;帮助大家实现快速开发。 在线预览 账号: adm…

Braindecode系列 (1):在BCIC IV 2a数据集上进行试验

Braindecode系列&#xff1a;在BCIC IV 2a数据集上进行试验 0. 引言1. 环境介绍1.1 环境配置1.2 运行环境 2. Python实现2.1 加载和预处理数据集2.2 创建模型2.3 模型训练2.4 结果输出图像 3. 结果展示4. 总结 0. 引言 最近在看运动想象相关的论文时&#xff0c;找到了一个很好…

在线培训系统的保障措施带来安全、可靠的学习环境

在今天的数字时代&#xff0c;越来越多的人选择在线培训系统作为学习的方式。然而&#xff0c;随着在线教育市场的不断增长&#xff0c;安全和可靠性成为消费者普遍关心的问题。因此&#xff0c;在线培训系统需要采取一系列保护措施以确保学生的数据和隐私得到保护&#xff0c;…

Python 运算符(二)

文章目录 Python逻辑运算符Python成员运算符Python身份运算符Python运算符优先级后记 Python逻辑运算符 Python语言支持逻辑运算符&#xff0c;以下假设变量 a 为 10, b为 20: 运算符逻辑表达式描述实例andx and y 布尔"与" - 如果 x 为 False&#xff0c;x and y …

php周练

前言&#xff1a;博主个人小练&#xff08;纯小白&#xff09;。 目录 1.[SWPUCTF 2021 新生赛]gift_F12已解决2.[SWPUCTF 2021 新生赛]jicao3.[ZJCTF 2019]NiZhuanSiWei4.[SWPUCTF 2021 新生赛]no_wakeup5.[SWPUCTF 2021 新生赛]ez_unserialize 1.[SWPUCTF 2021 新生赛]gift_…

Ae 效果:CC RepeTile

风格化/CC RepeTile Stylize/CC RepeTile CC RepeTile&#xff08;CC 重复拼贴&#xff09;效果可对整个图层进行复制并扩展&#xff0c;通过重复拼贴来创建平铺效果。 ◆ ◆ ◆ 效果属性说明 Expand Right 向右扩展 设置图层向右扩展的距离。 Expand Left 向左扩展 设置图层…

VMware vCenter Server 7.0 Update 3n 下载 - 集中管理 vSphere 环境

VMware vCenter Server 7.0 Update 3n 下载 - 集中管理 vSphere 环境 请访问原文链接&#xff1a;https://sysin.org/blog/vmware-vcenter-7-u3/&#xff0c;查看最新版。原创作品&#xff0c;转载请保留出处。 作者主页&#xff1a;sysin.org VMware vCenter Server 是一款高…

【菜菜丸的菜鸟教程】制作带闹铃和振动功能的仿真闹钟

一、准备闹钟模型 (一)下载模型 从Unity资源商店和其他模型网站可以下载到各种各样的闹钟模型。为了帮助大家了解机械钟表的设置原理&#xff0c;建议使用带有时针、分针和秒针的钟表&#xff0c;如下图。 注意&#xff1a;时针、分针和秒针最好是挂在闹钟父物体下的三个独立的…

【数据结构】--二叉树

注&#xff1a;本文树和二叉树的概念及结构部分有部分参考了别的文章&#xff0c;其他的二叉树的实现和性质习题等等都是自己一点点写的&#xff0c;创作不易&#xff0c;希望支持&#xff01; ————————————————————— 目录 一. 树概念及结构 1、树概念…

springboot家具商城系统

开发语言&#xff1a;Java 框架&#xff1a;springboot JDK版本&#xff1a;JDK1.8 服务器&#xff1a;tomcat7 数据库&#xff1a;mysql 5.7&#xff08;一定要5.7版本&#xff09; 数据库工具&#xff1a;Navicat11 开发软件&#xff1a;eclipse/myeclipse/idea Maven…

Spring Boot 系列2 -- 配置文件

目录 1. 配置文件的作用 2. 配置文件的格式 3. properties 配置文件说明 3.1 properties 基本语法 3.2 读取配置文件 3.3 properties 缺点 4.yml 配置文件说明 4.1 yml 基本语法 4.2 yml 使用进阶 4.2.1 yml 配置不同数据类型及 null 4.2.2 yml 配置读取 4.2.3 注意…