背景
在华为云服务器上跑了 zookeeper 和 kafka 的 broker,想内外网分流,重点就是做不到从外网去消费,比如用自己的 windows 笔记本去消费。
配置 server.properties 的 listener
为 broker 所在机子的的内网 IP 后,终于能 start 了:
listener=PLAINTEXT://192.168.0.154:9092
zookeeper 查看 kafka broker 的地址:
get /brokers/ids/0
{"features":{},"listener_security_protocol_map":{"PUBLIC":"PLAINTEXT"},"endpoints":["PUBLIC://192.168.0.154:9092"],"jmx_port":-1,"port":9092,"host":"192.168.0.154","version":5,"timestamp":"1686651266529"}
地址是 “endpoints”:[“PUBLIC://192.168.0.154:9092”]
从 broker 机子的本地创建 topic:
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test0
结果疯狂滚屏:
workClient)
[2023-06-13 18:21:48,162] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.1.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2023-06-13 18:21:49,266] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
换个姿势创建:
bin/kafka-topics.sh --create --bootstrap-server 192.168.0.154:9092 --topic test0
成功了:
broker 本机来 produce。
换同网段机子 192.168.0.28 来消费:
外网 windows 去消费:
D:\Programs\MQ\kafka\kafka_2.12-3.2.3\bin\windows>kafka-console-consumer.bat --topic test0 --from-beginning --bootstrap-server 121.37.xx.xxx:9092
[2023-06-13 18:44:40,678] WARN [Consumer clientId=console-consumer, groupId=console-consumer-76813] Connection to node 0 (/192.168.0.154:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
访问不到。
listeners & advertised.listeners
让我们来看下官网的说明:
章节来源:(7. Security - 7.2 Listener Configuration)
每个服务器上必须至少定义一个监听器。 listeners
中定义的每个监听器的格式如下:
{LISTENER_NAME}://{hostname}:{port}
LISTENER_NAME
通常是一个描述性的名字,定义了监听器的用途。例如,许多配置为客户端流量使用单独的监听器,所以他们可能在配置中把相应的监听器称为`CLIENT’.
listener.security.protocol.map
。该值是一个逗号分隔的列表,列出了映射到其安全协议的每个监听器。例如,下面值配置指定 CLIENT
监听器将使用 SSL,而BROKER
监听器将使用明文(plaintext)。
listener.security.protocol.map=CLIENT:SSL,BROKER:PLAINTEXT
下面给出了安全协议的可能选项:
- PLAINTEXT
- SSL
- SASL_PLAINTEXT
- SASL_SSL
明文(PLAINTEXT)协议不提供安全性,不需要任何额外的配置。在下面的章节中,本文将介绍如何配置其余的协议。
也可以在监听器中使用安全协议名称作为监听器名称。
在 listeners list 中,可以通过将 inter.broker.listener.name
,来哪一个声明监听器用于 broker 间的通信。broker 间监听器的主要目的是分区复制。如果没有定义,那么 broker 间的监听器由 security.inter.broker.protocol
定义的安全协议决定,该协议默认为PLAINTEXT
。
对于依赖 Zookeeper 存储集群元数据 metadata 的传统集群 cluster,可以声明一个单独的 listener,用于从活动控制器 controller 到 broker 的元数据 metadata 传播。这是由 control.plane.listener.name
定义的。当 controller 需要向 cluster 中的 broker 推送 metadata 更新时,它将使用这个监听器 listener。使用控制平面监听器(contol.plane.listener)的好处是,它使用一个单独的处理线程,这使得应用程序流量不太可能阻碍元数据变化的及时传播(如分区领导和ISR更新)。
控制器 controller 接收来自其他控制器 controller 和 broker 的请求。由于这个原因,即使一个服务器没有启用控制器角色(即它只是一个 broker),它仍然必须定义控制器监听器以及配置它所需的任何安全属性。例如,我们可以在一个独立的 broker 上使用以下配置:
process.roles=broker
listeners=BROKER://localhost:9092
inter.broker.listener.name=BROKER
controller.quorum.voters=0@localhost:9093
controller.listener.names=CONTROLLER
listener.security.protocol.map=BROKER:SASL_SSL,CONTROLLER:SASL_SSL
在这个例子中,控制器监听器仍然被配置为使用 SASL_SSL
安全协议,但它不包括在 listeners
中,因为 broker 没有暴露控制器监听器本身。在这种情况下,将使用的端口来自 controller.quorum.voters
配置,它定义了完整的控制器列表。
(3.1 Broker Configs)
🌵 listeners
\############################# Socket Server Settings #############################
\# The address the socket server listens on. If not configured, the host name will be equal to the value of
\# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
\# FORMAT:
\# listeners = listener_name://host_name:port
\# EXAMPLE:
\# listeners = PLAINTEXT://your.host.name:9092
\#listeners=PLAINTEXT://:9092
Listener List - Comma-separated list of URIs we will listen on and the listener names. If the listener name is not a security protocol, listener.security.protocol.map
must also be set.
Listener names and port numbers must be unique.
Specify hostname as 0.0.0.0 to bind to all interfaces.
Leave hostname empty to bind to default interface.
Examples of legal listener lists:
PLAINTEXT://myhost:9092,SSL://:9091
CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093
监听器
Listener List - 以逗号分隔的我们要监听的 URI 列表和监听器名称。如果监听器的名字不是安全协议,listenener.security.protocol.map 也必须被设置。
监听器名称和端口号必须是唯一的。
指定 hostname 为 0.0.0.0 以绑定所有接口。
把 hostname 留空就可以绑定到默认接口。
合法监听器列表的例子。
PLAINTEXT://myhost:9092,SSL://:9091
CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093
(/key:value -> ListenerName: Uri/)
🌵 advertised.listeners
\# Listener name, hostname and port the broker will advertise to clients.
\# If not set, it uses the value for "listeners".
\#advertised.listeners=PLAINTEXT://your.host.name:9092
advertised.listeners=
/监听器名称、主机名和端口,broker将向 clients 公布/
Listeners to publish to ZooKeeper for clients to use, if different than the listeners
config property. In IaaS environments, this may need to be different from the interface to which the broker binds. If this is not set, the value for listeners
will be used. Unlike listeners
, it is not valid to advertise the 0.0.0.0 meta-address.
发布至 zookeeper 注册中心的 listeners, 为了让 clients (从zk拿来)使用。
只需外网访问 kafka
你肯定想到了最简单的一个方法,listeners
使用外网ip
listeners=PLAINTEXT://101.89.163.1:9092
如果宿主机有外网网卡,这么配当然没问题。如果没有(ifconfig看不到外网ip的网卡,基本上就不存在这个外网网卡),很可能和我使用的的宿主机(华为云ECS)一样是通过 NAT 映射或者啥办法搞出来的外网 ip,此时 kafka 无法监听这个外网 ip(因为不存在,启动就会报错)。
[2023-06-13 18:57:53,738] INFO App info kafka.server for 0 unregistered (org.apache.kafka.common.utils.AppInfoParser)
[2023-06-13 18:57:53,738] INFO [KafkaServer id=0] shut down completed (kafka.server.KafkaServer)
[2023-06-13 18:57:53,738] ERROR Exiting Kafka due to fatal exception during startup. (kafka.Kafka$)
org.apache.kafka.common.KafkaException: Socket server failed to bind to 121.37.xx.xxx:9092: Cannot assign requested address.
at kafka.network.Acceptor.openServerSocket(SocketServer.scala:684)
at kafka.network.Acceptor.<init>(SocketServer.scala:576)
failed to bind to 121.37.xx.xxx:9092: Cannot assign requested address.
这时候就是advertised.listeners
真正发挥作用的时候了。使用如下配置:
listeners=PLAINTEXT://192.168.0.213:9092 //内网IP
advertised.listeners=PLAINTEXT://101.89.163.1:9092 //外网IP
此时一个完整的 kafka 客户端访问服务端的流程:
- 客户端访问 101.89.163.1:9092,被 kafka 宿主机所在环境映射到内网192.168.0.213:9092,访问到了kafka节点,请求获得 kafka 服务端的访问地址
- kafka 从 zookeeper 拿到自己和其他兄弟节点通过 advertised.listeners 注册到 zookeeper 的101.89.163.1:9092 等外网地址,作为 kafka 的服务端访问地址返回给客户端
- 客户端拿这些地址访问 kafka 集群,被 kafka 宿主机所在环境映射到各kafka节点的内网ip,访问到了kafka服务端…完美循环
你可能会问已经配置了访问地址,为什么还要在第一次访问的时候请求获得 kafka 的访问地址。因为如果是 kafka 集群,你可以选择只给客户端配置一个 kafka 节点的地址(这样是不推荐的),但是客户端必须要访问集群中的每一个节点,所以必须通过这个节点获得集群中每一个节点的访问地址。
如果不配置advertised.listeners=PLAINTEXT://101.89.163.1:9092
,你会发现虽然你给kafka 客户端配置的访问地址是 101.89.163.1:9092
,但是kafka客户端访问时报错,报错原因是Connection to node -1[192.168.0.213:9092] could not be established. Broker may not be available.
。这就是因为不配置advertised.listeners
则advertised.listeners
默认使用listeners
配置的地址,客户端拿到的就是listeners
配置的内网地址
内外网分流
如果是有外网网卡的情况,直接配置外网 ip 有没有问题呢?
如果既要内网访问,又要外网访问,本来可以走内网的流量都走外网网卡,显然不合适;而且有的环境可能被配置成这些 kafka 宿主机是没有外网访问权限的,即虽然他可以访问自己的外网ip,但是访问不了兄弟节点的外网ip。这时候就要配置内外网分流。网上教程就有很多了。
像上面这样设置完后,再从外网(我的windows)访问 broker:
成功。
broker 的日志:
[2023-06-13 19:11:22,003] INFO [GroupMetadataManager brokerId=0] Finished loading offsets and group metadata from __consumer_offsets-28 in 48 milliseconds for epoch 0, of which 48 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)
[2023-06-13 19:11:49,972] INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group console-consumer-85874 in Empty state. Created a new member id console-consumer-da4137d8-6b3a-4983-bf0e-e1dcd49f876b and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
[2023-06-13 19:11:50,009] INFO [GroupCoordinator 0]: Preparing to rebalance group console-consumer-85874 in state PreparingRebalance with old generation 0 (__consumer_offsets-23) (reason: Adding new member console-consumer-da4137d8-6b3a-4983-bf0e-e1dcd49f876b with group instance id None; client reason: rebalance failed due to MemberIdRequiredException) (kafka.coordinator.group.GroupCoordinator)
[2023-06-13 19:11:50,015] INFO [GroupCoordinator 0]: Stabilized group console-consumer-85874 generation 1 (__consumer_offsets-23) with 1 members (kafka.coordinator.group.GroupCoordinator)
[2023-06-13 19:11:50,048] INFO [GroupCoordinator 0]: Assignment received from leader console-consumer-da4137d8-6b3a-4983-bf0e-e1dcd49f876b for group console-consumer-85874 for generation 1. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
…for group console-consumer-85874 for generation 1. The group has 1 members,
consumer端:
跑了五次,创了五个consumer: