电商项目之海量操作日志的实现

news2025/2/23 13:46:11

在这里插入图片描述

文章目录

  • 1 问题背景
  • 2 前言
  • 3 思考
  • 4 解决思路
  • 5 交互
  • 6 工作原理
  • 7 伪代码实现
    • 7.1 安装并配置Canal Server
    • 7.2 Canal客户端拉取MQ消息
    • 7.3 Canal数据的转换
    • 7.4 定制自己的业务逻辑

1 问题背景

有时候客户做了某些操作却不认账,咱们又拿不出证据;有时候客户将账号授权给了别人,客户想要知道期间别人做了什么操作;有些重要操作需要记录起来,等等。以上这些情况都需要我们实现一个操作日志的功能,使用技术的手段获取控制权,使自己处于一个可进可退的处境,证据完全掌握在自己手中。

2 前言

  1. 本文阐述的是企业级电商解决方案,非自学项目或者八股文或者小demo,解决方案中的任何细节都是站在企业级开发的角度去思考的。解决方案都会尽量从可读性、易用性、高扩展性、统一管理性去考虑。
  2. 操作日志讲述的是在B端的任何操作都需要持久化起来,非C端

3 思考

  1. 怎么实现?每个业务逻辑都写一个持久化操作吗?这样不可行,不够统一,容易忘记写,企业级开发是有很多开发者协同开发的。而且业务也复杂,在业务逻辑处写会降低可读性和扩展性。
  2. 有没有什么组件可以自主感知到每一次数据库操作,然后就可以做自己任意想要做的持久化了。

4 解决思路

使用阿里的开源组件Canal,Canal充当一个MySQL的Slave,订阅binlog。canal有很多种模式,笔者此处使用RocketMQ模式。详情可以了解Canal在Github上介绍。

5 交互

从Canal的github上拿的图,这个交互是最清晰明了的:
在这里插入图片描述

6 工作原理

先了解MySQL主备复制原理:

在这里插入图片描述

  • Master将数据变更写入二进制日志(binary log,其中记录叫做二进制日志事件binary log events),可以通过show binlog events查看
  • Slave将Master的binary log拷贝到它的中继日志(relay log)
  • Slave重放relay log中事件,将数据变更反映它自己的数据

Canal工作原理

  • Canal模拟MySQL Slave协议,将自己伪装成Slave,向MySQL Master发送dump协议
  • MySQL Master收到dump请求,开始推送binary log给Slave(即Canal)
  • Canal解析binary log(原始是byte数据流)

7 伪代码实现

  1. 此处简单阐述如何用SpringBoot整合Canal,Canal模式使用RocketMQ。
  2. canal 1.1.1版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ

7.1 安装并配置Canal Server

详情见官方操作文档

instance配置表中,要配置你想做操作日志的数据库表以及过滤字段的名单,如下所示:

canal.instance.filter.regex=product\\..*,user\\..*,order\\..*
canal.instance.filter.black.field=product.t_product:product_short_desc/product_long_desc/special_return_desc,user.t_user:description,order.t_order:order_desc/order_remark
canal.mq.topic=canal_shoplus_log

canal.instance.filter.regex表示要对哪些库的哪些表做操作日志的实现;canal.instance.filter.black.field表示无需做操作日志实现的字段
canal.mq.topic表示Canal将binary log数据发到MQ的哪个topic

7.2 Canal客户端拉取MQ消息

笔者采用了Canal的MQ模式,因此Canal Server会将MySQL的binary log丢给MQ,我们需要做的是拉取MQ消息来消费,做一个持久化就能实现操作日志了。

canal.client下有对应的MQ数据消费的样例工程,包含数据编解码的功能

  • kafka模式: com.alibaba.otter.canal.example.kafka.CanalKafkaClientExample
  • rocketMQ模式: com.alibaba.otter.canal.example.rocketmq.CanalRocketMQClientExample

由于代码篇幅过长,详情可以看上面贴出的地址,下面给出关键的代码:

List<Message> messages = connector.getListWithoutAck(1000L, TimeUnit.MILLISECONDS);

解释:Message保存的数据变更是二进制格式,需要使用工具类转换成对象才能更方便写业务逻辑,接下来的小节会给出Canal官方的工具类方法。

7.3 Canal数据的转换

