如果想通过docker安装kafka,可参考
Docker安装Kafka
生产者
import json
import time
import traceback
from datetime import datetime
from kafka import KafkaProducer
from kafka.errors import kafka_errors
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
key_serializer=lambda k: json.dumps(k).encode(),
value_serializer=lambda v: json.dumps(v).encode())
while True:
i = datetime.strftime(datetime.now(), '%Y%m%d %H:%M:%S')
future = producer.send(
'kafka_demo',
key='count_num', # 同一个key值,会被送至同一个分区
value=str(i),
partition=0) # 向分区1发送消息
print("send {}".format(str(i)))
time.sleep(3)
try:
future.get(timeout=10) # 监控是否发送成功
except kafka_errors: # 发送失败抛出kafka_errors
traceback.format_exc()
消费者
import json
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'kafka_demo',
bootstrap_servers=':9092',
group_id='test'
)
for message in consumer:
print("receive, key: {}, value: {}".format(
json.loads(message.key.decode()),
json.loads(message.value.decode())
)
)
分为两个终端页面运行即可:
生产者打印:
消费者打印: