基于Flink SQL实现7天用户行为风险识别,结合滚动窗口预聚合与CEP复杂事件处理技术,根据用户7天的动作,包括交易,支付,评价等行为,识别用户的风险等级

news2025/2/26 23:53:31

一、数据建模与预聚合

1. 数据源定义
CREATE TABLE user_actions (
  user_id STRING,
  event_time TIMESTAMP(3),
  action_type STRING, -- 交易/支付/评价
  amount DOUBLE,
  status STRING,      -- 交易状态(成功/失败)
  review_score INT,   -- 评价分数(1-5分)
  WATERMARK FOR event_time AS event_time - INTERVAL '5' MINUTE
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'format' = 'json'
);
2. 日维度滚动窗口预聚合
CREATE VIEW daily_metrics AS
SELECT 
  user_id,
  TUMBLE_START(event_time, INTERVAL '1' DAY) AS window_start,
  COUNT_IF(action_type = 'transaction' AND status = 'failed') AS daily_failed_trans,
  SUM_IF(amount, action_type = 'payment' AND amount > 10000) AS daily_high_payment,
  COUNT_IF(action_type = 'review' AND review_score <= 2) AS daily_negative_review
FROM user_actions
GROUP BY 
  user_id, 
  TUMBLE(event_time, INTERVAL '1' DAY); -- 按日滚动窗口聚合

关键优化

  • 使用COUNT_IF/SUM_IF过滤无效数据,减少后续处理量
  • 预聚合结果写入Redis/HBase,支持快速合并计算 

二、CEP规则定义(7天风险模式检测)

1. CEP模式语法
SELECT *
FROM daily_metrics
MATCH_RECOGNIZE (
  PARTITION BY user_id
  ORDER BY window_start
  MEASURES
    SUM(A.daily_failed_trans) AS total_failed,
    SUM(B.daily_high_payment) AS total_high_payment,
    LAST(C.daily_negative_review) AS last_negative_review,
    CASE 
      WHEN SUM(A.daily_failed_trans) >=1 AND 
           SUM(B.daily_high_payment) >=1 AND 
           LAST(C.daily_negative_review) >=1 THEN 'HIGH'
      ELSE 'LOW'
    END AS risk_level
  PATTERN (A+ B+ C) WITHIN INTERVAL '7' DAY  -- 7天内模式匹配
  DEFINE
    A AS daily_failed_trans >= 1,    -- 至少1次失败交易
    B AS daily_high_payment >= 1,    -- 至少1次大额支付(金额>1万)
    C AS daily_negative_review >= 1  -- 至少1次差评(评分≤2)
);

模式详解

  • A+:匹配连续多日(≥1天)的失败交易
  • B+:匹配连续多日(≥1天)的大额支付
  • C:匹配最后1次差评事件
  • WITHIN限制整体时间窗口为7天 
2. 动态规则管理
-- 外部规则表(MySQL)
CREATE TABLE risk_rules (
  rule_id STRING,
  condition STRING, -- 如 'total_failed>=1 AND total_high_payment>=1'
  risk_level STRING,
  PRIMARY KEY (rule_id)
) WITH ('connector'='jdbc', ... );

-- 动态关联规则
SELECT r.risk_level, c.* 
FROM cep_results c
JOIN risk_rules FOR SYSTEM_TIME AS OF c.window_start AS r
ON c.risk_condition = r.condition;

优势

  • 规则热更新:修改MySQL规则后,通过PatternProcessorDiscoverer动态加载 
  • 支持多级风险(如增加MEDIUM级别)

三、性能优化策略

1. 状态管理
  • 窗口状态TTL:设置14天过期(2倍窗口周期)
  • RocksDB状态后端:支持TB级状态存储 
  • 增量检查点:减少Checkpoint数据量 
2. 计算优化
  • Local-Global聚合:先本地预聚合再全局合并 
  • 水位线对齐:配置table.exec.source.idle-timeout防止窗口卡住 

四、风险处置联动

1. 告警输出
INSERT INTO risk_alert
SELECT 
  user_id, 
  risk_level,
  PROCTIME() AS alert_time 
FROM cep_results 
WHERE risk_level = 'HIGH';
2. 实时阻断
// 自定义UDF调用风控API
@FunctionHint(output = @DataTypeHint("BOOLEAN"))
public class BlockUserFunction extends ScalarFunction {
  public boolean eval(String userId) {
    return RiskService.block(userId); // 调用外部风控系统
  }
}

五、案例验证

测试数据示例

