【Linux】canal1.1.7同步MySQL8.0.3和Redis

news2024/11/24 18:47:59

目录

  • 前言
  • 一、MySQL8配置
    • 1. 修改my.cnf
    • 2. 重启mysql
    • 3. 建用户、授权
  • 二、Canal服务端配置
    • 1. 下载
    • 2. 修改配置
    • 3. 启动服务与验证
  • 三、Canal客户端编写
    • 1. yml配置文件添加canal服务端配置信息和Redis信息
    • 2. 配置pom文件
    • 3. 代码
    • 4. MySQL建表storage.storage
    • 5. 启动客户端与验证
  • 参考

前言

自MySQL 8.0.3开始,身份验证插件默认使用caching_sha2_password。按照官网的文档执行,在启动时会报错。

本文写了下大致的步骤。

一、MySQL8配置

1. 修改my.cnf

可以 find / -name my.cnf 命令找到这个配置文件

对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

2. 重启mysql

systemctl stop mysqld.service
systemctl start mysqld.service

3. 建用户、授权

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限

-- 降低密码验证等级
set global validate_password.policy=LOW;
set global validate_password.length=5;
-- 自MySQL 8.0.3开始,身份验证插件默认使用caching_sha2_password
drop user canal;
CREATE USER canal IDENTIFIED WITH mysql_native_password BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

二、Canal服务端配置

1. 下载

canal下载地址
在这里插入图片描述
上传服务器

# 建立文件夹
mkdir canal
# 解压
tar -xvf canal.deployer-1.1.7-SNAPSHOT.tar.gz -C ./canal/

2. 修改配置

# 进入conf/example目录
cd conf/example
# 修改配置文件
vi instance.properties

在这里插入图片描述

  • slaveId:从节点id,不与1.1节的server_id一样即可
  • address:连接的mysql的ip:port
  • dbUsername/dbPassword:用户名/密码

主要配置这几个即可

3. 启动服务与验证

# 进入bin目录
cd bin
# 启动脚本
sh startup.sh
# 查看启动日志
tail -200f ../logs/example/example.log

在这里插入图片描述
无报错信息即可。

三、Canal客户端编写

Spring Boot集成canal.client获取变更信息,插入Redis

1. yml配置文件添加canal服务端配置信息和Redis信息

canal:
  host: Canal服务端所在服务器IP
  port: 11111
  username: canal
  password: canal
  destination: example
spring:
  redis:
    port: 6379
    host: 
    password: 123456
    database: 1
    lettuce:
       pool:
         min-idle: 0
         max-idle: 8
         max-active: 20

2. 配置pom文件

下面是所需的依赖

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.0</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>
 <dependency>
     <groupId>com.alibaba</groupId>
     <artifactId>fastjson</artifactId>
     <version>1.2.83</version>
 </dependency>

3. 代码

  1. CanalClientProperties 为Canal连接属性
import lombok.Data;
@Data
public class CanalClientProperties {
    private String host;
    private int port;
    private String username;
    private String password;
    private String destination;
}

  1. CanalConfig 为配置类
import java.net.InetSocketAddress;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;

@Configuration
public class CanalConfig {
    @Bean
    @ConfigurationProperties(prefix = "canal")
    CanalClientProperties canalClientProperties(){
        return new CanalClientProperties();
    }

    @Bean(destroyMethod = "disconnect")
    CanalConnector canalConnector(CanalClientProperties canalClientProperties) {
        // 创建链接
        return CanalConnectors.newSingleConnector(
                new InetSocketAddress(canalClientProperties.getHost(), canalClientProperties.getPort()),
                canalClientProperties.getDestination(), canalClientProperties.getUsername(), canalClientProperties.getPassword());
    }
}

  1. CanalClient为同步监听

TABLE_NAME 限制了读取的表


import java.util.List;

import javax.annotation.Resource;

import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;

import lombok.extern.slf4j.Slf4j;

@Slf4j
@Component
public class CanalClient implements ApplicationRunner {

    @Resource
    private RedisTemplate<String, String> redisTemplate;

