大数据-玩转数据-Flink

news2024/11/25 7:37:41

一、说明

在电商网站中,订单的支付作为直接与营销收入挂钩的一环,在业务流程中非常重要。对于订单而言,为了正确控制业务流程,也为了增加用户的支付意愿,网站一般会设置一个支付失效时间,超过一段时间不支付的订单就会被取消。另外,对于订单的支付,我们还应保证用户支付的正确性,这可以通过第三方支付平台的交易数据来做一个实时对账。

二、思路

对于订单支付事件,用户支付完成其实并不算完,我们还得确认平台账户上是否到账了。而往往这会来自不同的日志信息,所以我们要同时读入两条流的数据来做合并处理。

三、数据准备

订单数据从OrderLog.csv中读取,交易数据从ReceiptLog.csv中读取
JavaBean类的准备

四、代码

package com.lyh.flink06;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.util.Collector;

import java.util.HashMap;
import java.util.Map;

public class Project_Order {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        SingleOutputStreamOperator<OrderEvent> orderEventString = env.readTextFile("input/OrderLog.csv")
                .map(line -> {
                    String[] data = line.split(",");
                    return new OrderEvent(
                            Long.valueOf(data[0]),
                            data[1],
                            data[2],
                            Long.valueOf(data[3])
                    );
                }).filter(log -> "pay".equals(log.getEventType()));

        SingleOutputStreamOperator<TxEvent> txEventString = env.readTextFile("input/ReceiptLog.csv")
                .map(line -> {
                    String[] data = line.split(",");
                    return new TxEvent(
                            data[0],
                            data[1],
                            Long.valueOf(data[2])
                    );
                });

        orderEventString.connect(txEventString)
                .keyBy(OrderEvent::getTxId,TxEvent::getTxId)
                .process(new KeyedCoProcessFunction<String, OrderEvent, TxEvent, String>() {
                    Map<String,OrderEvent> OrderEventmap = new HashMap<>();
                    Map<String,TxEvent> TxEventmap = new HashMap<>();
                    @Override
                    public void processElement1(OrderEvent value,
                                                Context ctx,
                                                Collector<String> out) throws Exception {
                        TxEvent txEvent = TxEventmap.get(ctx.getCurrentKey());
                        if (txEvent != null) {
                            out.collect("订单" + value.getOrderId() + "对账成功");
                        }else {
                            OrderEventmap.put(ctx.getCurrentKey(),value);
                        }

                    }

                    @Override
                    public void processElement2(TxEvent value,
                                                Context ctx,
                                                Collector<String> out) throws Exception {
                        OrderEvent orderEvent = OrderEventmap.get(ctx.getCurrentKey());
                        if (orderEvent != null) {
                           out.collect("订单" + orderEvent.getOrderId() + "对账成功");
                        }else {
                            TxEventmap.put(ctx.getCurrentKey(),value);
                        }
                    }
                }).print();
        env.execute();

    }
}

五、结果

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/903763.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

大数据从入门到放弃——浅谈数据架构的前世今生

文章目录 1. 背景2. 数据的定义及分类2.1 数据的定义2.2 数据的分类2.3 数据和信息的区别 3. 数据的作用4. 数据的那些美好时代4.1 人脑时代4.2 文件时代4.3 数据库时代4.3.1 大服务器时代4.3.2 读写分离时代4.4 数据库的分布式时代4.5 云端时代 5. 数据的未来 1. 背景 随着云时…

excel 核心快捷键用法

1、wps怎样只复制公示计算出来的数据 1.1、按下快捷键“CtrlC”&#xff0c;复制该单元格。 1.2、按下快捷键“ShiftCtrlV”&#xff0c;即“粘贴为数值”&#xff0c;即可只复制数字而不复制该单元格的公式 1.3、wps怎样只复制公示计算出来的数据_百度知道https://zhidao.baid…

【福建事业单位-综合基础知识】05民法典

这里写自定义目录标题 一、民法概述概念原则总结 二、自然人概念总结 三、民事法律行为总结 民法考察2-4题&#xff08;重点总则篇&#xff09; 一、民法概述 概念原则 总结 二、自然人 概念 总结 三、民事法律行为 总结

【python】正则表达式

本文介绍正则表达式常用的用法。 有哪些正则字符 正则表达式中有各种各样的正则字符&#xff0c;用于匹配不同情况下的字符串。具体如下&#xff1a; 使用 re 模块进行字符串匹配 比如&#xff0c;我们要从 ‘Xiaoshuaib has 100 bananas’ 中匹配一个数字&#xff0c;可…

Zoho Books的安全性和数据保护:财务信息安全的保障措施揭秘

在信息化时代&#xff0c;如何保障企业信息安全是十分重要的问题&#xff0c;尤其是财务信息。财务管理工具的安全性是否有保障是许多用户担心的问题。 Zoho Books财务管理工具为客户提供了一系列的数据保护和安全措施&#xff0c;以确保客户财务信息的安全。 1. 采用高度加密…

漏洞指北-VluFocus靶场专栏-工具篇

漏洞指北-VluFocus靶场专栏-番外篇奇技淫巧 &#x1f338;1、burp suite 、中国蚁剑工具、Strut2扫描软件地址&#x1f338;&#x1f338;2、burp suite使用&#x1f338;step1 浏览器开启代理&#xff0c;**推荐使用&#xff1a;SwitchyOmega** step2 确认浏览器端口和burp su…

