Java版Flink使用指南——背压

news2024/9/20 6:39:39

大纲

  • 新建工程
  • 模拟函数
  • 自定义无界流
  • 背压测试
    • 引入数据
    • 低压侧
    • 高压侧
    • 测试结果
  • 优化
    • 降低算法复杂度
    • 提高并行度
  • 工程代码

背压(Backpressure)又称“反压”,是指在Flink的处理过程中,某个过程出于某种原因,消耗的上游数据过慢,导致上游数据积压。
本文我们将通过例子来探索背压产生的原因以及处理方法。

新建工程

我们新建一个名字叫Backpressure的工程。
Archetype:org.apache.flink:flink-quickstart-java
版本:1.19.1
在这里插入图片描述

模拟函数

我们借助《0基础学习区块链技术——入门》的思想,设计一个函数,用于计算出一个符合“前置N个0”的Hash。

    private static String generateHash(String input) {
        try {
            MessageDigest md = MessageDigest.getInstance("SHA-256");
            byte[] hashBytes = md.digest(input.getBytes());
			StringBuilder sb = new StringBuilder();
			for (byte b : hashBytes) {
				sb.append(String.format("%02x", b));
			}
			while (sb.length() < 64) {
				sb.insert(0, "0");
			}
			return sb.toString();
        } catch (NoSuchAlgorithmException e) {
            throw new RuntimeException("SHA-256 algorithm not found", e);
        }
    }

	private static String generateHash(String input, int zeroCount) {
		Long seed = 0L;
		String value = input + " seed:" + seed;
		String hash = generateHash(value);
		String cmpStart = "0".repeat(zeroCount);
		while (!hash.startsWith(cmpStart)) {
			seed++;
			value = input + " seed:" + seed;
			hash = generateHash(value);
		}
		return hash;
	}

后续我们通过调整zeroCount来调整计算过程的复杂性。

自定义无界流

通过这个无界数据流,我们可以持续给系统提供数据,进而方便我们测试。
这块代码见《Java版Flink使用指南——自定义无界流生成器》。

package org.example.generator;

import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

public class UnBoundedStreamGenerator extends RichSourceFunction<Long> {
    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<Long> ctx) throws Exception {
        long count = 0L;
        while (isRunning) {
            Thread.sleep(10); // Simulate delay
            ctx.collect(count++); // Emit data
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
        System.out.println("UnBoundedStreamGenerator canceled");
    }
}

背压测试

引入数据

引入的数据就是上面创建的无界数据流,它会每10毫秒产生一条自增的Long型数据。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SingleOutputStreamOperator<Long> customStreamSource = env.addSource(new UnBoundedStreamGenerator()).setParallelism(1).name("Custom Stream Source");

低压侧

我们在低压侧使用复杂度为1的算法,即要求算出的Hash值最前面一位是0即可。

int parallelism_low = 2;
int complexity_low = 1;

customStreamSource.keyBy(fields -> fields % parallelism_low).countWindow(10).apply(new WindowFunction<Long, String, Long, GlobalWindow>(){
	@Override
	public void apply(Long key, GlobalWindow window, Iterable<Long> input, Collector<String> out) throws Exception {
		String newLine = "";
		for (Long element : input) {
			String hash = generateHash(element.toString(), complexity_low);
			newLine = newLine + " " + hash;
		}
		out.collect(newLine);
	}
}).setParallelism(parallelism_low).name("complexity = " + complexity_low).print().name("print complexity = " + complexity_low);

高压侧

高压侧的复杂度是3,即要求算出的Hash值前3为是0。

int parallelism_high = 2;
int complexity_high = 3;

customStreamSource.keyBy(fields -> fields % parallelism_high).countWindow(10).apply(new WindowFunction<Long, String, Long, GlobalWindow>(){
	@Override
	public void apply(Long key, GlobalWindow window, Iterable<Long> input, Collector<String> out) throws Exception {
		String newLine = "";
		for (Long element : input) {
			String hash = generateHash(element.toString(), complexity_high);
			newLine = newLine + " " + hash;
		}
		out.collect(newLine);
	}
}).setParallelism(parallelism_high).name("complexity = " + complexity_high).print().name("print complexity = " + complexity_high);

测试结果

刚开始时,complexity = 3,即高压侧的过程是红色。
在这里插入图片描述
其处理速度不到低压侧(complexity = 1)的一半。目前缓冲区还没满,所以Source侧还没有背压。
在这里插入图片描述

这种情况持续下去,会导致Source过程产生背压,即数据拥堵。
过了一段时间,Source侧显示出已经处于“LOW”状态的低压了。
在这里插入图片描述
但是很快,它就会变成“HIGH”状态。
在这里插入图片描述
此时,可以看到Flink还可以继续运行,但是JVM的内存在持续增长。
在这里插入图片描述
在这里插入图片描述

优化

降低算法复杂度

