canal中间件集成springboot实战落地

news2025/1/21 18:48:22

目录

一、数据库开启相关权限功能:

  二、canal 服务端配置启动:从官网下载程序和源码到本地环境

三、canal客户端配置启动:


canal中间件集成springboot实战落地开始分享,这是目前互联网很常见的中间件,监听数据库变化、全量数据缓存等功能,起到很方便的作用,原理和使用场景可以直接参考官网,介绍的很详细,中文文档:GitHub - alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件

今天我们就直接开始分享实战使用,使用分为三大步骤,数据库开启big-log功能,canal服务端配置启动、canal客户端配置启动和数据测试:

一、数据库开启相关权限功能:

 Canal 模拟 MySQL 从节点获得数据库服务器的数据,很明显,对 MySQL服务器的配置完全可以参考MySQL的主从复制中主节点的配置。

1、先开启 Binlog 写入功能,开启bin-log权限:

SHOW VARIABLES like "%log_bin%";

数据库显示已开启状态:

2、 配置 binlog-format 为 ROW 模式:


SHOW VARIABLES like "%binlog_format%";

查询显示: 

3、 主数据库的唯一值server_id配置:


SHOW VARIABLES like "%server_id%";

 查询显示:

4、创建一个用户用来同步数据:

CREATE USER canal IDENTIFIED BY 'canal';     

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; 
--如果创建报错,可以先刷新一下权限
FLUSH PRIVILEGES;

--创建后查询用户相关信息
SELECT * from mysql.user;

SELECT user, select_priv,Repl_slave_priv,Repl_client_priv from mysql.user;

 创建后查询:

5、 查询此主库的状态:

show master status;

查询结果:

mysql-bin.000001 信息来源于mysql配置文件配置,my.int文件,下面会说到 

6、查询此从库状态:

show slave status;

查询为空,因为这个数据库是主库:

7、 查看当前所有binlog的日志存储

show binary logs;

结果:

 8、查看当前已经消费到了什么位置

show binlog events in 'mysql-bin.000001';

结果:

 9、以上信息均基于my.int配置如下:

 
[client]

port=3306
default-character-set=utf8
[mysql]

default-character-set=utf8

[mysqld]

# The TCP/IP Port the MySQL Server will listen on
port=3306

#开启查询缓存
explicit_defaults_for_timestamp=true

#Path to installation directory. All paths are usually resolved relative to this.
basedir=C:/Program Files/mysql-5.7.18-winx64/

#Path to the database root
#datadir="C:/Program Files/mysql-5.7.18-winx64/data/"

# The default character set that will be used when a new schema or table is
# created and no character set is defined
character-set-server=utf8

# The default storage engine that will be used when create new tables when
default-storage-engine=INNODB

# Set the SQL mode to strict
sql-mode="STRICT_TRANS_TABLES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION"

#用于第一次登录 可以免密
skip-grant-tables
 
max_connections=100

server_id=1   # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
log-bin=mysql-bin  # 开启 binlog
binlog-format=ROW   # 选择 ROW 模式 MIXED
#binlog_format=MIXED   # 选择 ROW 模式 MIXED
 
#query_cache_size=0

# table_cache=256
 
#innodb_log_file_size=24M

相关的配置主要是这几行:

server_id=1   # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
log-bin=mysql-bin  # 开启 binlog
binlog-format=ROW   # 选择 ROW 模式 MIXED
#binlog_format=MIXED   # 选择 ROW 模式 MIXED

  二、canal 服务端配置启动:从官网下载程序和源码到本地环境

1、下载入口:进入里面有各种版本

2、解压缩到固定目录:

直接解压即可

3、修改核心配置文件:

canal.deployer-1.1.6\canal.deployer-1.1.6\conf 目录下的 canal.properties 文件,核心的几个配置如下:

# tcp bind ip 服务端ip
canal.ip =127.0.0.1
#服务端默认端口号
canal.port =11111
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
#监控的模块之一,可以是多个 canal.destinations = promotion,example
canal.destinations = promotion

