Flink DataStream之使用filter实现分流

news2025/1/11 10:56:19
  • 新建类
package test01;

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SplitStreamByFilter {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
        //设置并行度
        executionEnvironment.setParallelism(1);
        //监听数据端口
        DataStreamSource<String> dataSource = executionEnvironment.socketTextStream("localhost", 9999);
        //使用filter对数据进行分流,这里测试如果是以H开头的分为一个流,以M开头的分为一个流。缺点是每条数据都会执行一下各个filter
        SingleOutputStreamOperator<String> hWord = dataSource.filter(value -> value.startsWith("H"));
        SingleOutputStreamOperator<String> mWord = dataSource.filter(value -> value.startsWith("M"));
        //打印分流,这里print可以使用ctrl+p看到print有个参数,这样就可以在打印时在开头位置加上一些信息。
        hWord.print("以H开头:");
        mWord.print("以M开头:");

        executionEnvironment.execute();


    }
}
  • 启动netcat和程序

 可以看到输入的"World"由于不满足两个filter中的任何一个,所以数据被舍弃。"Monday"和"Hello"分别打印在两个不同的流中。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/734232.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Pygame Zero(pgzrun)游戏库介绍

Pygame Zero&#xff08;pgzrun&#xff09;游戏库介绍 pgzero是python的一个第三方库。pgzrun 是 python game zero run 的缩写, 它对 Pygame 进行了封装, 屏蔽了繁琐枯燥的框架代码, 让学习者可以更专注于游戏的实现逻辑, 并且更快看到成果。 官网https://pygame-zero.read…

单样本微调给ChatGLM2注入知识~

前方干货预警&#xff1a;这可能也是一篇会改变你对LLM微调范式&#xff0c;以及对LLM原理理解的文章。 同时这也是一篇非常有趣好玩&#xff0c;具有强大实操性的ChatGLM2微调喂饭级教程。 我们演示了使用AdaLoRA算法&#xff0c;使用1条样本对ChatGLM2-6b实施微调。几分钟就成…

【Redis】五大数据类型(操作命令)

&#x1f3af;Redis 命令 &#x1f6a9;Redis 键(key) 这些是 Redis 数据库中的命令&#xff0c;用于对数据类型进行操作和管理。以下是每个命令的含义和用法&#xff1a; DEL&#xff1a;删除一个或多个键。DUMP&#xff1a;将一个键的值转储到一个字符串中。EXPIRE&#x…

【数据结构二叉树OJ系列】4、翻转二叉树(又称求二叉树的镜像)

目录 法一、 法二、 题述&#xff1a; 翻转一颗二叉树。 输入&#xff1a; 输出&#xff1a; 题中已给&#xff1a; struct TreeNode {int val;struct TreeNode* left;struct TreeNode* right; }; TreeNode* invertTree(struct TreeNode* root) 法一、 思路&#xff1a;…

操作指南 | 如何使用Foundry在Moonbeam上进行部署

Foundry是一种以太坊开发环境&#xff0c;可帮助构建者管理依赖项、编译项目、测试或部署合约以及通过指令与区块链进行交互。Foundry已成为流行的开发智能合约开发环境&#xff0c;仅需要使用Solidity即可进行操作。Moonbeam在官方文档网站提供了有关将Foundry与Moonbeam网络结…

vector [] 赋值出现的报错问题

下面这段代码的作用是创建了一个整数类型的vector&#xff08;std::vector<int>&#xff09;并对其进行操作。以下是代码的详细说明&#xff1a; 使用reserve(10)方法为向量分配至少10个元素的存储空间。reserve() 预留了额外的存储空间&#xff0c;以避免后续添加元素时…

C++之typeof和typeid用法(一百五十三)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 人生格言&#xff1a; 人生…

动态规划--Fibonacci数列 III

描述 众所周知&#xff0c;Fibonacci数列是一个著名数列。它的定义是&#xff1a; 本题要求采用第三种方法&#xff1a;简单的动态规划。 用数组把求出来的 Fibonacci 数列保存下来&#xff0c;以免后面要的时候再算一次。 输入描述 每行一个整数 i &#xff0c;表示 Fibona…

【C++修炼之路】string 模拟实现

&#x1f451;作者主页&#xff1a;安 度 因 &#x1f3e0;学习社区&#xff1a;StackFrame &#x1f4d6;专栏链接&#xff1a;C修炼之路 文章目录 一、默认成员函数1、全缺省构造2、析构3、拷贝构造&#xff08;深拷贝&#xff09;4、赋值重载&#xff08;深拷贝&#xff09;…

