一、背景
项目开发了一个类似kafka tools查询工具的kafka 查询,现在需要测试一下如果通过字节数组的形式写入,看看查询有没有问题
二、kafka查询代码
Python代码示例:
from kafka import KafkaProducer
import json
# 创建Kafka生产者
producer = KafkaProducer(bootstrap_servers='localhost:9092')
# 定义JSON数据
json_data = {
'name': '测试',
'age': 30,
'email': 'johndoe@example.com'
}
# 将JSON数据转换为字符串,并指定ensure_ascii参数为False,以保留非ASCII字符
json_str = json.dumps(json_data, ensure_ascii=False)
# 将字符串编码为字节数组
byte_array = json_str.encode('utf-8')
# 发送字节数组消息到Kafka主题
producer.send('lqiju_test_json_trans_bytearray_20230703', value=byte_array)
# 关闭Kafka生产者连接
producer.close()
Java代码示例:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class KafkaByteArrayProducer {
public static void main(String[] args) {
// Kafka配置
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
// 创建KafkaProducer实例
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(properties);
// 消息数据
String topic = "your-topic-name";
byte[] messageBytes = "Hello, Kafka!".getBytes();
// 创建ProducerRecord对象并发送消息
ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, messageBytes);
producer.send(record);
// 关闭KafkaProducer
producer.close();
}
}
三、解决报错return '<SimpleProducer batch=%s>' % self.async
把上面代码运行,报错
因为py3.7里面async已经变成了关键字。导致不兼容。
解决办法:
pycharm工具,在执行的脚本右键点击open in terminal:执行pip install kafka-python
或者在settings里面安装
重新执行,OK
四、小结
kafka支持存储什么格式的消息?
Kafka支持存储任意格式的消息,它本身并不关心消息的具体格式。Kafka将消息视为字节数组(bytes)的形式进行传输和存储。这意味着你可以以任何你喜欢的方式序列化你的消息,并将其转换为字节数组进行发送到Kafka。
常见的消息格式包括文本(如JSON、XML、CSV等)、二进制数据、Avro、Protobuf等。你可以根据你的需求和使用场景选择合适的消息格式。
在发送消息时,你需要将消息转换为字节数组,并使用Kafka提供的Producer API将字节数组发送到指定的Topic。在消费消息时,你可以使用相应的Consumer API从Kafka订阅的Topic中接收字节数组,并根据你事先定义的消息格式将其反序列化为可读的格式。
总之,Kafka本身并不限制消息的格式,你可以使用任何你喜欢的格式来存储和传输消息。