由于complexity = 3时,算法复杂度比较高,才导致了背压。所以最简单的办法就是修改算法,降低其复杂度。
我们将算法复杂度调到1。

int parallelism_high = 2;
int complexity_high = 1;

customStreamSource.keyBy(fields -> fields % parallelism_high).countWindow(10).apply(new WindowFunction<Long, String, Long, GlobalWindow>(){
	@Override
	public void apply(Long key, GlobalWindow window, Iterable<Long> input, Collector<String> out) throws Exception {
		String newLine = "";
		for (Long element : input) {
			String hash = generateHash(element.toString(), complexity_high);
			newLine = newLine + " " + hash;
		}
		out.collect(newLine);
	}
}).setParallelism(parallelism_high).name("complexity = " + complexity_high).print().name("print complexity = " + complexity_high);

在这里插入图片描述
可以看到不会有背压产生。

提高并行度

如果算法就是很复杂,不能提升效率,那就要调动更多的CPU资源来计算。此时我们可以提升并行度来达成。
如下代码,我们将计算的并行度调整为9。

int parallelism_high = 9;
int complexity_high = 3;

customStreamSource.keyBy(fields -> fields % parallelism_high).countWindow(10).apply(new WindowFunction<Long, String, Long, GlobalWindow>(){
	@Override
	public void apply(Long key, GlobalWindow window, Iterable<Long> input, Collector<String> out) throws Exception {
		String newLine = "";
		for (Long element : input) {
			String hash = generateHash(element.toString(), complexity_high);
			newLine = newLine + " " + hash;
		}
		out.collect(newLine);
	}
}).setParallelism(parallelism_high).name("complexity = " + complexity_high).print().name("print complexity = " + complexity_high);

则背压也不会产生
在这里插入图片描述

工程代码

https://github.com/f304646673/FlinkDemo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1918925.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

分析逆向案例十三——拍拍贷登录密码逆向和JS原型链

网址&#xff1a;拍拍贷 登陆页面&#xff0c;找登陆包&#xff0c;密码和用户名都进行了加密。 直接参数搜索 &#xff0c;一眼下面的第二条&#xff0c;直接点击进入 加密位置打上断点分析&#xff0c;一个加密嵌套另一个加密。 中间的e.md5好像是md5加密&#xff0c;打印一…

windows系统上python3安装open3d第三方库

打开命令提示符&#xff0c;输入&#xff0c; pip install open3d -i https://pypi.tuna.tsinghua.edu.cn/simple成功页面&#xff0c;

百度安全大模型智能体实践入选信通院“安全守卫者计划”优秀案例

7月3日&#xff0c;由全球数字经济大会组委会主办&#xff0c;中国信息通信研究院&#xff08;以下简称中国信通院&#xff09;与中国通信标准化协会联合承办的2024全球数字经济大会“云和软件安全论坛暨第二届SecGo云和软件安全大会”在北京召开。本届论坛聚焦云和软件安全最新…

【Stable Diffusion】(基础篇三)—— 图生图基础

图生图基础 本系列笔记主要参考B站nenly同学的视频教程&#xff0c;传送门&#xff1a;B站第一套系统的AI绘画课&#xff01;零基础学会Stable Diffusion&#xff0c;这绝对是你看过的最容易上手的AI绘画教程 | SD WebUI 保姆级攻略_哔哩哔哩_bilibili 本文主要讲解如何使用S…

线性系统理论及应用GUI设计及仿真

目录 1.控制系统的状态空间模型 1.1.状态空间模型 1.2 传递函数模型 1.3 传递函数转换为状态空间模型 1.4.状态空间模型转换为传递函数 1.5.状态空间模型转化为约当标准型 2.线性系统的时域分析 2.1.矩阵指数函数的计算 2.2.线型定常连续系统的状态空间模型求解 3.线…

java后端项目启动失败,解决端口被占用问题

报错信息&#xff1a; Web server failed to start . Port 8020 was already in use. 1、查看端口号 netstat -ano | findstr 端口号 2、终止进程 taskkill /F /PID 进程ID 举例&#xff1a;关闭8020端口

Echarts 实现空心圆的绘制

文章目录 需求分析在指定位置绘制需求 如图所示,绘制空心圆 分析 <!DOCTYPE html> <html><head><meta charset=

dive deeper into tensor:从底层开始学习tensor

inspired by karpathy/micrograd: A tiny scalar-valued autograd engine and a neural net library on top of it with PyTorch-like API (github.com)and Taking PyTorch for Granted | wh (nrehiew.github.io). 这属于karpathy的karpathy/nn-zero-to-hero: Neural Networks…

【数据结构】深入理解哈希及其底层数据结构