Langchain 新手完全指南

Langchain 可能是目前在 AI 领域中最热门的事物之一&#xff0c;仅次于向量数据库。 它是一个框架&#xff0c;用于在大型语言模型上开发应用程序&#xff0c;例如 GPT、LLama、Hugging Face 模型等。 它最初是一个 Python 包&#xff0c;但现在也有一个 TypeScript 版本&…

Git gui教程---第五篇 Git gui的使用 查看提交历史

查看提交历史 1.点击菜单栏的“版本库”&#xff0c;选择“图示master分支的历史” 2.出现的界面就是显示当前分支的提交历史了

Java基础---Java中创建对象方式

目录 使用new关键字 使用反射机制 使用clone方法 使用反序列化 使用方法句柄 使用Unsafe分配内存 使用new关键字 这是最常见的也是最简单的创建对象的方式通过这种方式还可以调用任意的构造函数&#xff08;无参的和有参的&#xff09; 使用反射机制 运用反射手段&#…

单个电源模块带电感的直流压降仿真(一)

单个电源模块带电感的直流压降仿真(一) 下面实例分析单个电源模块带电感的直流压降仿真分析,以下图为例 具体操作如下 创建新的workspaceLoad a New/Different layout(把PCB文件加载进来)

【滑动窗口】209. 长度最小的子数组

209. 长度最小的子数组 解题思路 滑动窗口设置前后指针滑动窗口内的元素之和总是大于或者等于s滑动窗口的起始位置: 如果窗口的值大于等于s 窗口向前移动窗口结束位置:for循环的j class Solution {public int minSubArrayLen(int target, int[] nums) {int left 0;// 滑动窗口…

UDS统一诊断服务【七】DTC控制0X85服务

文章目录 前言一、DTC控制服务介绍二、数据格式2.1 请求报文2.2 子功能2.3响应格式 三、举例总结 前言 大家好&#xff0c;我是嵌入式老林&#xff0c;从事嵌入式软件开发多年&#xff0c;今天分享的内容是UDS诊断故障码控制0X85服务介绍&#xff0c;希望能对你有所帮助 一、D…

[LeetCode周赛复盘] 第 353 场周赛20230709

[LeetCode周赛复盘] 第 353 场周赛20230709 一、本周周赛总结6451. 找出最大的可达成数字1. 题目描述2. 思路分析3. 代码实现 6899. 达到末尾下标所需的最大跳跃次数1. 题目描述2. 思路分析3. 代码实现 6912. 构造最长非递减子数组1. 题目描述2. 思路分析3. 代码实现 6919. 使…

人工智能与Chat GPT

一本书全面掌握ChatGPT&#xff0c;既有向ChatGPT提问的技巧&#xff0c; 也有构建自己的ChatGPT模型的方法&#xff0c;涵盖开发背景、关联技术、使用方法、应用形式、实用案例等 人工智能是我们这个时代最热门的话题&#xff0c;人们既希望它能代替我们做一些工作&#xff0c…

Python使用SQLAlchemy

Python使用SQLAlchemy 1 安装SQLAlchemy 备注&#xff1a;本文适用于SQLAlchemy>2.0 # 安装SQLAlchemy pip install SQLAlchemy# 安装pymysql pip install pymysql参考文档&#xff08;SQLAlchemy>2.0&#xff09; https://docs.sqlalchemy.org/en/20/创建数据库 # …

什么是敏捷测试?

目录 前言&#xff1a; 敏捷测试的定义 敏捷测试的特点 为什么要敏捷测试 缩短价值交付周期 强调质量属于大家 化繁为简节省成本 敏捷测试VS. 传统测试 传统测试如何迁移到敏捷测试 1. 组织文化的转变 2. 组织架构的调整 3. 人员培训与指导 4. 轻流程 敏捷测试成…

电视访问Samba

文章目录 问题描述方案一&#xff1a;当贝播放器方案二&#xff1a;nPlayer方案三&#xff1a;Kodi 问题描述 本人使用小米 AX9000 路由器 移动硬盘组了个轻 NAS&#xff0c;想通过电视访问 Samba 看视频&#xff08;也可以电脑开 SMB&#xff09; 开启 Samba 功能 文件夹开…