基于 HBase Phoenix 构建实时数仓(5)—— 用 Kafka Connect 做实时数据同步

news2024/12/23 12:16:41

目录

一、总体架构

二、安装配置 MySQL

1. 创建 mysql 用户

2. 建立 MySQL 使用的目录

3. 解压安装包

4. 配置环境变量

5. 创建 MySQL 配置文件

6. MySQL 系统初始化

7. 启动 mysql 服务器

8. 创建 dba 用户

三、配置 MySQL 主从复制

四、安装部署 Kafka Connector

1. 创建插件目录

2. 解压文件到插件目录

3. 配置 Kafka Connector

(1)配置属性文件

(2)分发到其它节点

(3)以 distributed 方式启动

(4)确认 connector 插件和自动生成的 topic

4. 创建 source connector

(1)创建源 mysql 配置文件

(2)创建 mysql source connector

5. 创建 sink connector

(1)创建目标 hbase 配置文件

(2)创建 hbase sink connector

6. 存量数据自动同步

7. 实时数据同步测试

参考:


        本篇演示安装配置 Kafka connect 插件实现 MySQL 到 Hbase 的实时数据同步。依赖环境见本专栏前面文章。相关软件版本如下:

  • JDK:11.0.22
  • MySQL:8.0.16
  • HBase:2.5.7
  • debezium-connector-mysql:2.4.2
  • kafka-connect-hbase:2.0.13

一、总体架构

        总体结构如下图所示。

        下表描述了四个节点上分别将会运行的相关进程。简便起见,安装部署过程中所用的命令都使用操作系统的 root 用户执行。

                                             节点

进程

node1

node2

node3

node4

debezium-connector-mysql

*

*

*

kafka-connect-hbase

*

*

*

        另外在 172.18.16.156 上安装 MySQL,并启动两个实例做主从复制,主库实例用3306端口,从库实例用3307端口。

        所需安装包:

  • mysql-8.0.16-linux-glibc2.12-x86_64.tar.xz
  • debezium-debezium-connector-mysql-2.4.2.zip
  • confluentinc-kafka-connect-hbase-2.0.13.zip

        这里使用的 debezium connector 版本需要 JDK 11 以上支持。在安装了多个 JDK 版本的环境中,可以使用 alternatives 命令选择需要的版本:

[root@vvgg-z2-music-mysqld~]#alternatives --config java

共有 5 个程序提供“java”。

  选择    命令
-----------------------------------------------
   1           /usr/lib/jvm/jre-1.7.0-openjdk.x86_64/bin/java
   2           /usr/lib/jvm/jre-1.6.0-openjdk.x86_64/bin/java
 + 3           /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.51-0.b16.el6_6.x86_64/jre/bin/java
   4           /usr/lib/jvm/jre-1.5.0-gcj/bin/java
*  5           /usr/lib/jvm/jdk-11-oracle-x64/bin/java

按 Enter 来保存当前选择[+],或键入选择号码:5
[root@vvgg-z2-music-mysqld~]#java -version
java version "11.0.22" 2024-01-16 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.22+9-LTS-219)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.22+9-LTS-219, mixed mode)
[root@vvgg-z2-music-mysqld~]#

        在 172.18.16.156 的 /etc/hosts 文件中加入 Kafka 集群主机名:

# 编辑文件
vim /etc/hosts

        添加以下内容:

172.18.4.126    node1
172.18.4.188    node2
172.18.4.71    node3
172.18.4.86    node4

二、安装配置 MySQL

        安装配置 MySQL 一主一从双实例。

1. 创建 mysql 用户

# root 用于执行
useradd mysql
passwd mysql

2. 建立 MySQL 使用的目录

# 创建数据目录,确保数据目录 mysqldata 为空
mkdir -p /data/3306/mysqldata

# 创建 binlog 目录
mkdir -p /data/3306/dblog

# 创建临时目录
mkdir -p /data/3306/tmp

# 修改目录属主为 mysql
chown -R mysql:mysql /data

# 使用 mysql 用户执行下面的安装过程
su - mysql

3. 解压安装包

# 进入安装目录
cd ~

# 从tar包中把提取文件
tar xvf mysql-8.0.16-linux-glibc2.12-x86_64.tar.xz

# 建立软连接
ln -s mysql-8.0.16-linux-glibc2.12-x86_64 mysql-8.0.16

4. 配置环境变量

# 将 MySQL 可执行文件所在目录添加到 $PATH 环境变量中
# 编辑文件
vim ~/.bash_profile

# 修改或添加如下两行
PATH=$PATH:$HOME/.local/bin:$HOME/bin:/home/mysql/mysql-8.0.16/bin
export PATH

# 使配置生效
source ~/.bash_profile

5. 创建 MySQL 配置文件

# 编辑文件
vim /home/mysql/my_3306.cnf

        文件内容如下:

[mysqld]
max_allowed_packet=1G
log_timestamps=system
binlog_transaction_dependency_tracking  = WRITESET
transaction_write_set_extraction        = XXHASH64

binlog_expire_logs_seconds=259200
lower_case_table_names=1
secure_file_priv=''
log_bin_trust_function_creators=on
character-set-server = utf8mb4
default_authentication_plugin=mysql_native_password
basedir=/home/mysql/mysql-8.0.16-linux-glibc2.12-x86_64
datadir=/data/3306/mysqldata
socket=/data/3306/mysqldata/mysql.sock

