- canal需要java8
去官网下载java8
安装JAVA
#创建目录
mkdir -p /usr/local/java/
#解压到目录
tar zxvf jdk-8u411-linux-x64.tar.gz -C /usr/local/java/
配置环境变量在 /etc/profile 最后加入
export JAVA_HOME=/usr/local/java/jdk1.8.0_411
export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$PATH:$JAVA_HOME/bin
使之生效
source /etc/profile
查看是否安装成功
java -version
设置mysql用户权限
#创建用户名和密码都为 canal 的用户
create user 'canal'@'%' identified by 'canal';
#授予该用户对所有数据库和表的查询、复制主节点数据的操作权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES; #重新加载权限
修改mysql配置
vim /etc/my.cnf
修改部分内容如下
# 开启 binlog
log-bin=mysql-bin
#master端的ID号,不能和 canal 的 slaveId 重复;
server-id=1
#行级,记录每次操作后每行记录的变化。
binlog-format=row
#指定库,缩小监控的范围。
binlog-do-db=test
查看是否开启主从
show master status;
安装canal
下载最新canal
下载
wget https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz
#创建canal目录
mkdir -p /usr/local/canal/
#解压到canal目录
tar -zxvf canal.deployer-1.1.7.tar.gz -C /usr/local/canal/
查看canal主配置文件
cat /usr/local/canal/conf/canal.properties
查看会看到暴露了三个端口和指定了一个实例
canal.admin.port = 11110
canal.port = 11111
canal.metrics.pull.port = 11112
#多个实例使用逗号分隔: canal.destinations = example1,example2,
#这里对应/usr/local/canal/conf/example/
canal.destinations = example
canal.destinations:canal能可以收集多个MySQL数据库数据,每个MySQL数据库都有独立的配置文件控制。具体配置规则: conf/目录下,使用文件夹放置,文件夹名代表一个MySQL实例。canal.destinations用于配置需要监控数据的数据库。如果是多个,使用,隔开。
修改实例配置文件
修改实例配置
vim /usr/local/canal/conf/example/instance.properties
在文件最后加入
#配置 slaveId ,不能等于 mysql 配置里的 server Id 即可
canal.instance.mysql.slaveId=10
#数据库连接
canal.instance.master.address=127.0.0.1:3306
#数据库账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
#代表数据库的编码方式
canal.instance.connectionCharset = UTF-8
#设置白名单,这里是监控test库下所有表
canal.instance.filter.regex=test\\..*
#设置黑名单,这里设置排除test库下以_noc结尾的表
canal.instance.filter.black.regex=test\\..*_noc
- 这个正则表达式 test\.*_noc 的含义是:
test:精确匹配字符串 "test"。
\\.:匹配一个点(.),因为点在正则表达式中有特殊含义,所以需要使用 \\ 进行转义。
*:匹配零个或多个任意字符。
_noc:精确匹配字符串 "_noc"。
因此,这个正则表达式可以用来匹配所有以 test. 开头且以 _noc 结尾的字符串。在 Canal 的配置中使用这个正则表达式,就可以实现排除以 test. 开头且以 _noc 结尾的数据库和表,而监控其他数据库和表。
比如我们要查询 test库下users表id为3的用户信息,sql语句可以这么写
SELECT * FROM test.users WHERE id = 3;
这里的test.users就是正则要匹配的地方
如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false
启动和停止
#启动
/usr/local/canal/bin/startup.sh
#停止
/usr/local/canal/bin/stop.sh
查看是否启动成功
ps -ef | grep canal
php测试
使用 canal-php
composer require xingwenge/canal_php
新建index.php文件
<?php
require __DIR__.'/vendor/autoload.php';
use xingwenge\canal_php\CanalClient;
use xingwenge\canal_php\CanalConnectorFactory;
use xingwenge\canal_php\Fmt;
try {
$client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE);
# $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SWOOLE);
$client->connect("127.0.0.1", 11111);
$client->checkValid();
//设置过滤tes库t下的所有表
$client->subscribe("1001", "example", "test.*");
while (true) {
$message = $client->get(100);
if ($entries = $message->getEntries()) {
foreach ($entries as $entry) {
Fmt::println($entry);
}
}
sleep(1);
}
$client->disConnect();
} catch (\Exception $e) {
echo $e->getMessage(), PHP_EOL;
}
运行php文件
php index.php
- eventType:1、是新增行,2、修改行,3、删除行、4、新增表
也可以直接将数据写入rabbitmq
修改conf/canal.properties
vim conf/canal.properties
修改RabbitMQ配置如下
rabbitmq.host = 127.0.0.1
rabbitmq.virtual.host = /
#交换机名称
rabbitmq.exchange =exchange.canal
#队列名称
rabbitmq.queue = canal_queue
#路由键名
rabbitmq.routingKey = canal-routing-key
#账号密码
rabbitmq.username =admin
rabbitmq.password =admin
#路由模式,这里是严格模式
rabbitmq.deliveryMode =direct
修改实例配置conf/example/instance.properties
vim conf/example/instance.properties
修改部分内容如下
#rabbitmq的路由键配置
#这里与canal.properties里的rabbitmq.routingKey一样
canal.mq.topic=canal-routing-key
canal.mq.partition=0
canal.instance.multi.stream.on=false
#rabbitmq 的 routing key
#dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
#hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\..*