1、首先说明一下为啥选用debezium,它能够根据事务的提交顺序向外推送数据,这一点非常重要。再有一个结合kafka集群能够保证高可用,对于熟悉java语言的朋友后面一篇博文会介绍怎样编写插件将事件自定义路由到你想要的主题甚至分区中。
提高按顺序消费事件的并发能力。
如果觉得好,请关注一下,后续将推出编写插件支持按照表名hash取模将事件分配到不同的主题或者分区当中支持多线程顺序并发消费,实现表与表之间的数据一致性
自定义插件链接地址:http://t.csdn.cn/xAPoH
下载链接地址
https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.9.7.Final/debezium-connector-mysql-1.9.7.Final-plugin.tar.gz
2、首先将文件包下载到/opt文件夹中
cd /opt
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.9.7.Final/debezium-connector-mysql-1.9.7.Final-plugin.tar.gz
3、在将文件分发到其他几台部署了kafka服务器上
scp debezium-connector-mysql-1.9.7.Final-plugin.tar.gz hadoop102:/opt
scp debezium-connector-mysql-1.9.7.Final-plugin.tar.gz hadoop103:/opt
4、接下来就是安装三台服务器步骤一样,hadoop101,hadoop102,hadoop103
tar -zxvf debezium-connector-mysql-1.9.7.Final-plugin.tar.gz
cd /opt/debezium-connector-mysql
mv *.jar /opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/kafka/libs
5、编写配置文件
cd /opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/etc/kafka/conf.dist/
cp connect-distributed.properties /opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/kafka/config/
mv connect-distributed.properties mysql-connect-distributed.properties
cp connect-log4j.properties /opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/kafka/config
6、修改配置文件
vi mysql-connect-distributed.properties
bootstrap.servers=hadoop101:9092,hadoop102:9092,hadoop103:9092
#如果想发送普通的json格式而不是avro格式的话,很简单key.converter.schemas.enable和value.converter.schemas.enable设置为false就行。这样就能发送普通的json格式数据。
key.converter.schemas.enable=true
value.converter.schemas.enable=true
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
plugin.path=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/kafka/libs/
group.id=cdc-connect-mysql
offset.storage.topic=cdc-connect-mysql-offsets
offset.storage.replication.factor=3
offset.storage.partitions=50
config.storage.topic=cdc-connect-mysql-configs
config.storage.replication.factor=3
config.storage.partitions=1
status.storage.topic=cdc-connect-mysql-status
status.storage.replication.factor=3
status.storage.partitions=10
7、重启kafka装载插件
8、三台服务器都启动脚本运行
cd /opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/kafka/
./bin/connect-distributed.sh config/cdc-mysql-connect-distributed1.properties
如下图启动成功
9、测试是否成功等请求命令,我是在postman请求,方便管理
1、注册信连接器
请方式:POST
请求地址:http://hadoop101:8083/connectors
请求类型:application/json
请求数据:
{
"name": "debezium-test-5017",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "hadoop101",
"database.port": "3306",
"database.user": "root",
"database.password": "XXXXX",
"database.include.list": "spark_event_data",
"database.dbname": "spark_event_data",
"database.serverTimezone": "UTC",
"database.server.id": "316545017",
"database.server.name": "debezium_mysql_wp",
"database.history.kafka.bootstrap.servers": "hadoop101:9092,hadoop102:9092,hadoop103:9092",
"database.history.kafka.topic": "db-event-history"
}
}
2、查看连接状态
请方式:GET
请求地址:http://hadoop101:8083/connectors/debezium-test-5017/status
返回数据:
{
"name": "debezium-test-5017",
"connector": {
"state": "RUNNING",
"worker_id": "xxxxx:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "xxxx:8083"
}
],
"type": "source"
}
3、停止连接器
请方式:DELETE
请求地址:http://hadoop101:8083/connectors/debezium-test-5017