wait_timeout=30
innodb_buffer_pool_size = 16G
max_connections = 1000

default-time-zone = '+8:00'

port = 3306
skip-name-resolve 
user=mysql

innodb_print_all_deadlocks=1
log_output='table'
slow_query_log = 1
long_query_time = 1

tmp_table_size = 32M

# 开启 binlog
log-bin=/data/3306/dblog/mysql-bin
log-bin-index = /data/3306/dblog/mysql-bin.index 

tmpdir = /data/3306/tmp

server-id = 1563306

innodb_data_file_path = ibdata1:1G:autoextend
innodb_data_home_dir = /data/3306/mysqldata

innodb_log_buffer_size = 16M
innodb_log_file_size = 1G
innodb_log_files_in_group = 3
innodb_log_group_home_dir=/data/3306/dblog
innodb_max_dirty_pages_pct = 90
innodb_lock_wait_timeout = 120

gtid-mode = on
enforce_gtid_consistency=true

local_infile=0
log_error='/data/3306/mysqldata/master.err'
skip_symbolic_links=yes

[mysqldump]
quick
max_allowed_packet = 1G

[mysqld_safe]
open-files-limit = 8192

6. MySQL 系统初始化

mysqld --defaults-file=/home/mysql/my_3306.cnf --initialize

7. 启动 mysql 服务器

mysqld_safe --defaults-file=/home/mysql/my_3306.cnf &

8. 创建 dba 用户

# 连接 mysql 服务器
mysql -u root -p -S /data/3306/mysqldata/mysql.sock

-- 修改 root 用户密码
alter user user() identified by "123456";

-- 创建一个新的 dba 账号
create user 'dba'@'%' identified with mysql_native_password by '123456';
grant all on *.* to 'dba'@'%' with grant option;

        重复执行 2 - 8 步,将 3306 换成 3307,创建从库实例。

三、配置 MySQL 主从复制

        3306 主库实例执行:

-- 查看复制位点
show master status;
-- 创建复制用户并授权
create user 'repl'@'%' identified with mysql_native_password by '123456';
grant replication client,replication slave on *.* to 'repl'@'%';
-- 创建测试库表及数据
create database test;
create table test.t1 (
  id bigint(20) not null auto_increment,
  remark varchar(32) default null comment '备注',
  createtime timestamp not null default current_timestamp comment '创建时间',
  primary key (id));
insert into test.t1 (remark) values ('第一行:row1'),('第二行:row2'),('第三行:row3');

        输出:

mysql> show master status;
+------------------+----------+--------------+------------------+------------------------------------------+
| File             | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set                        |
+------------------+----------+--------------+------------------+------------------------------------------+
| mysql-bin.000001 |      977 |              |                  | ba615057-e11c-11ee-b80e-246e961c91f8:1-3 |
+------------------+----------+--------------+------------------+------------------------------------------+
1 row in set (0.00 sec)

mysql> create user 'repl'@'%' identified with mysql_native_password by '123456';
Query OK, 0 rows affected (0.01 sec)

mysql> grant replication client,replication slave on *.* to 'repl'@'%';
Query OK, 0 rows affected (0.00 sec)

mysql> create database test;
Query OK, 1 row affected (0.00 sec)

mysql> create table test.t1 (
    ->   id bigint(20) not null auto_increment,
    ->   remark varchar(32) default null comment '备注',
    ->   createtime timestamp not null default current_timestamp comment '创建时间',
    ->   primary key (id));
Query OK, 0 rows affected (0.01 sec)

mysql> insert into test.t1 (remark) values ('第一行:row1'),('第二行:row2'),('第三行:row3');
Query OK, 3 rows affected (0.00 sec)
Records: 3  Duplicates: 0  Warnings: 0

        3307 从库实例执行:

change master to
master_host='172.18.16.156',
master_port=3306,
master_user='repl',
master_password='123456',
master_log_file='mysql-bin.000001',
master_log_pos=977;

start slave;
show slave status\G
select user,host from mysql.user;
select * from test.t1;

输出:
mysql> change master to
    -> master_host='172.18.16.156',
    -> master_port=3306,
    -> master_user='repl',
    -> master_password='123456',
    -> master_log_file='mysql-bin.000001',
    -> master_log_pos=977;
Query OK, 0 rows affected, 2 warnings (0.00 sec)

mysql> start slave;
Query OK, 0 rows affected (0.01 sec)

mysql> show slave status\G
*************************** 1. row ***************************
               Slave_IO_State: Waiting for master to send event
                  Master_Host: 172.18.16.156
                  Master_User: repl
                  Master_Port: 3306
                Connect_Retry: 60
              Master_Log_File: mysql-bin.000001
          Read_Master_Log_Pos: 2431
               Relay_Log_File: vvgg-z2-music-mysqld-relay-bin.000002
                Relay_Log_Pos: 1776
        Relay_Master_Log_File: mysql-bin.000001
             Slave_IO_Running: Yes
            Slave_SQL_Running: Yes
              Replicate_Do_DB: 
          Replicate_Ignore_DB: 
           Replicate_Do_Table: 
       Replicate_Ignore_Table: 
      Replicate_Wild_Do_Table: 
  Replicate_Wild_Ignore_Table: 
                   Last_Errno: 0
                   Last_Error: 
                 Skip_Counter: 0
          Exec_Master_Log_Pos: 2431
              Relay_Log_Space: 1999
              Until_Condition: None
               Until_Log_File: 
                Until_Log_Pos: 0
           Master_SSL_Allowed: No
           Master_SSL_CA_File: 
           Master_SSL_CA_Path: 
              Master_SSL_Cert: 
            Master_SSL_Cipher: 
               Master_SSL_Key: 
        Seconds_Behind_Master: 0
