Python 实现的风控系统(使用了kafka、Faust、模拟drools、redis、分布式数据库)

news2024/10/24 10:56:14

以下是一个使用 Python 实现的风控系统示例,涵盖以下技术组件:

  1. Kafka 消息中间件:用于实时接收支付业务系统传递的交易数据。
  2. Faust(Kafka Streams 的 Python 等价):用于流式处理 Kafka 中的消息。
  3. 规则引擎:使用 Python 实现简单的规则评估逻辑,模拟 Drools 的功能。
  4. Redis 内存数据库:用于存储风险标签,快速获取账户的风险级别。
  5. 分布式数据库:使用 SQLite 模拟,从中获取风险标签数据(当 Redis 中没有时)。

我们将构建一个简单的风控系统,流程如下:

  • 从 Kafka 中消费实时交易数据。
  • 从 Redis 获取对应的风险标签,如果没有则从分布式数据库获取并更新到 Redis。
  • 使用规则引擎对交易数据和风险标签进行评估。
  • 将评估结果返回给支付业务系统或记录下来。
  • 实时交易模块:接收交易数据 ——> 获取风险标签(Redis) ——> 调用规则引擎 ——> 评估结果返回
          ↓                                           ↓                          ↑
    规则引擎模块:交易数据 + 风险标签 ---> 规则执行 ----> 输出评估结果(通过/拒绝)
    

     

项目结构和依赖

1. 项目结构

risk_control_demo/
├── app.py                      # 主应用程序
├── models.py                   # 数据模型定义
├── rules.py                    # 规则引擎逻辑
├── database.py                 # 数据库服务类
├── redis_service.py            # Redis 服务类
├── requirements.txt            # 项目依赖
└── producer.py                 # Kafka 生产者,发送测试数据

2. 项目依赖(requirements.txt)

faust==1.10.4
redis==4.5.5
aiokafka==0.7.2
sqlite3==0.0.1

安装依赖

pip install -r requirements.txt

详细代码

1. models.py(数据模型定义)
# models.py
from dataclasses import dataclass

@dataclass
class Transaction:
    transaction_id: str
    account_id: str
    amount: float
    timestamp: float

@dataclass
class RiskTag:
    account_id: str
    risk_level: int  # 1-低风险, 2-中风险, 3-高风险
2. database.py(数据库服务类)
# database.py
import sqlite3
from models import RiskTag

