confluent-kafka
pip3 install confluent-kafka
Producer.py
from confluent_kafka import Producer
# Kafka 配置
config = {
'bootstrap.servers': '10.10.x.x:3082',
'security.protocol': 'SASL_PLAINTEXT',
'sasl.mechanism': 'SCRAM-SHA-512',
'sasl.username': 'kafka-974a3a34-bpxuser1',
'sasl.password': 'zjavIj4OPZNV2vALc2F>zesn8izaHEYP(ZK0IETrtKrMR5w+gUNpL60xkGhX3ca9' # 请替换为您的实际密码
}
# 创建生产者实例
producer = Producer(**config)
# 异步发送消息
def delivery_report(err, msg):
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
# 发送消息
for _ in range(10):
producer.produce('bpx', 'Hello, Kafka!', callback=delivery_report) # 替换Topic
# 等待消息被发送
producer.flush()
Consumer.py
from confluent_kafka import Consumer
# Kafka 配置
config = {
'bootstrap.servers': '10.10.x.x:3082',
'security.protocol': 'SASL_PLAINTEXT',
'sasl.mechanism': 'SCRAM-SHA-512',
'sasl.username': 'kafka-974a3a34-bpxuser1',
'sasl.password': 'zjavIj4OPZNV2vALc2F>zesn8izaHEYP(ZK0IETrtKrMR5w+gUNpL60xkGhX3ca9',
'group.id': 'my-python-group' # 添加 group.id 配置项
}
# 创建消费者实例
consumer = Consumer(**config)
consumer.subscribe(['bpx']) # 替换Topic
try:
while True:
msg = consumer.poll(1.0) # 等待消息,超时为1秒
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
print('End of partition reached {0}/{1}'.format(msg.topic(), msg.partition()))
elif msg.error():
raise KafkaException(msg.error())
else:
print('Received message: {}'.format(msg.value().decode('utf-8')))
finally:
# 关闭消费者连接
consumer.close()