通过SSH隧道安全消费Kafka数据

news2024/12/24 9:48:26

一.背景

    由于我们有个业务在阿里云部署了Kafka,但是想直接在本地IDC机房服务器直接通过公网消费Kafka进行业务处理。这个本来也不是什么难事,阿里云把9092默认端口打开运行访问即可,也不不值得再写这篇博客了。 这个事情让人特别关注的一个主题其实就是数据安全问题

    如果从公网消费Kafka, 由于Kafka我们并没有配置SSL机制,所以如果直接通过公网消费意味着数据明文裸奔,导致数据泄漏。 那么针对这个安全问题,我们是这么做的:

     1.首先,阿里云安全组更改kafka的默认端口号,并且对访问kafka的端口设置为公司的IP白名单,保证只有机房IP才能正常访问,非法IP则无法访问。

      2.通过搭建SSH隧道代理的方式,在IDC服务器做一个SSH隧道代理到阿里云服务器的Kafka端口,这样从公网消费的kafka数据都经过隧道进行传输,避免了数据通过明文传输的风险。

    还不太了解什么是SSH隧道的童鞋,可以参考一下我之前写过的博客: ssh端口转发(隧道技术)

二.实现流程

1.运行SSH隧道代理

ssh -N -o "ServerAliveInterval 60" -L 192.168.1.101:9092:10.0.3.99:9092 -p 22 root@182.43.x.x

解释一下这个语句:

-N   #不登录到远程服务器

-o "ServerAliveInterval 60"   #心跳检测周期时间 60s

-L 192.168.1.101:9092:10.0.3.99:9092  #绑定本地192.168.1.101:9092端口,映射到10.0.3.99:9092(阿里云服务器内网IP)

-p 22 #SSH端口

root@182.43.x.x  #服务器的公网IP

使用supervisord加以运行即可,防止进程意外挂掉,保证进程高可用。

2.Python客户端无法通过SSH隧道消费

执行消费代码:

from kafka import KafkaConsumer

consumer = KafkaConsumer('test', bootstrap_servers='192.168.1.101:9092', group_id="python", retries=3, max_block_ms=3000, request_timeout_ms=3000)

print("开始")
for msg in consumer:
    print('从kafka获取到的数据: ')
    print(msg.value.decode(encoding='utf-8'))

连接kafka消费超时: 

 

3.Kafka客户端连接集群的流程分析

    如果上面的服务是http或者https服务器,你通过curl访问本地192.168.1.101:9092的话绝对是没啥问题的。 并且我通过测试 telnet 192.168.1.101 9092 是能正常连接的,由此说明我搭建的SSH隧道是没问题的。  现在出问题更多的可能是在于我Kafka配置或者Kafka的用法存在问题.  查了一波资料引用下这个网友回答:

根据这个网友的资料,我自己画了一下Kafka客户端连接服务端的原理流程图:

 kafka存在2个比较重要的参数: 

       1.KAFKA_LISTENERS(只负责集群服务的监听,用来返回brokers列表给客户端)

       2.KAFKA_ADVERTISED_LISTENERS(客户端真实连接的broker端点, 进行数据传输)

4.Kafka无法消费原因求证

我当前的Kafka配置信息:

KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://10.0.3.99:9092
KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092

我访问192.168.1.101:9092,此时可以通过隧道访问到阿里云公网【KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092】,此时Kafka集群返回给客户端的brokers是【KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://10.0.3.99:9092】, 那我python客户端真实连接的brokers端点则是10.0.3.99:9092, 但是明显我本地的网络环境根本连不到10.0.3.99这个IP,  所以导致我的python脚本超时。OK.如果到这里仅仅只是猜测,那么我们通过抓包来验证下结论:

 事实胜于雄辩! 此时大家可以看到最后我本地python客户端连接的brokers是10.0.3.99:9092, 所以导致消费超时!

5.使用域名的方式修改kafka配置

最后我们采用域名的方式修改了kafka配置,相对灵活:

KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://my.kafka.com:9092
KAFKA_LISTENERS=PLAINTEXT://my.kafka.com:9092

 阿里云服务器/etc/hosts添加了一条记录:

0.0.0.0  my.kafka.com

此时,我们的python客户端所在宿主机也要加下hosts, 将my.kafka.com指向SSH隧道代理的服务器ip:

192.168.1.101 my.kafka.com

修改后的python代码:

from kafka import KafkaConsumer

consumer = KafkaConsumer('test', bootstrap_servers='my.kafka.com:9092', group_id="python", retries=3, max_block_ms=3000, request_timeout_ms=3000)

