Flink-03 Flink Java 3分钟上手 Stream 给 Flink-02 DataStreamSource Socket写一个测试的工具!

news2024/11/21 0:35:39

代码仓库

会同步代码到 GitHub
https://github.com/turbo-duck/flink-demo

在这里插入图片描述

当前章节

继续上一节的内容:https://blog.csdn.net/w776341482/article/details/139875037

上一节中,我们需要使用 nc 或者 telnet 等工具来模拟 Socket 流。这节我们写一个 ServerSocket 来模拟这些 操作,让流自动的写入不用我们手动去操作了。

POM.xml

与上一节一致,不需要修改

编写代码

还是和上一节一样的 Socket 流,这里略去其他的代码

DataStreamSource<String> textStream = streamExecutionEnvironment.socketTextStream(ip, port, "\n");

FlinkServer

继承 Thread 启动一个线程来进行Flink的服务

package icu.wzk.demo03;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class FlinkServer extends Thread {

    @Override
    public void run() {
        String ip = "0.0.0.0";
        int port = 9999;
        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> textStream = streamExecutionEnvironment.socketTextStream(ip, port, "\n");
        SingleOutputStreamOperator<Tuple2<String, Long>> tuple2SingleOutputStreamOperator = textStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Long>> collector) throws Exception {
                String[] splits = s.split("\\s");
                for (String word : splits) {
                    collector.collect(Tuple2.of(word, 1L));
                }
            }
        });
        SingleOutputStreamOperator<Tuple2<String, Long>> word = tuple2SingleOutputStreamOperator
                .keyBy(new KeySelector<Tuple2<String, Long>, Object>() {
                    @Override
                    public Object getKey(Tuple2<String, Long> stringLongTuple2) throws Exception {
                        return stringLongTuple2.f0;
                    }
                })
                .window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1)))
                .sum(1);
        System.out.println("wait word print()");
        word.print();
        try {
            streamExecutionEnvironment.execute("stream!");
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

NumRandom

使用 ServerSocket 实现一个持续的流输出

package icu.wzk.demo03;

import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Random;

public class RandomNumClient extends Thread {

    @Override
    public void run() {
        String ip = "0.0.0.0";
        int port = 9999;
        try {
            ServerSocket serverSocket = new ServerSocket();
            InetSocketAddress address = new InetSocketAddress(ip, port);
            serverSocket.bind(address);
            Socket socket = serverSocket.accept();
            OutputStream output = socket.getOutputStream();
            PrintWriter writer = new PrintWriter(output, true);
            Random random = new Random();
            for (int i = 0; i < 500; i ++) {
                int randomNumber = random.nextInt(10) + 1;
                writer.println(randomNumber);
                System.out.println("ServerSocket Send To Flink: " + randomNumber);
                Thread.sleep(200);
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

}

StartApp

将上述的两个类组装起来

请添加图片描述

package icu.wzk.demo03;


public class StartApp {

    public static void main(String[] args) throws Exception {
        RandomNumClient randomNumClient = new RandomNumClient();
        FlinkServer flinkServer = new FlinkServer();
        flinkServer.start();
        randomNumClient.start();
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1855064.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于YOLOv5的火灾检测系统的设计与实现(PyQT页面+YOLOv5模型+数据集)

基于YOLOv5的火灾检测系统的设计与实现 概述系统架构主要组件代码结构功能描述YOLOv5检测器视频处理器主窗口详细代码说明YOLOv5检测器类视频处理类主窗口类使用说明环境配置运行程序操作步骤检测示例图像检测视频检测实时检测数据集介绍数据集获取数据集规模YOLOv5模型介绍YOL…

python输入、输出和变量

一、变量 变量是存储数据的容器。在 Python 中&#xff0c;变量在使用前不需要声明数据类型&#xff0c;Python 会根据赋值自动推断变量类型。 定义变量&#xff1a; 二、输入&#xff08;Input&#xff09; input() 函数用于获取用户输入。默认情况下&#xff0c;input() 会…

在阿里云使用Docker部署MySQL服务,并且通过IDEA进行连接

阿里云使用Docker部署MySQL服务&#xff0c;并且通过IDEA进行连接 这里演示如何使用阿里云来进行MySQL的部署&#xff0c;系统使用的是Linux系统 (Ubuntu)。 为什么使用Docker? 首先是因为它的可移植性可以在任何有Docker环境的系统上运行应用&#xff0c;避免了在不通操作系…

Android intent 打开链接跳转到外部浏览

前言: 各位同学大家好, 最近接到一个比较诡异的需求 ,不是通常的webview 加URL显示网页 是需要跳转到外部浏览器 ,我这边处理好了就分享给大家 效果图 : 点几就跳转到外部浏览器 如图 具体代码实现: 点击打开链接并跳转外部浏览器方法 public void openBrowser(Con…

urfread刷SQL|left join|175. 组合两个表

175. 组合两个表 题目描述 左连接 因为不是所有人都有地址 如果只是用join&#xff0c;那么只会匹配到有地址的人。没有地址的人&#xff0c;就不会显示在结果中。 如果使用左连接&#xff0c;会把左表都显示在结果中&#xff0c;如果谁匹配不到右表&#xff0c;就为空值。 所…

Excel做简单的趋势预测

这种方法不能代替机器学习&#xff0c;时序分析等&#xff0c;只是为后面的时序预测提供一个经验认识。 step1 选中序号列&#xff08;或时间列&#xff09;与预测列如图1所示&#xff1a; 图1 step2 工具栏点击“数据”&#xff0c;然后再“数据”下点击“预测模型”&#x…

CARLA自动驾驶模拟器基础

CARLA 使用服务器-客户端架构运行&#xff0c;其中 CARLA 服务器运行模拟并由客户端向其发送指令。客户端代码使用 API 与服务器进行通信。要使用 Python API&#xff0c;您必须通过 PIP 安装该模块&#xff1a; pip3 install carla-simulator # Python 3World and client 客…

AG32 MCU是否支持DFU下载实现USB升级

1、AG32 MCU是否支持DFU下载实现USB升级呢&#xff1f; 先说答案是NO. STM32 可以通过内置DFU实现USB升级&#xff0c;AG32 MCU目前不支持。但用户可以自己写一个DFU&#xff0c; 作为二次boot. 2、AG32 MCU可支持的下载方式有哪些呢&#xff1f; 我们AG32裸机下载只支持uart和…

文华财经T8自动化交易程序策略模型指标公式源码

文华财经T8自动化交易程序策略模型指标公式源码&#xff1a; //定义变量 //资金管理与仓位控制 8CS:INITMONEY;//初始资金 8QY:MONEYTOT;//实际权益 8QY1:MIN(MA(8QY,5*R),MA(8QY,2*R)); FXBL:N1; DBKS:8QY1*N1;//计算单笔允许亏损额度 BZDKS:MAX(AA-BB,N*1T)*UNIT; SZDKS:MAX…

ChatGPT原理和训练【 ChatGPT是由OpenAI开发】

本人详解 作者:王文峰,参加过 CSDN 2020年度博客之星,《Java王大师王天师》 公众号:JAVA开发王大师,专注于天道酬勤的 Java 开发问题中国国学、传统文化和代码爱好者的程序人生,期待你的关注和支持!本人外号:神秘小峯 山峯 转载说明:务必注明来源(注明:作者:王文峰…

骑马与砍杀-战团mod制作-基础篇-武器模型入骑砍(二)

骑马与砍杀战团mod制作-基础-武器模型入骑砍笔记&#xff08;二&#xff09; 资料来源 学习的资料来源&#xff1a; b站【三啸解说】手把手教你做【骑砍】MOD&#xff0c;基础篇&#xff0c;链接为&#xff1a; https://www.bilibili.com/video/BV19x411Q7No?p4&vd_sour…

秋招突击——6/22——复习{区间DP——加分二叉树,背包问题——买书}——新作{移除元素、实现strStr()}

文章目录 引言复习区间DP——加分二叉树个人实现 背包问题——买书个人实现参考实现 新作移除元素个人实现参考思路 找出字符串中第一个匹配项的下标个人实现参考实现 总结 引言 今天做了一个噩梦&#xff0c;然后流了一身汗&#xff0c;然后没起来&#xff0c;九点多才起床背…

Python + Playwright(0):从零开始学习Playwright自动化框架

Python Playwright&#xff08;0&#xff09;&#xff1a;从零开始学习Playwright自动化框架 简介一、官方文档二、安装安装要求pip安装 三、基本使用方法录制脚本 四、代码示例结语 简介 Playwright 是一个强大的自动化库&#xff0c;由微软开发&#xff0c;主要用于web端的…

会声会影2024永久破解和谐版下载 包含激活码序列号

亲爱的创作伙伴们&#xff0c;今天我要分享一个让我的影视编辑生活大放异彩的神器——会声会影2024破解版本&#xff01;&#x1f389;&#x1f31f; &#x1f308;**功能全面升级**&#xff1a;作为一款专业的视频编辑软件&#xff0c;会声会影2024破解版本不仅继承了之前版本…

CRMEB-PHP多商户版安装系统配置清单

系统在安装完成之后&#xff0c;需要对系统进行一系列的配置&#xff0c;才能正常使用全部的功能&#xff0c;以下是官方整理的配置清单 平台后台 商户后台

Linux常用

很早以前的 ls: 查看文件夹内所有文件 rz: windows的文件传到linux服务器 sz filename: 将文件下载到windows本地 ctrlinsert:复制 shiftinsert:粘贴 ctrlD&#xff1a;退出spark-shell 运行脚本并输出日志 nohup sh filename.sh > log.log 2>&1 & 查看日…

计算机视觉解决什么问题?

本节课为「计算机视觉 CV 核心知识」第一节课正式课&#xff1b; 「AI秘籍」系列课程&#xff1a; 人工智能应用数学基础人工智能Python基础人工智能基础核心知识人工智能BI核心知识人工智能CV核心知识 Hi&#xff0c;大家好。我是茶桁。 老同学对我应该都很熟悉了&#xff…

std::bind与std::ref配合使用时要注意的几个问题

目录 1 假如输入函数的变量是左值非常量引用&#xff0c;则该变量在std::bind中只能用std::ref修饰&#xff0c;不能用cref&#xff0c;否则编译失败&#xff1a; 2 假如输入函数的变量是左值常量引用&#xff0c;则该变量在std::bind中既可以用std::ref修饰&#xff0c;也可…

如何在 Mac 上清空硬盘后恢复丢失的数据?

如果您不小心从 Mac 硬盘上删除了重要文件&#xff0c;您可能会感到非常沮丧。但您仍然可以找回丢失的信息。将 Mac 想象成一个大盒子&#xff0c;里面装着所有东西。丢弃某样东西就像撕掉盒子上的标签&#xff1a;房间现在可以放新东西了&#xff0c;但旧东西仍然在那里&#…

leetcode 二分查找·系统掌握 有效的完全平方数

题目&#xff1a; 题解&#xff1a; 就是一个非常普通的二分查找&#xff0c;但是需要注意的是查找的上下界&#xff0c;因为是完全平方&#xff0c;所以可以把上界设为这个数的一半&#xff0c;但是要特殊处理num等于1的时候。 bool isPerfectSquare(int num) {if(num1)retur…