文章目录
- 01 引言
- 02 简介概述
- 03 基于socket套接字读取数据
- 3.1 从套接字读取。元素可以由分隔符分隔。
- 3.2 windows安装netcat工具
- (1)下载netcat工具
- (2)安装部署
- (3)启动socket端口监听
- 04 源码实战demo
- 4.1 pom.xm依赖
- 4.2创建socket数据流作业
- 4.3实时cmd窗口输入数据
01 引言
源码地址,一键下载可用:https://gitee.com/shawsongyue/aurora.git
模块:aurora_flink
主类:FlinkSocketSourceJob(socket请求)
02 简介概述
1.Source 是Flink程序从中读取其输入数据的地方。你可以用 StreamExecutionEnvironment.addSource(sourceFunction) 将一个 source 关联到你的程序。
2.Flink 自带了许多预先实现的 source functions,不过你仍然可以通过实现 SourceFunction 接口编写自定义的非并行 source。
3.也可以通过实现 ParallelSourceFunction 接口或者继承 RichParallelSourceFunction 类编写自定义的并行 sources。
03 基于socket套接字读取数据
3.1 从套接字读取。元素可以由分隔符分隔。
3.2 windows安装netcat工具
(1)下载netcat工具
下载地址:https://eternallybored.org/misc/netcat/
(2)安装部署
注意:不是拷贝整个文件夹,而是文件夹里面的全部文件。
将解压后的单个文件全部拷贝到C:\Windows\System32的文件夹下。
(3)启动socket端口监听
注意:该端口需要跟代码中监听的端口一致,否则获取不到数据
nc -l -p 8081
04 源码实战demo
4.1 pom.xm依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.xsy</groupId>
<artifactId>aurora_flink</artifactId>
<version>1.0-SNAPSHOT</version>
<!--属性设置-->
<properties>
<!--java_JDK版本-->
<java.version>11</java.version>
<!--maven打包插件-->
<maven.plugin.version>3.8.1</maven.plugin.version>
<!--编译编码UTF-8-->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<!--输出报告编码UTF-8-->
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<!--json数据格式处理工具-->
<fastjson.version>1.2.75</fastjson.version>
<!--log4j版本-->
<log4j.version>2.17.1</log4j.version>
<!--flink版本-->
<flink.version>1.18.0</flink.version>
<!--scala版本-->
<scala.binary.version>2.11</scala.binary.version>
<!--log4j依赖-->
<log4j.version>2.17.1</log4j.version>
</properties>
<!--通用依赖-->
<dependencies>
<!-- json -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!--================================集成外部依赖==========================================-->
<!--集成日志框架 start-->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
<!--集成日志框架 end-->
</dependencies>
<!--编译打包-->
<build>
<finalName>${project.name}</finalName>
<!--资源文件打包-->
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.xml</include>
</includes>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>org.google.code.flindbugs:jar305</exclude>
<exclude>org.slf4j:*</exclude>
<excluder>org.apache.logging.log4j:*</excluder>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.xsy.sevenhee.flink.TestStreamJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<!--插件统一管理-->
<pluginManagement>
<plugins>
<!--maven打包插件-->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring.boot.version}</version>
<configuration>
<fork>true</fork>
<finalName>${project.build.finalName}</finalName>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
<!--编译打包插件-->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven.plugin.version}</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<encoding>UTF-8</encoding>
<compilerArgs>
<arg>-parameters</arg>
</compilerArgs>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
<!--配置Maven项目中需要使用的远程仓库-->
<repositories>
<repository>
<id>aliyun-repos</id>
<url>https://maven.aliyun.com/nexus/content/groups/public/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<!--用来配置maven插件的远程仓库-->
<pluginRepositories>
<pluginRepository>
<id>aliyun-plugin</id>
<url>https://maven.aliyun.com/nexus/content/groups/public/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
</project>
4.2创建socket数据流作业
package com.aurora.source;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
/**
* @description flink的socket请求的source应用
* @author 浅夏的猫
* @datetime 23:03 2024/1/28
*/
public class FlinkSocketSourceJob {
private static final Logger logger = LoggerFactory.getLogger(FlinkSocketSourceJob.class);
public static void main(String[] args) throws Exception {
//1.创建Flink运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.设置Flink运行模式:
//STREAMING-流模式,BATCH-批模式,AUTOMATIC-自动模式(根据数据源的边界性来决定使用哪种模式)
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//3.基于socket请求的source使用
DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost",8081);
//4.输出打印
dataStreamSource.print();
//5.启动运行
env.execute();
}
}
4.3实时cmd窗口输入数据
注意:先启动cmd窗口监听再启动程序,否则会报端口连接失败