审核中台业务数据进审升级之路

news2024/11/27 9:54:31

e8bc06285401a635d92476eeebc4a1ce.gif

本文字数:3850

预计阅读时间:15 分钟

目录

  • 1.背景

    • 1.1. 相关名词介绍

    • 1.2. 审核中台介绍

    • 1.3. 业务痛点介绍

  • 2. 规范化改造

    • 2.1 规范通讯协议

    • 2.2 规范处理流程

  • 3. 自动化改造

    • 3.1 业务接入检测器

    • 3.2 数据自动化流转

    • 3.3 源码示例

  • 4. 总结

1.背景

1.1 相关名词介绍

1.1.1 业务端

业务端是指某个产品线下的具体的一个业务。比如视频产品线细分业务端:举报视频、56视频、搜狐视频、千帆视频;用户产品线细分业务端:用户签名、用户头像、用户头图、用户昵称。评论产品线细分业务端:弹幕抓取、弹幕直播、弹幕加一、弹幕普通、弹幕举报、弹幕热剧、视频评论、头条评。等等目前接入的业务端共计有58个。相同的产品线划分不同的业务端,是为了应用不同的审核策略,比如审核的时效不同,审核的优先级不同,审核标准的严格或宽松的程度不同。

1.1.2 审核中台

业务端生成的业务数据发送给审核中台。审核中台前置有机器审核,机器审核通过后再经过人工审核。审核结束后,审核中台将审核结果发送给业务端。业务端根据审核结果对业务数据做出相应的处理:数据露出或者数据删除。审核中台是为了把控用户生成数据的安全性与合法性。

1.1.3 领域模型

领域模型(Domain Model),是完成从需求分析到面向对象设计的一座桥梁,领域模型是指对需求所涉及的领域的建模,所以也叫业务对象模型,是描述业务用例实现的对象模型。它是对业务角色和业务实体之间应该如何联系和协作以执行业务的一种抽象。下文中提到的业务领域模型是对业务端数据进审的业务流程进行建模的模型。模型名称为:MBusiness,关键属性为inTopic(进审消息topic),inGroup(进审消息消费者group),outTopic(出审消息topic),outGroup(出审消息生产者group),type(业务类型)。(inTopic、outTopic:是每个业务端对应一组。)

1.2 审核中台介绍

众所周知,互联网平台中都存在审核流程,以实现净化网络环境的目的,而在搜狐视频平台中也不例外。搜狐视频审核中台就是这样一个承接了搜狐矩阵大部分的业务审核需求的平台。

审核中台数年来对接了58个业务端,由于各产品线下的新业务频出,每出现一个新的业务端,新的业务端生成的业务数据都需要发送到审核中台以某种审核标准进行审核操作。

d324792405cd9c7ed518d1b52c5d3e6f.png

1.3 业务痛点介绍

接入每一个新的业务端,虽然审核标准不同,但进审的流程都是相似的。共有两大痛点:

痛点1

业务端的数据发送给审核中台有的通过http协议,有的通过mq协议。通讯协议不统一,数据规范也不统一。

痛点2

每接入新的业务端都需要重复的编写相似的进审流程代码,占用了大量的开发成本,并且代码重复率太高,大大的降低了代码的质量,增加了系统的维护成本,对开发和系统运维不友好,不利于系统的健康发展。

2. 规范化改造

2.1 规范通讯协议

由于业务线众多,为实现业务解耦审核中台采用消息中间件的方式对接业务线,本文中使用的是RocketMQ,RocketMQ是一个高可用、高性能、高可靠的分布式消息队列,相对于kafka更适合处理业务系统之间的消息。

2.2 规范处理流程

1)在审核中台中预设了三种类型的标准数据进审流程,包括:视频、专辑、图文。(预设的标准流程如不满足新业务需求,可定制开发新的流程,该三种标注流程目前已经涵盖所有的产品线和业务端。)

2)在审核中台定义了业务领域模型,MBusiness,该领域模型关键属性为inTopic(进审消息topic),inGroup(进审消息消费者group),outTopic(出审消息topic),outGroup(出审消息生产者group),type(业务类型)。

3)定义了数据的流转规范:

a、inTopic为进审topic,在inTopic下业务端作为Producer生产消息,审核中台作为Consumer消费消息。

b、outTopic为出审topic,在outTopic下审核中台作为Producer生产消息,业务端作为Consumer消费消息。