目录 一、unordered系列关联式容器 二、底层结构 2.1 哈希的概念 2.2 哈希冲突&#xff08;哈希碰撞&#xff09; 2.3 哈希函数 2.4 哈希冲突处理 2.4.1 闭散列&#xff08;开放定址法&#xff09; 2.4.1.1 代码实现&#xff1a; 2.4.2 开散列&#xff08;链地址法&…

利用视频识别做一个土粒实时监测系统

要利用视频识别技术构建一个土粒实时监测系统&#xff0c;我们可以参考以下方案&#xff0c;该方案结合了计算机视觉、深度学习以及相关技术的要点。 一、系统概述 土粒实时监测系统基于先进的视频识别技术&#xff0c;旨在实现对土壤颗粒的实时、准确监测。该系统可以应用于…

Android启动优化之精确测量启动各个阶段的耗时

1. 直观地观察应用启动时长 我们可以通过观察logcat日志查看Android应用启动耗时&#xff0c;过滤关键字"Displayed"&#xff1a; ActivityTaskManager: Displayed com.peter.viewgrouptutorial/.activity.DashboardActivity: 797ms 启动时长(在这个例子中797ms)表示…

水库大坝安全监测险情主要内容

水库常见险情主要包括洪水漫顶、脱坡滑坡、坝体裂缝、 散浸、渗漏、漏洞、陷坑、管涌等&#xff0c;此外风浪冲击、水流冲刷等也会加剧险情的扩大。大坝险情万一抢护不及时&#xff0c;易导致发 生溃坝事故&#xff0c;造成极为严重的灾难性后果。要做到及时有效地 抢护大坝险情…

智慧金融-数据可视化

智慧金融-数据可视化 导入所需的库 import numpy as np import numpy_financial as npf import matplotlib.pyplot as plt from pylab import mpl mpl.rcParams[font.sans-serif][FangSong] mpl.rcParams[axes.unicode_minus]False单图曲线图 r 0.05 # 贷款的年利率 n 30…

28.IP核理论知识(Xilinx)

&#xff08;1&#xff09;ip核是什么&#xff1f; IP&#xff08;Intellectual Property&#xff09;即知识产权&#xff0c;在半导体产业中&#xff0c;将IP核定义为“用于ASIC或FPGA中的预先设计好的电路功能模块”&#xff0c;简而言之&#xff0c;这里的IP即电路功能模块。…

使用 `useAppConfig` :轻松管理应用配置

title: 使用 useAppConfig &#xff1a;轻松管理应用配置 date: 2024/7/11 updated: 2024/7/11 author: cmdragon excerpt: 摘要&#xff1a;本文介绍了Nuxt开发中useAppConfig的使用&#xff0c;它便于访问和管理应用配置&#xff0c;支持动态加载资源、环境配置切换、权限…

MacOS 开发 — Packages 程序 macOS新版本 演示选项卡无法显示

MacOS 开发 — Packages 程序 macOS新版本 演示选项卡无法显示 问题描述 &#xff1a; 之前写过 Packages 的使用以及如何打包macOS程序。最近更新了新的macOS系统&#xff0c;发现Packages的演示选项卡无法显示&#xff0c;我尝试从新安转了Packages 也是没作用&#xff0c;…

医院人员管理系统03_上午:JDBC连接数据库,完成简单的增删改查

文章目录 jdbc连接数据库什么是jdbc完成jdbc的步骤导入jar包写三个类DBConn.java加载驱动类&#xff1a;找到对应的然后写上获取连接关闭连接代码解释 最后写一个main方法调用测试一下运行结果 Students.javaStudentDao.java 运行结果![](https://i-blog.csdnimg.cn/direct/ba2…

bC一体化:推拉联动双向引擎促增长

在数字化浪潮的推动下&#xff0c;快消品行业正面临着前所未有的变革。在过去的几十年里&#xff0c;快速消费品的渠道分销模式发展经历了几个重要的阶段。传统渠道分销模式&#xff0c;从多级分销商的“批发制胜”到“深度分销”的人海战术&#xff0c;在过去的几十年内虽各有…

工业智能网关的功能特点及应用

工业智能网关的功能特点及应用 工业智能网关作为工业互联网架构中的关键组件&#xff0c;扮演着数据桥梁与边缘计算核心的角色&#xff0c;它不仅促进了物理设备与云端平台之间的高效通信&#xff0c;还通过集成的高级功能推动了工业4.0时代的智能化转型。以下是对其功能特点及…

【北京迅为】《i.MX8MM嵌入式Linux开发指南》-第一篇 嵌入式Linux入门篇-第二十五章 Source Insight 的安装和使用

i.MX8MM处理器采用了先进的14LPCFinFET工艺&#xff0c;提供更快的速度和更高的电源效率;四核Cortex-A53&#xff0c;单核Cortex-M4&#xff0c;多达五个内核 &#xff0c;主频高达1.8GHz&#xff0c;2G DDR4内存、8G EMMC存储。千兆工业级以太网、MIPI-DSI、USB HOST、WIFI/BT…