【极数系列】Flink搭建入门项目Demo 秒懂Flink开发运行原理(05)

news2024/12/27 13:20:26

文章目录

    • 引言
    • 1.创建mavenx项目
    • 2.包结构
    • 3.引入pom依赖
    • 4.增加log4j2.properties配置
    • 5.创建主启动类
    • 6.构建打jar包
    • 7.flinkUI页面部署

引言

gitee地址:https://gitee.com/shawsongyue/aurora.git
源码直接下载可运行,模块:aurora_flink
Flink 版本:1.18.0
Jdk 版本:11

1.创建mavenx项目

在这里插入图片描述

2.包结构

在这里插入图片描述

3.引入pom依赖

tips:transformer处写主启动类

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.xsy</groupId>
    <artifactId>aurora_flink</artifactId>
    <version>1.0-SNAPSHOT</version>

    <!--属性设置-->
    <properties>
        <!--java_JDK版本-->
        <java.version>11</java.version>
        <!--maven打包插件-->
        <maven.plugin.version>3.8.1</maven.plugin.version>
        <!--编译编码UTF-8-->
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <!--输出报告编码UTF-8-->
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <!--json数据格式处理工具-->
        <fastjson.version>1.2.75</fastjson.version>
        <!--log4j版本-->
        <log4j.version>2.17.1</log4j.version>
        <!--flink版本-->
        <flink.version>1.18.0</flink.version>
        <!--scala版本-->
        <scala.binary.version>2.11</scala.binary.version>
        <!--log4j依赖-->
        <log4j.version>2.17.1</log4j.version>
    </properties>

    <!--通用依赖-->
    <dependencies>

        <!-- json -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <!--================================集成外部依赖==========================================-->
        <!--集成日志框架 start-->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <!--集成日志框架 end-->
    </dependencies>

    <!--编译打包-->
    <build>
        <finalName>${project.name}</finalName>
        <!--资源文件打包-->
        <resources>
            <resource>
                <directory>src/main/resources</directory>
            </resource>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.xml</include>
                </includes>
            </resource>
        </resources>

        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>org.google.code.flindbugs:jar305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <excluder>org.apache.logging.log4j:*</excluder>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>org.xsy.sevenhee.flink.TestStreamJob</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

        <!--插件统一管理-->
        <pluginManagement>
            <plugins>
                <!--maven打包插件-->
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <version>${spring.boot.version}</version>
                    <configuration>
                        <fork>true</fork>
                        <finalName>${project.build.finalName}</finalName>
                    </configuration>
                    <executions>
                        <execution>
                            <goals>
                                <goal>repackage</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>

                <!--编译打包插件-->
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>${maven.plugin.version}</version>
                    <configuration>
                        <source>${java.version}</source>
                        <target>${java.version}</target>
                        <encoding>UTF-8</encoding>
                        <compilerArgs>
                            <arg>-parameters</arg>
                        </compilerArgs>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

    <!--配置Maven项目中需要使用的远程仓库-->
    <repositories>
        <repository>
            <id>aliyun-repos</id>
            <url>https://maven.aliyun.com/nexus/content/groups/public/</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>

    <!--用来配置maven插件的远程仓库-->
    <pluginRepositories>
        <pluginRepository>
            <id>aliyun-plugin</id>
            <url>https://maven.aliyun.com/nexus/content/groups/public/</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </pluginRepository>
    </pluginRepositories>

</project>

4.增加log4j2.properties配置

tips:resource目录下增加该配置,主要用于日志打印

rootLogger.level=INFO
rootLogger.appenderRef.console.ref=ConsoleAppender
appender.console.name=ConsoleAppender
appender.console.type=CONSOLE
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.file=D:\\tmp

5.创建主启动类

tips:编写了一个简单的有界数据流处理demo程序

  • step1:创建flink程序运行所需环境
  • step2:创建数据集
  • step3:把有限数据集转换为数据源
  • step4:简单通过flatmap处理数据
  • step5:输出最终结果
  • step6:启动任务
package com.aurora;


import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Random;
import java.util.UUID;

/**
 * @author 浅夏的猫
 * @description 主启动类
 * @date 22:46 2024/1/13
 */
public class Application {

    private static final Logger logger = LoggerFactory.getLogger(Application.class);

    public static void main(String[] args) throws Exception {

        //1.创建flink程序运行所需环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //2.创建数据集
        ArrayList<String> list = new ArrayList<>();
        list.add("001");
        list.add("002");
        list.add("003");

        //3.把有限数据集转换为数据源
        DataStreamSource<String> dataStreamSource = env.fromCollection(list).setParallelism(1);

        //4.简单通过flatmap处理数据,
        SingleOutputStreamOperator<String> flatMap = dataStreamSource.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String record, Collector<String> collector) throws Exception {
                //数据追加随机数
                String uuidRecord=record+ UUID.randomUUID().toString();
                //当前环节处理完需要传递数据给下个环节
                collector.collect(uuidRecord);
            }
        });

        //5.输出最终结果
        flatMap.addSink(new SinkFunction<String>() {
            @Override
            public void invoke(String value) throws Exception {
                logger.info("当前正在处理的数据:{}",value);
            }
        }).setParallelism(1);

        //6.启动任务
        env.execute();
    }
}

6.构建打jar包

在这里插入图片描述

7.flinkUI页面部署

1.点击add new上传对应的应用包

2.主类填写com.aurora.Application

3.检查任务running状态,大概几秒钟跑完
在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1415802.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数据结构与算法:复杂度

友友们大家好啊&#xff0c;今天开始正式学习数据结构与算法有关内容&#xff0c;后续不断更新数据结构有关知识内容&#xff0c;希望多多支持&#xff01; 数据结构&#xff1a; 数据结构是用于存储和组织数据的方式&#xff0c;以便可以有效地访问和修改数据。不同的数据结构…

python222网站实战(SpringBoot+SpringSecurity+MybatisPlus+thymeleaf+layui)-友情链接管理实现

锋哥原创的SpringbootLayui python222网站实战&#xff1a; python222网站实战课程视频教程&#xff08;SpringBootPython爬虫实战&#xff09; ( 火爆连载更新中... )_哔哩哔哩_bilibilipython222网站实战课程视频教程&#xff08;SpringBootPython爬虫实战&#xff09; ( 火…

Modern C++ std::tuple的size

不知道大家读过《Modern C std::unique_ptr的实现原理》没有&#xff1f; 里面提到了std::tuple<void*, default_delete()>的大小是4&#xff0c;而不是41或者44&#xff0c;是不是很奇怪&#xff0c;本文不会揭晓答案&#xff0c;只是会扩展测试各种情况。 #include<…

打开 IOS开发者模式

前言 需要 1、辅助设备&#xff1a;苹果电脑&#xff1b; 2、辅助应用&#xff1a;Xcode&#xff1b; 3、准备工作&#xff1a;苹果手机 使用数据线连接 苹果电脑&#xff1b; 当前系统版本 IOS 17.3 通过Xcode激活 两指同时点击 Xcode 显示选择&#xff0c;Open Develop…

私人漫画图书馆:分类管理,一目了然 | 开源日报 No.157

tachiyomiorg/tachiyomi Stars: 26.9k License: Apache-2.0 tachiyomi 是一个免费开源的安卓漫画阅读器。 该项目的主要功能、关键特性、核心优势包括&#xff1a; 从多种来源在线阅读本地阅读已下载内容可配置的阅读器&#xff0c;具有多个查看器、翻页方向和其他设置支持追…

Bug: git stash恢复误drop的提交

Bug: git stash恢复误drop的提交 前几天在写ut时突然需要通过本地代码临时出一个包&#xff0c;但是本地ut又不想直接作为一个commit提交&#xff0c;所以为了省事就将ut的代码暂时stash起来。出完包后想apply stash&#xff0c;但是手误操作点成了drop stash&#xff0c;丢失了…

容器化部署 Jenkins,并配置SSH远程操作服务器

目录 一、Jenkins是什么 二、常见的部署Jenkins的方法 三、为什么选择容器化部署 四、容器化部署Jenkins步骤 1、安装 Docker 2、获取 Jenkins 镜像 3、创建并运行容器 4、访问 Jenkins 4.1 查看初始密码问题 5、配置 Jenkins 5.1 安装插件 5.2 创建管理员用户 5.3…

Git安装详细步骤

目录 1、双击安装包&#xff0c;点击NEXT​编辑 2、更改安装路径&#xff0c;点击NEXT 3、选择安装组件 4、选择开始菜单页 5、选择Git文件默认的编辑器 6、调整PATH环境 7、选择HTTPS后端传输 8、配置行尾符号转换 9、配置终端模拟器与Git Bash一起使用 10、配置额外…

web前端---------盒子模型

1.内容 盒子的内容可以包含文字、图片等多种类型。 浏览器在加载网页时&#xff0c;会将元素按照内容区分为替换元素与非替换元素。 &#xff08;1&#xff09;替换元素指的是HTML中的一些形如<img>、<input>等非文本元素。 这些元素本身不包含任何内容&#x…

【VB测绘程序设计】案例13——几种常用的角度转换子程序Function功能的使用(附源代码)

【VB测绘程序设计】案例13——几种常用的角度转换子程序Function的使用(附源代码) 文章目录 前言一、界面展示二、程序说明三、程序代码1.角度转换子程序jdzh()四、数据演示总结前言 使用VB编写测绘程序,最基础的对于角度在导线测量计算中频繁需要角度的计算,从度分秒转…

Redis核心技术与实战【学习笔记】 - 3.Redis服务高可靠

1.数据同步&#xff1a;主从库如何实现数据一致&#xff1f; 前面我们学习了 AOF 和 RDB&#xff0c;如果 Redis 发生了宕机&#xff0c;它们可以分别通过回放日志和重新读入 RDB 文件的方式恢复数据&#xff0c;从而保证尽量较少丢失数据&#xff0c;提升可靠性。 不过&…

qemu单步调试arm64 linux kernel

一、背景和目的 qemu搭建arm64 linux kernel环境-CSDN博客 之前介绍了qemu启动kernel的配置步骤和方法&#xff0c;现在开始我们的调试&#xff0c;这篇文章主要讲解如何单步调试内核&#xff0c;所有的实验还是基于ARM64&#xff1b; 二、环境准备 需要准备hostx86 target…

vue2中的watch(侦听器)讲解,以及解决深度监听新值和旧值相同的两种方案(手写深拷贝和JSON.parse())。

目录 一&#xff1a;什么是watch? 二&#xff1a;watch的基础使用 1.最基本的使用 2.简写形式 三&#xff1a;watch中的immediate和deep属性 1.immediate属性 2.deep属性 3.解决深度监听新旧值相同的问题 1&#xff09;使用序列化和反序列化。 2&#xff09;手写深…

DevOps系列文章之 GitLabCI汇总

GitlabCI环境搭建 前提 先安装 docker Docker容器化安装 docker pull gitlab/gitlab-ee:12.4.0-ee.0 创建挂载目录 mkdir -p /srv/gitlab mkdir -p /srv/gitlab/config # 映射到 Glitlab 容器中的配置目录 mkdir -p /srv/gitlab/logs # 映射到 Glitlab 容器中的日志目录 m…

换个思维方式快速上手UML和 plantUML——类图

和大多数朋友一样&#xff0c;Jeffrey 在一开始的时候也十分的厌烦软件工程的一系列东西&#xff0c;对工程化工具十分厌恶&#xff0c;觉得它繁琐&#xff0c;需要记忆很多没有意思的东西。 但是之所以&#xff0c;肯定有是因为。对工程化工具的不理解和不认可主要是基于两个逻…

最新国内GPT4.0使用教程,AI绘画-Midjourney绘画V6 ALPHA绘画模型,GPT语音对话使用,DALL-E3文生图+思维导图一站式解决方案

一、前言 ChatGPT3.5、GPT4.0、GPT语音对话、Midjourney绘画&#xff0c;文档对话总结DALL-E3文生图&#xff0c;相信对大家应该不感到陌生吧&#xff1f;简单来说&#xff0c;GPT-4技术比之前的GPT-3.5相对来说更加智能&#xff0c;会根据用户的要求生成多种内容甚至也可以和…

围棋的气 - 华为OD统一考试

OD统一考试&#xff08;C卷&#xff09; 分值&#xff1a; 100分 题解&#xff1a; Java / Python / C 题目描述 围棋棋盘由纵横各19条线垂直相交组成&#xff0c;棋盘上一共19x19361个交点&#xff0c;对弈双方一方执白棋&#xff0c;一方执黑棋&#xff0c;落子时只能将棋子…

智慧文旅:未来旅游业的数字化转型

随着科技的快速发展&#xff0c;数字化转型已经成为各行各业的必然趋势。旅游业作为全球经济的重要组成部分&#xff0c;也正经历着前所未有的变革。智慧文旅作为数字化转型的重要领域&#xff0c;正逐渐改变着旅游业的传统模式&#xff0c;为游客带来更加便捷、个性化的旅游体…

锐龙笔记本Windows 11休眠无法唤醒问题的解决(6800h, 7840H/Hs)

锐龙笔记本运行Windows 11时经常会遇到休眠后无法唤醒的问题&#xff0c;表现为休眠后 按键盘或鼠标无反应&#xff0c;只能长按电源开关关机后再开机。网上有很多说法&#xff0c;比如显卡问题或其它问题。但是本质 上这个是电源管理软硬件不兼容导致的&#xff0c;解决办法如…

2024 年 eBPF 和网络趋势预测

本文地址&#xff1a;2024 年 eBPF 和网络趋势预测 | 深入浅出 eBPF 1. eBPF 1.1 eBPF 将继续呈指数增长1.2 eBPF 应用市场1.3 eBPF 在手机中得到更广泛的应用1.4 eBPF 滥用带来的风险2. 可观测 2.1 最受欢迎的可观测性2.2 降低可观测性开销2.3 上下文感知的 Kubernetes 工作负…