Flink-使用合流操作进行实时对账需求的实现

news2024/10/5 12:49:17

学Flink第八章多流转换的时候,进行合流操作.connect()使用到了第九章状态编程的知识,感觉总体不是很清晰,因此学完状态编程后现在进行重温并细化一些细节

  1. 业务背景

在这里插入图片描述

  • 步骤一:

用户进行支付的时候,后台是需要调用第三方服务平台进行服务,即用户支付请求,页面将会跳转到第三方支付平台支付

  • 步骤二:

用户进行支付之后,第三方支付平台给到用户前端支出反馈,并且给我们平台发送用户已经付款的消息

  • 步骤三:

第三方支付平台需要将钱再转入到我们平台账户

  1. 出现的问题以及需求
  • 问题

如果进行到图中④,如果发生数据丢失,那么用户已经支付的消息无法传达给到后台,而后不能关闭订单

  • 需求

因此需要进行实时对账操作,即用户提交的支付请求(客户端),以及第三方支付平台给到的请求(三方端),两者可以当成两条流

  • 结果

如果进行两条流的操作后不匹配,那么将进行预警

  1. 一些细节考虑
  • 两个流都给他标上时间戳(使用watermark标志)

  • 使用状态编程保存状态以及设置定时器,来进行两条流的连接以及等待

    • 如果对方流中有我流的数据,那么直接输出成功;如果没有则更新我流状态,注册定时器等待另一个流

    • 然后用ontimer()触发定时器:判断条件如果两条流中还有状态没被清空,说明没匹配上

  1. 上代码
  • 代码
public class BillCheckExample {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        //来自app的支付日志
        SingleOutputStreamOperator<Tuple3<String,String,Long>> appStream = env.fromElements(
                Tuple3.of("order-1", "app", 1000L),
                Tuple3.of("order-2", "app", 2000L),
                Tuple3.of("order-3", "app", 3500L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.
        <Tuple3<String,String,Long>>forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
                    @Override
                    public long extractTimestamp(Tuple3<String, String, Long> element, long recordTimestamp) {
                        return element.f2;
                    }
                })
        );
        //来自第三方平台的支付日志
        SingleOutputStreamOperator<Tuple4<String,String,String,Long>> thirdpartStream = env.fromElements(
                Tuple4.of("order-1", "third-party", "success", 3000L),
                Tuple4.of("order-3", "third-party", "success", 4000L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.
                <Tuple4<String,String,String,Long>>forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple4<String,String,String,Long>>() {
                    @Override
                    public long extractTimestamp(Tuple4<String,String,String,Long> element, long recordTimestamp) {
                        return element.f3;
                    }
                })
        );

        //检测同一支付单在两条流中是否匹配,等待一段时间后,不匹配就报警
//        //这种也可以
//        appStream.keyBy(data->data.f0)
//                .connect(thirdpartStream.keyBy(data -> data.f0));
//
        appStream.connect(thirdpartStream)
                        .keyBy(data->data.f0,data-> data.f0)
                        .process(new OrderMatchResult())
                        .print();


        env.execute();
    }
    //自定义实现CoFunction
    public static class OrderMatchResult extends CoProcessFunction<Tuple3<String,String,Long>,
                                                Tuple4<String,String,String,Long>,String>{

        //定义状态变量,用来保存已经到达的事件
        private ValueState<Tuple3<String, String, Long>> appEventState;
        private ValueState<Tuple4<String, String, String, Long>> thirdPartyEventState;

        //运行上下文环境中获取状态
        @Override
        public void open(Configuration parameters) throws Exception {
            appEventState = getRuntimeContext().getState(
                    new ValueStateDescriptor<Tuple3<String, String, Long>>("app-event", Types.TUPLE(Types.STRING,Types.STRING,Types.LONG))
            );
        thirdPartyEventState = getRuntimeContext().getState(
                new ValueStateDescriptor<Tuple4<String, String, String, Long>>("thirdparty-event", Types.TUPLE(Types.STRING, Types.STRING, Types.STRING,Types.LONG))
        );
        }

        @Override
        public void processElement1(Tuple3<String, String, Long> value, CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String>.Context ctx, Collector<String> out) throws Exception {
            //来的是app event,看另一条流中事件是否来过
            if(thirdPartyEventState.value()!=null){
                out.collect("对账成功:"+value+" "+thirdPartyEventState.value());
                //清空状态
                thirdPartyEventState.clear();
            }else{
                //如果没来就等待,并且更新状态
                appEventState.update(value);
                //注册一个5秒后的定时器,开始等待另一条的事件
                ctx.timerService().registerEventTimeTimer(value.f2+5000L);
            }

        }

        @Override
        public void processElement2(Tuple4<String, String, String, Long> value, CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String>.Context ctx, Collector<String> out) throws Exception {
            //来的是app event,看另一条流中事件是否来过
            if(appEventState.value()!=null){
                out.collect("对账成功:"+appEventState.value()+" "+value);
                //清空状态
                appEventState.clear();
            }else{
                //如果没来就等待,并且更新状态
                thirdPartyEventState.update(value);
                //注册一个5秒后的定时器,开始等待另一条的事件
                ctx.timerService().registerEventTimeTimer(value.f3);
            }
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            //定时器触发,判断状态,如果某个状态不为空,说明另一条中事件没来
            //并且不会存在两个都不为空,因为其中一个不为空后会被清除
            //没有没清空表示失败
            if(appEventState.value()!=null){
                out.collect("对账失败:"+appEventState.value()+" "+"第三方支付平台信息未到");
            }
            if(thirdPartyEventState.value()!=null){
                out.collect("对账失败:"+thirdPartyEventState.value()+" "+"APP信息信息未到");
            }
            //清空所有数据
            appEventState.clear();
            thirdPartyEventState.clear();
        }
    }

}
  • 结果