4、修改需要监控的包配置:新建promotion目录,添加修改配置文件和example目录是并列关系

canal.deployer-1.1.6\canal.deployer-1.1.6\conf\promotion

# position info
canal.instance.master.address=127.0.0.1:3306
#主库binlog日志
canal.instance.master.journal.name=mysql-bin.000001
#监控的位置
canal.instance.master.position=1612
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
canal.instance.filter.regex=master-db.frend
# 监听的表,可以指定,多个用逗号分割,这里正则是监听所有
canal.instance.filter.regex=.*\\..*
#canal.instance.filter.black.regex=mysql\\.slave_.*

5、启动服务,win环境 点击 bin目录下的 startup.bat

正常启动是这种控制台日志

6、server 日志查看目录在:

 日志文件里面有详细信息

三、canal客户端配置启动:

1、jar包引入:

     <!-- 引入canal -->
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.6</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.rocketmq</groupId>
                    <artifactId>rocketmq-client</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!--我这边不手动引入会报找不到com.alibaba.otter.canal.protocol.Message-->
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.protocol</artifactId>
            <version>1.1.6</version>
        </dependency>

2、配置信息相关


canal:
  server:
    ip: 127.0.0.1
    port: 11111
  promotion:
    destination: promotion
    #subscribe: .*\..*
    batchSize: 1000

3、客户端初始化类

package com.nandao.datasource.dynamic.mybatis.config;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import java.net.InetSocketAddress;

@Configuration
@EnableScheduling
@EnableAsync
public class CanalPromotionConfig {

    @Value("${canal.server.ip}")
    private String canalServerIp;

    @Value("${canal.server.port}")
    private int canalServerPort;

    @Value("${canal.server.username:blank}")
    private String userName;

    @Value("${canal.server.password:blank}")
    private String password;

    @Value("${canal.promotion.destination}")
    private String destination;

    @Bean("promotionConnector")
    public CanalConnector newSingleConnector(){
        //有默认账号,初始化的时候可以不加
        String userNameStr = "blank".equals(userName) ? "" : userName;
        String passwordStr = "blank".equals(password) ? "" : password;
        return CanalConnectors.newSingleConnector(new InetSocketAddress(canalServerIp,
                canalServerPort), destination, userNameStr, passwordStr);
    }
}

4、监听的业务类

package com.nandao.datasource.dynamic.mybatis.service.impl;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.nandao.datasource.dynamic.mybatis.service.IProcessCanalData;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

@Service
@Slf4j
public class PromotionData implements IProcessCanalData {

    private final static String SMS_HOME_ADVERTISE = "sms_home_advertise";
    private final static String SMS_HOME_BRAND = "sms_home_brand";
    private final static String SMS_HOME_NEW_PRODUCT = "sms_home_new_product";
    private final static String SMS_HOME_RECOMMEND_PRODUCT = "sms_home_recommend_product";
    /*存储从表名到Redis缓存的键*/
    private Map<String,String> tableMapKey = new HashMap<>();

    @Autowired
    @Qualifier("promotionConnector")
    private CanalConnector connector;

    @Value("${canal.promotion.subscribe:server}")
    private String subscribe;

    @Value("${canal.promotion.batchSize}")
    private int batchSize;

    @PostConstruct
    @Override
    public void connect() {
        connector.connect();
        if("server".equals(subscribe))
            connector.subscribe(null);//可以直接采用服务端配置的扫描表的范围
        else
            connector.subscribe(subscribe);//自定义表的范围,不依赖服务端
        connector.rollback();
    }

    @PreDestroy
    @Override
    public void disConnect() {
        connector.disconnect();
    }