print("开始")
for msg in consumer:
    print('从kafka获取到的数据: ')
    print(msg.value.decode(encoding='utf-8'))

那么此时,kafka就能正常消费了。 我们再来捋一下python客户端连接kafka集群的过程。 首先,连接my.kafka.com:9092获取到broker列表, my.kafka.com对应的ip是192.168.1.101, 连接的端口是9092,没毛病,因为SSH隧道是正常的. 此时kafka返回broker列表为【KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://my.kafka.com:9092】, 明显我在本地访问broker   my.kafka.com:9092也能正常连接,到此,python从kafka消费正常。

 

综上所述,我们得到一些经验:

   1. 在搭建kafka集群的时候使用域名的方式来配置,而不是使用IP.  这样我们可以很灵活的做hosts配置来访问kafka.

    2.充分理解Kafka的2个参数KAFKA_ADVERTISED_LISTENERS、KAFKA_LISTENERS含义以及Kafka客户端连接到服务端的流程和原理

    3.别人说的不一定是正确的,大家可以自行判断、求证(包括我写的,大家可以自己去求证)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/506637.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【记录】ChatGPT|图片预览魔法咒语魔改,使用 ChatGPT 返回大量可以跳转的链接

很早的时候,我已经留意到 ChatGPT 会以返回图片的 markdown 格式来显示图片,很可能拥有一定的图片上传功能,但是它往往会显示得有些问题。一些代码图片之类的或者风景图什么的都不是很会。 但其实 ChatGPT 是可以直接返回图片类型的回复的&a…

C++初始化列表

1.初始化列表概述 初始化列表:以一个冒号开始,接着是一个以逗号分隔的数据成员列表,每个"成员变量"后面跟一个放在括号中的初始值或表达式。 2.为什么使用初始化列表 在创建对象时,编译器通过调用构造函数&#xff0c…

个人电脑操作系统

UEFI(Unified Extensible Firmware Interface,全称统一的可扩展固件接口)是一种个人电脑系统规格,用来定义操作系统与系统固件之间的软件界面,作为BIOS的替代方案。其前身是Intel在1998年开始开发的Intel Boot Initiat…

【static_cast、reinterpret_cast、const_cast、dynamic_cast】C++类型转换

C类型转换 引入C语言中的类型转换 C的强制类型转换static_castreinterpret_catconst_castdynamic_cast向下转型的安全问题 explicit4种类型转换的应用场景 RTTI 引入 C语言中的类型转换 C语言和C都是强类型语言,如果赋值运算符左右两侧变量的类型不同,…

ChatGPT一键私有部署,全网可用,让访问、问答不再受限,且安全稳定!

前言 ChatGPT由于在访问上有一些限制,使用并不便利。目前国内可以直接访问的大部分是调用API返回结果,我们去使用时总会有次数限制,而且它们可能随便崩掉。 其实,目前我们访问过的大部分国内的网页包括UI,其实是套用了…

过滤器对前端请求参数进行解码URLDecoder,接口接收参数类型为map,解码无效问题

文章目录 一、前言二、设计思路三、代码实现四、启动测试五、过滤器解码无效六、源码跟踪七、解决方案八、再次重启测试九、总结 一、前言 最近做的一个公司项目,因为客户需要对特殊字符做搜索,但是前端的请求参数无法传递到后端,所以前端对…

VS2017中Qt项目数据库连接——包含报错比如QMYSQL driver not loaded(细心看到最后,一定能解决你想解决的问题)

我把爆的错误QMYSQL driver not loaded写在文章末尾了,大家看一看!前面是配置数据库 一、测试 VS2017 中 Qt 项目数据库连接 打开 VS2017,文件——新建项目,右侧输入框输入 Qt 确定后点击下一步,勾选模块 基类也是默…

RSU路测单元,你知道多少?

一、什么是RSU路测单元? RSU路测单元是实现智慧的路、车路协同的关键设备,设置在路侧,与附近过往车辆进行双向通信、交互数据,是智能交通系统中的一种重要设备。RSU可以连接路面原有电子设备,比如信号灯和摄像头&…

一文解决Xshell无法连接vmware上的centos

问题描述 win10系统上安装VMware workstation16 pro,装好后安装centos虚拟机,在设置network & hostname时选择的NAT模式,即使用自定义的网关和IPv4地址,最后配置完成后centos主机地址信息如下,在虚拟机内部进行pi…

【一览无余】Vue框架下Cesium加载遥感地图使用GeoServer切割TIF大文件对外发布WMS服务进行地图绘制(科普篇2/2)

