flink学习之广播流与合流操作demo

news2025/1/15 12:59:58

广播流是什么?

将一条数据广播到所有的节点。使用 dataStream.broadCast()

广播流使用场景?

一般用于动态加载配置项。比如lol,每天不断有人再投诉举报,客服根本忙不过来,腾讯内部做了一个判断,只有vip3以上的客户的投诉才会有人工一对一回复,过了一段时间大家都发现vip3才有人工,都开始充钱到vip3,此时人还是很多,于是只有vip4上的客户才能人工回复

vip3->vip4 这种判断标准在不断的变化。此时就需要广播流。因为此时数据只有1条,需要多个节点都收到这个变化的数据。

广播流怎么用?

一般通过connect合流去操作 a connect b.broadcast 。a是主流也就是数据流,b是配置变化流

不多说直接上demo,开箱即用

package com.chenchi.broadcast;

import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;

import java.util.HashMap;
import java.util.Random;

public class BroadCastStreamDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Pattern> patternDataStream = env.addSource(new ChangeSource());
        DataStream<User> userDataStream = env.addSource(new CustomerSource());


        userDataStream.print("user");
        patternDataStream.print("pattern");
        //test1  直接合流 不广播。只会在一个节点更新。 用于特殊需求?
//        userDataStream
//                .keyBy(user -> user.userId)
//                .connect(patternDataStream)
//                .process(new CustomerSimpleProcess())
//                .print();
        //test2
        // 定义广播状态的描述器,创建广播流 如何保存需要的广播数据呢 这个案例是通过map保留变化数据
