debezium 依赖于kafka,kafka依赖于zookeeper。
zookeeper实现了kafka消息的一致性,debezium 把订阅的数据推送到kafka
dockerFile
FROM debezium/connect:1.6
ENV KAFKA_CONNECT_JDBC_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-jdbc
ENV INSTANT_CLIENT_DIR=/instant_client/
USER root
USER kafka
# Deploy Oracle client and drivers
COPY instant_client/* $INSTANT_CLIENT_DIR
COPY instant_client/xstreams.jar /kafka/libs
COPY instant_client/ojdbc8.jar /kafka/libs
INSTANT_CLIENT_DIR 为oracleCilent,去官网下载即可,oracle11g的client是jdk8使用的,而debezium是jdk11, 这里我使用的是oracle12的client,测试可以使用
docker-compose 可根据github的推荐进行修改
https://github.com/debezium/debezium-examples/tree/master/tutorial
docker-compose.yaml
version: '2'
services:
zookeeper:
image: debezium/zookeeper:1.6
ports:
- 2181:2181
- 2888:2888
- 3888:3888
volumes:
- D://software\docker-volume\docker_debezium/volume:/volume
privileged: true
networks:
proxy:
ipv4_address: 192.168.192.3
kafka:
depends_on:
- zookeeper
image: debezium/kafka:1.6
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
- cluster.id=012344
volumes:
- D://software\docker-volume\docker_debezium/volume:/volume
- D://software\docker-volume\docker_debezium/kafka/data:/kafka/data
- D://software\docker-volume\docker_debezium/kafka/logs:/kafa/logs
cap_add:
- ALL # 开启全部权限
privileged: true #设置容器的权限为root
networks:
proxy:
ipv4_address: 192.168.192.4
connect:
depends_on:
- kafka
image: debezium-connenct-oracle
build:
context: D:\software\docker-volume\docker_debezium
dockerfile: dockerFile
args:
DEBEZIUM_VERSION: 1.6
ports:
- 8083:8083
- 5005:5005
links:
- kafka
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
- LD_LIBRARY_PATH=/instant_client
- KAFKA_DEBUG=true
- DEBUG_SUSPEND_FLAG=n
- JAVA_DEBUG_PORT=0.0.0.0:5005
volumes:
- D://software\docker-volume\docker_debezium/volume:/volume
privileged: true
networks:
proxy:
ipv4_address: 192.168.192.5
kafkaui:
depends_on:
- kafka
image: provectuslabs/kafka-ui:latest
ports:
- 8811:8080
links:
- kafka
environment:
- KAFKA_CLUSTERS_0_NAME=clusters
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
networks:
proxy:
ipv4_address: 192.168.192.6
networks:
proxy:
ipam:
config:
- subnet: 192.168.192.0/20
注意要将kafka的data映射出去,因为kafka容器重新构建会丢失未消费的消息。
所有volumes的映射根据自己需求修改
kafka的cluster.id最好是随意写一个,这里把所有容器的网络ip都定义好,方便后面使用。
容器启动后即可访问kafkaUi
oracle执行下列语句用以支持订阅
ALTER TABLE 表名 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA
此时的kafka地址即为上面设置的192.168.192.4
使用postman或者curl给debezium发送创建连接器
http://localhost:8083/connectors/
{
"name": "oracle-connector",
"config": {
"connector.class": "io.debezium.connector.oracle.OracleConnector",
"database.hostname": "xxx",
"database.port": "1521",
"database.user": "xxx",
"database.password": "xxx",
"database.server.name": "xxx",
"database.history.kafka.bootstrap.servers": "192.168.192.4:9092",
"database.history.kafka.topic": "oracle-history",
"topic.prefix":"prefix",
"database.dbname":"xxx",
"table.include.list": "你订阅的表",
"snapshot.mode": "schema_only",
"database.history.skip.unparseable.ddl":"true"
}
}
接口返回为你的配置即配置成功
使用
get http://localhost:8083/connectors/${connector}/status 获取连接器运行状态
如上则是 http://localhost:8083/connectors/oracle-connector/status
备注
“snapshot.mode”: "schema_only"表示不同步表所有信息,只同步当前增量数据。
“database.history.skip.unparseable.ddl”:“true” 在对ddl语句解析失败时忽略,程序很容易j解析ddl异常如
Ignoring unparsable DDL statement 'alter table SYSTEM.LOGMNR_TABCOMPART$ modify partition P470 REBUILD UNUSABLE LOCAL INDEXES;': {} [io.debezium.connector.oracle.OracleSchemaChangeEventEmitter]
io.debezium.text.ParsingException: DDL statement couldn't be parsed. Please open a Jira issue with the statement 'alter table SYSTEM.LOGMNR_TABCOMPART$ modify partition P470 REBUILD UNUSABLE LOCAL INDEXES;'
而异常后连接器会直接挂掉,配置忽略后异常不会挂而是忽略此错误
至此配置完成,进入kafkaui查看数据
有一个以你设置的前缀开头的和你表名结尾的队列即为你订阅的表,如上配置则为
prefix.你订阅的表
你对表 insert、update、delete的数据全部同步至此,消息包含before、after字段用以对比更改前后数据。
kafka 的配置修改 connect-distributed.properties
key.converter.schemas.enable=false
value.converter.schemas.enable=false
用以精简json
至此搭建已完毕。