Master_SSL_Verify_Server_Cert: No
                Last_IO_Errno: 0
                Last_IO_Error: 
               Last_SQL_Errno: 0
               Last_SQL_Error: 
  Replicate_Ignore_Server_Ids: 
             Master_Server_Id: 1563306
                  Master_UUID: ba615057-e11c-11ee-b80e-246e961c91f8
             Master_Info_File: mysql.slave_master_info
                    SQL_Delay: 0
          SQL_Remaining_Delay: NULL
      Slave_SQL_Running_State: Slave has read all relay log; waiting for more updates
           Master_Retry_Count: 86400
                  Master_Bind: 
      Last_IO_Error_Timestamp: 
     Last_SQL_Error_Timestamp: 
               Master_SSL_Crl: 
           Master_SSL_Crlpath: 
           Retrieved_Gtid_Set: ba615057-e11c-11ee-b80e-246e961c91f8:4-8
            Executed_Gtid_Set: ba615057-e11c-11ee-b80e-246e961c91f8:4-8,
c2df1946-e11c-11ee-8026-246e961c91f8:1-3
                Auto_Position: 0
         Replicate_Rewrite_DB: 
                 Channel_Name: 
           Master_TLS_Version: 
       Master_public_key_path: 
        Get_master_public_key: 0
            Network_Namespace: 
1 row in set (0.00 sec)

mysql> select user,host from mysql.user;
+------------------+-----------+
| user             | host      |
+------------------+-----------+
| dba              | %         |
| repl             | %         |
| mysql.infoschema | localhost |
| mysql.session    | localhost |
| mysql.sys        | localhost |
| root             | localhost |
+------------------+-----------+
6 rows in set (0.00 sec)

mysql> select * from test.t1;
+----+------------------+---------------------+
| id | remark           | createtime          |
+----+------------------+---------------------+
|  1 | 第一行:row1     | 2024-03-20 10:25:32 |
|  2 | 第二行:row2     | 2024-03-20 10:25:32 |
|  3 | 第三行:row3     | 2024-03-20 10:25:32 |
+----+------------------+---------------------+
3 rows in set (0.00 sec)

        MySQL主从复制相关配置参见“配置异步复制”。

四、安装部署 Kafka Connector

        在 node2 上执行以下步骤。

1. 创建插件目录

mkdir $KAFKA_HOME/plugins

2. 解压文件到插件目录

# debezium-connector-mysql
unzip debezium-debezium-connector-mysql-2.4.2.zip -d $KAFKA_HOME/plugins/
# kafka-connect-hbase
unzip confluentinc-kafka-connect-hbase-2.0.13.zip -d $KAFKA_HOME/plugins/

3. 配置 Kafka Connector

(1)配置属性文件

# 编辑 connect-distributed.properties 文件
vim $KAFKA_HOME/config/connect-distributed.properties

        内容如下:

bootstrap.servers=node2:9092,node3:9092,node4:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=3
config.storage.topic=connect-configs
config.storage.replication.factor=3
status.storage.topic=connect-status
status.storage.replication.factor=3
status.storage.partitions=3
offset.flush.interval.ms=10000
plugin.path=/root/kafka_2.13-3.7.0/plugins

(2)分发到其它节点

scp $KAFKA_HOME/config/connect-distributed.properties node3:$KAFKA_HOME/config/
scp $KAFKA_HOME/config/connect-distributed.properties node4:$KAFKA_HOME/config/
scp -r $KAFKA_HOME/plugins node3:$KAFKA_HOME/
scp -r $KAFKA_HOME/plugins node4:$KAFKA_HOME/

(3)以 distributed 方式启动

        三台都执行,在三个节点上各启动一个 worker 进程,用以容错和负载均衡。

connect-distributed.sh -daemon $KAFKA_HOME/config/connect-distributed.properties 
# 确认日志是否有 ERROR
grep ERROR ~/kafka_2.13-3.7.0/logs/connectDistributed.out

(4)确认 connector 插件和自动生成的 topic

        查看连接器插件:

curl -X GET http://node2:8083/connector-plugins | jq

        从输出中可以看到,Kafka connect 已经识别到了 hbase sink 和 mysql source 插件:

[root@vvml-yz-hbase-test~]#curl -X GET http://node2:8083/connector-plugins | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   494  100   494    0     0   4111      0 --:--:-- --:--:-- --:--:--  4116
[
  {
    "class": "io.confluent.connect.hbase.HBaseSinkConnector",
    "type": "sink",
    "version": "2.0.13"
  },
  {
    "class": "io.debezium.connector.mysql.MySqlConnector",
    "type": "source",
    "version": "2.4.2.Final"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
    "type": "source",
    "version": "3.7.0"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
    "type": "source",
    "version": "3.7.0"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "type": "source",
    "version": "3.7.0"
  }
]
[root@vvml-yz-hbase-test~]#

        查看 topic:

kafka-topics.sh --list --bootstrap-server node2:9092,node3:9092,node4:9092

        从输出中可以看到,Kafka connect 启动时自动创建了 connect-configs、connect-offsets、connect-status 三个 topic:

[root@vvml-yz-hbase-test~]#kafka-topics.sh --list --bootstrap-server node2:9092,node3:9092,node4:9092
__consumer_offsets
connect-configs
connect-offsets
connect-status
test-1
test-3
[root@vvml-yz-hbase-test~]#

4. 创建 source connector

(1)创建源 mysql 配置文件

# 编辑文件
vim $KAFKA_HOME/plugins/source-mysql.json

        内容如下:

{
 "name": "mysql-source-connector",
 "config": {
     "connector.class": "io.debezium.connector.mysql.MySqlConnector",
     "tasks.max": "1",
     "topic.prefix": "mysql-hbase-test",
     "database.hostname": "172.18.16.156",
     "database.port": "3307",
     "database.user": "dba",
     "database.password": "123456",
     "database.server.id": "1563307",
     "database.server.name": "dbserver1",
     "database.include.list": "test",
     "schema.history.internal.kafka.bootstrap.servers": "node2:9092,node3:9092,node4:9092",
     "schema.history.internal.kafka.topic": "schemahistory.mysql-hbase-test"
     }
 }

(2)创建 mysql source connector

# 创建 connector
curl -X POST -H 'Content-Type: application/json' -i 'http://node2:8083/connectors' -d @"/root/kafka_2.13-3.7.0/plugins/source-mysql.json"
# 查看 connector 状态
curl -X GET http://node2:8083/connectors/mysql-source-connector/status | jq
# 查看 topic
kafka-topics.sh --list --bootstrap-server node2:9092,node3:9092,node4:9092

        从输出中可以看到,mysql-source-connector 状态为 RUNNING,并自动创建了三个 topic:

[root@vvml-yz-hbase-test~]#curl -X POST -H 'Content-Type: application/json' -i 'http://node2:8083/connectors' -d @"/root/kafka_2.13-3.7.0/plugins/source-mysql.json"
HTTP/1.1 201 Created
Date: Wed, 20 Mar 2024 02:31:30 GMT
Location: http://node2:8083/connectors/mysql-source-connector
Content-Type: application/json
Content-Length: 579
Server: Jetty(9.4.53.v20231009)

{"name":"mysql-source-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","topic.prefix":"mysql-hbase-test","database.hostname":"172.18.16.156","database.port":"3307","database.user":"dba","database.password":"123456","database.server.id":"1563307","database.server.name":"dbserver1","database.include.list":"test","schema.history.internal.kafka.bootstrap.servers":"node2:9092,node3:9092,node4:9092","schema.history.internal.kafka.topic":"schemahistory.mysql-hbase-test","name":"mysql-source-connector"},"tasks":[],"type":"source"}
[root@vvml-yz-hbase-test~]#curl -X GET http://node2:8083/connectors/mysql-source-connector/status | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   182  100   182    0     0  20726      0 --:--:-- --:--:-- --:--:-- 22750
{
  "name": "mysql-source-connector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "172.18.4.188:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "172.18.4.188:8083"
    }
  ],
  "type": "source"
}
[root@vvml-yz-hbase-test~]#kafka-topics.sh --list --bootstrap-server node2:9092,node3:9092,node4:9092
__consumer_offsets
connect-configs
connect-offsets
connect-status
mysql-hbase-test
mysql-hbase-test.test.t1
schemahistory.mysql-hbase-test
test-1
test-3
[root@vvml-yz-hbase-test~]#

5. 创建 sink connector

(1)创建目标 hbase 配置文件

# 编辑文件
vim $KAFKA_HOME/plugins/sink-hbase.json

        内容如下:

{
  "name": "hbase-sink-connector",
  "config": {
    "topics": "mysql-hbase-test.test.t1",
    "tasks.max": "1",
    "connector.class": "io.confluent.connect.hbase.HBaseSinkConnector",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "value.converter":"org.apache.kafka.connect.storage.StringConverter",
    "confluent.topic.bootstrap.servers": "node2:9092,node3:9092,node4:9092",
    "confluent.topic.replication.factor":3,
    "hbase.zookeeper.quorum": "node2,node3,node4",
    "hbase.zookeeper.property.clientPort": "2181",
    "auto.create.tables": "true",
    "auto.create.column.families": "true",
    "table.name.format": "example_table"
  }
}

(2)创建 hbase sink connector

# 创建 connector
curl -X POST -H 'Content-Type: application/json' -i 'http://node2:8083/connectors' -d @"/root/kafka_2.13-3.7.0/plugins/sink-hbase.json"
# 查看 connector 状态
curl -X GET http://node2:8083/connectors/hbase-sink-connector/status | jq
# 查看 consumer group
kafka-consumer-groups.sh --list --bootstrap-server node2:9092,node3:9092,node4:9092

        从输出中可以看到,hbase-sink-connector 状态为 RUNNING,并自动创建了一个消费者组:

[root@vvml-yz-hbase-test~]#curl -X POST -H 'Content-Type: application/json' -i 'http://node2:8083/connectors' -d @"/root/kafka_2.13-3.7.0/plugins/sink-hbase.json"
HTTP/1.1 201 Created
Date: Wed, 20 Mar 2024 02:33:11 GMT
Location: http://node2:8083/connectors/hbase-sink-connector
Content-Type: application/json
Content-Length: 654
Server: Jetty(9.4.53.v20231009)

{"name":"hbase-sink-connector","config":{"topics":"mysql-hbase-test.test.t1","tasks.max":"1","connector.class":"io.confluent.connect.hbase.HBaseSinkConnector","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.storage.StringConverter","confluent.topic.bootstrap.servers":"node2:9092,node3:9092,node4:9092","confluent.topic.replication.factor":"3","hbase.zookeeper.quorum":"node2,node3,node4","hbase.zookeeper.property.clientPort":"2181","auto.create.tables":"true","auto.create.column.families":"true","table.name.format":"example_table","name":"hbase-sink-connector"},"tasks":[],"type":"sink"}
[root@vvml-yz-hbase-test~]#curl -X GET http://node2:8083/connectors/hbase-sink-connector/status | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   176  100   176    0     0  23084      0 --:--:-- --:--:-- --:--:-- 25142
{
  "name": "hbase-sink-connector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "172.18.4.71:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "172.18.4.71:8083"
    }
  ],
  "type": "sink"
}
[root@vvml-yz-hbase-test~]#kafka-consumer-groups.sh --list --bootstrap-server node2:9092,node3:9092,node4:9092
connect-hbase-sink-connector
[root@vvml-yz-hbase-test~]#

6. 存量数据自动同步

        sink connector 自动在 hbase 中创建了 example_table 表,并且自动同步了前面配置 MySQL 主从复制时添加的三条测试数据:

[root@vvml-yz-hbase-test~]#hbase shell
HBase Shell
Use "help" to get list of supported commands.
Use "exit" to quit this interactive shell.
For Reference, please visit: http://hbase.apache.org/2.0/book.html#shell
Version 2.5.7-hadoop3, r6788f98356dd70b4a7ff766ea7a8298e022e7b95, Thu Dec 14 16:16:10 PST 2023
Took 0.0012 seconds                                                                                                                       
hbase:001:0> list
TABLE                                                                                                                                     
SYSTEM:CATALOG                                                                                                                            
SYSTEM:CHILD_LINK                                                                                                                         
SYSTEM:FUNCTION                                                                                                                           
SYSTEM:LOG                                                                                                                                
SYSTEM:MUTEX                                                                                                                              
SYSTEM:SEQUENCE                                                                                                                           
SYSTEM:STATS                                                                                                                              
SYSTEM:TASK                                                                                                                               
example_table                                                                                                                             
test                                                                                                                                      
10 row(s)
Took 0.3686 seconds                                                                                                                       
=> ["SYSTEM:CATALOG", "SYSTEM:CHILD_LINK", "SYSTEM:FUNCTION", "SYSTEM:LOG", "SYSTEM:MUTEX", "SYSTEM:SEQUENCE", "SYSTEM:STATS", "SYSTEM:TASK", "example_table", "test"]
hbase:002:0> describe 'example_table'
Table example_table is ENABLED                                                                                                            
example_table, {TABLE_ATTRIBUTES => {METADATA => {'hbase.store.file-tracker.impl' => 'DEFAULT'}}}                                         
COLUMN FAMILIES DESCRIPTION                                                                                                               
{NAME => 'mysql-hbase-test.test.t1', INDEX_BLOCK_ENCODING => 'NONE', VERSIONS => '1', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING =
> 'NONE', TTL => 'FOREVER', MIN_VERSIONS => '0', REPLICATION_SCOPE => '0', BLOOMFILTER => 'ROW', IN_MEMORY => 'false', COMPRESSION => 'NON
E', BLOCKCACHE => 'true', BLOCKSIZE => '65536 B (64KB)'}                                                                                  

1 row(s)
Quota is disabled
Took 0.1173 seconds                                                                                                                       
hbase:003:0> scan 'example_table',{FORMATTER=>'toString'}
ROW                                 COLUMN+CELL                                                                                           
 {"id":1}                           column=mysql-hbase-test.test.t1:KAFKA_VALUE, timestamp=2024-03-20T10:33:13.587, value={"before":null,"
                                    after":{"id":1,"remark":"第一行:row1","createtime":"2024-03-20T02:25:32Z"},"source":{"version":"2.4.2.Fin
                                    al","connector":"mysql","name":"mysql-hbase-test","ts_ms":1710901892000,"snapshot":"first","db":"test"
                                    ,"sequence":null,"table":"t1","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":2465,"row":0,"
                                    thread":null,"query":null},"op":"r","ts_ms":1710901892115,"transaction":null}                         
 {"id":2}                           column=mysql-hbase-test.test.t1:KAFKA_VALUE, timestamp=2024-03-20T10:33:13.593, value={"before":null,"
                                    after":{"id":2,"remark":"第二行:row2","createtime":"2024-03-20T02:25:32Z"},"source":{"version":"2.4.2.Fin
                                    al","connector":"mysql","name":"mysql-hbase-test","ts_ms":1710901892000,"snapshot":"true","db":"test",
                                    "sequence":null,"table":"t1","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":2465,"row":0,"t
                                    hread":null,"query":null},"op":"r","ts_ms":1710901892117,"transaction":null}                          
 {"id":3}                           column=mysql-hbase-test.test.t1:KAFKA_VALUE, timestamp=2024-03-20T10:33:13.596, value={"before":null,"
                                    after":{"id":3,"remark":"第三行:row3","createtime":"2024-03-20T02:25:32Z"},"source":{"version":"2.4.2.Fin
                                    al","connector":"mysql","name":"mysql-hbase-test","ts_ms":1710901892000,"snapshot":"last","db":"test",
                                    "sequence":null,"table":"t1","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":2465,"row":0,"t
                                    hread":null,"query":null},"op":"r","ts_ms":1710901892117,"transaction":null}                          
3 row(s)
Took 0.0702 seconds                                                                                                                       
hbase:004:0> 

        debezium-connector-mysql 默认会在启动时将存量数据写到 Kafka 中,这使得在构建实时数仓时,可以做到存量数据与增量数据一步实时同步,极大方便了 CDC(Change Data Capture,变化数据捕获) 过程。

7. 实时数据同步测试

        MySQL 主库数据变更:

insert into test.t1 (remark) values ('第四行:row4');
update test.t1 set remark = '第五行:row5' where id = 4;
delete from test.t1 where id =1;

        Hbase 查看数据变化:

hbase:004:0> scan 'example_table',{FORMATTER=>'toString'}
ROW                                 COLUMN+CELL                                                                                           
 {"id":1}                           column=mysql-hbase-test.test.t1:KAFKA_VALUE, timestamp=2024-03-20T10:33:13.587, value={"before":null,"
                                    after":{"id":1,"remark":"第一行:row1","createtime":"2024-03-20T02:25:32Z"},"source":{"version":"2.4.2.Fin
                                    al","connector":"mysql","name":"mysql-hbase-test","ts_ms":1710901892000,"snapshot":"first","db":"test"
                                    ,"sequence":null,"table":"t1","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":2465,"row":0,"
                                    thread":null,"query":null},"op":"r","ts_ms":1710901892115,"transaction":null}                         
 {"id":2}                           column=mysql-hbase-test.test.t1:KAFKA_VALUE, timestamp=2024-03-20T10:33:13.593, value={"before":null,"
                                    after":{"id":2,"remark":"第二行:row2","createtime":"2024-03-20T02:25:32Z"},"source":{"version":"2.4.2.Fin
                                    al","connector":"mysql","name":"mysql-hbase-test","ts_ms":1710901892000,"snapshot":"true","db":"test",
                                    "sequence":null,"table":"t1","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":2465,"row":0,"t
                                    hread":null,"query":null},"op":"r","ts_ms":1710901892117,"transaction":null}                          
 {"id":3}                           column=mysql-hbase-test.test.t1:KAFKA_VALUE, timestamp=2024-03-20T10:33:13.596, value={"before":null,"
                                    after":{"id":3,"remark":"第三行:row3","createtime":"2024-03-20T02:25:32Z"},"source":{"version":"2.4.2.Fin
                                    al","connector":"mysql","name":"mysql-hbase-test","ts_ms":1710901892000,"snapshot":"last","db":"test",
                                    "sequence":null,"table":"t1","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":2465,"row":0,"t
                                    hread":null,"query":null},"op":"r","ts_ms":1710901892117,"transaction":null}                          
 {"id":4}                           column=mysql-hbase-test.test.t1:KAFKA_VALUE, timestamp=2024-03-20T10:38:18.788, value={"before":null,"
                                    after":{"id":4,"remark":"第四行:row4","createtime":"2024-03-20T02:38:18Z"},"source":{"version":"2.4.2.Fin
                                    al","connector":"mysql","name":"mysql-hbase-test","ts_ms":1710902298000,"snapshot":"false","db":"test"
                                    ,"sequence":null,"table":"t1","server_id":1563306,"gtid":"ba615057-e11c-11ee-b80e-246e961c91f8:9","fil
                                    e":"mysql-bin.000001","pos":2679,"row":0,"thread":49,"query":null},"op":"c","ts_ms":1710902298665,"tra
                                    nsaction":null}                                                                                       
4 row(s)
Took 0.0091 seconds                                                                                                                       
hbase:005:0> 

        MySQL 执行的 delete、update 操作没有同步到 Hbase。

        查看消费情况:

[root@vvml-yz-hbase-test~]#kafka-consumer-groups.sh --group connect-hbase-sink-connector --describe --bootstrap-server node2:9092,node3:9092,node4:9092

Consumer group 'connect-hbase-sink-connector' has no active members.

GROUP                        TOPIC                    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
connect-hbase-sink-connector mysql-hbase-test.test.t1 0          3               7               4               -               -               -
[root@vvml-yz-hbase-test~]#

        数据变更都写入了 Kafka,但没有都消费。

        查看 sink connector 状态:

[root@vvml-yz-hbase-test~]#curl -X GET http://node2:8083/connectors/hbase-sink-connector/status | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  2168  100  2168    0     0   368k      0 --:--:-- --:--:-- --:--:--  423k
{
  "name": "hbase-sink-connector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "172.18.4.71:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "FAILED",
      "worker_id": "172.18.4.71:8083",
      "trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Error inserting record in topic mysql-hbase-test.test.t1 with offset 4 and partition 0 to table example_table: \n\tat io.confluent.connect.bigtable.client.BigtableClient.handleWriteErrors(BigtableClient.java:515)\n\tat io.confluent.connect.bigtable.client.BigtableClient.insert(BigtableClient.java:287)\n\tat io.confluent.connect.bigtable.client.InsertWriter.writeInserts(InsertWriter.java:58)\n\tat io.confluent.connect.bigtable.client.InsertWriter.write(InsertWriter.java:48)\n\tat io.confluent.connect.bigtable.BaseBigtableSinkTask.put(BaseBigtableSinkTask.java:100)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)\n\t... 11 more\nCaused by: org.apache.kafka.connect.errors.ConnectException: Row with specified row key already exists.\n\t... 16 more\n"
    }
  ],
  "type": "sink"
}
[root@vvml-yz-hbase-test~]#

        查看 node3 上的日志文件 ~/kafka_2.13-3.7.0/logs/connectDistributed.out,错误信息如下:

[2024-03-20 10:38:18,794] ERROR [hbase-sink-connector|task-0] WorkerSinkTask{id=hbase-sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Error inserting record in topic mysql-hbase-test.test.t1 with offset 4 and partition 0 to table example_table:  (org.apache.kafka.connect.runtime.WorkerSinkTask:630)
org.apache.kafka.connect.errors.ConnectException: Error inserting record in topic mysql-hbase-test.test.t1 with offset 4 and partition 0 to table example_table: 
    at io.confluent.connect.bigtable.client.BigtableClient.handleWriteErrors(BigtableClient.java:515)
    at io.confluent.connect.bigtable.client.BigtableClient.insert(BigtableClient.java:287)
    at io.confluent.connect.bigtable.client.InsertWriter.writeInserts(InsertWriter.java:58)
    at io.confluent.connect.bigtable.client.InsertWriter.write(InsertWriter.java:48)
    at io.confluent.connect.bigtable.BaseBigtableSinkTask.put(BaseBigtableSinkTask.java:100)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.ConnectException: Row with specified row key already exists.
    ... 16 more
[2024-03-20 10:38:18,794] ERROR [hbase-sink-connector|task-0] WorkerSinkTask{id=hbase-sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:212)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.ConnectException: Error inserting record in topic mysql-hbase-test.test.t1 with offset 4 and partition 0 to table example_table: 
    at io.confluent.connect.bigtable.client.BigtableClient.handleWriteErrors(BigtableClient.java:515)
    at io.confluent.connect.bigtable.client.BigtableClient.insert(BigtableClient.java:287)
    at io.confluent.connect.bigtable.client.InsertWriter.writeInserts(InsertWriter.java:58)
    at io.confluent.connect.bigtable.client.InsertWriter.write(InsertWriter.java:48)
    at io.confluent.connect.bigtable.BaseBigtableSinkTask.put(BaseBigtableSinkTask.java:100)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
    ... 11 more
Caused by: org.apache.kafka.connect.errors.ConnectException: Row with specified row key already exists.
    ... 16 more

        可以看到报错为:Row with specified row key already exists.

        原因是 sink connector 将 MySQL 的 update、delete 都转化为 Hbase 数据插入,但自动识别的 rowkey 为 MySQL 表的主键,而该 rowkey 已经存在,所以插入报错了。这种同步行为需要注意。

参考:

  • Greenplum 实时数据仓库实践(5)——实时数据同步
  • Debezium MySQL Source Connector for Confluent Platform
  • Apache HBase Sink Connector for Confluent Platform

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1532493.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Docker常用命令!!!

一、docker基础命令 1、启动docker systemctl start docker 2、关闭docker systemctl stop docker 3、重启docker systemctl restart docker 4、docker设置随服务启动而自启动 systemctl enable docker 5、查看docker 运行状态 systemctl status docker 6、查看docker 版本号信…

ChatGPT在大气科学领域建模、数据分析、可视化与资源评估中的高效应用及论文写作

深度探讨人工智能在大气科学中的应用,特别是如何结合最新AI模型与Python技术处理和分析气候数据。课程介绍包括GPT-4等先进AI工具,旨在帮助学员掌握这些工具的功能及应用范围。课程内容覆盖使用GPT处理数据、生成论文摘要、文献综述、技术方法分析等实战…

Learn OpenGL 19 几何着色器

几何着色器 在顶点和片段着色器之间有一个可选的几何着色器(Geometry Shader),几何着色器的输入是一个图元(如点或三角形)的一组顶点。几何着色器可以在顶点发送到下一着色器阶段之前对它们随意变换。然而,几何着色器最有趣的地方…

IOS/Android App备案(uniapp)

IOS/App备案 IOS备案Android备案 IOS备案 准备好p12证书即可 链接: https://aitoolnav.caichuangkeji.com/#/AppMd5 Android备案 上DCLOUD开发者中心,找到相关应用后,直接查看证书即可获取到MD5 公钥:先根据上述页面下载证书,…

应急响应-Web2

应急响应-Web2 1.攻击者的IP地址(两个)? 192.168.126.135 192.168.126.129 通过phpstudy查看日志,发现192.168.126.135这个IP一直在404访问 , 并且在日志的最后几条一直在访问system.php ,从这可以推断 …

Git原理与使用(一)

目录 前言 版本控制器 Linux下的Git的安装 Git的基本操作 创建Git本地仓库 配置Git 工作区、暂存区、版本库 添加与提交 查看.git文件 前言 我们可能要写多个文档对一个产品进行描述,但是一般情况下我们可能要写多个文档,比如: 初…

Rust Rocket简单入门

简介 Rust中最知名的两个web框架要数Rocket和Actix了,Rocket更注重易用性,Actix则更注重性能。这里只是了解一下Rust下的WebAPI开发流程,就学一下最简单的 Rocket。 Rocket 是一个用于 Rust 的异步 Web 框架,专注于可用性、安全性…

NCV4266ST50T3G线性稳压器芯片中文资料规格书PDF数据手册引脚图参数图片价格

产品概述: NCV4266 是一款集成了 150 mA 输出电流的低漏稳压器系列,可用于严酷汽车环境。它包括了较宽的运行温度范围和输出电压范围。该器件提供 3.3 V、5.0 V 固定电压版本,以及可调电压版本,输出电压准确度为 2%。它具有较高的…

web攻防——csrf,ssrf

csrf 当我们在访问自己的管理员系统的时候,打开别人发的钓鱼连接就会自动增加管理员(前提,后台在登录状态)当我们打开别人发的网站,就会触发增加管理员的数据包 假设我们要测试这个网站 看到这个,就得下载一…

ES 常见面试题及答案

目录 es 写入数据流程 es 删除数据流程 es 读数据流程 es 部署的服务有哪些角色 es 的实现原理 es 和lucence 关系 如何提高写入效率 提高搜索效率 es doc value指的啥 分片指的啥,定义后可不可义再修改 深分页如何优化 对于聚合操作是如何优化的 元数据…

Vue+Element-UI Table表格实现复选框单选效果(隐藏表头上的全选Checkbox)

实现效果 完整代码 <div class"box-pos"><el-table ref"table" :header-cell-style"{ color: #FFF, background: #333 }":cell-style"{ color: #FFF, background: #333 }" :data"grListData" style"width: 1…

网状的隧道穿梭特效HTML代码

网状的隧道穿梭特效HTML代码&#xff0c;效果是动态的&#xff0c;可以下载源码&#xff0c;自己在本地运行 源码下载 网状的隧道穿梭特效HTML代码

uni-app 中两个系统各自显示不同的tabBar

最近在一个uni-app项目中遇到一个需求,在登录页面成功登录以后需要判断身份,不同的身份的进入不同的tabBar页面,但是在uni-app项目中pages.json中的tabBar的list数组只有一个&#xff0c;且不能写成动态的,那如何实现这个需求呢?答案是需要我们自定义tabBar。 目录 1、我们确…

关于paper中的一些硬件知识

一. OS中的event Information in event traces from software systems can help developers with performance analysis, debugging and troubleshooting 1.事件的概念 已知软件系统中的event能够帮助开发者对系统进行性能分析、调试以及定位&#xff0c;那我们应该仔细考虑…

简单了解:localhost 与 127.0.0.1 的区别

在信息技术的世界里&#xff0c;localhost和127.0.0.1频繁出现在各种网络及软件开发的场景之中。它们似乎指向同一个意义——那就是你的本地机器。但仔细探究之下&#xff0c;你会发现它们之间其实存在着一些微妙的差异。今天&#xff0c;我们就来深究这两者之间的区别&#xf…

Maxwell监听mysql的binlog日志变化写入kafka消费者

一. 环境&#xff1a; maxwell:v1.29.2 (从1.30开始maxwell停止了对java8的使用&#xff0c;改为为11) maxwell1.29.2这个版本对mysql8.0以后的缺少utf8mb3字符的解码问题&#xff0c;需要对原码中加上一个部分内容 &#xff1a;具体也给大家做了总结 &#xff1a; 关于v1.…

GaussDB(分布式)实例故障处理

一、说明 GaussDB Kernel实例出现故障时&#xff0c;可以按照本节的办法进行实例快速修复。 1、执行gs_om -t status --detail查看集群状态&#xff0c;cluster_state为Normal&#xff0c;balanced为No&#xff0c;请重置实例状态。 2、执行gs_om -t status --detail查看集群…

MySQl基础入门⑩

上一章内容 数据插入、更新与删除 以下是创建一个名为users的表并定义其字段结构的SQL命令&#xff08;以MySQL为例&#xff09;&#xff1a; CREATE TABLE users (id INT AUTO_INCREMENT, username VARCHAR(255) NOT NULL,email VARCHAR(255) NOT NULL UNIQUE,password VAR…

new mars3d.layer.GeoJsonLayer({的pupup配置参考

new mars3d.layer.GeoJsonLayer({的pupup配置可选项以及相关效果参考&#xff1a; 说明&#xff1a;popup按属性字段配置&#xff0c;可以是字符串模板或数组 1.popup仅配置{type}{name}等属性的的时候&#xff0c;指显示json文件内数据的type与name&#xff0c;效果如下 相关…

关系型数据库mysql(2)SQL语句

目录 一.SQL语句简介 1.1SQL语言 1.2SQL语句分类 1.3SQL分类 1.4SQL 语言规范 二.数据库基本操作 2.1查看数据库中的库信息 2.2查看数据库中的表信息 数据库内查看 数据库外查看 2.3显示数据库的结构&#xff08;字段&#xff09; ​编辑 2.4 字段属性 2.5常见的数…