由于Canal传输的数据都是二进制的,我们在代码里面一般都是面向对象来写业务逻辑,因此需要将二进制数据转成一个对象结构,Canal有提供工具类,该工具方法是com.alibaba.otter.canal.connector.core.producer.MQMessageUtils#messageConverter,需要下载Canal源码才能找到。下面贴出该工具方法:

/**
     * 将Message转换为FlatMessage
     *
     * @return FlatMessage列表
     * @author agapple 2018年12月11日 下午1:28:32
     */
    public static List<FlatMessage> messageConverter(EntryRowData[] datas, long id) {
        List<FlatMessage> flatMessages = new ArrayList<>();
        for (EntryRowData entryRowData : datas) {
            CanalEntry.Entry entry = entryRowData.entry;
            CanalEntry.RowChange rowChange = entryRowData.rowChange;
            // 如果有分区路由,则忽略begin/end事件
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
                || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            // build flatMessage
            CanalEntry.EventType eventType = rowChange.getEventType();
            FlatMessage flatMessage = new FlatMessage(id);
            flatMessages.add(flatMessage);
            flatMessage.setDatabase(entry.getHeader().getSchemaName());
            flatMessage.setTable(entry.getHeader().getTableName());
            flatMessage.setIsDdl(rowChange.getIsDdl());
            flatMessage.setType(eventType.toString());
            flatMessage.setEs(entry.getHeader().getExecuteTime());
            flatMessage.setTs(System.currentTimeMillis());
            flatMessage.setSql(rowChange.getSql());
            flatMessage.setGtid(entry.getHeader().getGtid());

            if (!rowChange.getIsDdl()) {
                Map<String, Integer> sqlType = new LinkedHashMap<>();
                Map<String, String> mysqlType = new LinkedHashMap<>();
                List<Map<String, String>> data = new ArrayList<>();
                List<Map<String, String>> old = new ArrayList<>();

                Set<String> updateSet = new HashSet<>();
                boolean hasInitPkNames = false;
                for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                    if (eventType != CanalEntry.EventType.INSERT && eventType != CanalEntry.EventType.UPDATE
                        && eventType != CanalEntry.EventType.DELETE) {
                        continue;
                    }

                    Map<String, String> row = new LinkedHashMap<>();
                    List<CanalEntry.Column> columns;

                    if (eventType == CanalEntry.EventType.DELETE) {
                        columns = rowData.getBeforeColumnsList();
                    } else {
                        columns = rowData.getAfterColumnsList();
                    }

                    for (CanalEntry.Column column : columns) {
                        if (!hasInitPkNames && column.getIsKey()) {
                            flatMessage.addPkName(column.getName());
                        }
                        sqlType.put(column.getName(), column.getSqlType());
                        mysqlType.put(column.getName(), column.getMysqlType());
                        if (column.getIsNull()) {
                            row.put(column.getName(), null);
                        } else {
                            row.put(column.getName(), column.getValue());
                        }
                        // 获取update为true的字段
                        if (column.getUpdated()) {
                            updateSet.add(column.getName());
                        }
                    }

                    hasInitPkNames = true;
                    if (!row.isEmpty()) {
                        data.add(row);
                    }

                    if (eventType == CanalEntry.EventType.UPDATE) {
                        Map<String, String> rowOld = new LinkedHashMap<>();
                        for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
                            if (updateSet.contains(column.getName())) {
                                if (column.getIsNull()) {
                                    rowOld.put(column.getName(), null);
                                } else {
                                    rowOld.put(column.getName(), column.getValue());
                                }
                            }
                        }
                        // update操作将记录修改前的值
                        old.add(rowOld);
                    }
                }
                if (!sqlType.isEmpty()) {
                    flatMessage.setSqlType(sqlType);
                }
                if (!mysqlType.isEmpty()) {
                    flatMessage.setMysqlType(mysqlType);
                }
                if (!data.isEmpty()) {
                    flatMessage.setData(data);
                }
                if (!old.isEmpty()) {
                    flatMessage.setOld(old);
                }
            }
        }
        return flatMessages;
    }

7.4 定制自己的业务逻辑

根据前面用工具类方法拿到的List<FlatMessage>就可以写自己的业务逻辑了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/542199.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

入参校验1