对账成功:(order-1,app,1000) (order-1,third-party,success,3000)
对账成功:(order-3,app,3500) (order-3,third-party,success,4000)
对账失败:(order-2,app,2000) 第三方支付平台信息未到

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/115329.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

leetcode 834. Sum of Distances in Tree(树中的距离和)

无向连接的树&#xff08;不一定是二叉树&#xff09;&#xff0c;求每个节点到其他节点的距离和。 返回一个数组&#xff0c;数组的第i个元素就是第i个节点到其他所有节点的距离之和。 思路&#xff1a; 涉及无向图的构造和遍历&#xff0c;树的前序后序遍历&#xff0c;问题…

论文复现-1:Perturbation CheckLists for Evaluating NLG Evaluation Metrics

以data2text任务为例&#xff0c;探讨generation metric矩阵对于一些句子扰动是否敏感&#xff0c;在多个维度上的敏感性如何&#xff1f; 1数据集 data2text数据集是由3025条samples构成&#xff0c;关键词由“ID”和“reference”构成。 每个子任务由对应的criteria&#…

python基础语法19-calendar模块

一、简介 有了time及datetime模块&#xff0c;再结合日历&#xff08;Calendar&#xff09;模块就可以更好的覆盖到时间处理的各个方面的应用。日历模块主要是用于处理日历及星期相关操作。 calendar模块的内置函数如下: 序号 函数及描述 1 calendar.calendar(yea…

Keras深度学习实战(42)——强化学习基础

Keras深度学习实战&#xff08;42&#xff09;——强化学习基础0. 前言1. 强化学习基础1.1 基本概念1.2 马尔科夫决策过程1.3 目标函数2. 在具有非负奖励的模拟游戏中获取最佳动作2.1 问题设定2.2 模型分析2.3 模型构建与训练3. 在模拟游戏中获取最佳动作3.1 问题定义3.2 模型分…

数据库原理及MySQL应用 | 数据表操作

数据表操作是数据库操作中最基本和最重要的操作。 图5-1是图书销售数据库booksale中存放的图书表books。 ■ 图5-1图书表books 01. 表的结构 表的结构也称为“型”(Type)&#xff0c;用于描述存储于表中的数据的逻辑结构和属性。定义表就是指定义表的结构&#xff0c;使用数据…

Vue CLI系列之生成打包报告

文章の目录一、通过命令行参数的形式生成报告二、通过可视化的UI面板直接查看报告写在最后打包时&#xff0c;为了直观地发现项目中存在的问题&#xff0c;可以在打包时生成报告。生成报告的方式有两种&#xff1a; 一、通过命令行参数的形式生成报告 "scripts": {…

关于对计算机发展史、冯诺依曼体系、CPU基本工作流程以及关于编程语言的简单认识

关于计算机发展史&#xff1a; 关于计算机发展史&#xff0c;大体经历了从一般计算工具到机械计算机到目前的电子计算机的发展历程。 公元前2500年&#xff0c;算盘已经出现&#xff1b; 1694 年&#xff0c;德国博物学家 戈特弗里德莱布尼兹建造了“步进计算器”。 关于步进…

Djiango实现用户管理增删改成功能实战

1.0定义 前后端不分离模式 前后端分离是指前端页面看到的效果都是由后端控制&#xff0c;即后端渲染HTML页面&#xff0c;前端与后端的耦合度比较高 前后端分离模式 后端仅返回前端所需要的数据&#xff0c;不在渲染HTML页面&#xff0c;不在控制前端的效果&#xff0c;至…

八、kubernetes1.25应用升级、回滚