4)当有新业务需要接入时,首先创建业务领域模型,MBusiness,设置好相应的属性,存入数据库中。

5)业务端首先作为消息生产者,向inTopic发送进审消息;然后业务端作为消息消费者,监听outTopic的出审消息。

6)审核中台设置了业务接入检测器,不断扫描是否有新的业务领域模型创建。

7)当扫描到MBusiness时,自动创建进审消息消费者Consumer,拉取inTopic下的消息,根据MBusiness的type按预设的标准数据进审流程(视频、专辑、图文)进行数据进审处理。

8)自动创建出审消息生产者Producer,等待审核结束后负责将审核结果发送到outTopic下。

9) 业务端通过监听outTopic的出审消息,获取审核结果。

3. 自动化改造

3.1 业务接入检测器

实现业务端的数据自动化进审,是依靠业务接入检测器装置完成的。业务接入检测器启动步骤如下:

1、 系统启动后,业务接入检测器立即启动。

2、 业务接入检测器持有2个map,consumerMap:key为inTopic,value为具体的消费者程序。producerMap:key为业务名,value为具体的生产者程序,初始状态,2个map全为空。

3、 业务接入检测器每3分钟读取数据库业务领域模型表所有数据MBusiness。

4、 读取MBusiness的inTopic,如果consumerMap中没有,说明是新业务,则以该inTopic和inGroup创建消费者,以outTopic和outGroup创建生产者,设置type对应的预设数据进审流程等待数据进审处理,并将inTopic与消费者存入consumerMap中,业务名与生产者存入producerMap中。

5、 等待审核结束后,通过业务名从producerMap中获取对应的生产者,发送审核结果。

6、 系统停止时,遍历consumerMap和producerMap获取所有的消费者和生产者,进行生产者和消费者的关停操作。

3.2 数据自动化流转

下面介绍一下进审数据如何自动化流转的:

1、 业务端作为生产者生产进审消息,发送到消息队列inTopic下。

2、 审核中台作为消费者,监听消息队列inTopic下的消息变化。

3、 每监听到inTopic下的新消息,则按预设的数据进审流程进行数据进审处理。

4、 等待审核结束后,通过业务名从producerMap中获取对应的生产者,并发送审核结果到outTopic下。

5、 业务端作为消费者监听outTopic下的消息变化,获取审核结果。

3.3 源码示例

3.3.1自动进审源码示例

下述源码是对业务端自动进审的描述,其中,VideoConsumerCallback、ContentConsumerCallback、PlayListConsumerCallback、BroadlistConsumerCallback是预设的视频、专辑、图文的标准处理流程,如源码所示,bean ConsumerManager创建时,通过init方法从数据库加载已接入审核中台的业务端(businessDao.listValid()),使用InGroup和InTopic自动构建消费者,根据业务的类型按预设的标准处理流程进行处理。并通过flush方法创建了一个定时任务,每隔5分钟检测是否有新的业务端接入,若有则自动构建消费者,根据业务的类型按预设的标准处理流程进行处理。实现了DisposableBean在程序销毁时,通过destroy方法,对消费者bean进行销毁。

