Flink学习之旅:(二)构建Flink demo工程并提交到集群执行

news2025/1/3 2:12:33

1.创建Maven工程

        在idea中创建一个 名为 MyFlinkFirst 工程

2.配置pom.xml

  <properties>
    <flink.version>1.13.0</flink.version>
    <java.version>1.8</java.version>
    <scala.binary.version>2.12</scala.binary.version>
    <slf4j.version>1.7.30</slf4j.version>
  </properties>
  <dependencies>
  <!-- 引入 Flink 相关依赖-->
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
  </dependency>
  <dependency>
  <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
  </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!-- 引入日志管理相关依赖-->
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>${slf4j.version}</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>${slf4j.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-to-slf4j</artifactId>
      <version>2.14.0</version>
    </dependency>
  </dependencies>

3.配置日志管理

        在目录 src/main/resources 下添加文件:log4j.properties,内容配置如下:

log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

4.编写代码

        编写 StreamWordCount 类,单词汇总

package com.qiyu;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;

/**
 * @author MR.Liu
 * @version 1.0
 * @data 2023-10-18 14:45
 */
public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        // 1. 创建流式执行环境
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        // 2. 读取文本流
        DataStreamSource<String> lineDSS = env.socketTextStream("192.168.220.130",
                7777);
        // 3. 转换数据格式
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDSS
                .flatMap((String line, Collector<String> words) -> {
                    Arrays.stream(line.split(" ")).forEach(words::collect);
                })
                .returns(Types.STRING)
                .map(word -> Tuple2.of(word, 1L))
                .returns(Types.TUPLE(Types.STRING, Types.LONG));
        // 4. 分组
        KeyedStream<Tuple2<String, Long>, String> wordAndOneKS = wordAndOne
                .keyBy(t -> t.f0);
        // 5. 求和
        SingleOutputStreamOperator<Tuple2<String, Long>> result = wordAndOneKS
                .sum(1);
        // 6. 打印
        result.print();
        // 7. 执行
        env.execute();
    }
}

5.测试

在hadoop102 服务器中 执行:

nc -lk 7777

再运行 StreamWordCount java类

在命令行随意疯狂输出

idea 控制台 打印结果:

测试代码正常

6. 打包程序提交到集群中运行

        在pom.xml添加打包插件

<build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>3.0.0</version>
        <configuration>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>

直接使用 maven 中的 package命令,控制台显示 BUILD SUCCESS 就是打包成功!

选择 MyFlinkFirst-1.0-SNAPSHOT.jar 提交到 web ui 上

上传 jar 后,点击 jar 包名称 ,填写 主要配置程序入口主类的全类名,任务运行的并行度。完成后 点击 submit 

查看 任务运行列表 

点击任务

点击“Task Managers”,打开 Stdout,并且在 hadoop102 命令行 疯狂输出 

Stdout 就会显示 结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1106545.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java构建Web项目

对无底线服务型的系统&#xff0c;业务代码和界面代码脚本化是及其重要的。一是脚本化能确保部署本地就是再用的代码&#xff0c;不存在为每个项目管理代码的问题。然后脚本化不需要人为编译和投放程序库。极大的简化维护难度和成本。能不能脚本化直接决定了能否全面铺开运维&a…

流量新玩法:微信问一问了解一下

来自一位不断探索的营销人的问题&#xff1a;微信“问一问”引流&#xff0c;一个问答引流1000精准粉&#xff0c;是不是真的&#xff1f;如果是真的&#xff0c;那该怎么做呢&#xff1f; 微信的问一问功能&#xff0c;支持图文回答&#xff0c;也支持用视频去回答&#xff0c…

Java使用javah命令:‘javah‘ 不是内部或外部命令,也不是可运行的程序或批处理文件。

前提是已安装jdk&#xff0c;配置JDK环境变量&#xff0c;并成功输出下图&#xff1a; 但在命令行窗口使用javah&#xff0c;找不到该命令&#xff1a; 原因&#xff1a;新版的Java不使用javah的命令了&#xff0c;而是使用javac -h 用法&#xff1a; javac -h <directory&…

低代码系列——可视化编辑器

前端社区里&#xff0c;低代码/无代码是被讨论的火热赛道。它通过用最少量的编程代码去开发应用程序&#xff0c;从而提高效率。由此&#xff0c;许多企业都在使用低代码平台进行业务的开发和升级。低代码平台可以大幅简化编码过程&#xff0c;并且可以快速构建定制化的应用程序…

C++ - 类型转换 - static_cast - reinterpret_cast - const_cast - dynamic_cast

目录 类型转换 C语言当中的类型转换 为什么C需要四种类型转换 &#xff08;讲解volatile关键字&#xff09; C强制类型转换 static_cast reinterpret_cast const_cast dynamic_cast&#xff08;动态转换&#xff09; RTTI 类型转换 C语言当中的类型转换 其实在 C语言当…

从理论到实践,实时湖仓功能架构设计与落地实战

在上篇文章中&#xff0c;我们向大家解释了为什么实时湖仓是当前企业数字化转型过程中的解决之道&#xff0c;介绍了实时计算和数据湖结合的应用场景。&#xff08;“数据驱动”时代&#xff0c;企业为什么需要实时湖仓&#xff1f;&#xff09; 在这篇文章中&#xff0c;我们…

使用Gitlab构建简单流水线CI/CD

什么是Gitlab Gitlab实质上是一套DevOps工具 目前看起来&#xff0c;Gitlab属于是内嵌了一套CI/CD的框架&#xff0c;并且可以提供软件开发中的版本管理、项目管理等等其他功能。 这里需要辨别一下Gitlab和Github Gitee的区别。 GIthub大家都很熟悉了&#xff0c;一般大家都会…