【一览无余】Vue框架下Cesium加载遥感地图使用GeoServer切割TIF大文件对外发布WMS服务进行地图绘制(科普篇2/2) 二、Cesium是弄啥嘞2.1 WebGL2.1.1 WebGL是什么2.1.2 WebGL优点 2.2 Cesium是什么2.3 Cesium能干什么2.4 Cesium相关工具有哪些2.5 相关案例…

Echarts 3D散点图

文章目录 以下是一个 html echarts的案例 <!DOCTYPE html> <html> <head><meta charset"utf-8"><title>ECharts 3D Scatter Plot Demo</title><!-- 引入 ECharts --><script src"https://cdnjs.cloudflare.com/…

急吗?光急可没用呀!满满干货,两小时速成,别搁那干瞪眼了!

全球产业链加速重构&#xff0c;各种不确定性加大&#xff01;数字经济规模不断提升&#xff0c;为企业转型与发展创造大量机会&#xff01;企业亟须以数字化为工具或手段&#xff0c;再造组织流程和业务流程&#xff0c;以数字化确定性应对外部环境变化的不确定性&#xff0c;…

中检集团:把数智化转型作为“1号工程”

2018年4月&#xff0c;中国检验认证集团&#xff08;以下简称“中检集团”&#xff09;启动信息化建设“1号工程”&#xff0c;5年时间&#xff0c;从“数字中检1.0”升级到“2.0”再到“2.1”“2.2”&#xff0c;中检集团在数智化转型过程中&#xff0c;可谓是打了一场攻坚战。…

智能ai文章伪原创工具-智能ai文章原创处理系统

智能AI文章伪原创工具 您好&#xff0c;智能AI文章伪原创工具是一种通过机器学习和自然语言处理技术&#xff0c;帮助用户生成“看起来像是”原创文章的人工智能工具。该工具的原理是将原始文章分解为各个句子和段落&#xff0c;然后对其中的一些单词、短语或句子进行修改或替…

MySQL高级(进阶)SQL语句

#显示一个字段或者多个字段的所有内容 SELECT “字段” FROM 表名 &#xff1b; #distinct对字段去重查询 &#xff08;最好只对单个字段进行去重&#xff09; SELECT DISTINCT 字段名 FROM 表名 #where 有条件查询 SELECT “字段” FROM 表名 WHERE 条件&#xff08;例如&a…

安卓开发:使用可为null性

Kotlin园地 地址&#xff1a;Kotlin 园地 | Android 开发者 | Android Developershttps://developer.android.google.cn/training/kotlinplayground?hlzh-cn 以下简称 “K园” 先在K园执行以下代码&#xff1a; fun main() {var fa: String "sandra on";prin…

服务攻防-协议漏洞-FTPRDPSSHRsyncProFTPDlibsshOpenssh-Hydra工具使用口令猜解未授权访问

目录 一、导图 二、口令猜解-Hydra-FTP&RDP&SSH 1、协议介绍 2、Hydra工具介绍 3、实例演示 三、配置不当-未授权访问-Rsync 文件备份 1、Rsync介绍 2、漏洞成因——配置不当 3、实例演示 四、协议漏洞-应用软件-FTP&ProFTPD搭建 1、引入 2、ProFTPD介…

IDEA 搭建 Maven模块化项目

目录 1.前言 2. 软硬件环境 3.项目搭建 3.1.创建 SpringBoot 父项目 3.2. 构建子项目centerdao 3.3. 构建子项目centerweb 4. 建立父子 Module 依赖 4.1 删除不必要文件 4.2.修改 Parent 项目 packaging 4.3.修改子项目pom.xml 信息 4.4. 补充说明 5. 项目继承关系…

ios15及以上webview、Safari使用Websocket断连,1006无清晰错误码

文章目录 问题表现&#xff1a;定位疑似原因&#xff1a;解决方式&#xff1a;定位问题思路过程记录&#xff1a;1、对比前端代码运行环境问题2、写纯请求前端代码连接&#xff0c;确认是否接口部署服务问题&#xff1b;3、IOS连接是否有对TSL安全协议版本有要求&#xff08;使…

【Java入门合集】第六章异常处理

博主&#xff1a;命运之光 专栏&#xff1a;JAVA入门 学习目标 掌握异常的概念&#xff0c;Java中的常见异常类&#xff1b; 掌握Java中如何捕获和处理异常&#xff1b; 掌握自定义异常类及其使用&#xff1b; 目录 异常概述 异常体系 常见的异常 Java的异常处理机制 方式…