1、hadoop集群连接本地MySQL
1.1 首先测试集群是否可以ping通本地
虚拟机可以ping通网关(192.168.10.2),但不能ping通192.168.10.1,这表明问题可能出在Windows主机的防火墙设置或VMware的网络配置上。
1.1.1 检查Windows防火墙设置
确保Windows防火墙没有阻止ICMP请求(ping)可以临时禁用Windows防火墙进行测试:
- 打开命令提示符(以管理员身份运行)
- 输入以下命令来禁用防火墙
netsh advfirewall set allprofiles state off
然后再次尝试从虚拟机ping 192.168.10.1。如果现在可以ping通,说明防火墙是问题所在。需要添加一个允许ICMP的规则。
1.1.2 重新启用防火墙并添加规则
输入以下命令来重新启用防火墙
netsh advfirewall set allprofiles state on
在命令提示符中输入以下命令来添加允许ICMP请求的规则
netsh advfirewall firewall add rule name="Allow ICMPv4" protocol=icmpv4:8,any dir=in action=allow
在命令提示符中输入以下命令来添加允许3306端口的入站规则
netsh advfirewall firewall add rule name="Allow MySQL 3306" dir=in action=allow protocol=TCP localport=3306
1.2 虚拟机连接到物理机上的MySQL服务器
需要确保几个关键点:
- MySQL服务器配置:确保MySQL服务器允许远程连接。
- 网络配置:确保虚拟机和物理机在同一网络中,并且可以相互通信。
- 防火墙设置:确保没有防火墙阻止MySQL的端口(默认是3306)。
- MySQL用户权限:确保有用户权限从虚拟机的IP地址连接到MySQL服务器。
1.2.1 配置MySQL服务器以允许远程连接
-
登录MySQL服务器
mysql -uroot -p123456
-
查看当前的用户权限
SHOW GRANTS FOR 'root'@'localhost';
-
创建一个允许远程连接的用户
CREATE USER 'remote_user'@'%' IDENTIFIED BY '123456';
-
给这个用户授权:
GRANT ALL PRIVILEGES ON *.* TO 'remote_user'@'%' WITH GRANT OPTION; FLUSH PRIVILEGES;
-
确保MySQL配置文件(my.cnf或my.ini)允许远程连接:
- 找到MySQL的配置文件,通常在
/etc/mysql/
目录下。 - 确保
bind-address
设置为0.0.0.0
或者被注释掉,这样MySQL服务器可以监听所有网络接口。
- 找到MySQL的配置文件,通常在
在虚拟机上执行如下命令:
mysql -h 192.168.10.1 -u remote_user -p
已经成功配置了从Hadoop集群的hadoop102
节点访问本地MySQL服务器,并且已经解决了网络和防火墙的问题,现在可以使用SeaTunnel来全量同步news_data
数据库的数据到HDFS上 。
2、seatunnel导入数据库全量数据到HDFS(单表)
2.1 下载并安装SeaTunnel
官方网站:Apache SeaTunnel | Apache SeaTunnel
导入到/opt/software下,执行如下命令解压
tar -xzvf apache-seatunnel-2.3.8-bin.tar.gz -C /opt/module/
2.2 安装Connector插件
从2.2.0-beta版本开始,二进制包默认不提供connector依赖。因此,需要执行以下命令来安装所需的connector插件:
sh bin/install_plugin.sh $version
2.3 驱动jar包
- 下载MySQL JDBC驱动: 确保下载了正确版本的MySQL JDBC驱动jar包。从MySQL官网下载。
- 将驱动添加到SeaTunnel的lib目录: 将下载的MySQL JDBC驱动jar包放到SeaTunnel的
lib
目录下。这样,当SeaTunnel启动时,它就能够加载所需的MySQL驱动类
2.4 配置SeaTunnel任务
配置文件mysql_to_hdfs.conf
,并将其放置在SeaTunnel的config
目录下。这个配置文件应该包含了源(MySQL)和接收器(HDFS)的正确配置。
env {
execution.parallelism = 1 # 根据你的集群资源调整并行度
}
source {
JDBC {
driver = "com.mysql.cj.jdbc.Driver"
url = "jdbc:mysql://192.168.10.1:3306/news_data?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC"
user = "remote_user"
password = "123456"
query = "SELECT * FROM news_information" # 使用查询语句来导入数据
}
}
sink {
HdfsFile {
fs.defaultFS = "hdfs://hadoop102:8020" # HDFS NameNode 地址
path = "/news" # HDFS 目标目录路径
file_type = "text" # 文件类型,例如 text, csv, orc 等
field_delimiter = "," # 字段分隔符,根据实际情况调整
file_format_type = "text" # 指定文件格式类型为文本
}
}
选择执行模式
SeaTunnel支持多种执行模式,包括本地模式(local)、独立模式(standalone)、集群模式(yarn/cluster)等。对于大多数大数据任务,推荐使用集群模式来充分利用Hadoop集群的资源。
2.5 环境变量配置
/opt/module/hadoop/etc/hadoop
是包含Hadoop配置文件的目录。需要将这个目录路径设置为HADOOP_CONF_DIR
环境变量,以便SeaTunnel的Spark作业能够正确地连接到YARN集群。
使用如下命令
sudo vim /etc/profile.d/my_env.sh
在环境变量最后添加如下
export HADOOP_CONF_DIR=/opt/module/hadoop/etc/hadoop
运行以下命令使变量立即生效
source /etc/profile.d/my_env.sh
运行命令
./bin/start-seatunnel-spark-3-connector-v2.sh --config ./config/mysql_to_hdfs.conf --master yarn
SeaTunnel作业通过Spark提交到YARN集群,启动一个SeaTunnel作业,从MySQL读取数据并将其写入到HDFS中,使用YARN作为资源管理器。
3、seatunnel导入数据库全量数据到HDFS(多表)
在seatunnel的安装目录下创建脚本mysql_full_hdfs.sh
#!/bin/bash
# 获取当前日期
current_date=$(date +%Y-%m-%d)
# 定义数据库连接信息
DB_URL="jdbc:mysql://192.168.10.1:3306/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC"
DB_USER="remote_user"
DB_PASSWORD="123456"
# 定义HDFS路径
HDFS_PATH="hdfs://hadoop102:8020"
# 定义表名数组
TABLES=("stu" "t_user")
# 循环处理每个表
for TABLE in "${TABLES[@]}"; do
# 创建配置文件内容,包含当前日期
conf_file="/opt/module/apache-seatunnel-2.3.8/config/${TABLE}_to_hdfs_${current_date}.conf"
cat > "$conf_file" << EOF
env {
execution.parallelism = 1
}
source {
JDBC {
driver = "com.mysql.cj.jdbc.Driver"
url = "${DB_URL}"
user = "${DB_USER}"
password = "${DB_PASSWORD}"
query = "SELECT * FROM ${TABLE}"
}
}
sink {
HdfsFile {
fs.defaultFS = "${HDFS_PATH}"
path = "/news/${TABLE}_full/${current_date}" # HDFS 目标目录路径,每个表都有自己的目录,并且包含日期
file_type = "parquet" # 文件类型改为Parquet
file_format_type = "parquet" # 文件格式类型为Parquet
parquet_compression_CODEC = "gzip" # 使用gzip压缩
}
}
EOF
# 运行Seatunnel任务
/opt/module/apache-seatunnel-2.3.8/bin/start-seatunnel-spark-3-connector-v2.sh --config "$conf_file" --master yarn
# 删除配置文件
rm "$conf_file"
done
使用chmod +x 脚本,再运行脚本。
4、MySQL到Kafka的实时数据流和变更数据捕获(单节点)
Debezium在Kafka Connect集群中监控MySQL数据库变更
4.1 MySQL配置
创建用户:首先,需要使用 CREATE USER
命令来创建一个新的用户。
CREATE USER 'debezium'@'192.168.10.102' IDENTIFIED BY '123456';
授予权限:然后,可以使用 GRANT
命令来授予这个用户所需的权限。
GRANT SUPER, REPLICATION CLIENT, SELECT ON *.* TO 'debezium'@'192.168.10.102';
为用户 debezium
授予 LOCK TABLES
权限
请注意,授予 LOCK TABLES
权限可能会影响数据库的并发性,因为它允许用户锁定表。在授予权限时,应始终确保遵循最小权限原则,只授予用户完成其任务所需的权限。
GRANT SUPER, REPLICATION CLIENT, SELECT ON *.* TO 'debezium'@'192.168.10.102';
为用户 debezium
授予 RELOAD
权限
GRANT RELOAD ON *.* TO 'debezium'@'192.168.10.102';
请注意,授予 RELOAD
权限允许用户执行 FLUSH TABLES WITH READ LOCK
命令,这在 Debezium 连接器进行初始快照时是必要的。确保在授予权限时,用户 debezium
已经存在于 MySQL 中,并且使用的是正确的主机地址(在本例中为虚拟机的 IP 地址 192.168.10.102
)。
刷新权限:最后,需要刷新 MySQL 的权限设置,使新权限生效:
FLUSH PRIVILEGES;
请注意,上述步骤中的 '192.168.10.102'
应该替换为虚拟机的 IP 地址,因为这是 Kafka Connect 尝试连接到 MySQL 数据库的地址。
MySQL的配置文件:
# 后续换为允许的IP
bind-address = 0.0.0.0
log-bin=mysql-bin
binlog-format = ROW
binlog-row-image = FULL
expire_logs_days = 10
4.2 下载安装Zookeeper
在zoo.cfg下配置如下信息:
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/module/zookeeper/zkData
clientPort=2181
在bin目录下执行
./zkServer.sh start
4.3 下载安装Kafka
在config目录的server.properties配置文件中,配置如下信息:
broker.id=0
advertised.listeners=PLAINTEXT://hadoop102:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/module/kafka/datas
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.retention.check.interval.ms=300000
zookeeper.connect=hadoop102:2181/kafka
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
在bin目录下执行
./kafka-server-start.sh /opt/module/kafka/config/server.properties
4.4 下载安装Debezium
下载到/opt/software
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.9.5.Final/debezium-connector-mysql-1.9.5.Final-plugin.tar.gz
解压到:/opt/module/kafka/libs 下
tar -xzf debezium-connector-mysql-1.9.5.Final-plugin.tar.gz -C /opt/module/kafka/libs/
把 debezium-connector-mysql 里面的jar包移动到libs目录下。
cp debezium-connector-mysql/*.jar /opt/module/kafka/libs/
在/opt/module/kafka/config目录下的connect-distributed.properties 配置文件中添加
plugin.path=/opt/module/kafka/libs
在config目录下创建配置文件mysql-connector.json,并且添加如下信息
{"name":"mysql-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","database.hostname":"192.168.10.1","database.port":"3306","database.user":"debezium","database.password":"123456","database.server.id":"184054","database.server.name":"mydb","database.include.list":"test","database.history.kafka.bootstrap.servers":"hadoop102:9092","schema.history.internal.kafka.bootstrap.servers":"hadoop102:9092","schema.history.internal.kafka.topic":"schema-changes.test","key.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter.schemas.enable":"false","value.converter":"org.apache.kafka.connect.json.JsonConverter","value.converter.schemas.enable":"false","database.history.kafka.topic":"dbhistory.test","database.ssl.mode":"REQUIRED","database.ssl.truststore.location":"/path/to/truststore.jks","database.ssl.truststore.password":"truststore-password","database.connectionTimeZone":"UTC"}}
在config目录下创建配置文件debezium.properties,如下信息:
bootstrap.servers=hadoop102:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.flush.interval.ms=10000
# 设置 offset topic 的复制因子为 1
config.storage.topic=connect-configs
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
status.flush.interval.ms=5000
group.id=connect-cluster
name=mysql-connector
connector.class=io.debezium.connector.mysql.MySqlConnector
tasks.max=1
database.hostname=192.168.10.1
database.port=3306
database.user=debezium
database.password=123456
database.server.id=184054
database.server.name=mydb
database.history.kafka.bootstrap.servers=hadoop102:9092
database.history.kafka.topic=dbhistory.mydb
database.history.store.only.monitored.tables.ddl=true
table.include.list=test.stu,test.t_user
将zookeeper和kafka启动起来,再启动kafka-connect。
kafka的bin目录下
./connect-distributed.sh /opt/module/kafka/config/debezium.properties
执行注册命令,kafka的config目录下
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://hadoop102:8083/connectors/ -d @mysql-connector.json
查看状态
curl -s http://hadoop102:8083/connectors/mysql-connector/status
删除
url -X DELETE localhost:8083/connectors/mysql-connector
5、MySQL到Kafka的实时数据流和变更数据捕获(多节点)
5.1 Zookeeper配置
- hadoop102节点上
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/module/zookeeper/zkData
clientPort=2181
server.2=hadoop102:2888:3888
server.3=hadoop103:2888:3888
server.4=hadoop104:2888:3888
/opt/module/zookeeper/zkData目录下有一个文件myid,里面的内容是:2
- hadoop103节点上
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/module/zookeeper/zkData
clientPort=2181
server.2=hadoop102:2888:3888
server.3=hadoop103:2888:3888
server.4=hadoop104:2888:3888
/opt/module/zookeeper/zkData目录下有一个文件myid,里面的内容是:3
- hadoop104节点上
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/module/zookeeper/zkData
clientPort=2181
server.2=hadoop102:2888:3888
server.3=hadoop103:2888:3888
server.4=hadoop104:2888:3888
/opt/module/zookeeper/zkData目录下有一个文件myid,里面的内容是:4
5.2 Kafka配置
- hadoop102节点上
broker.id=0
advertised.listeners=PLAINTEXT://hadoop102:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/module/kafka/datas
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.retention.check.interval.ms=300000
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
- hadoop103节点上
只需要修改kafka/config/server.properties
broker.id=1
advertised.listeners=PLAINTEXT://hadoop103:9092
- hadoop104节点上
只需要修改kafka/config/server.properties
broker.id=2
advertised.listeners=PLAINTEXT://hadoop104:9092
5.3 Dezebium下载安装
hadoop102、hadoop103、hadoop104下载了debezium,并且jar包放到了/opt/module/kafka/libs目录。每个节点的/opt/module/kafka/config/connect-distributed.properties配置文件内容都是
bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
config.storage.topic=connect-configs
config.storage.replication.factor=3
status.storage.topic=connect-status
status.storage.replication.factor=3
offset.flush.interval.ms=10000
plugin.path=/opt/module/kafka/libs
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
status.flush.interval.ms=5000
每个节点的/opt/module/kafka/config下创建了一个配置文件:mysql-connector.json,内容如下:
{
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "192.168.10.1",
"database.port": "3306",
"database.user": "debezium",
"database.password": "123456",
"database.server.id": "184054",
"database.server.name": "mydb",
"database.include.list": "news_data",
"database.history.kafka.bootstrap.servers": "hadoop102:9092,hadoop103:9092,hadoop104:9092",
"schema.history.internal.kafka.bootstrap.servers": "hadoop102:9092,hadoop103:9092,hadoop104:9092",
"schema.history.internal.kafka.topic": "schema-changes.news_data",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"database.history.kafka.topic": "dbhistory.news_data",
"database.ssl.mode": "REQUIRED",
"database.ssl.truststore.location": "/path/to/truststore.jks",
"database.ssl.truststore.password": "truststore-password",
"database.connectionTimeZone": "UTC"
}
}
启动Zookeeper、Kafka,在hadoop102、hadoop103、hadoop104节点上依次启动Kafka-connect,并且在其中一台(hadoop102)注册即可。
- hadoop102上执行如下语句
curl -s http://hadoop102:8083/connectors/mysql-connector/status
- hadoop103上执行如下语句
curl -s http://hadoop103:8083/connectors/mysql-connector/status
- hadoop104上执行如下语句
curl -s http://hadoop104:8083/connectors/mysql-connector/status
都得到如下结果:
{
"name": "mysql-connector",
"connector": {
"state": "RUNNING",
"worker_id": "192.168.10.102:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "192.168.10.102:8083"
}
],
"type": "source"
}
当hadoop102的Kafka-connect关闭时
- hadoop103上执行如下语句
curl -s http://hadoop103:8083/connectors/mysql-connector/status
- hadoop104上执行如下语句
curl -s http://hadoop104:8083/connectors/mysql-connector/status
都得到如下结果:
{
"name": "mysql-connector",
"connector": {
"state": "RUNNING",
"worker_id": "192.168.10.103:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "192.168.10.103:8083"
}
],
"type": "source"
}
成功地在多节点上配置了Zookeeper、Kafka和Debezium连接器,并且能够在Kafka Connect集群中注册和运行Debezium连接器
6、kafka写入HDFS
在hadoop102节点上的flume下编写一个配置文件kafka_to_hdfs.conf,内容如下:
# Define the sources, sinks, and channels
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# # Configure the source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 1000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics = mydb.news_data.comment_information,mydb.news_data.media_information,mydb.news_data.news_information,mydb.news_data.user_information,mydb.news_data.user_auth
# # Interceptor configuration
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = org.atguigu.news.flume.interceptor.TimestampInterceptor$Builder
# # Configure the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
# # Configure the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/news_data/%{table}_inc/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = events
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# # Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
编写拦截器代码打jar包放入hadoop102的flume的lib目录下,jar包代码如下:
package org.atguigu.news.flume.interceptor;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
public class TimestampInterceptor implements Interceptor {
@Override
public void initialize() {
// Initialization logic, if needed
}
@Override
public Event intercept(Event event) {
// 1. Get header and body data
Map<String, String> headers = event.getHeaders();
String body = new String(event.getBody(), StandardCharsets.UTF_8);
try {
// 2. Parse body data as JSON
JSONObject jsonObject = JSONObject.parseObject(body);
// 3. Extract timestamp and table name
String ts = jsonObject.getString("ts_ms");
JSONObject source = jsonObject.getJSONObject("source");
String tableName = source.getString("table");
// 4. Put timestamp and table name into headers
headers.put("timestamp", ts);
headers.put("table", tableName);
return event;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
@Override
public List<Event> intercept(List<Event> events) {
for (Event event : events) {
if (intercept(event) == null) {
events.remove(event);
}
}
return events;
}
@Override
public void close() {
// Clean up resources, if needed
}
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new TimestampInterceptor();
}
@Override
public void configure(Context context) {
// Configuration logic, if needed
}
}
}