    @Async
    @Scheduled(initialDelayString="${canal.promotion.initialDelay:5000}",fixedDelayString = "${canal.promotion.fixedDelay:5000}")
    @Override
    public void processData() {
        try {
            if(!connector.checkValid()){
                log.warn("与Canal服务器的连接失效!!!重连,下个周期再检查数据变更");
                this.connect();
            }else{
                Message message = connector.getWithoutAck(batchSize);//获取batchSize条数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    log.info("本次[{}]没有检测到促销数据更新。",batchId);
                }else{
                    log.info("本次[{}]促销数据本次共有[{}]次更新需要处理",batchId,size);
                    /*一个表在一次周期内可能会被修改多次,而对Redis缓存的处理只需要处理一次即可*/
                    Set<String> factKeys = new HashSet<>();
                    for(CanalEntry.Entry entry : message.getEntries()){
                        if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
                                || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                            continue;
                        }
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                        String tableName = entry.getHeader().getTableName();//获取表名
                        if(log.isDebugEnabled()){
                            CanalEntry.EventType eventType = rowChange.getEventType();
                            log.debug("数据变更详情:来自binglog[{}.{}],数据源{}.{},变更类型{}",
                                    entry.getHeader().getLogfileName(),entry.getHeader().getLogfileOffset(),
                                    entry.getHeader().getSchemaName(),tableName,eventType);
                        }
                        factKeys.add(tableMapKey.get(tableName));
                    }
                    for(String key : factKeys){
                        if(StringUtils.isNotEmpty(key))  {
                            log.info("删除需要");
                        }
                    }
                    connector.ack(batchId); // 提交确认
                    log.info("本次[{}]处理促销Canal同步数据完成",batchId);
                }
            }
        } catch (Exception e) {
            log.error("处理促销Canal同步数据失效,请检查:",e);
        }

    }
}

处理字段信息:增删改

for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                            if (eventType == CanalEntry.EventType.DELETE) {
                                List<CanalEntry.Column> columns = rowData.getBeforeColumnsList();
                                for (CanalEntry.Column column : columns) {
                                    if(column.getName().equals("id")) {
                                        deleteDoc(column.getValue());
                                        break;
                                    }
                                }
                            } else if (eventType == CanalEntry.EventType.INSERT) {
                                List<CanalEntry.Column> columns = rowData.getAfterColumnsList();
                                ProductESVo productESVo = new ProductESVo();
                                JSONObject jsonData=new JSONObject();
                                //jsonData.put("", tableName);
                                String docId = makeVo(columns,productESVo);
                                String docIdNew = makeVoNew(columns,jsonData, tableName);
                                insertDoc(docId,productESVo);
                                insertDocNew(docId,jsonData);
                            } else {
                                List<CanalEntry.Column> columns = rowData.getAfterColumnsList();
                                ProductESVo productESVo = new ProductESVo();
                                String docId = makeVo(columns,productESVo);
                                if(null != docId){
                                    if(null == productESVo){
                                        log.info("商品的删除状态字段update为已删除,从ES中移除");
                                        deleteDoc(docId);
                                    }else  updateDoc(docId,productESVo);
                                }
                            }
                        }

 遍历字段:

 private String makeVoNew(List<CanalEntry.Column> columns,JSONObject jsonData,String tableName ){
        String docId = null;

        for (CanalEntry.Column column : columns) {
            String colName = column.getName();
            String colValue = column.getValue();
            jsonData.put(colName,colValue);
            if(colName.equals(T_ID)) {
                docId = colValue;
            }
            if(parentTableList.contains(tableName)){
                jsonData.put("relation_flag",tableName);
            }
            if(subTableList.contains(tableName)){
                jsonData.put("relation_flag",tableName);
                jsonData.put("parent",docId);
            }
        }
        return docId;
    }

5、启动客户端服务:5秒监听一次

 6、修改数据库数据:立刻看到监听到数据

 看到监听的数据消费,到此,canal 实战流程分享完毕,大家一定要动手操作演练,才能熟练掌握,下篇我们分享canal实战过程中遇到的问题,敬请期待!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/116137.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Harbor镜像仓库的安装以及Docker从Harbor上传与下载镜像