user_id日期失败交易大额支付差评
U0012025-02-16100
U0012025-02-18010
U0012025-02-20001

输出结果

user_id: U001, risk_level: HIGH 
window_start: 2025-02-16, window_end: 2025-02-23

总结

该方案通过FlinkSQL实现特征矩阵实时计算CEP动态规则引擎结合,解决了传统风控模型规则更新滞后的问题。关键技术点包括:

  1. 时态表关联(Temporal Table Join)实现实时-维度数据融合
  2. MATCH_RECOGNIZE语法定义复杂事件模式 
  3. 动态规则加载避免作业重启[[2][5]]

落地时可参考电商/金融行业案例,通过AB测试验证规则有效性(如误报率降低30%+)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2306604.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【无标题】网络安全公钥密码体制

第一节 网络安全 概述 一、基本概念 网络安全通信所需要的基本属性“ 机密性&#xff1b;消息完整性&#xff1b;可访问性与可用性&#xff1b;身份认证。 二、网络安全威胁 窃听&#xff1b;插入&#xff1b;假冒&#xff1b;劫持&#xff1b;拒绝服务Dos和分布式拒绝服务…

【含开题报告+文档+PPT+源码】基于SpringBoot的进销存管理系统的设计与实现

开题报告 本文提出并研发了一款基于Spring Boot框架构建的进销存管理系统&#xff0c;该系统集成了全方位的企业运营管理功能&#xff0c;涵盖了用户登录验证、系统公告管理、员工信息与权限管理、物料全流程&#xff08;采购入库、销售出库、退货处理&#xff09;控制、部门组…

Linux-SaltStack配置

文章目录 SaltStack配置 &#x1f3e1;作者主页&#xff1a;点击&#xff01; &#x1f916;Linux专栏&#xff1a;点击&#xff01; ⏰️创作时间&#xff1a;2025年02月24日20点51分 SaltStack配置 SaltStack 中既支持SSH协议也支持我们的一个客户端 #获取公钥&#xff08;…

事务的4个特性和4个隔离级别

事务的4个特性和4个隔离级别 1. 什么是事务2. 事务的ACID特性2.1 原子性2.2 一致性2.3 持久性2.4 隔离性 3. 事务的创建4. 事务并发时出现的问题4.1 DIRTY READ 脏读4.2 NON - REPEATABLR READ 不可重复读4.3 PHANTOM READ 幻读 5. 事务的隔离级别5.1 READ UNCOMMITTED 读未提交…

对计算机中缓存的理解和使用Redis作为缓存

使用Redis作为缓存缓存例子缓存的引入 Redis缓存的实现 使用Redis作为缓存 缓存 ​什么是缓存&#xff0c;第一次接触这个东西是在考研学习408的时候&#xff0c;计算机组成原理里面学习到Cache缓存&#xff0c;用于降低由于内存和CPU的速度的差异带来的延迟。它是在CPU和内存…

SOME/IP-SD -- 协议英文原文讲解5

前言 SOME/IP协议越来越多的用于汽车电子行业中&#xff0c;关于协议详细完全的中文资料却没有&#xff0c;所以我将结合工作经验并对照英文原版协议做一系列的文章。基本分三大块&#xff1a; 1. SOME/IP协议讲解 2. SOME/IP-SD协议讲解 3. python/C举例调试讲解 5.1.2.5 S…

lowagie(itext)老版本手绘PDF,包含页码、水印、图片、复选框、复杂行列合并等。

入口类&#xff1a;exportPdf ​ package xcsy.qms.webapi.service;import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.alibaba.nacos.common.utils.StringUtils; import com.ibm.icu.text.RuleBasedNumberFormat; import com.lowa…

达梦有没有类似oerr的功能

在oracle 23ai的sqlplus中&#xff0c;直接看异常信息说明&#xff1a; 达梦没有此功能&#xff0c;但是可以造一个 cd /home/dmdba cat >err.sql<<eof set echo off set ver off set timing off set lineshow off set feedback off select * from V\$ERR_INFO wher…

实战-网安

面试感受:网安公司前端实习 今天我有幸面试了一家网络安全公司的前端开发实习岗位,整个过程让我受益匪浅,也让我对未来的职业发展有了更清晰的认识。 首先,面试官非常专业且友好,整个面试氛围轻松但不失严谨。面试一开始,面试官简单介绍了公司背景和团队文化,让我对公…

MybatisPlus-扩展功能-枚举处理器

