【极数系列】Flink集成DataSource读取Socket请求数据(09)

news2025/1/10 16:09:15

文章目录

  • 01 引言
  • 02 简介概述
  • 03 基于socket套接字读取数据
    • 3.1 从套接字读取。元素可以由分隔符分隔。
    • 3.2 windows安装netcat工具
      • (1)下载netcat工具
      • (2)安装部署
      • (3)启动socket端口监听
  • 04 源码实战demo
    • 4.1 pom.xm依赖
    • 4.2创建socket数据流作业
    • 4.3实时cmd窗口输入数据

01 引言

源码地址,一键下载可用:https://gitee.com/shawsongyue/aurora.git
模块:aurora_flink
主类:FlinkSocketSourceJob(socket请求)

02 简介概述

1.Source 是Flink程序从中读取其输入数据的地方。你可以用 StreamExecutionEnvironment.addSource(sourceFunction) 将一个 source 关联到你的程序。

2.Flink 自带了许多预先实现的 source functions,不过你仍然可以通过实现 SourceFunction 接口编写自定义的非并行 source。

3.也可以通过实现 ParallelSourceFunction 接口或者继承 RichParallelSourceFunction 类编写自定义的并行 sources。

03 基于socket套接字读取数据

3.1 从套接字读取。元素可以由分隔符分隔。

3.2 windows安装netcat工具

(1)下载netcat工具

下载地址:https://eternallybored.org/misc/netcat/

(2)安装部署

注意:不是拷贝整个文件夹,而是文件夹里面的全部文件。

将解压后的单个文件全部拷贝到C:\Windows\System32的文件夹下。

(3)启动socket端口监听

注意:该端口需要跟代码中监听的端口一致,否则获取不到数据

nc -l -p 8081

04 源码实战demo

4.1 pom.xm依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.xsy</groupId>
    <artifactId>aurora_flink</artifactId>
    <version>1.0-SNAPSHOT</version>

    <!--属性设置-->
    <properties>
        <!--java_JDK版本-->
        <java.version>11</java.version>
        <!--maven打包插件-->
        <maven.plugin.version>3.8.1</maven.plugin.version>
        <!--编译编码UTF-8-->
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <!--输出报告编码UTF-8-->
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <!--json数据格式处理工具-->
        <fastjson.version>1.2.75</fastjson.version>
        <!--log4j版本-->
        <log4j.version>2.17.1</log4j.version>
        <!--flink版本-->
        <flink.version>1.18.0</flink.version>
        <!--scala版本-->
        <scala.binary.version>2.11</scala.binary.version>
        <!--log4j依赖-->
        <log4j.version>2.17.1</log4j.version>
    </properties>

    <!--通用依赖-->
    <dependencies>

        <!-- json -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <!--================================集成外部依赖==========================================-->
        <!--集成日志框架 start-->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <!--集成日志框架 end-->
    </dependencies>

    <!--编译打包-->
    <build>
        <finalName>${project.name}</finalName>
        <!--资源文件打包-->
        <resources>
            <resource>
                <directory>src/main/resources</directory>
            </resource>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.xml</include>
                </includes>
            </resource>
        </resources>

        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>org.google.code.flindbugs:jar305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <excluder>org.apache.logging.log4j:*</excluder>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>org.xsy.sevenhee.flink.TestStreamJob</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

        <!--插件统一管理-->
        <pluginManagement>
            <plugins>
                <!--maven打包插件-->
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <version>${spring.boot.version}</version>
                    <configuration>
                        <fork>true</fork>
                        <finalName>${project.build.finalName}</finalName>
                    </configuration>
                    <executions>
                        <execution>
                            <goals>
                                <goal>repackage</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>

                <!--编译打包插件-->
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>${maven.plugin.version}</version>
                    <configuration>
                        <source>${java.version}</source>
                        <target>${java.version}</target>
                        <encoding>UTF-8</encoding>
                        <compilerArgs>
                            <arg>-parameters</arg>
                        </compilerArgs>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

    <!--配置Maven项目中需要使用的远程仓库-->
    <repositories>
        <repository>
            <id>aliyun-repos</id>
            <url>https://maven.aliyun.com/nexus/content/groups/public/</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>

    <!--用来配置maven插件的远程仓库-->
    <pluginRepositories>
        <pluginRepository>
            <id>aliyun-plugin</id>
            <url>https://maven.aliyun.com/nexus/content/groups/public/</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </pluginRepository>
    </pluginRepositories>

</project>

4.2创建socket数据流作业

