canal 嵌入式部署 监听binlog

news2024/11/16 19:38:06

canal 嵌入式部署

  • 背景
    • 技术选型
    • canal
    • 原理
    • 用途
    • 嵌入式代码实现
      • 引入pom
      • 引入工具pom
      • main方法
      • 引入
      • 常量定义
      • install方法
      • buildCanal方法
      • pull方法
      • printSummary
      • printEntry2
    • 总结
    • 谢谢

背景

最近发现一个需求,需要监听mysql 数据库里的数据变动, 但由于架构方面的原因, 只能做成单体嵌入式的方向,嵌入进应用中,不用单独部署

技术选型

我对监控binlog 监控工具进行了了解,包括

  1. mysql-binlog-connector-java1
  2. canal2
  3. open-replicator3

canal

本篇博文主讲cannal 的嵌入模式

原理

在这里插入图片描述

用途

要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

嵌入式代码实现

引入pom

         <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.4</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.common</artifactId>
            <version>1.1.4</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.protocol</artifactId>
            <version>1.1.4</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.deployer</artifactId>
            <version>1.1.4</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.server</artifactId>
            <version>1.1.4</version>
        </dependency>

引入工具pom

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>

main方法

public static void main(String[] args) {
        EmbeddedCanalListener embeddedCanalListener = new EmbeddedCanalListener();
        //安装实体
        embeddedCanalListener.install();
        //拉取消息
        embeddedCanalListener.pull();
    }

引入

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager;
import com.alibaba.otter.canal.instance.manager.model.Canal;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter.IndexMode;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter.SourcingType;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.Pair;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.CanalEntry.TransactionBegin;
import com.alibaba.otter.canal.protocol.CanalEntry.TransactionEnd;
import com.alibaba.otter.canal.protocol.ClientIdentity;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.SystemUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

常量定义


    //日志
    public static final Logger logger = LoggerFactory.getLogger(EmbeddedCanalListener.class);
    //随意命名
    protected static final String DESTINATION = "example";
    //测试sql
    protected static final String DETECTING_SQL = "select 1 from dual;";
    //MSQL配置
    protected static final String MYSQL_ADDRESS = "xxxx";
    protected static final int MYSQL_PORT = xxx;
    protected static final String USERNAME = "xxx";
    protected static final String PASSWORD = "xxx";
    //使用ORACLE 未实现
    protected static final String ORACLE_ADDRESS = "xx.xx.xx.xx";
    protected static final int ORACLE_PORT = xxx;
    protected static final String ORACLE_USERNAME = "xxx";
    protected static final String ORACLE_PASSWORD = "xxx";
    /**
     * 表筛选 , 这里默认全部
     * 多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
     * <p>
     * <p>
     * 常见例子:
     * <p>
     * 1.  所有表:.*   or  .*\\..* 2.  canal schema下所有表: canal\\..* 3.  canal下的以canal打头的表:canal\\.canal.* 4.  canal schema下的一张表:canal\\.test1
     * <p>
     * 5.  多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
     */
    protected static final String FILTER = ".*";
    //定义一个server
    private CanalServerWithEmbedded server;
    //定义一个client
    private ClientIdentity clientIdentity = new ClientIdentity(DESTINATION, (short) 1);
    
static {
        context_format = SEP + "****************************************************" + SEP;
        context_format += "* Batch Id: [{}] ,count : [{}] , memsize : [{}] , Time : {}" + SEP;
        context_format += "* Start : [{}] " + SEP;
        context_format += "* End : [{}] " + SEP;
        context_format += "****************************************************" + SEP;

        row_format = SEP
            + "----------------> binlog[{}:{}] , name[{},{}] , eventType : {} , executeTime : {}({}) , gtid : ({}) , delay : {} ms"
            + SEP;

        transaction_format = SEP + "================> binlog[{}:{}] , executeTime : {}({}) , gtid : ({}) , delay : {}ms"
            + SEP;

    }

install方法

        //获取一个instance
        server = CanalServerWithEmbedded.instance();
        //设置一个gennertor去生成
        server.setCanalInstanceGenerator(destination -> {
        	//构建cannal 把上面的参数设置进去
            Canal canal = buildCanal();
            //返回一个Manager
            return new CanalInstanceWithManager(canal, FILTER);
        });
        //启动
        server.start();
        //启动这个实例
        server.start(DESTINATION);