1、概述 用户希望应用程序始终可用&#xff0c;而开发人员则需要每天多次部署它们的新版本。在 Kubernetes 中&#xff0c;这些是通过滚动更新&#xff08;Rolling Updates&#xff09;完成的。 滚动更新 允许通过使用新的实例逐步更新 Pod 实例&#xff0c;零停机进行 Deploym…

Sentinel流控

Sentinel 随着微服务的流行&#xff0c;服务和服务之间的稳定性变得越来越重要。 Sentinel 以流量为切入点&#xff0c;从流量控制、熔断降级、系统负载保护等多个维度保护服务的稳定性。 1.sentinel特性 Sentinel 具有以下特征: 丰富的应用场景&#xff1a; Sentinel 承接了…

最优化方法——最小二乘法与梯度下降法

目录 系列文章目录 一、问题 二、实验思路综述 1.实验工具及算法 2.实验数据 3.实验目标 4.实验步骤 三、最小二乘问题引入 1.最小二乘问题样例 2.最小二乘问题解决方案及数学模型化 3.相关线性代数知识导入 3.1 梯度 3.2 矩阵的逆 3.3 QR分解 四、最小二乘法 …

用 ChatGPT 运行 Python

最近&#xff0c;我一直在阅读一些关于ChatGPT的有趣文章。在一篇文章中&#xff0c;有人发明了一种新的语言&#xff0c;并让ChatGPT运行它。在另一篇文章中&#xff0c;有人在ChatGPT中运行一个虚拟机。后者启发我提出了下面这个问题。你能在ChatGPT中运行一个交互式Python会…

【docker常用命令】

一、帮助启动类命令 &#xff08;1&#xff09;启动docker systemctl start docker&#xff08;2&#xff09;停止docker systemctl stop docker&#xff08;3&#xff09;重启docker systemctl restart docker&#xff08;4&#xff09;查看docker状态 systemctl status…

【数据预处理】基于Pandas的数据预处理技术【california_housing加州房价数据集】_后9个任务

文章目录一.需求分析二.需求解决2.1 对第一个特征&#xff08;收入中位数&#xff09;排序后画散点图2.2 对第一个特征&#xff08;收入中位数&#xff09;画分位数图并分析2.3 【选做】对所有特征画分位数图并进行分析2.4 使用线性回归方法拟合第一个特征&#xff08;收入中位…

基于c# asp.net电子病历管理系统的设计与实现

摘 要 网络的广泛应用给生活带来了十分的便利。所以把电子病历管理与现在网络相结合&#xff0c;利用net语言建设电子病历管理系统&#xff0c;实现电子病历管理的信息化。则对于进一步提高医院的发展&#xff0c;丰富电子病历管理经验能起到不少的促进作用。 电子病历管理系统…

AbstractQueueSynchronizer

AbstractQueueSynchronizer AbstractQueueSynchronizer 是基于 FIFO线程等待队列 的一个同步器开发框架。 这篇文章首先介绍同步器概念&#xff0c;然后介绍AQS的结构原理 什么是Synchronizer&#xff08;同步器&#xff09; 并发环境下&#xff0c;Synchronizer用于实现线…

Windows和Mac系统实现本地部署WebPageTest工具

在项目开发或者测试的过程中&#xff0c;由于没有上线&#xff0c;我们在公网上无法访问我们的网站&#xff0c;但同时我们又需要查看浏览器性能&#xff0c;这样我们就需要在本地部署WebPageTest工具以协助进行性能测试 具体实现步骤&#xff1a; Windows系统&#xff1a; …

FFT求多项式乘积

之前在b站上看到了一些介绍FFT的视频 《快速傅里叶变换(FFT)——有史以来最巧妙的算法&#xff1f;》 《这个算法改变了世界》 于是打算写一篇记录一下qwq&#xff08;本博客中的截图基本上来源于第一个视频&#xff09; Fast Fourier Transform 是一种能在O(nlogn)O(nlogn)…

企业营销数字化转型:如何转型、如何选品、如何用好?

省时查报告-专业、及时、全面的行研报告库省时查方案-专业、及时、全面的营销策划方案库【免费下载】2022年11月份热门报告盘点2023年&#xff0c;如何科学制定年度规划&#xff1f;《底层逻辑》高清配图清华大学256页PPT元宇宙研究报告.pdf&#xff08;附下载链接&#xff09;…

【LeetCode】1759. 统计同构子字符串的数目

统计同构子字符串的数目 题目描述 给你一个字符串 s &#xff0c;返回 s 中 同构子字符串 的数目。由于答案可能很大&#xff0c;只需返回对 109 7 取余 后的结果。 同构字符串 的定义为&#xff1a;如果一个字符串中的所有字符都相同&#xff0c;那么该字符串就是同构字符串…