springboot整合Canal实时同步数据库表

news2025/1/10 12:19:09

一、Canal介绍

1、应用场景

在前面的统计分析功能中,我们采取了服务调用获取统计数据,这样耦合度高,效率相对较低,目前我采取另一种实现方式,通过实时同步数据库表的方式实现,例如我们要统计每天注册与登录人数,我们只需把会员表同步到统计库中,实现本地统计就可以了,这样效率更高,耦合度更低,Canal就是一个很好的数据库同步工具。canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL。

2、Canal环境搭建

canal的原理是基于mysql binlog技术,所以这里一定需要开启mysql的binlog写入功能

开启mysql服务: service mysql start

(1)检查binlog功能是否有开启

(2)如果显示状态为OFF表示该功能未开启,开启binlog功能

(3)在mysql里面添加以下的相关用户和权限

3、下载安装Canal服务

下载地址: https://github.com/alibaba/canal/releases

(1)下载之后,放到目录中,解压文件

cd /usr/local/canal

canal.deployer-1.1.4.tar.gz

tar zxvf canal.deployer-1.1.4.tar.gz

(2)修改配置文件

vi conf/example/instance.properties

注:
mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\)
常见例子:

  1. 所有表:.   or  .\..*
  2. canal schema下所有表: canal\..*
  3. canal下的以canal打头的表:canal\.canal.*
  4. canal schema下的一张表:canal.test1
  5. 多个规则组合使用:canal\..*,mysql.test1,mysql.test2 (逗号分隔)
    注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤)

(3)进入bin目录下启动
sh bin/startup.sh

二、创建canal_client模块

1、创建canal_client模块

2、引入相关依赖

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!--mysql-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>

        <dependency>
            <groupId>commons-dbutils</groupId>
            <artifactId>commons-dbutils</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
        </dependency>
    </dependencies>

3、创建application.yml配置文件

server:
  port: 8009
spring:
  application:
    name: caner_clientedu
  profiles:
    active: dev
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/education?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowPublicKeyRetrieval=true
    username: root
    password: 419520

4、编写canal客户端类

package com.xingchen.canel.client;



import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import javax.sql.DataSource;
import java.net.InetSocketAddress;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;


@Component
public class CanalClient {

    //sql队列
    private Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();

    @Resource
    private DataSource dataSource;

    /**
     * canal入库方法
     */
    public void run() {

        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.88.135",
                11111), "example", "", "");
        int batchSize = 1000;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            try {
                while (true) {
                    //尝试从master那边拉去数据batchSize条记录,有多少取多少
                    Message message = connector.getWithoutAck(batchSize);
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {
                        Thread.sleep(1000);
                    } else {
                        dataHandle(message.getEntries());
                    }
                    connector.ack(batchId);

                    //当队列里面堆积的sql大于一定数值的时候就模拟执行
                    if (SQL_QUEUE.size() >= 1) {
                        executeQueueSql();
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            }
        } finally {
            connector.disconnect();
        }
    }

    /**
     * 模拟执行队列里面的sql语句
     */
    public void executeQueueSql() {
        int size = SQL_QUEUE.size();
        for (int i = 0; i < size; i++) {
            String sql = SQL_QUEUE.poll();
            System.out.println("[sql]----> " + sql);

            this.execute(sql.toString());
        }
    }

    /**
     * 数据处理
     *
     * @param entrys
     */
    private void dataHandle(List<Entry> entrys) throws InvalidProtocolBufferException {
        for (Entry entry : entrys) {
            if (EntryType.ROWDATA == entry.getEntryType()) {
                RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
                EventType eventType = rowChange.getEventType();
                if (eventType == EventType.DELETE) {
                    saveDeleteSql(entry);
                } else if (eventType == EventType.UPDATE) {
                    saveUpdateSql(entry);
                } else if (eventType == EventType.INSERT) {
                    saveInsertSql(entry);
                }
            }
        }
    }

    /**
     * 保存更新语句
     *
     * @param entry
     */
    private void saveUpdateSql(Entry entry) {
        try {
            RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
            List<RowData> rowDatasList = rowChange.getRowDatasList();
            for (RowData rowData : rowDatasList) {
                List<Column> newColumnList = rowData.getAfterColumnsList();
                StringBuffer sql = new StringBuffer("update " + entry.getHeader().getTableName() + " set ");
                for (int i = 0; i < newColumnList.size(); i++) {
                    sql.append(" " + newColumnList.get(i).getName()
                            + " = '" + newColumnList.get(i).getValue() + "'");
                    if (i != newColumnList.size() - 1) {
                        sql.append(",");
                    }
                }
                sql.append(" where ");
                List<Column> oldColumnList = rowData.getBeforeColumnsList();
                for (Column column : oldColumnList) {
                    if (column.getIsKey()) {
                        //暂时只支持单一主键
                        sql.append(column.getName() + "=" + column.getValue());
                        break;
                    }
                }
                SQL_QUEUE.add(sql.toString());
            }
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }
    }

    /**
     * 保存删除语句
     *
     * @param entry
     */
    private void saveDeleteSql(Entry entry) {
        try {
            RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
            List<RowData> rowDatasList = rowChange.getRowDatasList();
            for (RowData rowData : rowDatasList) {
                List<Column> columnList = rowData.getBeforeColumnsList();
                StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where ");
                for (Column column : columnList) {
                    if (column.getIsKey()) {
                        //暂时只支持单一主键
                        sql.append(column.getName() + "=" + column.getValue());
                        break;
                    }
                }
                SQL_QUEUE.add(sql.toString());
            }
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }
    }

    /**
     * 保存插入语句
     *
     * @param entry
     */
    private void saveInsertSql(Entry entry) {
        try {
            RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
            List<RowData> rowDatasList = rowChange.getRowDatasList();
            for (RowData rowData : rowDatasList) {
                List<Column> columnList = rowData.getAfterColumnsList();
                StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getTableName() + " (");
                for (int i = 0; i < columnList.size(); i++) {
                    sql.append(columnList.get(i).getName());
                    if (i != columnList.size() - 1) {
                        sql.append(",");
                    }
                }
                sql.append(") VALUES (");
                for (int i = 0; i < columnList.size(); i++) {
                    sql.append("'" + columnList.get(i).getValue() + "'");
                    if (i != columnList.size() - 1) {
                        sql.append(",");
                    }
                }
                sql.append(")");
                SQL_QUEUE.add(sql.toString());
            }
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }
    }

    /**
     * 入库
     * @param sql
     */
    public void execute(String sql) {
        Connection con = null;
        try {
            if(null == sql) return;
            con = dataSource.getConnection();
            QueryRunner qr = new QueryRunner();
            int row = qr.execute(con, sql);
            System.out.println("update: "+ row);
        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            DbUtils.closeQuietly(con);
        }
    }
}