//        userDataStream
//                .keyBy(user -> user.userId)
//                .connect(patternDataStream.broadcast())
//                .process(new CustomerSimpleProcess())
//                        .print();
        //test3
        MapStateDescriptor<Void, Pattern> bcStateDescriptor = new MapStateDescriptor<>(
                "patterns", Types.VOID, Types.POJO(Pattern.class));
        //通过描述器 更新
        BroadcastStream<Pattern> broadcast = patternDataStream.broadcast(bcStateDescriptor);
        userDataStream
                .keyBy(user -> user.userId)
                .connect(broadcast)
                .process(new CustomerBroadCastProcess())
                .print();
        env.execute();
    }

    private static class CustomerBroadCastProcess extends KeyedBroadcastProcessFunction<Integer, User, Pattern, String> {

        @Override
        public void processElement(User user, KeyedBroadcastProcessFunction<Integer, User, Pattern, String>.ReadOnlyContext readOnlyContext, Collector<String> collector) throws Exception {
            Integer userVip = user.getVip();
            //获取广播流的数据 不是通过map保存
            Pattern pattern = readOnlyContext.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class))).get(null);
            if (pattern!=null){
                Integer patternVip = pattern.vip;
                String result = "当前系统需要的vip等级=" + patternVip + ",用户id=" + user.userId + ",vip=" + userVip;
                if (userVip>= patternVip){
                    result=result+"符合要求";
                }else {
                    result=result+"不符合要求";
                }
                collector.collect(result);
            }else {
                System.out.println("pattern is null ");
            }

        }

        @Override
        public void processBroadcastElement(Pattern pattern, KeyedBroadcastProcessFunction<Integer,
                User, Pattern, String>.Context context, Collector<String> collector) throws Exception {
            BroadcastState<Void, Pattern> bcState = context.getBroadcastState(
                    new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class)));
            // 将广播状态更新为当前的pattern
            bcState.put(null, pattern);
        }


    }
    public static class CustomerSimpleProcess extends CoProcessFunction<User, Pattern, String> {
        ValueState<Integer> vip; //这个是保留主流的state的。 不是保留广播流的state
        HashMap<String,Integer> vipMap;
        @Override
        public void open(Configuration parameters) throws Exception {
            vip = getRuntimeContext().getState(new ValueStateDescriptor<>("vip", Integer.class));
            vipMap=new HashMap<String,Integer>();
            super.open(parameters);
        }


        @Override
        public void processElement1(User user, CoProcessFunction<User, Pattern, String>.Context context, Collector<String> collector) throws Exception {
            Integer userVip = user.getVip();
            Integer patternVip = vipMap.getOrDefault("vip", 0);
            String result = "当前系统需要的vip等级=" + patternVip + ",用户id=" + user.userId + ",vip=" + userVip;
            if (userVip>=patternVip){
                result=result+"符合要求";
            }else {
                result=result+"不符合要求";
            }
            collector.collect(result);
        }

        @Override
        public void processElement2(Pattern pattern, CoProcessFunction<User, Pattern, String>.Context context, Collector<String> collector) throws Exception {
            vipMap.put("vip",pattern.vip);
        }
    }

    public static class User {
        public Integer userId;
        public Integer vip;

        public User() {
        }

        public User(Integer userId, Integer vip) {
            this.userId = userId;
            this.vip = vip;
        }

        public Integer getUserId() {
            return userId;
        }

        public void setUserId(Integer userId) {
            this.userId = userId;
        }

        public Integer getVip() {
            return vip;
        }

        public void setVip(Integer vip) {
            this.vip = vip;
        }

        @Override
        public String toString() {
            return "Action{" +
                    "userId=" + userId +
                    ", vip='" + vip + '\'' +
                    '}';
        }
    }

    // 定义行为模式POJO类,包含先后发生的两个行为
    public static class Pattern {
        public Integer vip;


        public Pattern() {
        }

        public Pattern(Integer vip) {
            this.vip = vip;
        }

        @Override
        public String toString() {
            return "Pattern{" +
                    "vip='" + vip + '\'' +
                    '}';
        }
    }

    private static class CustomerSource implements SourceFunction<User> {
        boolean run = true;

        @Override
        public void run(SourceContext<User> sourceContext) throws Exception {
            while (true) {
                Integer userId = new Random().nextInt(1000);
                Integer vip = new Random().nextInt(10);
                sourceContext.collect(new User(userId, vip));
                Thread.sleep(1000);
            }
        }

        @Override
        public void cancel() {
            run = false;
        }
    }

    private static class ChangeSource implements SourceFunction<Pattern> {
        boolean run = true;

        @Override
        public void run(SourceContext<Pattern> sourceContext) throws Exception {
            int i = 1;
            while (true) {
                sourceContext.collect(new Pattern(i++));
                Thread.sleep(5000);
            }
        }

        @Override
        public void cancel() {
            run = false;
        }
    }

}

demo思想:以上述vip做例子,获取用户不断投诉的id和vip等级, 数据库保存可以享受人工服务的vip等级,该等级可以自行调整(我是随着时间变化主键增大)。

test1 不广播

注意看pattern:4 print vip=2的消息但是不代表是task4收到的消息,我们看到>1输出了vip=2

但是task10 task9都还是vip=0 ,说明流没有广播,除非此处并行度设置为1

test2 map保存变化数据

test3通过描述器获取数据

和test2 一样,不过要注意因为两个流的数据有先后,可能还没有pattern就来了user信息,所以建议先初始化,或者先添加pattern流。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/980899.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Android开发-Mac Android开发环境搭建(Android Studio Mac环境详细安装教程,适合新手)...

1.进入Android Studio官网 https://developer.android.google.cn/studio 2.点击下载Android Studio. 3.将说明拉到最下方,选择Mac With Apple Chip.苹果芯片选择Apple Chip,如果是Intel芯片可以选择Intel. 4.下载完成后,双击进入Android Studio. 点击打开. 5.如果是第一次安装,…

【枚举子序列+组合数学+推式子】Cf Edu11 E

