Redis学习——高级篇④
- = = = = = = Redis7高级之Redis与Mysql数据双写一致性工程案例(四)= = = = = =
- 4.1 MySQL主从复制原理
- 4.2 canal 工作原理
- 4.3 mySQL->canal->redis 双写一致性
- 1.环境
- 2.配置Mysql
- 3.配置canal
- 4. Canal客户端(Java编写)
- 1.SQL脚本(随便找个数据库)
- 2.建Module
- 3.改POM
- 4.改YML
- 5.启动类
- 6.业务类
= = = = = = Redis7高级之Redis与Mysql数据双写一致性工程案例(四)= = = = = =
4.1 MySQL主从复制原理
MySQL的主从复制
- 当
Master
主服务器上的数据发生改变时,将其改变写入二进制事件日志文件中 Slave
从服务器会在一定时间间隔内对master
主服务器上的二进制日志进行探测,探测其是否发生过改变- 如果探测到
master
主服务器的二进制时间发生了改变,则开始一个I/O Thread
请求 master 二进制事件日志
- 如果探测到
- 同时
master
主服务器为每个I/O Thread
启动一个dump Thread
,用于向其发送二进制事件日志 slave
从服务器将接收到的二进制事件日志保存至自己本地的中继日志文件中slave
从服务器将启动SQL Thread
从中继日志中读取二进制日志,在本地重放,使得其数据和主服务器保持一致- 最后
I/O Thread
和SQL Thread
将进入睡眠状态,等待下一次被唤醒
4.2 canal 工作原理
- canal 模拟 Mysql slave 的交互协议,将自己作为 Mysql slave ,向 Mysql master 发送 dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave(即cancal)
- canal 解析 binary log 对象(原始为 byte 流)
4.3 mySQL->canal->redis 双写一致性
1.环境
linux的mysql、canal、redis,其中canal和redis在一台机器,mysql单独一个机器
2.配置Mysql
别忘了 开启
start net mysql80
- 查看mysql版本
- 当前的主机二进制日志——
show master status;
-
查看
SHOW VARIABLES LIKE 'log_bin';
如果是on跳过下一步!
-
开启
MySQL
的binlog
写入功能
做好备份
my.cnf
[client]
default_character_set=utf8
[mysqld]
log-bin=mysql-bin #开启 binlog
binlog-format=ROW #选择 ROW 模式
server_id=1 #配置MySQL replaction需要定义,不要和canal的 slaveId重复
collation_server = utf8_general_ci
character_set_server = utf8
修改
-
重启
mysql
-
再次查看
SHOW VARIABLES LIKE 'log _bin'
;
-
授权
canal
连接MySQL
账号- mysql默认的用户在mysq|库的user表里
- 默认没有canal账户,此处新建+授权
DROP USER IF EXISTS 'canal'@'%';
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
FLUSH PRIVILEGES;
SELECT * FROM mysql.user;
上面是教学给的 但是我报错 我这样写的 才过的 看大家自己测试的时候,也希望大家要到原因留言告诉俺一声 😀
3.配置canal
canal服务端
-
下载canal.deployer-1.1.7.tar.gz
- Releases · alibaba/canal (github.com)
-
解压 tar -zxvf canal.deployer-1.1.7.tar.gz
-
配置
-
修改 /mycanal/conf/example 路径下的 instance.properties 文件
-
-
启动
-
在 /mycanal/bin 路径下执行 ./startup.sh
必须要有 java 8 的环境
-
等我去装一下 java的环境!!!
- 看是否成功启动(报错记录:如果canal启动后没有这两个文件,多半是内存不够了,记着扩一下内存)
你可以通过以下步骤来查看 Canal 使用的 Java 是 32 位还是 64 位:
-
查看Canal启动脚本: 打开 Canal 启动脚本(通常是
.sh
或.bat
文件),寻找包含 Java 启动命令的行。在这一行中,你应该能够找到 Java 的路径。如果是 Linux 系统,启动脚本可能以
.sh
结尾。打开该文件,找到启动 Canal 的地方。如果是 Windows 系统,启动脚本可能以
.bat
结尾。同样,打开该文件,找到启动 Canal 的地方。 -
在启动脚本中找到 Java 启动命令: 查找包含 Java 启动命令的那一行,这可能类似于:
java -server -Xms512m -Xmx1024m -XX:MaxPermSize=256m -Djava.awt.headless=true ...
或
java -server -Xms512m -Xmx1024m -XX:MaxPermSize=256m -Djava.awt.headless=true ...
这里的
java
后面就是 Java 的执行路径。 -
查看 Java 的执行路径: 执行路径应该类似于
/path/to/java/bin/java
或C:\path\to\java\bin\java.exe
。在这个路径中,你可以看到 Java 的安装目录。 -
查看 Java 的位数: 进入 Java 安装目录,找到
java
可执行文件(在 Linux 上可能是java
,在 Windows 上可能是java.exe
)。然后,在命令行中运行以下命令:在 Linux 上:
file /path/to/java/bin/java
在 Windows 上:
file C:\path\to\java\bin\java.exe
这将显示 Java 可执行文件的详细信息,包括 32 位还是 64 位。
通过以上步骤,你应该能够确定 Canal 使用的 Java 是 32 位还是 64 位。如果你发现它使用了不正确的 Java 版本,你可能需要更改启动脚本或配置以使用正确版本的 Java。
- 如果下图是这样,也就是java版本太高了
“Unrecognized VM option ‘AggressiveOpts’” 错误通常发生在Java虚拟机(JVM)遇到不认识的选项时。在这种情况下,似乎选项 “AggressiveOpts” 在你使用的JVM中不被支持。
“AggressiveOpts” 是较旧版本Java中的一个实验性选项,但在最近的Java版本中已被移除。如果你使用的是较新版本的Java,这个选项可能不再受支持。
解决这个问题的步骤如下:
-
删除 ‘AggressiveOpts’ 选项: 打开用于启动Canal的脚本或配置文件,并找到Java命令行。从命令行中删除 “-XX:+AggressiveOpts” 选项。
-
删除 ‘UseBiasedLocking’ 选项: 打开用于启动Canal的脚本或配置文件,并找到Java命令行。从命令行中删除 “-XX:+UseBiasedLocking” 选项。
-
检查Java版本: 确保你使用的是一个受支持的Java版本。你可以通过在终端或命令提示符中运行以下命令来检查你的Java版本:
java -version
如果你使用的是太旧或太新的Java版本,请考虑更新或降级到与Canal兼容的版本。
-
检查JVM选项: 检查并更新Canal启动脚本或配置中的其他JVM选项,以确保它们与你使用的Java版本兼容。
如果问题仍然存在,或者如果你遇到其他问题,请提供有关你的环境、所使用的Java版本以及任何相关的Canal配置文件或启动脚本的更多详细信息,以便我能够提供更具体的帮助。
-
查看server日志
-
tmd 终于好了!!!!!
-
查看样例example的日志
4. Canal客户端(Java编写)
Redis用RedisTemplate
1.SQL脚本(随便找个数据库)
CREATE TABLE `t_user` (
`id` BIGINT(20) NOT NULL AUTO_INCREMENT,
`userName` VARCHAR(100) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=INNODB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4
2.建Module
canal_demo
3.改POM
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.14</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.xfcy</groupId>
<artifactId>canal_demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>canal_demo</name>
<description>canal_demo</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<junit.version>4.12</junit.version>
<log4j.version>1.2.17</log4j.version>
<lombok.version>1.16.18</lombok.version>
<mysql.version>5.1.47</mysql.version>
<druid.version>1.1.16</druid.version>
<mapper.version>4.1.5</mapper.version>
<mybatis.spring.boot.version>1.3.0</mybatis.spring.boot.version>
</properties>
<dependencies>
<!--canal-->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
<!--SpringBoot通用依赖模块-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!--swagger2-->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
<!--SpringBoot与Redis整合依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<!--SpringBoot与AOP-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
<!--Mysql数据库驱动-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<!--SpringBoot集成druid连接池-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.10</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>${druid.version}</version>
</dependency>
<!--mybatis和springboot整合-->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>${mybatis.spring.boot.version}</version>
</dependency>
<!--通用基础配置junit/devtools/test/log4j/lombok/hutool-->
<!--hutool-->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.2.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<optional>true</optional>
</dependency>
<!--persistence-->
<dependency>
<groupId>javax.persistence</groupId>
<artifactId>persistence-api</artifactId>
<version>1.0.2</version>
</dependency>
<!--通用Mapper-->
<dependency>
<groupId>tk.mybatis</groupId>
<artifactId>mapper</artifactId>
<version>${mapper.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
4.改YML
server.port=5555
# ========================alibaba.druid=====================
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://192.168.238.130:3306/bigdata?useUnicode=true&characterEncoding=utf-8&useSSL=false
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.druid.test-while-idle=false
5.启动类
这个是一个模板不用启动
package com.xfcy.canal_demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class CanalDemoApplication {
public static void main(String[] args) {
//SpringApplication.run(CanalDemoApplication.class, args);
}
}
6.业务类
RedisUtils类
package com.xfcy.canal_demo.util;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class RedisUtils {
public static final String REDIS_IP_ADDR = "192.168.238.110";
public static final String REDIS_pwd = "111111";
public static JedisPool jedisPool;
static {
JedisPoolConfig jedisPoolConfig=new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(20);
jedisPoolConfig.setMaxIdle(10);
jedisPool=new JedisPool(jedisPoolConfig,REDIS_IP_ADDR,6379,10000,REDIS_pwd);
}
public static Jedis getJedis() throws Exception {
if(null!=jedisPool){
return jedisPool.getResource();
}
throw new Exception("Jedispool is not ok");
}
}
RedisCanalClientExample
package com.xfcy.canal_demo.biz;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.xfcy.canal_demo.util.RedisUtils;
import redis.clients.jedis.Jedis;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
public class RedisCanalClientExample {
public static final Integer _60SECONDS = 60;
public static final String REDIS_IP_ADDR = "192.168.238.130";
private static void redisInsert(List<Column> columns)
{
JSONObject jsonObject = new JSONObject();
for (Column column : columns)
{
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
jsonObject.put(column.getName(),column.getValue());
}
if(columns.size() > 0)
{
try(Jedis jedis = RedisUtils.getJedis())
{
jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
}catch (Exception e){
e.printStackTrace();
}
}
}
private static void redisDelete(List<Column> columns)
{
JSONObject jsonObject = new JSONObject();
for (Column column : columns)
{
jsonObject.put(column.getName(),column.getValue());
}
if(columns.size() > 0)
{
try(Jedis jedis = RedisUtils.getJedis())
{
jedis.del(columns.get(0).getValue());
}catch (Exception e){
e.printStackTrace();
}
}
}
private static void redisUpdate(List<Column> columns)
{
JSONObject jsonObject = new JSONObject();
for (Column column : columns)
{
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
jsonObject.put(column.getName(),column.getValue());
}
if(columns.size() > 0)
{
try(Jedis jedis = RedisUtils.getJedis())
{
jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
System.out.println("---------update after: "+jedis.get(columns.get(0).getValue()));
}catch (Exception e){
e.printStackTrace();
}
}
}
public static void printEntry(List<Entry> entrys)
{
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
//获取变更的row数据
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(),e);
}
//获取变动类型
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.INSERT) {
redisInsert(rowData.getAfterColumnsList());
} else if (eventType == EventType.DELETE) {
redisDelete(rowData.getBeforeColumnsList());
} else {//EventType.UPDATE
redisUpdate(rowData.getAfterColumnsList());
}
}
}
}
public static void main(String[] args)
{
System.out.println("---------O(∩_∩)O哈哈~ initCanal() main方法-----------");
//=================================
// 创建链接canal服务端
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(REDIS_IP_ADDR,
11111), "example", "", ""); // 这里用户名和密码如果在这写了,会覆盖canal配置文件的账号密码,如果不填从配置文件中读
int batchSize = 1000;
//空闲空转计数器
int emptyCount = 0;
System.out.println("---------------------canal init OK,开始监听mysql变化------");
try {
connector.connect();
//connector.subscribe(".*\\..*");
connector.subscribe("db01.t_user"); // 设置监听哪个表
connector.rollback();
int totalEmptyCount = 10 * _60SECONDS;
while (emptyCount < totalEmptyCount) {
System.out.println("我是canal,每秒一次正在监听:"+ UUID.randomUUID().toString());
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
} else {
//计数器重新置零
emptyCount = 0;
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
System.out.println("已经监听了"+totalEmptyCount+"秒,无任何消息,请重启重试......");
} finally {
connector.disconnect();
}
}
}
java程序下connectors.subscribe 配置的过滤正则
A | B |
---|---|
全库全表 | connector.subscribe(". *\\..*") |
指定库全表 | connector.subscribe("test1..*") |
单表 | connector.subscribe("test.user") |
多规则组合使用 | connector.subscribe("test\\..*,test2.user1,test3.user2") |
关闭资源代码简写
try-with-resource释放资源(语法糖)
jdk1.7后增加了try-with-resources
,他是一个声明一个或多个资源的ty语句。一个资源作为一个对象,必须在程序结束之后关闭。try-with-resources
语句确保在语句的最后每个资源都被关闭,任何实现了java.lang AutoCloseable
和java.io.Closeable
的对象都可以使用try-with-resource
来实现异常处理Q和关闭资源。