文章目录 一、简介1、快速失败(Fail Fast) 二、单字段类入参校验三、JSON实体类校验1、注解解析2、案例1、简单校验2、分组校验3、嵌套校验4、集合校验5、自定义校验 四、相关1、源码文件2、参考地址 一、简介 1、快速失败(Fail Fast) Spring Validation 默认会校验完所有字段…

GPT-4的免费使用方法分享(续)

GPT-4的免费使用方法分享_我爱OJ的博客-CSDN博客 在这篇博客里&#xff0c;我介绍了一些ChatGPT的一些使用方法&#xff0c;但可能有一定的缺陷&#xff0c;有的需要魔法&#xff0c;所以&#xff0c;今天我就来亲测一下&#xff0c;关于ChatGPT的一些免费使用技巧 目录 镜像…

代码随想录算法训练营第九天|KMP算法

记录一下KMP算法&#xff0c;本文摘录自《代码随想录》和部分b站视频帮你把KMP算法学个通透&#xff01;&#xff08;理论篇&#xff09;_哔哩哔哩_bilibili最浅显易懂的 KMP 算法讲解_哔哩哔哩_bilibiliKMP字符串匹配算法2_哔哩哔哩_bilibili KMP算法 主要应用&#xff1a;字…

牛客小白月赛65

题目链接 牛客小白月赛65 A-牛牛去购物&#xff08;枚举&#xff09;B-牛牛写情书&#xff08;字符串&#xff09;C-牛牛排队伍&#xff08;模拟&#xff09;D-牛牛取石子&#xff08;博弈&#xff09;E-牛牛的构造&#xff08;构造&#xff0c;思维&#xff09; A-牛牛去购物…

怎么免费使用 ChatGpt,实用!

最近发现了一个可以免费、轻松使用 ChatGpt 的方法&#xff0c;随即做个记录&#xff0c;留着备忘&#xff0c;以后想用也能随时找到方法。 但是不保证该方法永远有效&#xff0c;仅当下有限&#xff0c;做个记录罢了。 因为我使用的是 windows 自带的浏览器 Microsoft Edge &a…

Android--刷机与adb

目录 一、Android设备启动流程 二、刷机模式介绍 三、Windows命令行 四、adb介绍与配置 五、常用的adb命令 一、Android设备启动流程 Android就是Linux内核(Kernel)Java虚拟机(JVM) Android设备启动就分为两个阶段&#xff1a; Linux启动 1.启动电源以及系统启动&#…

详解c++STL—容器list

目录 1、list基本概念 1.1、概念描述 1.2、结点的组成 1.3、list的优点 1.4、list的缺点 1.5、总结 2、list构造函数 2.1、功能描述 2.2、函数原型 2.3、示例 3、list赋值和交换 3.1、功能描述 3.2、函数原型 3.3、示例 4、list大小操作 4.1、功能描述 4.2、函…

ChatGPT的前世今生——混沌初开

目录 ChatGPT的前世今生——混沌初开ChatCPT简介ChatCPT是什么&#xff1f;ChatCPT的火爆程度ChatCPT火爆的原因1、功能强大&#xff0c;应用范围广泛2、训练数据量大&#xff0c;模型效果好3、优秀的商业模式 OpenAI公司公司创始团队 总结公众号文章链接参考链接&#xff1a; …

03C++类与对象之运算符重载

文章目录 C类与对象之运算符重载与const成员运算符重载赋值运算符重载运算符重载 日期类的实现与运算符重载赋值运算符重载比较类运算符的重载二元运算符-的重载前置和后置重载 总体实现代码const成员const的好处1.防止程序员犯错2.提高代码的复用性 const 成员与函数重载规则 …

Qt文件系统源码分析—第三篇QDir

深度 本文主要分析Windows平台&#xff0c;Mac、Linux暂不涉及 本文只分析到Win32 API/Windows Com组件/STL库函数层次&#xff0c;再下层代码不做探究 本文QT版本5.15.2 类关系图 QTemporaryFile继承QFile QFile、QSaveFile继承QFileDevice QFileDevice继承QIODevice Q…

由浅入深Netty基础知识IO相关