LeetCode 542. 01 Matrix【多源BFS】中等

本文属于「征服LeetCode」系列文章之一&#xff0c;这一系列正式开始于2021/08/12。由于LeetCode上部分题目有锁&#xff0c;本系列将至少持续到刷完所有无锁题之日为止&#xff1b;由于LeetCode还在不断地创建新题&#xff0c;本系列的终止日期可能是永远。在这一系列刷题文章…

栈空间和栈帧

如图所示&#xff0c;栈空间是每个线程私有的&#xff0c;其中每个方法有一个栈帧&#xff0c;里面保存了局部变量 返回地址等信息。 如果是多线程&#xff0c;每个线程都会有一个栈空间。 多线程切换的时候需要保存局部变量、当前的地址等信息。 线程上下文切换的时机&…

mybatis入门Idea搭建

一、概念 1、什么是mybatis&#xff1f; MyBatis是一个开源的Java持久层框架&#xff0c;它提供了一种简化数据库访问的方式。它的主要作用是将Java对象与数据库表之间进行映射&#xff0c;使开发者可以通过面向对象的方式操作数据库&#xff0c;而不需要编写大量的SQL语句。M…

线性代数的学习和整理5: 矩阵的加减乘除及其几何意义(未完成,建设ing)

目录 1 矩阵加法 1.1 矩阵加法的定义 1.2 加法的属性 1.2.1 只有同类型&#xff0c;相同n*m的矩阵才可以相加 1.2.1 矩阵加法的可交换律&#xff1a; 1.2.2 矩阵加法的可结合律&#xff1a; 1.3矩阵加法的几何意义 2 矩阵的减法 2.1 矩阵减法定义和原理基本同 矩阵的…

前端学习记录~2023.8.3~JavaScript重难点实例精讲~第5章 DOM与事件

第 5 章 DOM与事件 前言5.1 DOM选择器5.1.1 传统原生JavaScript选择器&#xff08;1&#xff09;通过id定位&#xff08;2&#xff09;通过class定位&#xff08;3&#xff09;通过name属性定位&#xff08;4&#xff09;通过标签名定位 5.1.2 新型的querySelector选择器和quer…

虚拟内存机制1

虚拟内存机制 计算机的存储系统 为什么要有虚拟内存&#xff1f; 在早期的计算机中&#xff0c;是没有虚拟内存的概念的。我们要运行一个程序&#xff0c;会把程序全部装入内存&#xff0c;然后运行。当运行多个程序时&#xff0c;经常会出现以下问题&#xff1a; 进程地址空…

2022年国考行政执法卷-判断推理

去掉重复题 例题 例题 例题 例题 例题 例题 例题 例题 例题 例题 类比推理 例题 例题 例题 例题 例题 例题

【汇编语言】CS、IP寄存器

文章目录 修改CS、IP的指令转移指令jmp问题分析 修改CS、IP的指令 理论&#xff1a;CPU执行何处的指令&#xff0c;取决于CS:IP应用&#xff1a;程序员可以通过改变CS、IP中的内容&#xff0c;进行控制CPU即将要执行的目标指令&#xff1b;问题&#xff1a;如何改变CS、IP中的…

go: go.mod file not found in current directory or any parent directory.

go version go 1.20.7 go 1.17 以后都是用 go install 命令 D:\Go\bin\go.exe get -u github.com/nsf/gocode D:\Go\bin\go.exe get -u golang.org/x/tools/cmd/guru D:\Go\bin\go.exe get -u github.com/rogpeppe/godef>> Running: D:\Go\bin\go.exe get -u github.com…

Kubernetes_Scheduler_资源调度

文章目录 一、前言二、k8s 资源模型2.1 Node 资源抽象2.1.1 Capacity2.1.2 Allocatable2.1.3 Allocated 2.2 Node 资源切分&#xff08;预留&#xff09;2.2.1 SystemReserved2.2.2 KubeReserved2.2.3 EvictionThreshold&#xff08;驱逐门限&#xff09;2.2.4 Allocatable 2.3…

二叉树搜索

✅<1>主页&#xff1a;我的代码爱吃辣&#x1f4c3;<2>知识讲解&#xff1a;数据结构——二叉搜索树☂️<3>开发环境 &#xff1a;Visual Studio 2022&#x1f4ac;<4>前言&#xff1a;在之前的我们已经学过了普通二叉树&#xff0c;了解了基本的二叉树…

Spring(四):Spring Boot 的创建和使用

关于Spring之前说到&#xff0c;Spring只是思想&#xff08;核心是IOC、DI和AOP&#xff09;&#xff0c;而具体的如何实现呢&#xff1f;那就是由Spring Boot 来实现&#xff0c;Spring Boot究竟是个啥呢&#xff1f; 什么是Spring Boot&#xff0c;为什么要学Spring Boot Sp…

Multi-UAV Disaster Environment Coverage Planning with Limited-Endurance

Multi-UAV Disaster Environment Coverage Planning with Limited-Endurance 有限续航时间下的多无人机灾害环境覆盖规划 定义问题将初始地图转换为热图产生优化路径 基于 已知的灾区热图&#xff0c;设计一个多无人机全覆盖搜索的路径规划方法。可以在无人机有限能量约束下探索…