https://codeforces.com/contest/660/problem/E 题意&#xff1a; 思路&#xff1a; 重点在于枚举子序列&#xff0c;一般是先枚举子序列长度&#xff0c;然后枚举别的 算是经典套路 Code&#xff1a; #include <bits/stdc.h>#define int long longusing i64 long lo…

使用 WebGL 为 HTML5 游戏创建逼真的地形

推荐&#xff1a;使用 NSDT场景编辑器快速搭建3D应用场景 建 模 和 3D 地形 大多数 3D 对象是 使用建模工具创建&#xff0c;这是有充分理由的。创建复杂对象 &#xff08;如飞机甚至建筑物&#xff09;很难在代码中完成。建模工具 几乎总是有意义的&#xff0c;但也有例外&am…

如何正确的写出第一个java程序:hello java

1 前言 最近公司由于项目需要&#xff0c;开始撸java代码了。学习一门新的编程语言&#xff0c;刚开始总是要踩很多坑&#xff0c;所以记录一下学习过程&#xff0c;也希望对java初学者有所帮助。 2 hello java 2.1 程序源码 程序内容十分简单&#xff0c;这里就不再过多赘…

ICCV 2023 | MoCoDAD:一种基于人体骨架的运动条件扩散模型,实现高效视频异常检测

论文链接&#xff1a; https://arxiv.org/abs/2307.07205 视频异常检测&#xff08;Video Anomaly Detection&#xff0c;VAD&#xff09;扩展自经典的异常检测任务&#xff0c;由于异常情况样本非常少见&#xff0c;因此经典的异常检测通常被定义为一类分类问题&#xff08;On…

记录--CSS 滚动驱动动画 scroll()

这里给大家分享我在网上总结出来的一些知识&#xff0c;希望对大家有所帮助 CSS 滚动驱动动画 scroll() animation-timeline 通过 scroll() 指定可滚动元素与滚动轴来为容器动画提供一个匿名的 scroll progress timeline. 通过元素在顶部和底部(或左边和右边)的滚动推进 scroll…

界面控件DevExpress WPF(v23.2)下半年发展路线图

本文主要概述了DevExpress官方在下半年&#xff08;v23.2&#xff09;中一些与DevExpress WPF相关的开发计划。 通过DevExpress WPF能创建有着强大互动功能的XAML基础应用程序&#xff0c;这些应用程序专注于当代客户的需求和构建未来新一代支持触摸的解决方案。 DevExpress …

如何让 Llama2、通义千问开源大语言模型快速跑在函数计算上?

:::info 本文是“在Serverless平台上构建AIGC应用”系列文章的第一篇文章。 ::: 前言 随着ChatGPT 以及 Stable Diffusion,Midjourney 这些新生代 AIGC 应用的兴起&#xff0c;围绕AIGC应用的相关开发变得越来越广泛&#xff0c;有呈井喷之势&#xff0c;从长远看这波应用的爆…

算法基础-数学知识-高斯消元、求组合数

高斯消元、求组合数 高斯消元883. 高斯消元解线性方程组 组合数AcWing 885. 求组合数 IAcWing 886. 求组合数 IIAcWing 887. 求组合数 IIIAcWing 888. 求组合数 IV 高斯消元 找到当前列绝对值最大的数 所在的行将改行的该列的系数变成1&#xff0c;其他列也要跟着变将这行和最…

使用GPT-4生成训练数据微调GPT-3.5 RAG管道

OpenAI在2023年8月22日宣布&#xff0c;现在可以对GPT-3.5 Turbo进行微调了。也就是说&#xff0c;我们可以自定义自己的模型了。然后LlamaIndex就发布了0.8.7版本&#xff0c;集成了微调OpenAI gpt-3.5 turbo的功能 也就是说&#xff0c;我们现在可以使用GPT-4生成训练数据&a…

taro vue3 ts nut-ui 项目