探索DeFi世界,MixGPT引领智能金融新时代

随着区块链技术的迅猛发展&#xff0c;DeFi&#xff08;去中心化金融&#xff09;正成为金融领域的新宠。在这个充满活力的领域里&#xff0c;MixTrust站在创新的前沿&#xff0c;推出了一款引领智能金融新时代的核心技术——MixGPT。 MixGPT&#xff1a;引领智能金融体验的大型…

Rust逆向学习 (1)

文章目录 Hello, Rust Reverse0x01. main函数定位0x02. main函数分析line 1line 2line 3line 4~9 0x03. IDA反汇编0x04. 总结 近年来&#xff0c;Rust语言的热度越来越高&#xff0c;很多人都对Rust优雅的代码和优秀的安全性赞不绝口。对于开发是如此&#xff0c;对于CTF也是如…

208. 开关问题 - 异或方程组

208. 开关问题 - AcWing题库 我们可以找每一个开关由哪些开关掌控&#xff0c;每一个开关的值设为动过为1&#xff0c;没动过为0 再看当前开关的状态与结果的状态是否一致&#xff0c;一致为0&#xff0c;说明掌控这个开关的开关门的异或值为0&#xff0c;不一致则为1&#xf…

彻底理解操作系统与内核的区别!

通用底盘技术 Canoo公司有一项核心技术专利&#xff0c;这就是它们的通用电动底盘技术&#xff0c;长得是这个样子&#xff0c;非常像一个滑板&#xff1a; 这个带轮子、有电池、能动的滑板已经包含了一辆车最核心的组件&#xff0c;差的就是一个外壳。这个看起来像滑板的东西…

【MATLAB源码-第50期】基于simulink的BPSK调制解调仿真,输出误码率。

操作环境&#xff1a; MATLAB 2022a 1、算法描述 1. Bernoulli Binary: 这个模块生成伯努利二进制随机数&#xff0c;即0或1。这些数字表示要传输的原始数字信息。 2. Unipolar to Bipolar Converter: 此模块将伯努利二进制数据从0和1转换为-1和1&#xff0c;这是BPSK调制的标…

AN动画基础——缓动动画

【AN动画基础——影片剪辑滤镜】 基础动画缓动动画缓动原理实例应用 本篇内容&#xff1a;了解曲线原理 重点内容&#xff1a;缓动动画 工 具&#xff1a;Adobe Animate 2022 基础动画 我们先做一个非缓动的效果的动画。 绘制一个矩形设置成元件—图形&#xff0c;30帧插入关…

SpringMVC源码分析(四)请求流程分析

a、http请求是怎么被Controller接受处理&#xff0c;然后返回结果的&#xff1f; 发出HTTP请求后&#xff0c;跳过网络层的东西&#xff0c;当被应用服务器Tomcat接受的时候。在Tomcat中存在一个servlet容器&#xff0c;它负责管理所有的servlet&#xff0c;包括SpringMVC的核…

MySQL绕过WAF实战技巧

一、前言 本人喜欢遇到好的东西&#xff0c;乐于分享&#xff0c;关注freebuf有段时间了&#xff0c;写过两篇文章&#xff0c;每次写写文章&#xff0c;不仅仅是为了挣点稿费。而是通过此平台能够认识一些安全圈的小伙伴&#xff0c;互相学习&#xff0c;共同进步。在安全行业…

为什么不可大张旗鼓地推动“汉字编程”?

为什么不可大张旗鼓地推动“汉字编程”&#xff1f; 没有不可。 我之前看到过一个vscode插件&#xff0c;是给一个不知道叫什么名字的编程语言用的&#xff0c;从代码到注释全是西里尔字母写的&#xff0c;反正就只有东欧那片区域用。最近很多小伙伴找我&#xff0c;说想要一些…

c++实现最大堆

前言 在写leetcode的时候&#xff0c;看到一道优先队列的题目&#xff0c;复习了一下最大堆&#xff0c;用c实现了一下。以前听网课的时候&#xff0c;根本看不懂实现&#xff0c;现在自己也能实现了。 参考文献 这个我觉得讲得挺好的&#xff0c;图很生动形象 代码 #incl…

2023年京东洗发护发行业增长趋势分析:头皮清洁或成小风口

如今&#xff0c;随着消费观念的转变&#xff0c;越来越多的消费者愈加重视头部的洗护&#xff0c;无论是女性还是男性&#xff0c;都开始积极寻找头部洗护用品&#xff0c;以更好地呵护头发及头皮。在用户需求的推动下&#xff0c;洗发护发行业已经逐渐发展成为成熟行业。 根据…

linux性能分析(二)如何从日志分析 PV、UV

一 如何从日志分析 PV、UV 本文是从业务侧来衡量整个应用系统的性能,区别与上篇的网络性能分析备注&#xff1a; 这里的日志不仅指的是业务类型日志,也包括系统日志等各种类型的日志关键&#xff1a; 掌握PV和UV的概念和度量方式 "以下是关于埋点的科普文章" 埋…

不会代码循环断言如何实现?只要6步!

对于使用jmeter工具完成接口测试的测试工程师而言。在工作中&#xff0c;或者在面试中&#xff0c;都会遇到一个问题—— “CSV文档做了一大笔测试数据后&#xff0c;怎么去校验这个结果呢&#xff1f;” 现在大部分测试工程师可能都是通过人工的方法去查看结果&#xff0c;十几…