一.背景
由于我们有个业务在阿里云部署了Kafka,但是想直接在本地IDC机房服务器直接通过公网消费Kafka进行业务处理。这个本来也不是什么难事,阿里云把9092默认端口打开运行访问即可,也不不值得再写这篇博客了。 这个事情让人特别关注的一个主题其实就是数据安全问题。
如果从公网消费Kafka, 由于Kafka我们并没有配置SSL机制,所以如果直接通过公网消费意味着数据明文裸奔,导致数据泄漏。 那么针对这个安全问题,我们是这么做的:
1.首先,阿里云安全组更改kafka的默认端口号,并且对访问kafka的端口设置为公司的IP白名单,保证只有机房IP才能正常访问,非法IP则无法访问。
2.通过搭建SSH隧道代理的方式,在IDC服务器做一个SSH隧道代理到阿里云服务器的Kafka端口,这样从公网消费的kafka数据都经过隧道进行传输,避免了数据通过明文传输的风险。
还不太了解什么是SSH隧道的童鞋,可以参考一下我之前写过的博客: ssh端口转发(隧道技术)
二.实现流程
1.运行SSH隧道代理
ssh -N -o "ServerAliveInterval 60" -L 192.168.1.101:9092:10.0.3.99:9092 -p 22 root@182.43.x.x
解释一下这个语句:
-N #不登录到远程服务器
-o "ServerAliveInterval 60" #心跳检测周期时间 60s
-L 192.168.1.101:9092:10.0.3.99:9092 #绑定本地192.168.1.101:9092端口,映射到10.0.3.99:9092(阿里云服务器内网IP)
-p 22 #SSH端口
root@182.43.x.x #服务器的公网IP
使用supervisord加以运行即可,防止进程意外挂掉,保证进程高可用。
2.Python客户端无法通过SSH隧道消费
执行消费代码:
from kafka import KafkaConsumer
consumer = KafkaConsumer('test', bootstrap_servers='192.168.1.101:9092', group_id="python", retries=3, max_block_ms=3000, request_timeout_ms=3000)
print("开始")
for msg in consumer:
print('从kafka获取到的数据: ')
print(msg.value.decode(encoding='utf-8'))
连接kafka消费超时:
3.Kafka客户端连接集群的流程分析
如果上面的服务是http或者https服务器,你通过curl访问本地192.168.1.101:9092的话绝对是没啥问题的。 并且我通过测试 telnet 192.168.1.101 9092 是能正常连接的,由此说明我搭建的SSH隧道是没问题的。 现在出问题更多的可能是在于我Kafka配置或者Kafka的用法存在问题. 查了一波资料引用下这个网友回答:
根据这个网友的资料,我自己画了一下Kafka客户端连接服务端的原理流程图:
kafka存在2个比较重要的参数:
1.KAFKA_LISTENERS(只负责集群服务的监听,用来返回brokers列表给客户端)
2.KAFKA_ADVERTISED_LISTENERS(客户端真实连接的broker端点, 进行数据传输)
4.Kafka无法消费原因求证
我当前的Kafka配置信息:
KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://10.0.3.99:9092
KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
我访问192.168.1.101:9092,此时可以通过隧道访问到阿里云公网【KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092】,此时Kafka集群返回给客户端的brokers是【KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://10.0.3.99:9092】, 那我python客户端真实连接的brokers端点则是10.0.3.99:9092, 但是明显我本地的网络环境根本连不到10.0.3.99这个IP, 所以导致我的python脚本超时。OK.如果到这里仅仅只是猜测,那么我们通过抓包来验证下结论:
事实胜于雄辩! 此时大家可以看到最后我本地python客户端连接的brokers是10.0.3.99:9092, 所以导致消费超时!
5.使用域名的方式修改kafka配置
最后我们采用域名的方式修改了kafka配置,相对灵活:
KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://my.kafka.com:9092
KAFKA_LISTENERS=PLAINTEXT://my.kafka.com:9092
阿里云服务器/etc/hosts添加了一条记录:
0.0.0.0 my.kafka.com
此时,我们的python客户端所在宿主机也要加下hosts, 将my.kafka.com指向SSH隧道代理的服务器ip:
192.168.1.101 my.kafka.com
修改后的python代码:
from kafka import KafkaConsumer
consumer = KafkaConsumer('test', bootstrap_servers='my.kafka.com:9092', group_id="python", retries=3, max_block_ms=3000, request_timeout_ms=3000)
print("开始")
for msg in consumer:
print('从kafka获取到的数据: ')
print(msg.value.decode(encoding='utf-8'))
那么此时,kafka就能正常消费了。 我们再来捋一下python客户端连接kafka集群的过程。 首先,连接my.kafka.com:9092获取到broker列表, my.kafka.com对应的ip是192.168.1.101, 连接的端口是9092,没毛病,因为SSH隧道是正常的. 此时kafka返回broker列表为【KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://my.kafka.com:9092】, 明显我在本地访问broker my.kafka.com:9092也能正常连接,到此,python从kafka消费正常。
综上所述,我们得到一些经验:
1. 在搭建kafka集群的时候使用域名的方式来配置,而不是使用IP. 这样我们可以很灵活的做hosts配置来访问kafka.
2.充分理解Kafka的2个参数KAFKA_ADVERTISED_LISTENERS、KAFKA_LISTENERS含义以及Kafka客户端连接到服务端的流程和原理
3.别人说的不一定是正确的,大家可以自行判断、求证(包括我写的,大家可以自己去求证)