class DatabaseService:
    def __init__(self):
        # 连接 SQLite 数据库,内存模式
        self.conn = sqlite3.connect(':memory:')
        self.initialize_database()

    def initialize_database(self):
        cursor = self.conn.cursor()
        # 创建风险标签表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS risk_tags (
                account_id TEXT PRIMARY KEY,
                risk_level INTEGER
            )
        ''')
        # 插入示例数据
        cursor.execute('''
            INSERT INTO risk_tags (account_id, risk_level) VALUES ('account123', 2)
        ''')
        self.conn.commit()

    def get_risk_tag(self, account_id):
        cursor = self.conn.cursor()
        cursor.execute('SELECT risk_level FROM risk_tags WHERE account_id = ?', (account_id,))
        result = cursor.fetchone()
        if result:
            return RiskTag(account_id, result[0])
        else:
            return None

    def close(self):
        self.conn.close()

3. redis_service.py(Redis 服务类)

# redis_service.py
import redis
from models import RiskTag

class RedisService:
    def __init__(self, host='localhost', port=6379):
        self.redis_client = redis.Redis(host=host, port=port, decode_responses=True)

    def get_risk_tag(self, account_id):
        risk_level = self.redis_client.get(f'risk:{account_id}')
        if risk_level:
            return RiskTag(account_id, int(risk_level))
        return None

    def set_risk_tag(self, risk_tag):
        self.redis_client.set(f'risk:{risk_tag.account_id}', risk_tag.risk_level)

    def close(self):
        self.redis_client.close()

 4. rules.py(规则引擎逻辑)

# rules.py
from models import Transaction, RiskTag

class RiskEvaluator:
    def evaluate(self, transaction: Transaction, risk_tag: RiskTag) -> bool:
        """
        返回 True 表示交易存在风险,需要阻止。
        返回 False 表示交易安全,可以通过。
        """
        # 高风险交易规则
        if transaction.amount > 10000 and risk_tag.risk_level == 3:
            print(f"检测到高风险交易:{transaction}")
            return True  # 阻止交易

        # 中风险交易规则
        if 5000 < transaction.amount <= 10000 and risk_tag.risk_level >= 2:
            print(f"检测到中风险交易:{transaction}")
            return True  # 阻止交易

        # 低风险交易规则
        print(f"交易通过:{transaction}")
        return False  # 允许交易

5. app.py(主应用程序)

# app.py
import faust
import asyncio
import json
from models import Transaction, RiskTag
from database.py import DatabaseService
from redis_service import RedisService
from rules import RiskEvaluator

# 定义 Faust 应用
app = faust.App(
    'risk_control_app',
    broker='kafka://localhost:9092',
    value_serializer='raw',
)

# 定义 Kafka 主题
transaction_topic = app.topic('transaction_topic')

# 初始化服务
redis_service = RedisService()
database_service = DatabaseService()
risk_evaluator = RiskEvaluator()

@app.agent(transaction_topic)
async def process_transaction(stream):
    async for event in stream:
        try:
            # 解析交易数据
            data = json.loads(event)
            transaction = Transaction(
                transaction_id=data['transaction_id'],
                account_id=data['account_id'],
                amount=data['amount'],
                timestamp=data['timestamp']
            )

            # 从 Redis 获取风险标签
            risk_tag = redis_service.get_risk_tag(transaction.account_id)
            if not risk_tag:
                # 如果 Redis 中没有,从数据库获取并更新到 Redis
                risk_tag = database_service.get_risk_tag(transaction.account_id)
                if risk_tag:
                    redis_service.set_risk_tag(risk_tag)
                else:
                    # 如果数据库中也没有,设定默认风险标签
                    risk_tag = RiskTag(transaction.account_id, 1)

            # 使用规则引擎进行风险评估
            is_risky = risk_evaluator.evaluate(transaction, risk_tag)

            # 根据评估结果进行处理
            if is_risky:
                print(f"交易 {transaction.transaction_id} 存在风险,执行阻止操作")
                # TODO: 将结果返回给支付业务系统,阻止交易
            else:
                print(f"交易 {transaction.transaction_id} 安全,允许通过")
                # TODO: 将结果返回给支付业务系统,允许交易
        except Exception as e:
            print(f"处理交易时发生错误:{e}")

if __name__ == '__main__':
    app.main()

注释:

  • 使用 Faust 定义 Kafka Streams 应用程序,处理 transaction_topic 中的消息。
  • process_transaction 函数中,逐条处理交易数据。
  • 从 Redis 获取风险标签,如果没有则从数据库获取并更新到 Redis。
  • 使用自定义的 RiskEvaluator 进行风险评估,根据评估结果执行相应的操作

6. producer.py(Kafka 生产者,发送测试数据)

# producer.py
from kafka import KafkaProducer
import json
import time

producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# 创建示例交易数据
transaction_data = {
    'transaction_id': 'tx1001',
    'account_id': 'account123',
    'amount': 12000.0,
    'timestamp': time.time()
}

# 发送交易数据到 Kafka
producer.send('transaction_topic', transaction_data)
producer.flush()
print(f"已发送交易数据:{transaction_data}")
producer.close()

运行示例

1. 启动必要的服务

注意事项


总结

上述示例提供了一个基本的 Python 程序框架,演示了如何将 Kafka、Faust、Redis、规则引擎和分布式数据库集成在一起,完成实时风控的基本功能。您可以根据具体的业务需求和技术环境,对程序进行扩展和优化。

扩展建议:

  • Redis:确保 Redis 服务在本地的 6379 端口运行

  • redis-server
    

    Kafka:确保 Kafka 服务在本地的 9092 端口运行,并创建主题 transaction_topic

  • # 启动 Zookeeper
    zookeeper-server-start.sh config/zookeeper.properties
    # 启动 Kafka
    kafka-server-start.sh config/server.properties
    # 创建主题
    kafka-topics.sh --create --topic transaction_topic --bootstrap-server localhost:9092
    

    2. 运行应用程序

  • 启动风控系统(app.py):

  • python app.py worker -l info
    

    运行 Kafka 生产者,发送交易数据(producer.py):

  • python producer.py
    

    3. 预期输出

    风控系统将处理交易数据,使用规则引擎进行评估,并根据规则打印评估结果。例如:

  • 检测到高风险交易:Transaction(transaction_id='tx1001', account_id='account123', amount=12000.0, timestamp=...)
    交易 tx1001 存在风险,执行阻止操作
    

    说明

  • Faust:Python 的流式处理库,类似于 Kafka Streams,用于处理 Kafka 中的消息流。
  • 规则引擎:使用 Python 自定义规则评估逻辑,模拟 Drools 的功能。
  • Redis:作为缓存,存储风险标签,快速获取账户的风险级别。
  • 分布式数据库(SQLite 模拟):当 Redis 中没有风险标签时,从数据库获取,并更新到 Redis。
  • 风险标签:简单地使用风险级别(1-低风险,2-中风险,3-高风险)来表示。
  • 异常处理:在实际应用中,需要更完善的异常处理机制,防止因异常导致程序崩溃。
  • 引入异步 Redis 客户端:使用 aioredis 提升 Redis 操作的性能。
  • 使用真正的分布式数据库:替换 SQLite,使用例如 PostgreSQL、MySQL 等数据库,并配置集群模式。
  • 完善规则引擎:使用现有的 Python 规则引擎库(如 durable_rulesexperta)实现更复杂的规则逻辑。
  • 添加日志和监控:集成日志系统和监控工具,便于维护和故障排查。
    • 性能优化:对于高并发场景,需要考虑异步 I/O、连接池等技术优化性能。
    • 配置管理:将硬编码的配置(如主机地址、端口、主题名)提取到配置文件或环境变量中,便于管理和修改。
    • 安全性:在生产环境中,注意保护敏感信息,确保数据传输和存储的安全。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2222334.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

arp代答观察

文章目录 代答和代理简述实验前提先不开启proxy代答的配置开启代答总结 代答和代理简述 ARP&#xff08;地址解析协议&#xff09;是在局域网中用于将IP地址映射到MAC地址的协议。在理解 ARP 代答和 ARP 代理之前&#xff0c;让我们先澄清一下 ARP 的基本工作原理。 ARP 代答&…

标题PLSQL 里面怎么在文件窗口下 ,创建文件夹,并做好常用sql语句的分类

标题PLSQL 里面怎么在文件窗口下 &#xff0c;创建文件夹&#xff0c;并做好常用sql语句的分类&#xff1f; 效果如图&#xff1a; 标题打开plsql,找到文件窗口 找到&#xff0c;窗口下的这个类似文件夹带扳手的这个图标&#xff0c;打开&#xff0c; 打开后&#xff0c;定位…

十一、pico+Unity交互开发教程——手指触控交互(Poke Interaction)

一、XR Poke Interactor 交互包括发起交互的对象&#xff08;Interactor&#xff09;和可被交互的对象&#xff08;Interactable&#xff09;。XR Interaction Toolkit提供了XR Poke Interactor脚本用于实现Poke功能。在LeftHand Controller和RightHand Controller物体下创建名…

LeetCode做题笔记第202题:快乐数

题目描述 编写一个算法来判断一个数 n 是不是快乐数。 「快乐数」 定义为&#xff1a; 1.对于一个正整数&#xff0c;每一次将该数替换为它每个位置上的数字的平方和。 2.然后重复这个过程直到这个数变为 1&#xff0c;也可能是 无限循环 但始终变不到 1。 3.如果这个过程 结…

「C/C++」C++ STL容器库 之 std::set 唯一键的集合容器

✨博客主页何曾参静谧的博客&#x1f4cc;文章专栏「C/C」C/C程序设计&#x1f4da;全部专栏「VS」Visual Studio「C/C」C/C程序设计「UG/NX」BlockUI集合「Win」Windows程序设计「DSA」数据结构与算法「UG/NX」NX二次开发「QT」QT5程序设计「File」数据文件格式「PK」Parasoli…

小鹏汽车股价分析:看涨信号已出现,技术指标显示还有40%的上涨空间

猛兽财经核心观点&#xff1a; &#xff08;1&#xff09;小鹏汽车的股价过去几天有所回落。 &#xff08;2&#xff09;随着需求的上升&#xff0c;该公司的业务发展的还算不错。 &#xff08;3&#xff09;猛兽财经对小鹏汽车股价的技术分析&#xff1a;多头已经将目标指向15…

【通俗理解】Neurosymbolic AI——融合神经网络与符号推理的智慧之力

【通俗理解】Neurosymbolic AI——融合神经网络与符号推理的智慧之力 关键词提炼 #Neurosymbolic AI #神经网络 #符号推理 #感知能力 #逻辑能力 #认知水平 #智慧与力量 第一节&#xff1a;Neurosymbolic AI的类比与核心概念 Neurosymbolic AI就像是给神经网络这位“大力士”…

排序算法 —— 计数排序

目录 1.计数排序的思想 2.计数排序的实现 3.计数排序的分析 时间复杂度 空间复杂度 稳定性 优点 缺点 1.计数排序的思想 顾名思义&#xff0c;计数排序就是通过计数的方式来排序&#xff0c;其基本思想为&#xff1a; 开辟一个计数数组&#xff0c;统计每个数出现的次…

计算机毕业设计Hadoop+大模型在线教育大数据分析可视化 学情分析 课程推荐系统 机器学习 深度学习 人工智能 大数据毕业设计

一、研究背景和意义 “互联网”和大数据带来了网络教育的蓬勃发展&#xff0c;学习分析技术和自适应学习也在近年内得到了重大突破。在线教育是互联网技术与传统教育的结合&#xff0c;是当前中国教育信息化发展最快的领域&#xff0c;而当下最迫切的是有效整合教育资源和互联…

AJAX——使用 fetch 发送 AJAX 请求

1、fetch&#xff08;&#xff09;函数属于全局函数&#xff0c;可以全局调用&#xff0c;返回的结果是一个 promise 对象。 2、语法&#xff1a; 3、参数 本文分享到此结束&#xff0c;欢迎大家评论区相互讨论学习&#xff0c;下一篇继续分享AJAX中同源策略的学习。

ChatGPT实现旅游推荐微信小程序

随着旅游行业的快速发展&#xff0c;个性化推荐已成为提升用户体验的重要手段。通过AI技术&#xff0c;提供一个智能旅游推荐小程序&#xff0c;使用户能够轻松获取定制化的旅行建议。 项目概述 项目目标 开发一个AI旅游推荐小程序&#xff0c;基于用户输入的旅行偏好&#…

Axure中继器单选、多选和重置

亲爱的小伙伴&#xff0c;在您浏览之前&#xff0c;烦请关注一下&#xff0c;在此深表感谢&#xff01; 课程主题&#xff1a;Axure中继器单选、多选和重置 主要内容&#xff1a;根据查询条件&#xff0c;通过单选、多选和重置&#xff0c;从中继器中得到数据 应用场景&…

C++ 二叉树进阶:二叉搜索树

目录 二叉搜索树的概念 二叉搜索树的实现 基本结构 插入 1&#xff0c;当树是空树的时候 2&#xff0c;当树不为空的时候 3&#xff0c;纠正后的代码 查找 删除 1&#xff0c;左为空或右为空 2&#xff0c;左右都不为空 3&#xff0c;删除的完整代码&#xff1a; 二…

hadoop-Zookeeper安装

hadoop-Zookeeper安装 Ububtu18.04安装Zookeeper3.7.1 环境与版本 这里采用的ubuntu18.04环境的基本配置为&#xff1a; hostname 为master 用户名为hadoop 静态IP为 192.168.100.3 网关为 192.168.100.2 防火墙已经关闭 /etc/hosts已经配置全版本下载地址&#xff1a; htt…

Director3D: Real-world Camera Trajectory and 3DScene Generation from Text 论文解读

目录 一、概述 二、相关工作 1、文本到3D生成 2、3DGS 三、Director3D 1、Cinematographer 2、Decorator 3、Detailer 4、Loss 一、概述 该论文提出利用真实世界数据集&#xff0c;设计一个从文本生成真实世界3D场景和自适应相机轨迹的强大的开放世界文本到3D生成框架…

Git使用GUI界面实现任意历史版本对比

首先进入版本历史查看界面 标记某次提交 选择某次提交并和标记的提交对比 可以查看比较结果了&#xff0c;具体到每一个文件每一行代码

鸿蒙HarmonyOS NEXT 5.0开发(2)—— ArkUI布局组件

文章目录 布局Column&#xff1a;从上往下的布局Row&#xff1a;从左往右的布局Stack&#xff1a;堆叠布局Flex&#xff1a;自动换行或列 组件Swiper各种选择组件 华为官方教程B站视频教程 布局 主轴和交叉轴的概念&#xff1a; 对于Column布局而言&#xff0c;主轴是垂直方…

cnn做整图匹配

好像还没有人把cnn在工业机器视觉中使用&#xff0c;我们打破界限&#xff0c;试一试&#xff01; 工业上有很多需求&#xff0c;判断加工产品有还是没有&#xff0c;从前基本上都是使用找斑的方法来判断。 我们可以用cnn代替试试&#xff01; 我们前头cnn最好成绩是&#x…

STM32(二十一):看门狗

WDG&#xff08;Watchdog&#xff09;看门狗&#xff0c;手动重装寄存器的操作就是喂狗。 看门狗可以监控程序的运行状态&#xff0c;当程序因为设计漏洞、硬件故障、电磁干扰等原因&#xff0c;出现卡死或跑飞现象时&#xff0c;看门狗能及时复位程序&#xff0c;避免程序陷入…

免费送源码:Node.JS+Express+MySQL Express 流浪动物救助系统 计算机毕业设计原创定制

摘 要 随着互联网大趋势的到来&#xff0c;社会的方方面面&#xff0c;各行各业都在考虑利用互联网作为媒介将自己的信息更及时有效地推广出去&#xff0c;而其中最好的方式就是建立网络管理系统&#xff0c;并对其进行信息管理。由于现在网络的发达&#xff0c;流浪动物救助系…