@Slf4j
@Component
public class ConsumerManager implements ApplicationRunner, DisposableBean {
    private Map<String, Consumer> consumerMap = Collections.synchronizedMap(new HashMap<>());
    @Resource
    private VideoConsumerCallback videoConsumerCallback;
    @Resource
    private ContentConsumerCallback contentConsumerCallback;
    @Resource
    private PlayListConsumerCallback playListConsumerCallback;
    @Resource
    private BroadlistConsumerCallback broadlistConsumerCallback;
    @Resource
    private MBusinessDao businessDao;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        init();
        flush();
    }

    private void flush() {
        Timer timer = new Timer();
        timer.schedule(new TimerTask() {

            @Override
            public void run() {
                init();
            }

        }, 1000 * 60 * 5L, 1000 * 60 * 5L);
    }

    private void init() {
        List<MBusiness> businessList = businessDao.listValid();
        if (CollectionUtils.isEmpty(businessList)) {
            return;
        }

        for (MBusiness business : businessList) {
            if (consumerMap.containsKey(business.getInTopic())) {
                continue;
            }

            Consumer consumer = initConsumer(business);
            if (consumer == null) {
                continue;
            }

            consumer.start();
            consumerMap.put(business.getInTopic(), consumer);
            log.info("topic:{} group:{} start consumer.", business.getInTopic(), business.getInGroup());
        }

    }

    @Override
    public void destroy() throws Exception {
        if (consumerMap.isEmpty()) {
            return;
        }
        for (Consumer consumer : consumerMap.values()) {
            consumer.shutdown();
        }
    }

    private Consumer initConsumer(MBusiness business) {
        if (StringUtils.isBlank(business.getInGroup()) || StringUtils.isBlank(business.getInTopic())) {
            return null;
        }

        Consumer consumer = new Consumer(business.getInGroup(), business.getInTopic());
        if (EnterType.VIDEO.getType().equals(business.getEnterType())) {
            consumer.setConsumerCallback(videoConsumerCallback);
        }
        if (EnterType.CONTENT.getType().equals(business.getEnterType())) {
            consumer.setConsumerCallback(contentConsumerCallback);
        }
        if (EnterType.PLAYLIST.getType().equals(business.getEnterType())) {
            consumer.setConsumerCallback(playListConsumerCallback);
        }
        if (EnterType.BROADLIST.getType().equals(business.getEnterType())) {
            consumer.setConsumerCallback(broadlistConsumerCallback);
        }

        consumer.setPullThresholdForQueue(1);
        return consumer;
    }

    public Set<String> listTopic() {
        return consumerMap.keySet();
    }

    public Consumer getConsumer(String topic) {
        return consumerMap.get(topic);
    }
}

3.3.2自动出审源码示例

下述源码是对业务端自动出审的描述,其中,bean ProducerManager创建时,通过init方法从数据库加载已接入审核中台的业务端(businessDao.listValid()),使用OutGroup和OutTopic自动构建生产者,并通过flush方法创建了一个定时任务,每隔3分钟检测是否有新的业务端接入,若有则自动构建生产者,用于审核结束时通过消息将审核结果下发到业务端。

@Slf4j
@Component
public class ProducerManager implements ApplicationRunner, DisposableBean {
    private Map<String, Producer> producerMap = Collections.synchronizedMap(new HashMap<>());
    private Map<String, Producer> producerTypeMap = Collections.synchronizedMap(new HashMap<>());
    private Map<String, String> webhookMap = Collections.synchronizedMap(new HashMap<>());

    @Resource
    private MBusinessDao businessDao;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        init();
        flush();
    }

    private void flush() {
        Timer timer = new Timer();
        timer.schedule(new TimerTask() {

            @Override
            public void run() {
                init();
            }

        }, 1000 * 10 * 3L, 1000 * 60 * 3L);
    }

    private void init() {
        List<MBusiness> businessList = businessDao.listValid();
        if (CollectionUtils.isEmpty(businessList)) {
            return;
        }

        for (MBusiness business : businessList) {
            if (StringUtils.isNotBlank(business.getOutWebhook()) && !webhookMap.containsKey(business.getType())) {
                webhookMap.put(business.getType(), business.getOutWebhook());
            }

            if (producerMap.containsKey(business.getOutTopic())) {
                if (!producerTypeMap.containsKey(business.getType())) {
                    producerTypeMap.put(business.getType(), producerMap.get(business.getOutTopic()));
                    log.info("business type:{} topic:{} group:{} init producer.", business.getType(), business.getOutTopic(), business.getOutGroup());
                }

                continue;
            }

            Producer producer = initProducer(business);
            if (producer == null) {
                continue;
            }

            producer.start();
            producerMap.put(business.getOutTopic(), producer);
            producerTypeMap.put(business.getType(), producer);
            log.info("business type:{} topic:{} group:{} init producer.", business.getType(), business.getOutTopic(), business.getOutGroup());
        }
    }

    @Override
    public void destroy() throws Exception {
        if (producerMap.isEmpty()) {
            return;
        }
        for (Producer producer : producerMap.values()) {
            producer.shutdown();
        }
    }

    public Producer initProducer(MBusiness business) {
        if (StringUtils.isBlank(business.getOutGroup()) || StringUtils.isBlank(business.getOutTopic())) {
            return null;
        }

        return new Producer(business.getOutGroup(), business.getOutTopic());
    }

    public String getWebhook(String type) {
        if (StringUtils.isBlank(type)) {
            return null;
        }
        return webhookMap.get(type);
    }

    public Producer getProducer(String type) {
        if (StringUtils.isBlank(type)) {
            return null;
        }
        return producerTypeMap.get(type);
    }

}

