大数据-玩转数据-Flink营销对账

news2024/12/25 1:45:36

一、说明

在电商网站中,订单的支付作为直接与营销收入挂钩的一环,在业务流程中非常重要。对于订单而言,为了正确控制业务流程,也为了增加用户的支付意愿,网站一般会设置一个支付失效时间,超过一段时间不支付的订单就会被取消。另外,对于订单的支付,我们还应保证用户支付的正确性,这可以通过第三方支付平台的交易数据来做一个实时对账。

二、思路

对于订单支付事件,用户支付完成其实并不算完,我们还得确认平台账户上是否到账了。而往往这会来自不同的日志信息,所以我们要同时读入两条流的数据来做合并处理。

三、数据准备

订单数据从OrderLog.csv中读取,交易数据从ReceiptLog.csv中读取
JavaBean类的准备

四、代码

package com.lyh.flink06;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.util.Collector;

import java.util.HashMap;
import java.util.Map;

public class Project_Order {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        SingleOutputStreamOperator<OrderEvent> orderEventString = env.readTextFile("input/OrderLog.csv")
                .map(line -> {
                    String[] data = line.split(",");
                    return new OrderEvent(
                            Long.valueOf(data[0]),
                            data[1],
                            data[2],
                            Long.valueOf(data[3])
                    );
                }).filter(log -> "pay".equals(log.getEventType()));

        SingleOutputStreamOperator<TxEvent> txEventString = env.readTextFile("input/ReceiptLog.csv")
                .map(line -> {
                    String[] data = line.split(",");
                    return new TxEvent(
                            data[0],
                            data[1],
                            Long.valueOf(data[2])
                    );
                });

        orderEventString.connect(txEventString)
                .keyBy(OrderEvent::getTxId,TxEvent::getTxId)
                .process(new KeyedCoProcessFunction<String, OrderEvent, TxEvent, String>() {
                    Map<String,OrderEvent> OrderEventmap = new HashMap<>();
                    Map<String,TxEvent> TxEventmap = new HashMap<>();
                    @Override
                    public void processElement1(OrderEvent value,
                                                Context ctx,
                                                Collector<String> out) throws Exception {
                        TxEvent txEvent = TxEventmap.get(ctx.getCurrentKey());
                        if (txEvent != null) {
                            out.collect("订单" + value.getOrderId() + "对账成功");
                        }else {
                            OrderEventmap.put(ctx.getCurrentKey(),value);
                        }

                    }

                    @Override
                    public void processElement2(TxEvent value,
                                                Context ctx,
                                                Collector<String> out) throws Exception {
                        OrderEvent orderEvent = OrderEventmap.get(ctx.getCurrentKey());
                        if (orderEvent != null) {
                           out.collect("订单" + orderEvent.getOrderId() + "对账成功");
                        }else {
                            TxEventmap.put(ctx.getCurrentKey(),value);
                        }
                    }
                }).print();
        env.execute();

    }
}

五、结果

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/905338.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Codeforces Round 890 (Div. 2) E2. PermuTree (hard version) (主席树/树状数组/差分+前缀和)

题目 有一个初始为空的数组&#xff0c;你需要处理q(q<1e6)次操作&#xff0c;操作分四种&#xff1a; ① x&#xff0c;数组后面加一个新的数x ② - k&#xff0c;删掉数组最后面的k个值 ③ !&#xff0c;回滚最后一次变更&#xff08;只有①操作和②操作视为变更&…

Leetcode刷题之快乐数

题⽬描述&#xff1a; 算法原理: 为了⽅便叙述&#xff0c;将「对于⼀个正整数&#xff0c;每⼀次将该数替换为它每个位置上的数字的平⽅和」这⼀个 操作记为 x 操作&#xff1b; 我们做这道题可以参考环形链表:142. 环形链表 II - 力扣&#xff08;LeetCode&#xff09;…

在线HmacSHA224加密工具--在线获取哈希值又称摘要

具体请前往&#xff1a;在线计算HmacSha224工具

Internet Download Manager2023下载器最新中文版本功能

对于idm相信大家都不陌生&#xff0c;全称是Internet Download Manager。idm是一款非常经典、功能强大的Windows文件多线程下载加速软件&#xff0c;在电脑用户中口碑极好&#xff0c;被称为必装的HTTP下载神器。 1、idm既是下载器&#xff0c;也是加速器&#xff0c;可以提升…

next.js 创建 react ant design ts 项目

环境说明&#xff1a;next.js 官方文档要求node版本在16.8以上。笔者使用的 node版本是16.20.1&#xff0c;不要使用16.13.0&#xff0c;笔者在使用 node16.13.0环境时创建的 react 项目点击事件无效 next.js官网截图 next.js 官网&#xff1a;https://nextjs.org/ react 官网…

个人信息保护影响评估(PIA)怎么做?解发条件、实施步骤、操作指南

个人信息保护一直是人们关注的热点话题&#xff0c;互联网、人工智能、大数据等新兴技术的快速发展极大地增强了入侵个人信息的能力&#xff0c;对个人信息的随意收集、违法获取、过度使用、非法买卖、泄露等问题引起了全球各国的普遍关注。同时随着用户的个人信息保护意识的逐…

Flask模型部署教程?