buildCanal方法

		Canal canal = new Canal();
		//ID无意义 随便设置
        canal.setId(12L);
        canal.setName(DESTINATION);
        canal.setDesc("my standalone server test ");

        CanalParameter parameter = new CanalParameter();
        //parameter.setDataDir("./conf");
        //索引的模式, 嵌入式选择内存
        parameter.setIndexMode(IndexMode.MEMORY);
        //存储buffsize 具体看canal 官方的介绍
        parameter.setMemoryStorageBufferSize(32 * 1024);
        //设置Mysql的配置 包括模式,地址,默认scheme,用户名,密码,slaveId(查看mysql的My.conf),链接编码格式,缓存设置(看官方介绍)
        parameter.setSourcingType(SourcingType.MYSQL);
        parameter.setDbAddresses(Arrays.asList(new InetSocketAddress(MYSQL_ADDRESS, MYSQL_PORT)));
        parameter.setDefaultDatabaseName("XXXX");
        parameter.setDbUsername(MYSQL_USERNAME);
        parameter.setDbPassword(MYSQL_PASSWORD);
        //可以指定binlog 和起始位置 或者其实timestamp
//        parameter.setPositions(Arrays.asList("{\"journalName\":\"mysql-bin.000001\",\"position\":332L,\"timestamp\":\"1505998863000\"}",
//                "{\"journalName\":\"mysql-bin.000001\",\"position\":332L,\"timestamp\":\"1505998863000\"}"));

        parameter.setSlaveId(2L);

        parameter.setDefaultConnectionTimeoutInSeconds(30);
        parameter.setConnectionCharset("GBK");
        parameter.setConnectionCharsetNumber((byte) 33);
        parameter.setReceiveBufferSize(8 * 1024);
        parameter.setSendBufferSize(8 * 1024);
       //测试链接设置 ,这里是false 无意义
        parameter.setDetectingEnable(false);
        parameter.setDetectingIntervalInSeconds(10);
        parameter.setDetectingRetryTimes(3);
        parameter.setDetectingSQL(DETECTING_SQL);
        parameter.setGtidEnable(true);
        canal.setCanalParameter(parameter);
        return canal;

pull方法

		//定义拉取的大小
       int batchSize = 5 * 1024;
        while (running) {
            try {
            	//订阅当前设定的client
                server.subscribe(clientIdentity);
                //循环拉取
                while (running) {
                    Message message = server.getWithoutAck(clientIdentity, batchSize);
                    List<CanalEntry.Entry> entries;
                    //message如果是raw形式的需要去rawEntries去解析
                    if (message.isRaw()) {
                        List<ByteString> rawEntries = message.getRawEntries();
                        entries = new ArrayList<>(rawEntries.size());
                        for (ByteString byteString : rawEntries) {
                            CanalEntry.Entry entry;
                            try {
                                entry = CanalEntry.Entry.parseFrom(byteString);
                            } catch (InvalidProtocolBufferException e) {
                                throw new RuntimeException(e);
                            }
                            entries.add(entry);
                        }
                    } else {
                    //如果不是就直接拉取
                        entries = message.getEntries();
                    }

                    long batchId = message.getId();
                    int size = entries.size();
					//如果是batchId是负一或者无内容进行睡眠
                    if (batchId == -1 || size == 0) {
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                        }
                    } else {
                        //打印汇总信息
                        printSummary(message, batchId, size);
                        //打印实体信息
                        printEntry2(entries);
                        server.ack(clientIdentity, batchId); // 提交确认
                    }
                }
            } finally {
                 //取消订阅
                //server.unsubscribe(clientIdentity);
            }
        }

printSummary

//打印解读时间
//关注log的起始位置终止位置和时间延迟的可以关注这个类 如何取
 protected void printSummary(Message message, long batchId, int size) {
        long memsize = 0;
        for (Entry entry : message.getEntries()) {
            memsize += entry.getHeader().getEventLength();
        }

        String startPosition = null;
        String endPosition = null;
        if (!CollectionUtils.isEmpty(message.getEntries())) {
            startPosition = buildPositionForDump(message.getEntries().get(0));
            endPosition = buildPositionForDump(message.getEntries().get(message.getEntries().size() - 1));
        }

        SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
        logger.info(context_format,
            new Object[]{batchId, size, memsize, format.format(new Date()), startPosition, endPosition});
    }

    protected String buildPositionForDump(Entry entry) {
        long time = entry.getHeader().getExecuteTime();
        Date date = new Date(time);
        SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
        String position = entry.getHeader().getLogfileName() + ":" + entry.getHeader().getLogfileOffset() + ":"
            + entry.getHeader().getExecuteTime() + "(" + format.format(date) + ")";
        if (StringUtils.isNotEmpty(entry.getHeader().getGtid())) {
            position += " gtid(" + entry.getHeader().getGtid() + ")";
        }
        return position;
    }

printEntry2