4 .总结

通过规范通讯协议,规范进审数据格式、规范数据进审流程,将各个产品线下的多种多样的业务端抽象为视频、专辑、图文三种类型,分别预设了标准化的数据进审流程;并设计了业务接入检测器装置,自动化的检测新业务接入并自动处理业务数据的进审,避免了新的业务端接入时的重复开发工作,极大的提高了开发效率。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/180690.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

计算机图形学基础教程(Visual C++版)习题解答与编程实践(第2版)孔令德1-到第3章的直线扫描转换

1-到第3章的直线扫描转换&#xff08;没更新完&#xff09;习题1知识积累习题2知识点映射模式使用GDI对象习题3知识积累直线的中点Bresenham算法习题1 1.计算机图形学的定义是什么?说明计算机图形学、图像处理和模式识别之间的关系。 答&#xff1a; CG是计算机图形学的缩写。…

实验一、旅馆客户服务呼叫显示系统

实验一 旅馆客户服务呼叫显示系统 实验目的 综合应用数字电子技术知识&#xff0c;按照要求设计并完成一个小规模的数字电路系统。进行硬件线路的设计、仿真、焊接、调试与实现。使系统实现一种用于旅馆客户服务呼叫显示系统的实用电路。在呼叫过程中&#xff0c;当8位旅客有…

Spark Core ---- RDD持久化

RDD的数据是过程数据 RDD之间进行相互迭代计算&#xff08;Transformation的转换&#xff09;&#xff0c;当执行开启后&#xff0c;新RDD的生成&#xff0c;代表老RDD的消失 RDD的数据是过程数据&#xff0c;只在处理的过程中存在&#xff0c;一旦处理完成&#xff0c;就不见…

【数据结构和算法】实现带头双向循环链表(最复杂的链表)

前文&#xff0c;我们实现了认识了链表这一结构&#xff0c;并实现了无头单向非循环链表&#xff0c;接下来我们实现另一种常用的链表结构&#xff0c;带头双向循环链表。如有仍不了解单向链表的&#xff0c;请看这一篇文章(7条消息) 【数据结构和算法】认识线性表中的链表&…

Spring Boot之SpringSecurity学习

文章目录一 SpringSecurity简介二 实战演示0. 环境 介绍1. 新建一个初始的springboot项目2. 导入thymeleaf依赖3. 导入静态资源4. 编写controller跳转5. 认证和授权6. 权限控制和注销7. 记住登录8. 定制登录页面三 完整代码3.1 pom配置文件3.2 RouterController.java3.3 Securi…

那些面试官口中常常提到b树(MySQL索引底层数据结构)

各种常见树1.树的基本概念2.二叉树3.b树4.b树5.b树与b树的对比5.MySQL索引底层数据结构1.树的基本概念 树的特点&#xff1a;有一个树根&#xff0c;树根上又有很多枝干&#xff0c;枝干上又有很多树枝&#xff0c;树枝上又有很多叶子 树最为一种数据结构也有相似特点 树是一个…

【计算机网络(考研版)】第二站:物理层(一)

前言 如下图所示&#xff0c;这是我们之前所说的数据流动示意图 我们将按照从下向上的结构进行学习。这一讲学习第一层物理层。物理层关注在一条通信信道上传输原始比特&#xff0c;即无论面对什么样的传输介质(有线或者无线)都可以传输比特流&#xff0c;物理层的作用正是要尽…

Python3 函数

函数是组织好的&#xff0c;可重复使用的&#xff0c;用来实现单一&#xff0c;或相关联功能的代码段。 函数能提高应用的模块性&#xff0c;和代码的重复利用率。你已经知道Python提供了许多内建函数&#xff0c;比如print()。但你也可以自己创建函数&#xff0c;这被叫做用户…

Node require 正解

require 实现原理 流程概述 步骤1&#xff1a;尝试执行代码require("./1"). 开始调用方法require.步骤2&#xff1a;此时会得到filename&#xff0c;根据filename 会判断缓存中是否已经加载模块&#xff0c;如果加载完毕直接返回&#xff0c;反之继续执行步骤3&…

JavaScript 的数据类型

JavaScript 的数据类型 基本数据类型&#xff08;值类型&#xff09; Number&#xff08;包含小数、整数、负数、科学计数法&#xff09; <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta http-equiv"…