Harbor镜像仓库的安装与使用 简介&#xff1a;Harbor是一个用于存储和分发Docker镜像的企业级Registry服务器&#xff0c;除了Harbor这个私有镜像仓库外&#xff0c;还有Docker官方提供的Registry。相对Registry&#xff0c;Harbor具有很多优势,本文主要介绍Harbor镜像仓库的安…

Serivice案例

Serivice启动方式案例 1.案例1&#xff1a;-start方式启动 1.1创建服务 //服务类 public class MyService extends Service {//创建服务调用一次Overridepublic void onCreate() {System.out.println("onCreate");Toast.makeText(this, "onCreate", Toast.…

MySQL (三)------DDL操作数据库、DDL操作表

DDL操作数据库 1.1创建数据库(掌握) 语法 create database 数据库名 [character set 字符集][collate 校对规则] 注: []意思是可选的意思 字符集(charset)&#xff1a;是一套符号和编码。 练习 创建一个day01的数据库&#xff08;默认字符集) create database day01;…

List使用的坑

Arrays.asList的三个坑 1、不能转换基本数组类型(传数组进去&#xff0c;size1) 2、不支持增删操作(因为内部是一个final的数组) 3、对原始数组的修改会影响到我们获得的那个List 源码&#xff1a; 抽象List接口不支持新增 解决方案&#xff1a; 1、new ArrayList 2、java8…

4.移动端布局-flex布局**

1、传统布局和flex布局 传统布局&#xff1a;PC端 兼容性好布局繁琐局限性&#xff0c;不能在移动端很好的布局 flex布局&#xff1a;PC端、移动端操作方便&#xff0c;布局简单&#xff0c;移动端应用广泛PC端浏览器支持情况较差IE11或更低版本&#xff0c;不支持或仅部分支…

关于加密货币危机公关的智能钱包系列:该做和不该做哪些事情

我们的新一期 Twitter Spaces 为危机公关带来了加密镜头。与我们的主持人 Megan DeMatteo 一起出席本期节目的还有 Market Across 战略与消费者成功副总裁 Kim Bazak 和 Ambire CMO Vanina Ivanova。 Ambire Twitter Spaces 第 14 集以更广泛的视角来看待 FTX/Alameda 的故事。…

virtio虚拟化框架

virtio虚拟化 系统虚拟化技术是云计算最重要的核心技术之一。云计算平台的资源池化&#xff0c;资源统一管理以及后续的动态分配都是基于系统虚拟化技术才得以实现的。在计算机系统中&#xff0c;主要有计算资源&#xff0c;存储资源和网络资源。所以&#xff0c;系统虚拟化技术…

通讯录(3)

接着上一篇。 上一篇的指定删除还有一定的问题&#xff0c;我让用户输入要删除的联系人的名字&#xff0c;然后查询这个名字是否存在&#xff0c;再去删除。但是这里忽略了一个问题&#xff0c;如果两人名字一样呢&#xff1f;其它也有这样的问题&#xff0c;年龄&#xff0…

【vant组件安装】按需引入 完整引入 定制主题

vant官网&#xff1a;https://vant-contrib.gitee.io/vant/v2/#/zh-CN/定制主题: https://vant-contrib.gitee.io/vant/v2/#/zh-CN/theme 1. vant组件安装—按需引入 1.安装vant组件库 npm i vantlatest-v22.安装按需引入组件 npm i babel-plugin-import -D3.在babel.config.j…

cadence SPB17.4 - 从正常PCB文件反推原理图

文章目录cadence SPB17.4 - 从正常PCB文件反推原理图概述笔记用SPB17.4 allegro 出报表剩余的事情最重要的一件事情 - 核对整理出的原理图是否和PCB原图网络一致最后的事情备注ENDcadence SPB17.4 - 从正常PCB文件反推原理图 概述 和同学讨论问题, 他那有一个可以正常生产的立…

2-2-3-9-1-2、jdk1.7ConcurrentHashMap详解

数据结构 对比hashmap,hashmap数组对象类型是Entry对象类型&#xff0c;而ConcurrentHashMap数组对象类型是Segment[]数组&#xff0c;segment[]数组的对象类型为HashEntry类型(一个Segment里面包含一个HashEntry数组&#xff0c;每个HashEntry是一个链表结构&#xff0c;当对…

【youcans 的 OpenCV 学习课】1.2 编译生成带有 OpenCV_contrib 的 OpenCV 库

专栏地址&#xff1a;『youcans 的图像处理学习课』 文章目录&#xff1a;『youcans 的图像处理学习课 - 总目录』 【youcans 的 OpenCV 学习课】1.2 编译生成 OpenCV_contrib 的 OpenCV 库 文章目录【youcans 的 OpenCV 学习课】1.2 编译生成 OpenCV_contrib 的 OpenCV 库1. 工…

机器学习笔记之Sigmoid信念网络(一)对数似然梯度

机器学习笔记之Sigmoid信念网络——对数似然梯度引言回顾&#xff1a;贝叶斯网络的因子分解Sigmoid信念网络对数似然梯度推导过程梯度求解过程中的问题引言 从本节开始&#xff0c;将介绍Sigmoid\text{Sigmoid}Sigmoid信念网络。 回顾&#xff1a;贝叶斯网络的因子分解 Sigmo…

.NET(C#、VB)APP开发——Smobiler平台控件介绍:Bluetooth组件

本文简述如何在Smobiler中使用Bluetooth。 Step 1. 新建一个SmobilerForm窗体&#xff0c;并在窗体中加入Button和Bluetooth&#xff0c;布局如下 Button的点击事件代码&#xff1a; /// <summary>/// 关闭蓝牙/// </summary>/// <param name"sender"…

飞项三招教你用协同工具杜绝远程办公“摸鱼”

共同社19日报道称&#xff0c;总务省在新冠紧急事态宣言全面解除后不久的2021年10月对日本约9万户家庭开展了社会生活基本调查&#xff0c;利用获得的数据&#xff0c;对上班族中在调查当天有过远程办公的人和完全没有远程办公的人的工作日时间分配进行了比较。 结果显示&…

【vue面试题】

vue谈谈你怼MVVM开发模式的理解vue优点渐进式框架的理解vue常用的指令v-if和v-showv-if和v-for的优先级如何让CSS只在当前组件中起作用?<keep-alive></keep-alive> 的作用是什么?如何获取dom?vue-loader是什么&#xff1f;使用它的用途有哪些&#xff1f;assets…

哺乳时宝宝一边吃奶,另一边却自动流出来,这是怎么回事?

别人眼中的母乳喂养只是简单地把宝宝抱在怀里&#xff0c;让宝宝吃饱&#xff0c;超级简单。事实上&#xff0c;有很多母乳喂养。“麻烦事”比如母乳不足、堵奶、乳腺炎等&#xff0c;甚至更多“简单”漏奶会让宝宝头疼。有些妈妈很幸运&#xff0c;不知道什么是漏奶&#xff0…

小程序之会议管理

会议管理 注意事项 一些需要注意的细节&#xff1a; 因为 WXML 节点标签名只能是小写字母、中划线和下划线的组合&#xff0c;所以自定义组件的标签名也只能包含这些字符。自定义组件也是可以引用自定义组件的&#xff0c;引用方法类似于页面引用自定义组件的方式&#xff0…

React DAY07

复习&#xff1a; 1.RN中的样式和布局 RN样式完全脱离浏览器&#xff0c;自成体系的一套样式&#xff0c;使用对象创建样式 行内样式&#xff1a; <Text style{{color: red}}>内部样式&#xff1a; let ss StyleSheet.create({danger: {color: red}}) <Text styl…

从业多年的Android开发,竟拿不到 Application Context?

Android 开发者们对于 Application 并不陌生。有的时候为避免内存泄漏&#xff0c;常常不直接使用 Context 而是通过其提供的 getApplicationContext() 确保拿到的是 Application 级别的 Context。而本次像通常一样&#xff0c;拿到的 Application 却是 null&#xff0c;到底是…