SaaS 电商设计 (五) 私有化部署-实现 binlog 中间件适配

news2024/9/23 9:22:46

一、 背景

  具体的中间件私有化背景在上文 SaaS` 电商设计 (二) 私有化部署-缓存中间件适配 已有做相关介绍.这里具体讨论的场景是通过解析mysql binlog 来实现mysql到其他数据源的同步.具体比如:在电商的解决方案业务流中经常有 ES 的使用场景,用以解决一些复杂的查询和搜索商品的支持以及某些数据分析的场景.那就需要做到 mysql 数据库到 ES 的数据同步.在支持 mysqlES 数据同步的过程中,常用的技术方案有这样几种.

二、 设计主体

2.1 N种方案

方案1: 业务代码成功应答后操作目标数据源写入(本文用ES举例)
在这里插入图片描述

如上第一种方案在业务代码操作数据库, 异步执行 ES 数据同步写入.如:完成商品后写入数据,异步线程开启执行写入 ES 索引录入.

方案2:业务代码成功应答后,发送MQ,利用MQ来保证 ES 写入的最终一致
在这里插入图片描述

在第一种的方案中写入 ES 步骤中可能出现ES 写入失败case. 在方案一基础上为了保证可靠性引入 MQ ,保证在ES操作时出现异常抖动能够通过重试来保证数据的最终一致性.在业务代码中实际操作数据库后发送 MQ ,这边消费 MQ 执行 ES 数据同步.如:完成商品写入数据,发送消息 MQ , MQ consumer 消费写入 ES 索引录入.

方案 3.通过binlog 来实现数据库监听,保证数据同步脱离业务代码控制
在这里插入图片描述

  • 在大部分的场景下方案二完全能够满足业务诉求. 这样的一个方案在具体实施过程中存在两个点.

  • 业务开发的同时需要同步关心数据的同步
    在某种意义上来说,数据的同步并不是业务代码需要去关心的.业务代码永远关心的只是自身的逻辑实现,关注的是产品迭代过程中如何保证业务模型的可持续演进和领域资产沉淀.基于这个原则我的理解是需要把数据的同步从业务代码里进行剥离的.

    • 散落在各个业务代码角落的维护成本
      方案二的场景在很长时间的迭代过程中很可能就将出现这样的情况.商品添加进行商品的 ES 数据更新,门店添加进行门店的 ES 数据更新.诸如此类,长期迭代将得到大量的脚本代码,随着开发人员的更替,不断的迭代和开发.最终可能变成一座岌岌可危的高楼,开发人员小心翼翼的在原来的代码上继续裹上自己这版的裹脚布.维护性和成本指数上升.
      基于此我们尝试着借助 binlog 的这样一个工具来完善第二个方案适应更多索引更新,更加复杂的同步场景.首先 binlog 的形式能够通过仅监听数据库的 binlog 的消息来做到不同数据表数据更新的收口,我们可以在消费消息的入口来定义一个处理的接口,通过表名来进行不同表消费逻辑的实现.很简单就可以做到.一石二鸟做到数据处理的收口以及逻辑代码关于数据同步逻辑的抽离.

    方案4:完美终极方案(抽离技术细节的实现,做到binlog解析的接口和数据同步的接口化.)

在这里插入图片描述

对于第三种方式来说的话,接下来引入了第二个讨论的点.

  • 私有化支持
    就是在去做一些 SaaS 场景的私有化时,咱们再去做数据同步的时候不得不依赖 binlog ,那对于 binlog 的解析常见的工具也比较多.常见的开源的 canal ,各大厂里也有相应的工具,东厂的 DRC (前身binlake),福包厂的精卫.基于此在项目中不得不在这些不同的实现之上完成抽象.这样我们就能够在既支持到内部项目的数据监听,也能够完成项目实施私有化的场景部署.
  • 同步目标逻辑的不同支持
    在上文中我们提到的最多也就是关于 ES 数据的同步,那其实在实际的开发场景可能面临的更多,比如在数据库更新后的准实时缓存刷新,数据库写入商品成功后关于商品新建成功的三方消息同步.等等.同样我们在这个基础实现了一个接口,用来方便具体的使用方来进行具体消息处理.完美.

2.2 方案4 coding落地

2.2.1 类图

在这里插入图片描述
核心步骤:

step1:抽象MessageListener 实现 BinlogListener 完成 binlog 中间件解析发送的 MQ msg 得到反序列化的表数据.内含本次选取的反序列化类型.如:是canal 还是 DRC .
step2:抽象 BinlogClientAdapter 完成反序列化和处理msg接口定义.具体可以有 CanalBinlogAdapter,DrcBinlogClientAdapter实现.
step3:抽象BinlogDataHandler 完成具体表具体操作**(insert,delete,update,query)** 接口定义.具体在接入方进行实现MultiCloundBinLogDataHandler,这样在进行注入时得到具体的实现类,进行具体的实现操作.如:CategoryBinlogDataHandler.

2.2.2 核心实现

BinlogHandlerAdapter 完成 binlog client 接口定义.

package com.baixiu.middleware.binlog.adapter;

import com.baixiu.middleware.binlog.model.BinlogData;
import com.baixiu.middleware.mq.model.CommonMessage;

/**
 * binlog 适配器接口
 * 适配中间件list:canal,jingwei,drc等。
 * function1:完成不同中间件解析能力
 * function2:完成不同中间件handlerMsg能力
 * @author baixiu
 * @date 2023年12月11日
 */
public interface BinlogHandlerAdapter {


    /**
     * 反序列MQMsg To binlogMsg
     * @param mqMsg mqMsg
     * @return
     */
    BinlogData deserializationMQMsg(CommonMessage mqMsg);

    /**
     * 反序列MQMsg To binlogMsg
     * @param mqMsg mqMsg
     * @return
     */
    void handleBinLogData(BinlogData binLogData) throws Exception;


}

CanalBinlogHandlerAdapter 完成 canal 解析

package com.baixiu.middleware.binlog.adapter;

import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.baixiu.middleware.binlog.consts.CommonConsts;
import com.baixiu.middleware.binlog.core.AbstractBinlogHandler;
import com.baixiu.middleware.binlog.core.BinlogTableHandlerRouter;
import com.baixiu.middleware.binlog.enums.CommonRowTypeEnum;
import com.baixiu.middleware.binlog.model.BinlogData;
import com.baixiu.middleware.binlog.model.BinlogDataToDiffModel;
import com.baixiu.middleware.binlog.model.BinlogTableRowDiffModel;
import com.baixiu.middleware.mq.model.CommonMessage;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * canal binlog handler adapter
 * 当property配置的clientType=canal时进行注入bean
 * canal client 用以解析 mq -starter 发送过来的消费消息
 * @author baixiu
 * @date 创建时间 2023/12/11 8:39 PM
 */
@Slf4j
public class CanalBinlogHandlerAdapter implements BinlogHandlerAdapter{

    @Autowired
    private BinlogTableHandlerRouter binlogTableHandlerRouter;

    @Override
    public BinlogData deserializationMQMsg(CommonMessage mqMsg) {
        FlatMessage flatMessage = JSON.parseObject(mqMsg.getText(),FlatMessage.class);

        BinlogData binLogData=new BinlogData ();
        if(flatMessage!=null){
            binLogData.setBinlogDataObject(flatMessage);
        }
        return binLogData;
    }

    @Override
    public void handleBinLogData(BinlogData binLogData) throws Exception {

        if(binLogData==null || binLogData.getBinlogDataObject()==null){
            return;
        }

        FlatMessage flatMessage= (FlatMessage) binLogData.getBinlogDataObject ();
        List<Map<String, String>> rowDatas = flatMessage.getData();
        List<Map<String, String>> oldDatas = flatMessage.getOld();
        String tableName = flatMessage.getTable();
        AbstractBinlogHandler handler = binlogTableHandlerRouter.ALL_TABLE_HANDLERS.get(tableName);


        for (int i = 0; i < rowDatas.size(); i++) {
            Map<String, String> rowData = rowDatas.get(i);
            Map<String, String> oldData = new HashMap<>(i,0.75f);
            if (oldDatas != null && oldDatas.size() == rowDatas.size()) {
                oldData = oldDatas.get(i);
            }
            Map<String, String> fieldsMaps = Maps.newHashMapWithExpectedSize(20);
            BinlogDataToDiffModel binlogDataToDiffModel = transRowDataToAllBinlogData(handler, rowData, oldData
                    , fieldsMaps, flatMessage.getType());

            switch (binlogDataToDiffModel.getCommonRowTypeEnum()) {
                case INSERT:
                    log.info("Canal.handleMessage.binlogTransConfigToMap.INSERT.{}"
                            , JSON.toJSONString(binlogDataToDiffModel.getCommonRowTypeEnum()));
                    handler.insert(binlogDataToDiffModel.getAllFieldMaps(),binlogDataToDiffModel.getBinlogTableRowDiffModels());
                    break;
                case UPDATE:
                    log.info("Canal.handleMessage.binlogTransConfigToMap.UPDATE.{}"
                            ,JSON.toJSONString(binlogDataToDiffModel.getCommonRowTypeEnum()));
                    handler.update(binlogDataToDiffModel.getAllFieldMaps(),binlogDataToDiffModel.getBinlogTableRowDiffModels());
                    break;
                case DELETE:
                    Map<String, String> delMap = getBeforeColumnsFromBinlogData(handler, oldData);
                    log.info("Canal.handleMessage.binlogTransConfigToMap.DELETE");
                    handler.delete(delMap);
                    break;
                default:
                    log.info("CanalBinlogClientAdapter.handleMessage.binlogTransConfigToMap.default.{}"
                            ,JSON.toJSONString(binlogDataToDiffModel.getCommonRowTypeEnum()));
                    break;
            }
        }
    }


    public static BinlogDataToDiffModel transRowDataToAllBinlogData(AbstractBinlogHandler binlogData, Map<String, String> afterColumns
            , Map<String, String> beforeColumns, Map<String, String> fieldsMap, String type) {

        try {

            String[] updateFields = binlogData.getUpdateFields();
            String[] keyFields = binlogData.getFields();

            List<BinlogTableRowDiffModel> changeList = new ArrayList<> ();

            for (String key : afterColumns.keySet()) {

                if (keyFields.length == 1 && ArrayUtils.contains(keyFields, CommonConsts.BINLOG_ALL_FIELDS)) {
                    fieldsMap.put(key, afterColumns.get(key));
                } else if (ArrayUtils.contains(keyFields, key)) {
                    fieldsMap.put(key, afterColumns.get(key));
                }

                if (beforeColumns != null && !beforeColumns.isEmpty() && beforeColumns.get(key) != null) {
                    BinlogTableRowDiffModel bean = new BinlogTableRowDiffModel();
                    bean.setField(key);
                    bean.setAfter(afterColumns.get(key));
                    bean.setBefore(beforeColumns.get(key));
                    if (updateFields.length == 1 && ArrayUtils.contains(updateFields,CommonConsts.BINLOG_ALL_FIELDS)) {
                        changeList.add(bean);
                    } else if (ArrayUtils.contains(updateFields, key)) {
                        changeList.add(bean);
                    }
                }
            }
            BinlogDataToDiffModel data = new BinlogDataToDiffModel(changeList, fieldsMap, CommonRowTypeEnum.transType(type));
            log.info("transRowDataToAllBinlogData.changeList:{}.fieldsMap{}.data{}"
                    ,JSON.toJSONString(changeList), JSON.toJSONString(fieldsMap), JSON.toJSONString(data));
            return data;
        } catch (Exception e) {
            log.error("handleMessage.transRowDataToAllBinlogData.handleMessage.error.{}", JSON.toJSONString(binlogData), e);
        }

        return null;
    }


    /**
     * 删除操作
     * 不同的表需要从binlogData中获取的信息不同,这里抽取
     *
     * @return
     */
    private Map<String, String> getBeforeColumnsFromBinlogData(AbstractBinlogHandler binlogData, Map<String, String> beforeColumns) {

        Map<String, String> keys = new HashMap<>();
        if (beforeColumns != null && !beforeColumns.isEmpty()) {
            String[] keyFields = binlogData.getFields();
            for (String key : beforeColumns.keySet()) {
                // 找出关心的字段值
                if (ArrayUtils.contains(keyFields, key)) {
                    keys.put(key, beforeColumns.get(key));
                }
            }
        }
        return keys;
    }
}

AbstractBinlogHandler 抽象binloghandler 处理类.

package com.baixiu.middleware.binlog.core;

import com.baixiu.middleware.binlog.model.BinlogTableRowDiffModel;
import java.util.List;
import java.util.Map;

/**
 * @author baixiu
 * @date 创建时间 2023/12/12 11:31 AM
 */
public interface AbstractBinlogHandler {

    /**
     * 需要关心的字段。实现后将仅实现的字段值放置于 fieldValues 中
     * @return 监控字段
     */
    String[] getFields();

    /**
     * 需要关心的变更字段。实现后将仅实现的字段值放置于 changeList 中
     * @return 更新字段
     */
    String[] getUpdateFields();

    /**
     * 新增时触发
     * @param fieldValues 唯一字段,用于确定一条数据
     * @param changeList 字段的值发生变化的
     * @throws Exception 业务exception
     */
    void insert(Map<String, String> fieldValues, List<BinlogTableRowDiffModel> changeList) throws Exception;

    /**
     * 数据修改时触发
     * @param fieldValues 实现了getFields接口里得到的字段里的字段以及字段的值
     * @param changeList  字段的值发生变化的
     * @throws Exception 业务exception
     */
    void update(Map<String, String> fieldValues, List<BinlogTableRowDiffModel> changeList) throws Exception;

    /**
     * 删除时触发
     * @param fieldValues 唯一字段,用于确定一条数据
     * @throws Exception 业务exception
     */
    void delete(Map<String, String> fieldValues) throws Exception;

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1313313.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

GD32F4标准外设库

学习目标 了解标准库来源熟悉模板搭建流程掌握在已有模板基础下进行开发学习内容 标准外设库获取 标准固件库获取我们可以从官网进行下载。 下载链接:兆易创新GigaDevice-资料下载兆易创新GD32 MCU 找到 GD32F4xx Firmware Library 这个压缩包 下载完成后,进行解压,解压…

Java医院信息化建设云HIS系统源码

云HIS提供标准化、信息化、可共享的医疗信息管理系统&#xff0c;实现医患事务管理和临床诊疗管理等标准医疗管理信息系统的功能。优化就医、管理流程&#xff0c;提升患者满意度、基层首诊率&#xff0c;通过信息共享、辅助诊疗等手段&#xff0c;提高基层医生的服务能力构建和…

nginx 1.24.0 安装nginx最新稳定版

1.官网&#xff1a; nginx: download 2. 选择稳定版&#xff1a; 3. 可以下载&#xff0c;然后上传服务器&#xff0c;也可以wget获取&#xff1a; cd /home wget https://nginx.p2hp.com/download/nginx-1.24.0.tar.gz 4. 放入/home 下。并解压缩&#xff0c;重命名nginx;…

C之不小心就犯错误1

以为会打印&#xff1a; it is ok 然而并不是&#xff1a; 原因&#xff1a; 根据C语言隐式类型转换的原理&#xff0c;如果是int型与uint型进行比较&#xff08;其它类型同理&#xff09;&#xff0c;则会将int型数据转换为uint型&#xff0c;则-1变成了 2^32-1 429496729…

Window操作系统发展史

引言 当谈及计算机操作系统的丰富历史和多样性时&#xff0c;Windows操作系统无疑是其中的一颗璀璨明星。自1985年首次亮相以来&#xff0c;Windows经历了长足的发展&#xff0c;塑造了计算机使用体验的方方面面。从初始的简单图形用户界面到如今强大而多样的功能&…

服务器被攻击宕机的一些小建议

现在网络攻击屡有发生&#xff0c;任何网站服务器都面临这样的危险&#xff0c;服务器被攻击造成的崩溃宕机是损失是我们无法估量的。网络攻击我们无法预测&#xff0c;但做好防御措施是必须的&#xff0c;建议所有的网站都要做好防范措施&#xff0c;准备相应的防护预案&#…

“Java 已死、前端已凉”?技术变革与编程语言前景:Java和前端的探讨

前端已死话题概论 本文讨论了近期IT圈中流传的“Java 已死、前端已凉”言论。我们审视了这些言论的真实性&#xff0c;并深入探讨了技术行业的演变和新兴技术的出现对编程语言和前端开发的影响。通过分析历史发展、当前趋势和未来展望&#xff0c;我们提供了对这些话题更深层次…

改造哈希表,封装unordered_map和unordered_set

正文开始前给大家推荐个网站&#xff0c;前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站。 unordered_map是存的是pair是K&#xff0c;V型的&#xff0c;而unordered_set是K型的&#xff…

恒创:更换服务器,网站备案会掉吗

随着互联网的普及&#xff0c;越来越多的企业和个人选择通过网站来展示自己的形象和业务。然而&#xff0c;在网站建设和运营过程中&#xff0c;更换服务器是一个常见的问题。那么&#xff0c;更换服务器后&#xff0c;网站备案是否会受到影响呢&#xff1f; 网站备案是为了保…

docker-compose 升级;yaml文件编写;gpu使用

1、docker-compose 升级&#xff08;现在已经2.*版本&#xff0c;升级使支持gpu&#xff09; 参考&#xff1a;https://blog.csdn.net/weixin_51311218/article/details/131376823 https://github.com/docker/compose/issues/8142 1&#xff09;下载&#xff1a;原来1.18&…

11、ble_mesh_provisioner 配网器

1、初始化流程&#xff0c;存储初始化&#xff0c;nvs擦除&#xff0c; 2、bluetooth_init();ble协议栈初始化 3、ble_mesh_get_dev_uuid(dev_uuid);//获取设备uuid加载到mac. 4、ble_mesh_init();//ble mesh协议栈初始化 4.1配置网络钥匙&#xff0c;索引&#xff0c;赋…

微信视频号视频加密逆向

tl;dr 拿WxIsaac64(Isaac64的变种?)生成2^17个字节。然后和视频的前2^17字节做异或。 前言 略去, 总之就是WeChatVideoDownloader不能用了。 原文链接: here 准备工作 在正式开始逆向之前&#xff0c;我们首先需要能够在微信视频号中打开开发者工具&#xff0c;由于微信…

vue3+echarts 立体柱状效果

vue3echarts 立体柱状效果 废话不多说&#xff0c;直接上代码 就两步&#xff0c;直接复制粘贴一手 <div id"main" class"chart" ref"chartDom"></div>import * as echarts from echarts; type EChartsOption echarts.EChartsOpti…

c YUV 转 JPEG(准备霍夫曼编码)

先取yuv 文件中一个168的块&#xff0c;跑通全流程 理解与思路&#xff1a; 1.块分割 YUV 文件分为&#xff1a;YUV444 YUV 422 YUV420。444:就是&#xff1a;12个char 有4个Y&#xff0c;4个U&#xff0c;4个 U&#xff0c;422&#xff1a;8个char 中有4个Y &#x…

网络渗透测试(1)

实验1&#xff1a;用搜索引擎Google或百度搜索麻省理工学院网站中文件名包含“network security”的pdf文档&#xff0c;截图搜索得到的页面。 实验2&#xff1a;brassserie&#xff08;题目中给到的餐厅名称&#xff09; 实验3 实验4&#xff1a;将Z29vZCBnb29kIHN0dWR5IQ解…

攻防世界题目练习——Web引导模式(五)(持续更新)

题目目录 1. FlatScience2. bug3. Confusion1 1. FlatScience 参考博客&#xff1a; 攻防世界web进阶区FlatScience详解 题目点进去如图&#xff0c;点击链接只能看到一些论文pdf 用dirsearch和御剑扫描出一些隐藏文件&#xff1a; robots.txt: admin.php: login.php: f…

《人工智能导论》知识思维导图梳理【第7章节】

文章目录 说明专家系统机器学习机器学习定义工作流程模型评估机器学习分类在这里插入图片描述 机器学习部分md内容机器学习1 机器学习定义机器学习是从数据中自动分析获得模型&#xff0c;并利用模型对未知数据进行预测机器学习&#xff08;machine learning&#xff09;使计算…

美容店预约小程序搭建指南

随着互联网的发展&#xff0c;越来越多的传统行业开始尝试将业务与互联网相结合&#xff0c;以提供更加便捷、高效的服务。美容行业也不例外。本文将通过使用第三方制作平台&#xff0c;如乔拓云网&#xff0c;指导您如何搭建一个美观实用的美容店预约小程序&#xff0c;帮助您…

计算机网络:物理层(编码与调制)

今天又学会了一个知识&#xff0c;加油&#xff01; 目录 一、基带信号与宽带信号 1、基带信号 2、宽带信号 3、选择 4、关系 二、数字数据编码为数字信号 1、非归零编码【NRZ】 2、曼彻斯特编码 3、差分曼彻斯特编码 4、归零编码【RZ】 5、反向不归零编码【NRZI】 …

云计算 云原生

一、引言 云计算需要终端把信息上传到服务器&#xff0c;服务器处理后再返回给终端。在之前人手一台手机的情况下&#xff0c;云计算还是能handle得过来的。但是随着物联网的发展&#xff0c;什么东西都要联网&#xff0c;那数据可就多了去了&#xff0c;服务器处理不过来&…