目录 1 stream vs channel2 IO 模型3 零拷贝3.1 传统 IO 问题3.2 NIO 优化 4 AIO4.1 文件 AIO4.2 守护线程4.3 网络 AIO 1 stream vs channel stream 不会自动缓冲数据&#xff0c;channel 会利用系统提供的发送缓冲区、接收缓冲区&#xff08;更为底层&#xff09;stream 仅支…

unity学习遇到的问题:解决VS不能加载Unity脚本,MonoBehaviour是灰色的

电脑出了点问题&#xff0c;然后就重装了&#xff0c;重装之后&#xff0c;从gitee上下载了原来的半截代码&#xff0c;结果发现里面的脚本运行出问题了&#xff0c;仔细一看&#xff0c;MonoBehaviour是灰色的&#xff0c;也就是说&#xff0c;加载不了unity的api了&#xff0…

目标检测复盘 --3. RCNN

RCNN的CNN部分使用AlexNet作为backbone来提取特征&#xff0c;Fast RCNN使用了VGG16来作为backboneRCNN将2000个框送入网络提取特征&#xff0c;Fast RCNN是将图像送入CNN来提取特征得到一个特征图将SS(Selective Search)算法获取的提议框映射到上面的特征图上&#xff0c;获取…

怎么通过ssh连上ipv6的服务器?阿里云怎么配置ipv6?wsl2怎么支持ipv6?

最近在研究ipv6&#xff0c;光调通环境居然让我折腾了好多回&#xff0c;现在终于通了 在这里提一句&#xff0c;IPV6和IPV4是两种东西&#xff0c;不要想着ipv6兼容ipv4&#xff0c;你就当它是全新的东西 1.前置条件 1.1我的电脑能访问ipv6 测试通过就代表你电脑可以访问ip…

Redis 哨兵模式的实现详解

文章目录 高可用&#xff08;HA&#xff09;哨兵模式概述哨兵的搭建伪集群 哨兵1. 复制sentinel.conf文件2. 修改sentinel.conf文件3. 新建sentinel26380.conf4. 启动并关联Redis集群5. 启动Sentinel集群6. 查看 Sentinel 信息7. 查看 Sentinel 配置文件 哨兵优化配置 高可用&…

【腾讯云Finops Crane集训营】降本增效神器Crane实战记录

本章目录 前言一、Crane是什么&#xff1f;Crane的主要功能&#xff1f;FinOps 是什么Prometheus是什么Grafana是什么 二、不得不面对的问题&#xff1a;云上资源效能挑战&#xff01;三、云原生场景下的成本优化挑战&#xff1f;四、K8s原生能力的不足五、Crane智能调度助力成…

Linux命令之vim/vi

目录 vim/vi简介 vi/vim 的使用 操作实例 总结 vim/vi简介 所有的 Unix Like 系统都会内建 vi 文书编辑器&#xff0c;其他的文书编辑器则不一定会存在。但是目前我们使用比较多的是 vim 编辑器。Vim 是从 vi 发展出来的一个文本编辑器。代码补全、编译及错误跳转等方便编程…

i.MX6ULL - 远程视频监控方案实现(nginx-rtmp流媒体服务器、ffmpeg推流)

i.MX6ULL - 远程视频监控配置&#xff08;nginx-rtmp流媒体服务器、ffmpeg推流&#xff09; 目录 i.MX6ULL - 远程视频监控配置&#xff08;nginx-rtmp流媒体服务器、ffmpeg推流&#xff09;1、前言2、buildroot文件系统构建2.1 勾选alsa-utils&#xff08;选做&#xff1a;如果…

桥接模式与NAT模式的区别以及设置静态IP

概述 日常我们都会使用到虚拟机&#xff0c;本文章以VMware虚拟机为例&#xff0c;主要介绍下虚拟机设置桥接模式与NAT模式的区别&#xff0c;并通过示意图进行讲解。并且会介绍如何去设置静态IP。 模式介绍 NAT模式NAT模式下 &#xff0c;创建出来的虚拟机只能访问当前主机…

基于ensp的跨地区的校园网组网方案

本博客是基于模拟器ensp的校园网组网方案&#xff0c;有总校区和分校区&#xff0c;主要用了vlan划分、dhcp、nat、ospf、acl、bgp等技术。首先说一下本博客的局限性&#xff1a; 总校区和分校区之间只是使用的传统的bgp建立连接&#xff0c;这样可以在运营商上看到内网的明细&…