    private static final String TABLE_NAME = "storage";
    private static final String PRIMARY_KEY = "c_id";
    private static final String SEPARATOR = ":";

    @Resource
    private CanalConnector canalConnector;

    @Override
    public void run(ApplicationArguments args) {
        this.initCanal();
    }

    public void initCanal() {
        int batchSize = 1000;
        log.info("启动 canal 数据同步...");
        canalConnector.connect();
        canalConnector.subscribe("storage.storage");
        canalConnector.rollback();
        while (true) {
            // 获取指定数量的数据
            Message message = canalConnector.getWithoutAck(batchSize);
            long batchId = message.getId();
            int size = message.getEntries().size();
            if (batchId == -1 || size == 0) {
                try {
                    // 时间间隔1000毫秒
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                syncEntry(message.getEntries());
            }
            canalConnector.ack(batchId); // 提交确认
        }
    }

    private void syncEntry(List<CanalEntry.Entry> entrys) {
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
                    || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChange;
            try {
                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR data:" + entry.toString(), e);
            }

            CanalEntry.EventType eventType = rowChange.getEventType();
            log.info("================> binlog[{}:{}] , name[{},{}] , eventType : {}",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType);

            String tableName = entry.getHeader().getTableName();
            if (!TABLE_NAME.equalsIgnoreCase(tableName)) continue;
            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                    redisInsert(tableName, rowData.getAfterColumnsList());
                } else if (eventType == CanalEntry.EventType.UPDATE) {
                    printColumn(rowData.getAfterColumnsList());
                    redisUpdate(tableName, rowData.getAfterColumnsList());
                } else if (eventType == CanalEntry.EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                    redisDelete(tableName, rowData.getBeforeColumnsList());
                }
            }
        }
    }

    private void redisInsert(String tableName, List<CanalEntry.Column> columns) {
        JSONObject json = new JSONObject();
        for (CanalEntry.Column column : columns) {
            json.put(column.getName(), column.getValue());
        }
        for (CanalEntry.Column column : columns) {
            if (PRIMARY_KEY.equalsIgnoreCase(column.getName())) {
                String key = tableName + SEPARATOR + column.getValue();
                redisTemplate.opsForValue().set(key, json.toJSONString());
                log.info("redis数据同步新增,key:" + key);
                break;
            }
        }
    }

    private void redisUpdate(String tableName, List<CanalEntry.Column> columns) {
        JSONObject json = new JSONObject();
        for (CanalEntry.Column column : columns) {
            json.put(column.getName(), column.getValue());
        }
        for (CanalEntry.Column column : columns) {
            if (PRIMARY_KEY.equalsIgnoreCase(column.getName())) {
                String key = tableName + SEPARATOR + column.getValue();
                redisTemplate.opsForValue().set(key, json.toJSONString());
                log.info("redis数据同步更新,key:" + key);
                break;
            }
        }
    }

    private void redisDelete(String tableName, List<CanalEntry.Column> columns) {
        for (CanalEntry.Column column : columns) {
            if (PRIMARY_KEY.equalsIgnoreCase(column.getName())) {
                String key = tableName + SEPARATOR + column.getValue();
                redisTemplate.delete(key);
                log.info("redis数据同步删除,key:" + key);
                break;
            }
        }
    }

    private void printColumn(List<CanalEntry.Column> columns) {
        for (CanalEntry.Column column : columns) {
            log.info(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }

}


4. MySQL建表storage.storage

建模式与建表

create database storage;
CREATE TABLE storage.storage (
  `c_id` bigint NOT NULL AUTO_INCREMENT,
  `c_commodity_name` varchar(32) NOT NULL,
  `n_count` bigint NOT NULL,
  PRIMARY KEY (`c_id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

5. 启动客户端与验证

启动spring boot项目,修改表数据。查看日志

在这里插入图片描述

参考

  • Canal 1.1.5 启动报错:caching_sha2_password Auth failed
  • GITHUB官网

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/392541.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

中微8S6990使用过程的一些记录--GPIO初始化、定时器、PWM、ADC、休眠等外设的配置和使用

前言 最近把一款产品的代码从新唐MS51移植到了中微8S6990平台上&#xff0c;记录下移植过程遇到的各种情况。 目录前言定时器初始化、中断服务函数GPIO配置ADC模数转换初始化PWM初始化Main函数休眠的一些注意事项最后定时器初始化、中断服务函数 void TMR0_Config(void) {/*(…

keepalived+nginx 双机热备搭建

keepalivednginx 双机热备搭建一、准备工作1.1 准备两台centos7.91.2 nginx 与 keepalived软件 双机安装1.3 ip分配1.4 修改主机名1.5 关闭selinux&#xff08;双机执行&#xff09;1.6 修改hosts&#xff08;双机执行&#xff09;二、安装keepalived2.1 执行一下命令安装keepa…

MidiaPipe +stgcn(时空图卷积网络)实现人体姿态判断(单目标)

文章目录前言Midiapipe关键点检测stgcn 姿态评估效果前言 冒个泡&#xff0c;年少无知吹完的牛皮是要还的呀。 那么这里的话要做的一个东西就是一个人体的姿态判断&#xff0c;比如一个人是坐着还是站着还是摔倒了&#xff0c;如果摔倒了我们要做什么操作&#xff0c;之类的。…

【模型复现】-alexnet,nn.Sequential顺序结构构建网络

深度卷积神经网络&#xff08;AlexNet&#xff09; 在LeNet提出后的将近20年里&#xff0c;神经网络一度被其他机器学习方法超越&#xff0c;如支持向量机。虽然LeNet可以在早期的小数据集上取得好的成绩&#xff0c;但是在更大的真实数据集上的表现并不尽如人意。一方面&#…

第五章 事务管理

1.事务概念 *什么是事务&#xff1a;事务是数据库操作最基本单元&#xff0c;逻辑上是一组操作&#xff0c;要么都成功&#xff0c;要么都失败 *事务的特性&#xff08;ACID&#xff09;&#xff1a;原子性、隔离性、一致性、持久性 2.搭建事务操作环境 *模拟场景&#xff…

uart串口接收模块

uart串口接收模块 1、UART&#xff08;异步串行接口&#xff09; 串行通信&#xff1a;指利用一条数据线将资料一位位的顺序传输。   异步通信&#xff1a;以一个字符为传输单位&#xff0c;通信中两个字符间的时间间隔是不固定的&#xff0c;然而在同一个字符的两个相邻位代…

【微信小程序】-- 页面事件 - 下拉刷新(二十五)

&#x1f48c; 所属专栏&#xff1a;【微信小程序开发教程】 &#x1f600; 作  者&#xff1a;我是夜阑的狗&#x1f436; &#x1f680; 个人简介&#xff1a;一个正在努力学技术的CV工程师&#xff0c;专注基础和实战分享 &#xff0c;欢迎咨询&#xff01; &…

高盐废水除钙镁的技术解析

高盐废水指含有机物和至少总溶解固体(totaldissolvedsolids&#xff0c;tds)的质量分数大于3.5&#xff05;的废水&#xff0c;具有水量大&#xff0c;无机盐离子k、na、ca2、mg2、cl-、so42-等含量高&#xff0c;水质水量变化大&#xff0c;成分复杂&#xff0c;难生化降解等特…

2023年中职网络安全竞赛——CMS网站渗透解析

需求环境可私信博主 解析如下: CMS网站渗透 任务环境说明: 服务器场景:Server2206(关闭链接) 服务器场景操作系统:未知 1.使用渗透机对服务器信息收集,并将服务器中网站服务端口号作为flag提交; Flag:8089

华为套件生态

华为套件生态前言蓝牙设备华为耳机华为鼠标智慧互联超级终端多屏协同远程访问文件共享华为电脑管家我的设备控制中心前言 华为的手机、平板、电脑、耳机、手环、手表等设备可以组成华为生态。以下分享一些生态体验。 蓝牙设备 华为耳机 快速连接 在手机/电脑附近打开华为耳…

里奇RIDGID管线定位仪/探测仪维修SR-20 SR-24 SR-60

美国里奇SeekTech SR-20管线定位仪对于初次使用定位仪的用户或经验丰富的用户&#xff0c;都同样可以轻易上手使用SR-20。SR-20提供许多设置和参数&#xff0c;使得大多数复杂的定位工作变得很容易。此外&#xff0c;当你在不复杂的环境下完成些基本的定位工作时&#xff0c;这…

软件测试7

一 CS和BS软件架构 CS&#xff1a;客户端-服务器端&#xff0c;BS&#xff1a;浏览器端-服务器端 区别总结&#xff1a; 1.效率&#xff1a;c/s效率高&#xff0c;某些内容已经安装在系统中了&#xff0c;b/s每次都要加载最新的数据 2.升级&#xff1a;b/s无缝升级&#xff0c…

【Maven】(五)Maven模块的继承与聚合 多模块项目组织构建

文章目录1.前言2.模块的继承2.1.可继承的标签2.2.超级POM2.3.手动引入自定义父POM3.模块的聚合3.1.聚合的注意事项3.2.反应堆(reactor)4.依赖管理及属性配置4.1.依赖管理4.2.属性配置5.总结1.前言 本系列文章记录了 Maven 从0开始到实战的过程&#xff0c;Maven 系列历史文章清…

三天吃透SpringMVC面试八股文

本文已经收录到Github仓库&#xff0c;该仓库包含计算机基础、Java基础、多线程、JVM、数据库、Redis、Spring、Mybatis、SpringMVC、SpringBoot、分布式、微服务、设计模式、架构、校招社招分享等核心知识点&#xff0c;欢迎star~ Github地址&#xff1a;https://github.com/…

【操作系统原理实验】命令解释器模拟实现

选择一种高级语言如C/C等&#xff0c;编写一类似于DOS、UNIX中的命令行解释程序。 1)设计系统命名行提示符&#xff1b; 2)自定义命令集&#xff08;8-10个&#xff09;&#xff1b; 3)用户输入help命令以查找命令的帮助&#xff1b; 4)列出命令的功能&#xff0c;区分内部命令…

fuse文件系统调试环境

libfuse源码&#xff1a;GitHub - libfuse/libfuse: The reference implementation of the Linux FUSE (Filesystem in Userspace) interface 一、ubuntu20.04挂载fuse文件系统 1&#xff0c;安装编译工具 apt install ninja-build apt install meson apt install build-ess…

4. C#语法基础

一、cs文件结构 上面程序的各个部分说明如下&#xff1a; 程序的第一行using System; 其中【using】关键字用于在程序中包含 System 命名空间。一个程序一般有多个 using 语句。程序的第七行是 namespace 声明。一个 namespace 是一系列的类&#xff0c;MyFirstWinFormApp 命名…

SQL语句大全(MySQL入门到精通——基础篇)(基础篇——进阶篇——运维篇)

文章目录前言MySQL——基础篇一、SQL分类二、图形化界面工具三、DDL&#xff08;Data Definition Language|数据定义语言&#xff09;1.SQL-DDL-数据库操作2.SQL-DDL-表操作&查询3.SQL-DDL-数据类型3.SQL-DDL-表操作-修改&删除四、DML&#xff08;Data Manipulation La…

经典模型LeNet跑Fashion-MNIST 代码解析

测试6.6. 卷积神经网络&#xff08;LeNet&#xff09; — 动手学深度学习 2.0.0 documentation import torch from torch import nn from d2l import torch as d2lnet nn.Sequential(#输入通道1表示黑白 输出通道6表示6组取不同特征的卷积核 因为卷积核是5*5,原始图片单通道黑…

面向对象设计模式:行为型模式之模板方法模式

一、模板方法引入&#xff1a;泡茶与冲咖啡 泡茶 烧水泡茶倒入杯子加入柠檬 冲咖啡 烧水冲咖啡倒入杯子加入牛奶和糖 二、模板方法&#xff0c;TemplateMethod 2.1 Intent 意图 Define the skeleton of an algorithm in an operation, deferring some steps to lets subclas…