如何使用Flask框架来部署机器学习模型&#xff1f;Flask是一个轻量级的Python Web框架&#xff0c;它非常适合用于将机器学习模型部署成实际应用。 什么是Flask&#xff1f; Flask是一个Python Web应用框架&#xff0c;它允许轻松地构建Web应用程序。它被广泛用于构建各种Web…

基于Java+SpringBoot+Vue的校企合作项目管理系统【源码+论文+演示视频+包运行成功】

博主介绍&#xff1a;✌擅长Java、微信小程序、Python、Android等&#xff0c;专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;&#x1f3fb; 不然下次找不到哟 Java项目精品实战案…

代码随想录(八):贪心算法

文章目录 455.分发饼干376. 摆动序列53. 最大子数组和122. 买卖股票的最佳时机 II55. 跳跃游戏1005. K 次取反后最大化的数组和134. 加油站860. 柠檬水找零135. 分发糖果406. 根据身高重建队列 455.分发饼干 题目链接 C代码&#xff1a; class Solution { public:int findCo…

在Windows下安装PhantomJS和CasperJS及入门介绍(上)

近在使用Python爬取网页内容时&#xff0c;总是遇到JS临时加载、动态获取网页信息的困难。例如爬取CSDN下载资源评论、搜狐图片中的“原图”等&#xff0c;此时尝试学习Phantomjs和CasperJS来解决这个问题。这第一篇文章当然就是安装过程及入门介绍。 一. 安装Phantomjs 下载地…

SWUST派森练习题:P118. 数组接雨

描述 给定一个整形数组​​arr​​**&#xff0c;已知其中所有的值都是非负的&#xff0c;将这个数组看作一个柱子高度图&#xff0c;计算按此排列的柱子&#xff0c;下雨之后能接多少雨水。​​(​​数组以外的区域高度视为​0)** 数据范围&#xff1a;数组长度​​** 0≤n≤…

wustojc2010两小时学完C语言

#include <stdio.h> int main() {int a,b,c;scanf("%d%d%d",&a,&b,&c);printf("%d",a-b*c);return 0;}

AUTOSAR规范与ECU软件开发(实践篇)4.5在Simulink中导入软件组件描述文件——“自上而下”的工作流程

“自上而下”的工作流程有别于“自下而上”的工作流程,其需要先在AAT(AUTOSAR Authoring Tool)工具(如ISOLAR-A)中完成软件组 件框架设计,并将软件组件arxml描述文件导入Matlab/Simulink完成内部 算法的实现,然后再通过Matlab/Simulink生成符合AUTOSAR规范的代 码及arxm…

js 获取页面的滚动高度

想要获取页面的滚动位置可以通过给window绑定滚动事件来实现。 window.addEventListener(scroll,()>{const n document.documentElement.scrollTopconsole.log(n);}) 通过该方法可以获取页面的当前位置&#xff0c;或者实现其他的效果&#xff0c;例如电梯导航

v-model原理

v-model本质上是一个语法糖&#xff0c;应用在输入框上&#xff0c;就是value属性 和input事件的合写 作用:提供数据的双向绑定—实现子组件 和父组件数据的双向绑定 数据变 视图跟着变 :value视图变 数据跟着变 input <input type"text" v-model"msg"…

漏洞指北-VulFocus靶场专栏-中级01

漏洞指北-VulFocus靶场专栏-中级01 中级001 &#x1f338;dcrcms 文件上传 &#xff08;CNVD-2020-27175)&#x1f338;step1&#xff1a;输入账号 密码burp suite 拦截 修改类型为 jpeg 中级002 &#x1f338;thinkphp3.2.x 代码执行&#x1f338;step1&#xff1a;burpsuite …

[保研/考研机试] KY11 二叉树遍历 清华大学复试上机题 C++实现

题目链接&#xff1a; 二叉树遍历_牛客题霸_牛客网编一个程序&#xff0c;读入用户输入的一串先序遍历字符串&#xff0c;根据此字符串建立一个二叉树&#xff08;以指针方式存储&#xff09;。题目来自【牛客题霸】https://www.nowcoder.com/share/jump/43719512169254700747…

【Android】Mobile-Security-Framework-MobSF Manifest 静态扫描规则

前言 移动安全框架&#xff08;MobSF&#xff09;是一个自动化的一体化移动应用程序&#xff08;Android/iOS/Windows&#xff09;测试、恶意软件分析和安全评估框架&#xff0c;能够执行静态和动态分析。MobSF支持移动应用程序二进制文件&#xff08;APK、XAPK、IPA和APPX&am…

JavaScript:DOM (5) 节点的CRUD - 修改、删除

修改(替换)节点 替换子项 replaceChild()可以将指定元素的某个子节点换成新的节点&#xff0c;语法为指定元素.replaceChild(新节点, 旧节点)。 范例&#xff1a; 原始结构&#xff1a; <ul><li>第一项</li><li>第二项</li><li>第三项&l…

Python编程从入门到实践_8-8 用户的专辑_答案

Python编程从入门到实践_8-8 用户的专辑_答案 我也看了一些其他人的答案&#xff0c;很多的答案存在问题&#xff0c;每次调用函数 make_album() 后生成一个专辑字典会覆盖上次调用函数 make_album() 生成的字典&#xff0c;不符合题意。 我采取的解决方案是添加一个空列表 …