package com.aurora.source;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;

/**
 * @description flink的socket请求的source应用
 * @author 浅夏的猫
 * @datetime 23:03 2024/1/28
*/
public class FlinkSocketSourceJob {

    private static final Logger logger = LoggerFactory.getLogger(FlinkSocketSourceJob.class);

    public static void main(String[] args) throws Exception {

        //1.创建Flink运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.设置Flink运行模式:
        //STREAMING-流模式,BATCH-批模式,AUTOMATIC-自动模式(根据数据源的边界性来决定使用哪种模式)
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //3.基于socket请求的source使用
        DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost",8081);

        //4.输出打印
        dataStreamSource.print();

        //5.启动运行
        env.execute();
    }

}

4.3实时cmd窗口输入数据

注意:先启动cmd窗口监听再启动程序,否则会报端口连接失败

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1421653.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

回归预测 | Matlab实现CPO-SVR冠豪猪优化支持向量机的数据多输入单输出回归预测

回归预测 | Matlab实现CPO-SVR冠豪猪优化支持向量机的数据多输入单输出回归预测 目录 回归预测 | Matlab实现CPO-SVR冠豪猪优化支持向量机的数据多输入单输出回归预测预测效果基本描述程序设计参考资料 预测效果 基本描述 1.Matlab实现CPO-SVR冠豪猪优化支持向量机的数据多输入…

深度强化学习(王树森)笔记11

深度强化学习&#xff08;DRL&#xff09; 本文是学习笔记&#xff0c;如有侵权&#xff0c;请联系删除。本文在ChatGPT辅助下完成。 参考链接 Deep Reinforcement Learning官方链接&#xff1a;https://github.com/wangshusen/DRL 源代码链接&#xff1a;https://github.c…

部署PXE高效批量网络装机

部署PXE高效批量网络装机 因在Cisco3850核心交换机中已开启DHCP 服务&#xff0c;因此不需要在配置DHCP服务。如果您的网络环境中也已有DHCP服务&#xff0c;也不用再配置DHCP服务了&#xff0c;直接部署PXE相关服务即可。 找一台linux系统的服务器&#xff0c;这本次试验用的是…

华为云codeArts使用操作流程

一、开启服务 什么是华为云CodeArts&#xff1f; 本实验将在华为云CodeArts平台上搭建一个凤凰商城开发项目&#xff0c;并完成需求管理、代码仓库、代码检查、编译构建、发布、部署、流水线等软件开发操作。 1)新建项目 进入华为云“控制台”&#xff0c;鼠标移动到页面左侧菜…

kettle通过severice_name连接oracle数据源踩坑

最近在研究kettle做数据抽取核对&#xff0c;按照官网安装kettle后无法连接oracle 坑1&#xff1a;kettle 连接oracle的数据库名指的是sidname 而非severicename&#xff0c;前期一直使用severicename 如下始终报错 注意区分下&#xff1a; SID:一个数据库可以有多个实例&…

Docker容器引擎镜像创建

目录 一、镜像的创建 &#xff08;一&#xff09;基于现有镜像创建 1.启动一个镜像&#xff0c;在容器里做修改 2.将修改后的容器提交为新的镜像 &#xff08;二&#xff09;基于本地模板创建 &#xff08;三&#xff09;基于Dockerfile 创建 1.联合文件系统&#xff08…

【Node.js基础】http模块的使用

文章目录 前言一、客户端与服务器是什么二、http模块的使用2.1 导入http模块2.2 创建web服务器服务器的相关概念创建最基本的web服务器req对象解决发送中文乱码根据不同的url 响应不同的 html内容 总结 前言 Node.js 提供了一个强大的内置模块——http 模块&#xff0c;用于创…

力扣 55.跳跃游戏

思路&#xff1a; 从后往前遍历&#xff0c;遇到元素为0时&#xff0c;记录对应的下标位置&#xff0c;再向前遍历元素&#xff0c;看最大的跳跃步数能否跳过0的位置&#xff0c;不能则继续往前遍历 代码&#xff1a; class Solution { public:bool canJump(vector<int>…

linux --中断管理 -- irq的自动探测机制

irq自动探测机制 如果一个设备的驱动程序无法确定它说管理的设备的软件中断号irq&#xff0c;此时设备驱动程序可以使用irq的自动探测机制来获取其正在使用的irq。 使用自动探测机制的条件 内核与驱动&#xff0c;必须共同努力才能完成只限于非共享中断的情况 探测前&#…