在Mybatis里有一个叫TypeHandler的类型处理器&#xff0c;我们常见的PO当中的这些成员变量的数据类型&#xff0c;它都有对应的处理器&#xff0c;因此它就能自动实现这些Java数据类型与数据库类型的相互转换。 它里面还有一个叫EnumOrdinalTypeHandler的枚举处理器&#xff0…

力扣2454. 下一个更大元素 IV

力扣2454. 下一个更大元素 IV 题目 题目解析及思路 题目要求对于每个数&#xff0c;找到右边比它大的第二个数&#xff0c;并记录在ans数组中 如果是右边第一个大的&#xff0c;就用一个递减栈即可&#xff0c;栈顶元素如果<当前元素则弹出 第二个大数就要利用弹出的栈顶…

unity学习51:所有UI的父物体:canvas画布

目录 1 下载资源 1.1 在window / Asset store下下载一套免费的UI资源 1.2 下载&#xff0c;导入import 1.3 导入后在 project / Asset下面可以看到 2 画布canvas&#xff0c;UI的父物体 2.1 创建canvas 2.1.1 画布的下面是 event system是UI相关的事件系统 2.2 canvas…

Ollama部署与常用命令

Ollama是一款开源工具&#xff0c;其目标是简化大语言模型在本地环境的部署和使用。它支持多种流行的开源大语言模型&#xff0c;如 Llama 2、Qwen2.5等。 通过Ollama&#xff0c;用户无需具备深厚的技术背景&#xff0c;就能在普通的消费级硬件上快速搭建一个强大的语言处理环…

Visual Studio Code 远程开发方法

方法1 共享屏幕远程控制&#xff0c;如 to desk, 向日葵 &#xff0c;像素太差&#xff0c;放弃 方法2 内网穿透 ssh 第二个方法又很麻烦&#xff0c;尤其是对于 windows 电脑&#xff0c;要使用 ssh 还需要额外安装杂七杂八的东西&#xff1b;并且内网穿透服务提供商提供的…

C语言预编译

大家好&#xff0c;这里是小编的博客频道 小编的博客&#xff1a;就爱学编程 很高兴在CSDN这个大家庭与大家相识&#xff0c;希望能在这里与大家共同进步&#xff0c;共同收获更好的自己&#xff01;&#xff01;&#xff01; 本文目录 引言正文一、预处理的作用与流程&#xf…

汽车智能制造企业数字化转型SAP解决方案总结

一、项目实施概述 项目阶段划分&#xff1a; 蓝图设计阶段主数据管理方案各模块蓝图设计方案下一阶段工作计划 关键里程碑&#xff1a; 2022年6月6日&#xff1a;项目启动会2022年12月1日&#xff1a;系统上线 二、总体目标 通过SAP实施&#xff0c;构建研产供销协同、业财一…

flowable-ui 的会签功能实现

场景&#xff1a;在进行智慧保时通开发时&#xff0c;有个协作合同入围功能&#xff0c;这个功能的流程图里有个评审小组&#xff0c;这个评审小组就需要进行会签操作&#xff0c;会签完成后&#xff0c;需要依据是否有不通过的情况选择下一步走的流程 思考步骤&#xff1a; 首…

大连指令数据集的创建--数据收集与预处理_02

1.去哪儿爬虫 编程语言&#xff1a;Python爬虫框架&#xff1a;Selenium&#xff08;用于浏览器自动化&#xff09;解析库&#xff1a;BeautifulSoup&#xff08;用于解析HTML&#xff09; 2.爬虫策略 目标网站&#xff1a;去哪儿&#xff08;https://travel.qunar.com/trav…

STM32MP157A-FSMP1A单片机移植Linux系统SPI总线驱动

SPI总线驱动整体上与I2C总线驱动类型&#xff0c;差别主要在设备树和数据传输上&#xff0c;由于SPI是由4根线实现主从机的通信&#xff0c;在设备树上配置时需要对SPI进行设置。 原理图可知&#xff0c;数码管使用的SPI4对应了单片机上的PE11-->SPI4-NSS,PE12-->SPI4-S…

java医院多维度综合绩效考核源码,医院绩效管理系统,支持一键核算和批量操作,设有审核机制,允许数据修正

医院绩效考核管理系统&#xff0c;java医院绩效核算系统源码&#xff0c;采用多维度综合绩效考核的形式&#xff0c;针对院内实际情况分别对工作量、KPI指标、科研、教学、管理等进行全面考核。医院可结合实际需求&#xff0c;对考核方案中各维度进行灵活配置&#xff0c;对各维…