【Linux】六、Linux 基础IO(四)|动态库和静态库

目录 十一、动态库和静态库 11.1 动态库和静态库定义 11.2 动静态库的基本原理 11.3 静态库的打包与使用 11.3.1 静态库的打包 11.3.2 静态库的使用 11.4 动态库的打包与使用 11.4.1 动态库的打包 11.4.2 动态库的使用 11.5 动态库的加载 十一、动态库和静态库 11.1…

CB2-2CARD的openSUSE安装NAS环境配置

CB2-2CARD的openSUSE安装&NAS环境配置1. 简介2. 规格3. 系统安装3.1 Linux/Unix稳定镜像3.2 基础功能更新&安装3.2.1 更新源3.2.2 升级系统3.2.3 基础功能安装3.3 OpenSUSE系统情况3.3.1 源操作命令3.3.2 源镜像4. 需求 & 配置4.1 MiniDLNAStep 1&#xff1a;安装M…

Day870.全局锁和表锁 -MySQL实战

全局锁和表锁 Hi&#xff0c;我是阿昌&#xff0c;今天学习记录的是关于全局锁和表锁的内容。 数据库锁设计的初衷是处理并发问题。 作为多用户共享的资源&#xff0c;当出现并发访问的时候&#xff0c;数据库需要合理地控制资源的访问规则。锁就是用来实现这些访问规则的重…

数据结构 | C++ | 并查集原理讲解与模拟实现 | 并查集的相关习题

文章目录前言并查集原理并查集的模拟实现leetcode练习省份数量等式方程的可满足性前言 并查集通常会作为高阶数据结构的一个子结构使用&#xff0c;虽然原理不是很难&#xff0c;但其思想值得我们好好学习 并查集原理 并查集是一种树形结构&#xff0c;其保存了多个集合&…

【Maven】多环境配置与应用

目录 1. 多环境配置作用 问题导入 2. 多环境配置步骤 2.1 定义多环境 2.2 使用多环境&#xff08;构建过程&#xff09; 3. 跳过测试&#xff08;了解&#xff09; 问题导入 3.1 应用场景 3.2 跳过测试命令 3.3 细粒度控制跳过测试 1. 多环境配置作用 问题导入 多…

LeetCode 2331. 计算布尔二叉树的值

给你一棵 完整二叉树 的根&#xff0c;这棵树有以下特征&#xff1a; 叶子节点 要么值为 0 要么值为 1 &#xff0c;其中 0 表示 False &#xff0c;1 表示 True 。 非叶子节点 要么值为 2 要么值为 3 &#xff0c;其中 2 表示逻辑或 OR &#xff0c;3 表示逻辑与 AND 。 计算…

【推荐系统】User-Item CF:NGCF

&#x1f4a1; 本次解读的文章是 2019 年发表于 SIGIR 的一篇基于图卷积神经网络的用户物品协同过滤推荐算法论文&#xff0c; 论文将用户-物品交互信息建模为二分图&#xff0c;提出了一个基于二分图的推荐框架 Neural Graph Collaborative Filtering&#xff08;NGCF&#xf…

基于nodejs+vue的社区问答网站与设计

目 录 摘要 I Abstract II 1 绪论 1 1.1 选题背景 1 1.2 选题意义 1 1.3 研究内容 2 2 相关技术介绍 3 3 系统分析 5 3.1可行性分析 5 3.2 需求分析 5 3.2.1非功能性需求 5 3.2.2功能需求 6 3.3 系统用例 6 3.3.1 会员功能需求 6 …

【C++修炼之路】13. priority_queue及仿函数

每一个不曾起舞的日子都是对生命的辜负 stack&&queue一 . priority_queue介绍二. priority_queue的使用三. 仿函数3.1 仿函数的介绍3.2 仿函数的好处四.priority_queue模拟实现五.仿函数之日期比较一 . priority_queue介绍 priority_queue文档介绍 优先队列是一种容器…

机器学习实战(第二版)读书笔记(2)—— LSTMGRU

刚接触深度学习半年的时间&#xff0c;这期间有专门去学习LSTM &#xff0c;这几天读机器学习实战这本书的时候又遇到了&#xff0c;感觉写的挺好的&#xff0c;所以准备结合本书写一下总结方便日后回顾。如有错误&#xff0c;欢迎批评指正。 一、LSTM 优势&#xff1a;可在一…