[AG32VF407]国产MCU+FPGA Verilog编写控制2路gpio输出不同频率方波实验

视频讲解 [AG32VF407]国产MCUFPGA Verilog编写控制2路gpio输出不同频率方波实验 实验过程 根据原理图&#xff0c;选择两个pin脚作为输出 修改VE文件&#xff0c;clk选择PIN_OSC&#xff0c;使用内部晶振8Mhz&#xff0c;gpio使用PIN_51和52&#xff0c;pinout是数组 添加pll…

并发编程之线程

一、并发、并行、串行 并发: 多个任务在同一时间段内同时执行&#xff0c;如果是单核计算机&#xff0c;CPU会不断地切换任务来完成并发操作 并行:多任务在同一时刻同时执行&#xff0c;计算机需要有多核心&#xff0c;每个核心独立执行一个任务&#xff0c;多个任务同时执行…

C++文件操作(1)

C文件操作 1.文本的写入及读取文本文件写入文本文件读取 2.二进制文件的写入及读取二进制文件写入二进制文件读取 3.小结 C也有处理文件的能力&#xff0c;其功能实现依赖文件流。文件流是C中用来处理文件输入输出的一种流类。文件流可以用于从文件中读取数据或将数据写入到文件…

洛谷P8599 [蓝桥杯 2013 省 B] 带分数

[蓝桥杯 2013 省 B] 带分数 题目描述 100 100 100 可以表示为带分数的形式&#xff1a; 100 3 69258 714 100 3 \frac{69258}{714} 100371469258​。 还可以表示为&#xff1a; 100 82 3546 197 100 82 \frac{3546}{197} 100821973546​。 注意特征&#xff1a;带分…

ASTORS国土安全奖:ManageEngine AD360荣获银奖

美国安全今日&#xff08;AST&#xff09;的年度“ASTORS”国土安全奖计划是一个备受瞩目的活动&#xff0c;致力于突显国土安全领域的创新与进步。这一奖项旨在表彰在保护国家免受安全威胁方面做出卓越贡献的个人和组织。该计划汇聚了执法、公共安全和行业领袖&#xff0c;不仅…

【Web前端实操20】商城官网_黑色导航

今天继续着上一篇博客的内容进行编写,本次主要实现的是商场官网实战里面的黑色导航栏部分,也就是广告图片下面的部分。 本次除了每篇都要通用的样式CSS代码之外,还增添了一个引用矢量图标的样式,如果有兴趣的友友们,可以看看我的这篇博客,上面详细介绍了如何下载矢量图标…

live2D学习:做好让图片动起来的准备

做好让图片动起来的准备https://www.bilibili.com/video/BV1JE411Y7Te?p2&vd_source124076d7d88eee393a1d8bf6fc787efa 把psd文件通过菜单栏的“打开文件”进行导入或直接把psd文件拖到Live2D Cubism Editor 4.0的面板中 网格 我们在点击图像的一部分时&#xff0c;会出现…

Boosting semantic human matting with coarse annotations

前向推理在modelscope中开源了&#xff0c;但是训练没开源&#xff0c;且是基于TensorFlow的&#xff0c;复现起来是比较麻烦的。 1.Introduction 分割技术主要集中在像素级二元分类&#xff0c;抠图被建模为前景图像F和背景图像B的加权融合&#xff0c;大多数matte方法采用指…

RK3568平台 热插拔机制

一.热插拔的基本概念 热插拔是指在设备运行的情况下&#xff0c;能够安全地插入或拔出硬件设备&#xff0c;而无需关闭或重启系统。这意味着你可以在计算机或其他电子设备上插入或拔出硬件组件&#xff08;比如USB设备&#xff0c;扩展卡&#xff0c;硬件驱动器等&#xff09;…

001集—shapefile(.shp)格式详解——arcgis

一、什么是shapefile Shapefile 是一种用于存储地理要素的几何位置和属性信息的非拓扑简单格式。shapefile 中的地理要素可通过点、线或面&#xff08;区域&#xff09;来表示。包含 shapefile 的工作空间还可以包含 dBASE 表&#xff0c;它们用于存储可连接到 shapefile 的要…

mysql入门到精通003-基础篇-SQL

1、目录 2、SQL通用语法及分类 2.1 SQL通用语法 2.2 SQL分类 3、SQL DDL数据库操作 3.1 SQL DDL表操作-创建&查询 3.1.1 表操作-查询 3.1.2 表操作-创建 create table tb_user(id int comment 编号,name varchar(50) comment 用户名,age int comment 用户名,gender varch…