5、创建启动类

package com.xingchen.canel;



import com.xingchen.canel.client.CanalClient;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import javax.annotation.Resource;

@SpringBootApplication
public class CanalApplication implements CommandLineRunner{
    @Resource
    private CanalClient canalClient;

    public static void main(String[] args) {
        SpringApplication.run(CanalApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        canalClient.run();
    }


}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/80151.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

语雀的技术栈与富文本编辑讨论分享

目录前言什么是语雀富文本编辑器的发展历程语雀结构简析语雀核心语雀渲染器语雀前端技术业务层编辑器语雀编辑器演化过程语雀研发流程关于语雀的讨论为何文档编写不是一种标准化的中台能力内容类产品典型类别业务所需编辑器开发成本如何&#xff1f;文本编辑器代码编辑器公式编…

[附源码]Python计算机毕业设计SSM基于的网上拍卖系统(程序+LW)

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

13.javase_动漫美女拼图实战

资料地址&#xff1a;https://cowtransfer.com/s/4573fe572f9c4a 项目效果&#xff1a; 练习编程逻辑思维&#xff0c;提高和锻炼自己能力。 一. 绘制游戏界面 1.1窗体绘制 第一个方法&#xff1a;initFrame()&#xff0c;用于窗体的基本设置 public void initFrame() { thi…

每天一个面试题:悲观锁、乐观锁,对比Hashtable和concurrentHashMap

每天一个面试题&#xff1a;悲观锁、乐观锁Hashtable和concurrentHashMap总结开始全新的学习&#xff0c;沉淀才会有产出&#xff0c;一步一脚印&#xff01; 面试题系列搞起来&#xff0c;这个专栏并非单纯的八股文&#xff0c;我会在技术的基础上&#xff0c;Debug解析&#…

戴维南定理(Thevenin‘s theorem)

戴维南定理&#xff1a;含独立电源的线性电阻单口网络N&#xff0c;就端口特性而言&#xff0c;可以等效为一个电压源和电阻串联的单口网络。电压源的电压等于单口网络在负载开路时的电压uoc&#xff1b;电阻R0是单口网络内全部独立电源为零值时所得单口网络N0的等效电阻。 例子…

JavaScript基本语法

1.JavaScript词法结构 所谓词法结构是指一套基础性规则&#xff0c;用来描述如何使用这门语言来编写程序&#xff0c;包括如下几项&#xff1a; 字符集unicode 区分大小写 &#xff08;true和TRUE&#xff09; 忽略空白字符&#xff08;空格、制表符和换行符&#xff09…

UG鼠标和键盘操作

UG鼠标和键盘操作角色操作鼠标操作选择放大/缩小旋转平移确认适合窗口正视于X/Y/Z视图显示样式九宫图设置键盘操作ESCF8角色操作 角色包含了用户的自定义配置&#xff0c;包括内容、演示、快捷键、工具栏等&#xff0c;通过&#xff1a;菜单-首选项-用户界面-角色 进行导入导出…

【C语言程序设计】实验 8

目录 1. 整数排序 2. 二维数组按每行数据之和升序 3. 字符串排序 4. 二维数组各行元素之和 5. 二位整数数组交换两行 6. 插入排序 7. 10进制转2进制&#xff0c;8进制和16进制数&#xff08;数组&#xff09; 1. 整数排序 【问题描述】从键盘输入n(0<n<21)个整…

微服务同时接入多个Kafka

准备工作 自己搭建一个Kafka 从官方下载Kafka&#xff0c;选择对应Spring Boot 的版本&#xff0c;好在Kafka支持的版本范围比较广&#xff0c;当前最新版本是3.2.1,支持2.12-3.2.1 范围的版本&#xff0c;覆盖了Spring Boot 2.0x-Spring Boot 3.0.x。Apache Kafka 解压安装 …

CMake中target_compile_definitions的使用

CMake中的target_compile_definitions命令用于向target添加编译定义&#xff0c;其格式如下&#xff1a; target_compile_definitions(<target><INTERFACE|PUBLIC|PRIVATE> [items1...][<INTERFACE|PUBLIC|PRIVATE> [items2...] ...]) 指定在编译给定的<…

网络原理初识

网络原理初识 文章目录网络原理初识网络发展历程独立模式网络互联IP地址端口号网络协议OSI七层TCP/ IP协议封装与分用封装一.应用层二.传输层三.网络层四.数据链路层五.物理层分用六.物理层七.数据链路层八.网络层九.传输层十.应用层网络发展历程 独立模式 一开始电脑之间是相…

【InnoDB ClusterSet】快速部署

快速部署 InnoDB ClusterSet 文章目录快速部署 InnoDB ClusterSet前言前期准备架构设计部署过程1. 使用配置账号通过 MySQL Shell 连接到 InnoDB Cluster 任一成员2. 为主 InnoDB Cluster 实例设置变量3. 创建以当前集群作为主集群的 ClusterSet4. 为每个独立服务器实例添加配置…

382. 链表随机节点-哈希表法

382. 链表随机节点-哈希表法 给你一个单链表&#xff0c;随机选择链表的一个节点&#xff0c;并返回相应的节点值。每个节点 被选中的概率一样 。 实现 Solution 类&#xff1a; Solution(ListNode head) 使用整数数组初始化对象。 int getRandom() 从链表中随机选择一个节点…

Vue Element动态生成的表单如何用 el-form 校验

<el-form :model"dynamicValidateForm" ref"dynamicValidateForm" label-width"100px" class"demo-dynamic"><el-form-item prop"email" label"邮箱" :rules"[{ required: true, message: 请输入…

研究良久,终于发现了他代码写的快且bug少的原因

前言 读者诸君&#xff0c;今日我们适当放松一下&#xff0c;不钻研枯燥的知识和源码&#xff0c;分享一套高效的摸鱼绝活。 我有一位程序员朋友&#xff0c;当时在一个团队中开发Android应用&#xff0c;历经多次考核后发现&#xff1a; 在组内以及与iOS团队的对比中: 他的任…

java项目请求url存在特殊字符 400错误

java项目请求url特殊字符 400错误 1 现象 请求路径带特殊字符&#xff0c;就会400错误&#xff0c;这就泄露了服务器版本和报错信息&#xff0c;无疑是敏感信息泄露&#xff0c;实属安全漏洞。 补充项目环境&#xff1a;springmvc、tomcat 8.5.59 2 原因 经排查和报错信息…

STM32--ADC模数转换器

学习江科大自化协stm32教程记录的笔记 ADC模数转换器 ADC&#xff08;Analog-Digital Converter&#xff09;模拟-数字转换器 ADC可以将引脚上连续变化的模拟电压转换为内存中存储的数字变量&#xff0c;建立模拟电路到数字电路的桥梁 DAC是数字-模拟转换器&#xff0c;但是P…

AI 助你轻松剪视频 # AutoCut

如果你还在犯愁每次剪视频都要反复听才能下手&#xff0c;不妨试试AutoCut , AI 大神李沐开源的一个剪辑神器&#xff0c;使用 Python 开发&#xff0c;它可以通过字幕来剪切视频。AutoCut 对你的视频自动生成字幕。然后你选择需要保留的句子&#xff0c;AutoCut 将对你视频中对…

C语言:变量的深入理解

文章目录一.什么是变量C语言中为什么要有类型&#xff1f;C语言中的类型为什么有这么多种呢&#xff1f;定义变量的本质为什么需要定义变量定义变量的本质定义变量时的规则二.深刻理解signed/unsigned定义的变量1.运算时的符号位2.数据的存储情况3.unsigned定义时的小细节三.大…

Android 13 VTS HIDL interface 解析

Android 13 VTS Introduction Android 13已经发布&#xff0c;VTS testcase发生很多变化&#xff0c;在此博客中对其每个测试项目进行流程介绍。 这里先对VTS 做一个介绍&#xff1a; VTS是vendor test suite简称&#xff0c;意为供应商测试套件。目的是确保Vendor层实现的兼容…