# 使用 npm 安装 CLI $ npm install -g tarojs/cli 查看 Taro 全部版本信息​ 可以使用 npm info 查看 Taro 版本信息&#xff0c;在这里你可以看到当前最新版本 npm info tarojs/cli 项目初始化​ 使用命令创建模板项目&#xff1a; taro init 项目名 taro init myApp …

《TCP/IP网络编程》阅读笔记--基于UDP的服务器端/客户端

目录 1--TCP和UDP的主要区别 2--基于 UDP 的数据 I/O 函数 3--基于 UDP 的回声服务器端/客户端 4--UDP客户端Socket的地址分配 5--UDP存在数据边界 6--UDP已连接与未连接的设置 1--TCP和UDP的主要区别 ① TCP 提供的是可靠数据传输服务&#xff0c;而 UDP 提供的是不可靠…

使用Java分析器优化代码性能,解决OOM问题

有的时候博客内容会有变动&#xff0c;首发博客是最新的&#xff0c;其他博客地址可能会未同步,认准https://blog.zysicyj.top 首发博客地址 背景 最近我一直在做性能优化&#xff0c;对一个单机应用做性能优化。主要是涉及到解析和导入导出相关的业务。 大致说一下这个单机应用…

算法 数据结构 递归插入排序 java插入排序 递归求解插入排序算法 如何用递归写插入排序 插入排序动图 插入排序优化 数据结构(十)

1. 插入排序&#xff08;insertion-sort&#xff09;&#xff1a; 是一种简单直观的排序算法。它的工作原理是通过构建有序序列&#xff0c;对于未排序数据&#xff0c;在已排序序列中从后向前扫描&#xff0c;找到相应位置并插入 算法稳定性: 对于两个相同的数&#xff0c;经过…

Matlab进阶绘图第28期—带回归趋势线的密度散点图

在之前的文章中&#xff0c;分享了Matlab密度散点图的绘制方法&#xff1a; 进一步&#xff0c;假如我们需要计算、添加散点的拟合线&#xff0c;该怎么操作呢&#xff1f; 本期就来分享一下带回归趋势线的密度散点图的绘制方法&#xff0c;先来看一下成品效果&#xff1a; 特…

iOS开发Swift-10-位置授权, cocoapods,API,天气获取,城市获取-和风天气App首页代码

1.获取用户当前所在的位置 在infi中点击加号,选择权限:当用户使用app的时候获取位置权限. 填写使用位置权限的目的. 2.获取用户的经纬度. ViewController: import UIKit import CoreLocationclass ViewController: UIViewController, CLLocationManagerDelegate { //遵循CLL…

C#进阶 多个泛型约束

using System; using System.Collections; using System.Collections.Generic; using System.Linq; using UnityEngine;public class A02_Generic : MonoBehaviour {[ContextMenu("测试Start")]// Start is called before the first frame updatevoid Start(){Person…

Java SPI的原理和实践

Java SPI的概念和术语 SPI&#xff1a;全称是Service Provider Interface它是从Java 6开始引入的&#xff0c;是一种基于ClassLoader来发现并加载服务的机制。一个标准的SPI&#xff0c;由3个组件构成&#xff0c;分别是&#xff1a; Service - 服务接口&#xff1a;是一个公…

简单理解微服务限流、降级、熔断

微服务限流、降级、熔断分别都是什么意思&#xff0c;我们平时工作中为什么要关注这些东西呢&#xff1f; 公司不断的发展壮大&#xff0c;一开始处于蛮荒时代&#xff0c;咱们从单体应用过渡到微服务的时候&#xff0c;可能还是那一套单体的思想&#xff0c;再加上用户量可能…

VIT理论代码详解

将图像输入到transformer的思想 把每个像素点按照顺序拿出来&#xff0c;作为token&#xff0c;这样做的话输入参数规模是&#xff1a;假如是1通道的灰度图&#xff1a; 224x224x150176&#xff0c;bert才512&#xff0c;是bert的100倍。 改进方法&#xff1a; VIT模型架构图…