//打印实体

   private void printEntry2(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) 		      {
            //如果需要监控事务的可以在这里进行实现
                continue;
            }

            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                    e);
            }
			//关注具体内容可以在这里实现
            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                eventType));

            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------&gt; before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------&gt; after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

//打印具体内容
protected void printColumn(List<Column> columns) {
        for (Column column : columns) {
            //如果column 是更新了的字段才打印
            if (column.getUpdated()) {
                StringBuilder builder = new StringBuilder();
                try {
                    if (StringUtils.containsIgnoreCase(column.getMysqlType(), "BLOB")
                        || StringUtils.containsIgnoreCase(column.getMysqlType(), "BINARY")) {
                        // get value bytes
                        builder.append(
                            column.getName() + " : " + new String(column.getValue().getBytes("ISO-8859-1"), "UTF-8"));
                    } else {
                        builder.append(column.getName() + " : " + column.getValue());
                    }
                } catch (UnsupportedEncodingException e) {
                }
                builder.append("    type=" + column.getMysqlType());
                if (column.getUpdated()) {
                    builder.append("    update=" + column.getUpdated());
                }
                builder.append(SEP);
                logger.info(builder.toString());
            }
        }

    }

总结

到这里 代码基本完成了, 然后根据自己的业务实现就好了
具体可以参考 canal java 客户端 的官方实现
还有他们的 AdminGuide 里面有详细的案例
推荐一份源码解析

谢谢


  1. mysql-binlog-connector-java ↩︎

  2. canal ↩︎

  3. open-replicator ↩︎

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/854586.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

7.4 并行连接网路GoogLeNet

由来&#xff1a;吸收了NiN网络的串联网络思想&#xff0c;并在此基础上做了改进 解决的问题&#xff1a;什么样大小的卷积核最合适的问题。使用不同大小的卷积核组合是有利的。 GoogLeNet架构 GoogLeNet的Inception块的架构 上图中的复杂小块的具体内容如下&#xff1a; i…

导出LLaMA ChatGlm2等LLM模型为onnx

通过onnx模型可以在支持onnx推理的推理引擎上进行推理&#xff0c;从而可以将LLM部署在更加广泛的平台上面。此外还可以具有避免pytorch依赖&#xff0c;获得更好的性能等优势。 这篇博客&#xff08;大模型LLaMa及周边项目&#xff08;二&#xff09; - 知乎&#xff09;进行…

解决监督学习,深度学习报错:AttributeError: ‘xxx‘ object has no attribute ‘module‘!!!!

哈喽小伙伴们大家好呀&#xff0c;很长时间没有更新啦&#xff0c;最近在研究一个问题&#xff0c;就是AttributeError: xxx object has no attribute module 今天终于是解决了&#xff0c;所以来记录分享一下&#xff1a; 我这里出现的问题是&#xff1a; 因为我的数据比较大…

QColorDialog

QColorDialog 颜色类 QColor颜色对话框API简单的使用 QColorDialog类是QDialog的子类, 通过这个类我们可以得到一个选择颜色的对话框窗口 颜色类 QColor 关于颜色的属性信息, 在QT框架中被封装到了一个叫QColor的类中。 各种颜色都是基于红, 绿, 蓝这三种颜色调配而成的, 并…

大模型开发工程师的成长路径(此篇文章持续更新)

导言&#xff1a;现在大模型如日中天&#xff0c;引起广大技术圈的强烈关注&#xff0c;现在投身于大模型开发&#xff0c;就是第一批的大模型开发工程师&#xff0c;必然能享受到行业内的先行者优势和红利。 我就是个俗人&#xff0c;工资待遇这么高&#xff0c;肯定要转行啊…

研发工程师玩转Kubernetes——PVC使用Label和storage选择PV

在《研发工程师玩转Kubernetes——local型PV和PVC绑定过程中的状态变化》和《研发工程师玩转Kubernetes——使用local型PV在不同Pod上共享数据》中&#xff0c;我们介绍了指定VPC的spec.volumeName为PV名称来绑定它们的方法。本文将介绍PVC在创建时&#xff0c;系统自动选择绑定…

SpringBoot 简单入门部署

1. 环境要求 Java 8 (配置环境变量)Maven 3.3idea 2019.1.2 及以上 1.1 maven 配置 <mirrors><mirror><id>nexus-aliyun</id><mirrorOf>central</mirrorOf><name>Nexus aliyun</name><url>http://maven.aliyun.com/n…

中国IT统一运维ITSM软件市场,云智慧再次稳居榜首!

日前&#xff0c;国际数据公司IDC发布《2022年中国IT统一运维软件市场总结》报告&#xff0c;报告显示&#xff0c;云智慧2022年全年继续保持ITSM市场第一。 图1&#xff1a;2022年中国IT统一运维ITSM软件市场&#xff0c;云智慧再次夺冠 根据IDC数据显示&#xff0c;2022年中…

重发布选路最佳实验

题目 IP地址配置 R1&#xff1a; R2&#xff1a; R3&#xff1a; R4&#xff1a; 双点重发布 R2&#xff1a; rip 1 version 2 network 12.0.0.0 network 2.0.0.0 import-route ospf 1 ospf 1 import-route rip 1 route-policy R2 area 0.0.0.0 network 23.1.1.0 0.0…

实现链式队列

dl.h dl.c main.c 结果

14-矩阵相乘及其运算法则

矩阵与向量的乘法 在这一篇文章中我们就将基于上一篇重新审视矩阵的这个视点来理解矩阵的乘法&#xff0c;那么在这一篇&#xff0c;我们主要来看一下矩阵和向量的乘法。这里这个线性方程组是上一小节给大家举的模拟的一个非常简单的小型经济系统的例子&#xff0c;我们可以把…

DAY04_SpringMVC—SpringMVC简介PostMan和ApiFox工具使用SpringMVC请求与响应REST风格

目录 一 SpringMVC简介1 SpringMVC概述问题导入1.1 SpringMVC概述 2 入门案例问题导入2.0 回顾Servlet技术开发web程序流程2.1 使用SpringMVC技术开发web程序流程2.2 代码实现【第一步】创建web工程&#xff08;Maven结构&#xff09;【第二步】设置tomcat服务器&#xff0c;加…

【C++从0到王者】第十九站:手把手教你写一个反向迭代器

文章目录 一、反向迭代器与正向迭代器的区别二、适配器模式来实现反向迭代器三、手撕反向迭代器 一、反向迭代器与正向迭代器的区别 反向迭代器与正向迭代器的解引用都是一样的&#xff0c;都是可以直接找到该位置里面存储的值。不同的是他们之间的运算规则不同&#xff0c;反…

剑指 Offer 61. 扑克牌中的顺子

题目描述 从若干副扑克牌中随机抽 5 张牌&#xff0c;判断是不是一个顺子&#xff0c;即这5张牌是不是连续的。2&#xff5e;10为数字本身&#xff0c;A为1&#xff0c;J为11&#xff0c;Q为12&#xff0c;K为13&#xff0c;而大、小王为 0 &#xff0c;可以看成任意数字。A 不…

从零开始:构建您自己的直播带货软件开发计划

1. 确定目标和需求 在开始开发之前&#xff0c;您需要明确您的目标和需求。考虑以下问题&#xff1a; 您的直播带货软件是面向哪个市场和用户群体&#xff1f; 您的软件需要支持哪些主要功能&#xff0c;如实时视频直播、商品展示、购买支付、实时互动等&#xff1f; 您是否需…

无人驾驶实战-第十二课(强化学习自动驾驶系统)(完)

在七月算法上报了《无人驾驶实战》课程&#xff0c;老师讲的真好。好记性不如烂笔头&#xff0c;记录一下学习内容。 课程入口&#xff0c;感兴趣的也可以跟着学一下。 ————————————————————————————————————————— 强化学习&#xff…

无刷电机控制

无刷电机控制 特点: 线圈不动&#xff0c;磁极转动电子换向方式消除了有刷电机的缺点单位质量/功率转矩大驱动较复杂

(十六)大数据实战——安装使用mysql版的hive服务

前言 hive默认使用的是内嵌据库derby&#xff0c;Derby 是一个嵌入式数据库&#xff0c;可以轻松地以库的形式集成到应用程序中。它不需要独立的服务器进程&#xff0c;所有的数据存储在应用程序所在的文件系统中。为了支持hive服务更方便的使用&#xff0c;我们使用mysql数据…

湘大 XTU OJ 1223 Repeat One 题解:最后面进行取模转变成一边取模一边计算

一、链接 Repeat One 二、题目 题目描述 求由最小的一个N&#xff0c;N个数码1组成的数能被M整除&#xff1f; 比如M3时&#xff0c;111能被3整除。M2时&#xff0c;则不存在这样的N。 输入 第一行是一个整数K(K≤1,000),表示样例的个数。 以后每行一个整数M(1≤M≤1,000,000…

BigDecimal使用总结

BigDecimal Java在java.math包中提供的API类BigDecimal&#xff0c;用来对超过16位有效位的数进行精确的运算。双精度浮点型变量double可以处理16位有效数。 在实际应用中&#xff0c;需要对更大或者更小的数进行运算和处理。float和double只能用来做科学计算或者是工程计算&a…