使用Python操作Kafka:KafkaProducer、KafkaConsumer
Python kafka-python API的帮助文档
1 kafka tools连接
(1)/usr/local/kafka_2.13-3.4.0/config/server.properties
listeners = PLAINTEXT://myubuntu:9092
advertised.listeners=PLAINTEXT://192.168.1.8:29092
(2)/etc/hosts
10.0.2.11 myubuntu
其中192.168.1.8是宿主机外网地址
其中10.0.2.11是虚拟机内网地址
192.168.1.8:29092映射到了10.0.2.11:9092。
2 单线程生产者
说是单线程,其实并不是,你启动一个生产者其实是2个线程,后台有一个IO线程用于真正发送消息出去,前台有一个线程用于把消息发送到本地缓冲区。
KafkaProducer是发布消息到Kafka集群的客户端,它是线程安全的并且共享单一生产者实例。生产者包含一个带有缓冲区的池,用于保存还没有传送到Kafka集群的消息记录以及一个后台IO线程,该线程将这些留在缓冲区的消息记录发送到Kafka集群中。
2.1 KafkaProducer构造函数参数
(1)acks
0表示发送不理睬发送是否成功;
1表示需要等待leader成功写入日志才返回;
all表示所有副本都写入日志才返回
(2)buffer_memory
默认33554432也就是32M,该参数用于设置producer用于缓存消息的缓冲区大小,
如果采用异步发送消息,那么生产者启动后会创建一个内存缓冲区用于存放待发送的消息,
然后由专属线程来把放在缓冲区的消息进行真正发送,
如果要给生产者要给很多分区发消息那么就需要考虑这个参数的大小防止过小降低吞吐量
(3)compression_type
是否启用压缩,默认是none,可选类型为gzip、lz4、snappy三种。
压缩会降低网络IO但是会增加生产者端的CPU消耗。
另外如果broker端的压缩设置和生产者不同那么也会给broker带来重新解压缩和重新压缩的CPU负担。
(4)retries
重试次数,当消息发送失败后会尝试几次重发。
默认为0,一般考虑到网络抖动或者分区的leader切换,
而不是服务端真的故障所以可以设置重试3次。
(5)retry_backoff_ms
每次重试间隔多少毫秒,默认100毫秒。
(6)max_in_flight_requests_per_connection
生产者会将多个发送请求缓存在内存中,默认是5个,
如果你开启了重试,也就是设置了retries参数,
那么将可能导致针对于同一分区的消息出现顺序错乱。
为了防止这种情况需要把该参数设置为1,来保障同分区的消息顺序。
(7)batch_size
对于调优生产者吞吐量和延迟性能指标有重要的作用。
buffer_memeory可以看做池子,而这个batch_size可以看做池子里装有消息的小盒子。
这个值默认16384也就是16K,其实不大。
生产者会把发往同一个分区的消息放在一个batch中,
当batch满了就会发送里面的消息,但是也不一定非要等到满了才会发。
这个数值大那么生产者吞吐量高但是性能低,
因为盒子太大占用内存发送的时候这个数据量也就大。
如果你设置成1M,那么显然生产者的吞吐量要比16K高的多。
(8)linger_ms
上面说batch没有填满也可以发送,那显然有一个时间控制,就是这个参数,
默认是0毫秒,这个参数就是用于控制消息发送延迟多久的。
默认是立即发送,无需关系batch是否填满。
大多数场景我们希望立即发送,但是这也降低了吞吐量。
(9)max_request_size
最大请求大小,可以理解为一条消息记录的最大大小,默认是1048576字节,1M。
(10)request_timeout_ms
生产者发送消息后,broker需要在规定时间内将处理结果返回给生产者,
那个这个时间长度就是这个参数控制的,默认30000,也就是30秒。
如果broker在30秒内没有给生产者响应,那么生产者就会认为请求超时,
并在回调函数中进行特殊处理,或者进行重试。
2.2 示例代码
# -*- coding: utf-8 -*-
import time
import random
import sys
from kafka import KafkaProducer
from kafka.errors import KafkaError, KafkaTimeoutError
import json
class Producer(object):
def __init__(self, KafkaServerList=['127.0.0.1:9092'], ClientId="Procucer01", Topic='Test'):
self._kwargs = {
"bootstrap_servers": KafkaServerList,
"client_id": ClientId,
"acks": 1,
"buffer_memory": 33554432,
'compression_type': None,
"retries": 3,
"batch_size": 1048576,
"linger_ms": 100,
"key_serializer": lambda m: json.dumps(m).encode('utf-8'),
"value_serializer": lambda m: json.dumps(m).encode('utf-8'),
}
self._topic = Topic
try:
self._producer = KafkaProducer(**self._kwargs)
except Exception as err:
print(err)
def _onSendSucess(self, record_metadata):
"""
异步发送成功回调函数,也就是真正发送到kafka集群且成功才会执行。
发送到缓冲区不会执行回调方法。
:param record_metadata:
:return:
"""
print("发送成功")
print("被发往的主题:", record_metadata.topic)
print("被发往的分区:", record_metadata.partition)
print("队列位置:", record_metadata.offset) # 这个偏移量是相对偏移量,也就是相对起止位置,也就是队列偏移量。
def _onSendFailed(self):
print("发送失败")
def sendMessage(self, value=None, partition=None):
if not value:
return None
# 发送的消息必须是序列化后的,或者是字节
# value = json.dumps(value, encoding='utf-8', ensure_ascii=False)
# value 必须为字节或者被序列化为字节,
# 由于之前我们初始化时已经通过value_serializer来做了,所以我上面的语句就注释了
# key 与value对应的键,可选,也就是把一个键关联到这个消息上,
# KEY相同就会把消息发送到同一分区上,所以如果有这个要求就可以设置KEY,也需要序列化
# partition 发送到哪个分区,整型。如果不指定将会自动分配。
kwargs = {
"value": value,
"key": None,
"partition": partition
}
try:
# 异步发送,发送到缓冲区,同时注册两个回调函数,一个是发送成功的回调,一个是发送失败的回调。
# send函数是有返回值的是RecordMetadata,也就是记录的元数据,包括主题、分区、偏移量
future = self._producer.send(self._topic, **kwargs)
future.add_callback(self._onSendSucess)
future.add_errback(self._onSendFailed)
print("发送消息:", value)
except KafkaTimeoutError as err:
print(err)
except Exception as err:
print(err)
def closeConnection(self, timeout=None):
# 关闭生产者,可以指定超时时间,也就是等待关闭成功最多等待多久。
self._producer.close(timeout=timeout)
def sendNow(self, timeout=None):
# 调用flush()函数可以将所有在缓冲区的消息记录立即发送,即使ligner_ms值大于0.
# 这时候后台发送消息线程就会开始立即发送消息并且阻塞在这里,等待消息发送成功,当然是否阻塞取决于acks的值。
# 如果不调用flush函数,那么什么时候发送消息取决于ligner_ms或者batch任意一个条件满足就会发送。
try:
self._producer.flush(timeout=timeout)
except KafkaTimeoutError as err:
print(err)
except Exception as err:
print(err)
def main():
p = Producer(KafkaServerList=["127.0.0.1:29092"], ClientId="Procucer01", Topic="test")
for i in range(10):
time.sleep(1)
closePrice = random.randint(1, 500)
msg = {
"Publisher": "Procucer01",
"股票代码": 60000 + i,
"昨日收盘价": closePrice,
"今日开盘价": 0,
}
p.sendMessage(value=msg)
p.closeConnection()
if __name__ == "__main__":
try:
main()
finally:
sys.exit()
3 单线程消费者
初始化一个消费者实例,消费者不是线程安全的,所以建议一个线程实现一个消费者,而不是一个消费者让多个线程共享。
下面这些是可选参数,可以在初始化KafkaConsumer实例的时候传递进去:
enable_auto_commit 是否自动提交,默认是true
auto_commit_interval_ms 自动提交间隔毫秒数
auto_offset_reset="earliest"
重置偏移量,earliest移到最早的可用消息,latest最新的消息,默认为latest
3.1 手动拉取消息
# -*- coding: utf-8 -*-
import sys
from kafka import KafkaConsumer
import json
class Consumer(object):
def __init__(self, KafkaServerList=['127.0.0.1:9092'], GroupID='TestGroup11', ClientId="Test", Topics=['Test', ]):
"""
用于设置消费者配置信息,这些配置项可以从源码中找到,下面为必要参数。
:param KafkaServerList: kafka服务器IP:PORT 列表
:param GroupID: 消费者组ID
:param ClientId: 消费者名称
:param Topic: 主题 列表
"""
self._kwargs = {
"bootstrap_servers": KafkaServerList,
"client_id": ClientId,
"group_id": GroupID,
"enable_auto_commit": False,
"auto_offset_reset": "earliest",
"key_deserializer": lambda m: json.loads(m.decode('utf-8')),
"value_deserializer": lambda m: json.loads(m.decode('utf-8')),
}
try:
self._consumer = KafkaConsumer(**self._kwargs)
self._consumer.subscribe(topics=(Topics))
except Exception as err:
print("Consumer init failed, %s" % err)
def consumeMsg(self):
try:
while True:
data = self._consumer.poll(timeout_ms=1000, max_records=100) # 拉取消息,字典类型
print(data)
if data:
for key in data:
for consumerrecord in data.get(key):
# 返回的是ConsumerRecord对象,可以通过字典的形式获取内容。
if consumerrecord != None:
# 消息消费逻辑
message = {
"Topic": consumerrecord.topic,
"Partition": consumerrecord.partition,
"Offset": consumerrecord.offset,
"Key": consumerrecord.key,
"Value": consumerrecord.value
}
print(message)
# 消费逻辑执行完毕后在提交偏移量
self._consumer.commit()
else:
print("%s consumerrecord is None." % key)
except Exception as err:
print(err)
def main():
try:
c = Consumer(KafkaServerList=['127.0.0.1:29092'], Topics=['test'])
c.consumeMsg()
except Exception as err:
print(err)
if __name__ == "__main__":
try:
main()
finally:
sys.exit()
3.2 非手动拉取消息
# -*- coding: utf-8 -*-
import sys
from kafka import KafkaConsumer
import json
class Consumer(object):
def __init__(self, KafkaServerList=['172.16.48.171:9092'], GroupID='TestGroup111', ClientId="Test", Topics=['Test', ]):
"""
用于设置消费者配置信息,这些配置项可以从源码中找到,下面为必要参数。
:param KafkaServerList: kafka服务器IP:PORT 列表
:param GroupID: 消费者组ID
:param ClientId: 消费者名称
:param Topic: 主题
"""
self._kwargs = {
"bootstrap_servers": KafkaServerList,
"client_id": ClientId,
"group_id": GroupID,
"enable_auto_commit": False,
"auto_offset_reset": "earliest",
"key_deserializer": lambda m: json.loads(m.decode('utf-8')),
"value_deserializer": lambda m: json.loads(m.decode('utf-8')),
}
try:
self._consumer = KafkaConsumer(**self._kwargs)
self._consumer.subscribe(topics=(Topics))
except Exception as err:
print("Consumer init failed, %s" % err)
def consumeMsg(self):
try:
while True:
for consumerrecord in self._consumer:
if consumerrecord:
message = {
"Topic": consumerrecord.topic,
"Partition": consumerrecord.partition,
"Offset": consumerrecord.offset,
"Key": consumerrecord.key,
"Value": consumerrecord.value
}
print(message)
# 消费逻辑执行完毕后在提交偏移量
self._consumer.commit()
except Exception as err:
print(err)
def main():
try:
c = Consumer(KafkaServerList=['127.0.0.1:29092'], Topics=['test'])
c.consumeMsg()
except Exception as err:
print(err)
if __name__ == "__main__":
try:
main()
finally:
sys.exit()
4 消费者
4.1 简单demo
启动后消费者可以从kafka服务器获取数据.
from kafka import KafkaConsumer
# 参数为接收主题和kafka服务器地址
consumer = KafkaConsumer('test',
bootstrap_servers=['127.0.0.1:29092'])
# 这是一个永久堵塞的过程,生产者消息会缓存在消息队列中,并且不删除,
# 所以每个消息在消息队列中都有偏移
# consumer是一个消息队列,当后台有消息时,这个消息队列就会自动增加.
# 所以遍历也总是会有数据,当消息队列中没有数据时,就会堵塞等待消息带来
for msg in consumer:
print("{}:{}:{}: key={} value={}".format(msg.topic,
msg.partition,
msg.offset,
msg.key,msg.value))
4.2 消费者组
启动多个消费者,只有其中某一个成员可以消费到,满足要求,消费组可以横向扩展提高处理能力。
from kafka import KafkaConsumer
# 使用group,对于同一个group的成员只有一个消费者实例可以读取数据
consumer = KafkaConsumer('test',
group_id='my-group',
bootstrap_servers=['127.0.0.1:29092'])
for msg in consumer:
print("{}:{}:{}: key={} value={}".format(msg.topic,
msg.partition,
msg.offset,
msg.key,msg.value))
4.3 读取目前最早可读的消息
auto_offset_reset:重置偏移量,earliest移到最早的可用消息,latest最新的消息,默认为latest。源码定义:{‘smallest’: ‘earliest’, ‘largest’: ‘latest’}
from kafka import KafkaConsumer
# 使用group,对于同一个group的成员只有一个消费者实例可以读取数据
consumer = KafkaConsumer('test',
auto_offset_reset='earliest',
bootstrap_servers=['127.0.0.1:29092'])
for msg in consumer:
print("{}:{}:{}: key={} value={}".format(msg.topic,
msg.partition,
msg.offset,
msg.key,msg.value))
4.4 手动设置偏移量
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
# ==========读取指定位置消息===============
consumer = KafkaConsumer('test',
bootstrap_servers=['127.0.0.1:29092'])
print(consumer.partitions_for_topic("test")) # 获取test主题的分区信息
print(consumer.topics()) # 获取主题列表
print(consumer.subscription()) # 获取当前消费者订阅的主题
print(consumer.assignment()) # 获取当前消费者topic、分区信息
print(consumer.beginning_offsets(consumer.assignment())) # 获取当前消费者可消费的偏移量
consumer.seek(TopicPartition(topic='test', partition=0), 5) # 重置偏移量,从第5个偏移量消费
for msg in consumer:
print("{}:{}:{}: key={} value={}".format(msg.topic,
msg.partition,
msg.offset,
msg.key,msg.value))
4.5 订阅多个主题
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer('test',
bootstrap_servers=['127.0.0.1:29092'])
# 订阅要消费的主题
consumer.subscribe(topics=('test','test0'))
print(consumer.topics())
#获取当前主题的最新偏移量
print(consumer.position(TopicPartition(topic='test', partition=0)))
for msg in consumer:
print("{}:{}:{}: key={} value={}".format(msg.topic,
msg.partition,
msg.offset,
msg.key,msg.value))
4.6 手动拉取消息
from kafka import KafkaConsumer
import time
consumer = KafkaConsumer('test',
bootstrap_servers=['127.0.0.1:29092'])
# 订阅要消费的主题
consumer.subscribe(topics=('test','test0'))
while True:
msg = consumer.poll(timeout_ms=5) # 从kafka获取消息
print(msg)
time.sleep(2)
4.7 消息挂起与恢复
# ==============消息恢复和挂起===========
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time
consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:29092'])
consumer.subscribe(topics=('test'))
consumer.topics()
# pause执行后,consumer不能读取,直到调用resume后恢复
consumer.pause(TopicPartition(topic=u'test', partition=0))
num = 0
while True:
print(num)
print(consumer.paused()) # 获取当前挂起的消费者
msg = consumer.poll(timeout_ms=5)
print(msg)
time.sleep(2)
num = num + 1
if num == 10:
print("resume...")
consumer.resume(TopicPartition(topic='test', partition=0))
print("resume......")