Kafka是基于发布/订阅模式的消息队列,消息的生产和消费都需要指定主题,因此,我们想要实现消息的传递,第一步必选是创建一个主题(Topic)。下面我们看下在命令行和代码中都是如何创建主题和实现消息的传递的。
使用命令行操作Kafka
使用命令行操作主题
- 使用kafka-topics.sh脚本来实现对Topic的操作
sh kafka-topics.sh
执行命令之后,我们可以找到到下面这行提示,REQUIRED代表必须的,就是说我们想要实现对Kafak的操作必须要带有这个参数,表示我们要连接的Kafka具体服务。
--bootstrap-server <String: server to REQUIRED: The Kafka server to connect
connect to> to.
接下来,就让我们创建一个主题吧。
# --bootstrap-server 用于指定我们连接的Kafka服务地址,9092是默认端口号
# --topic 指定要操作的Topic名称
# --create 表示本次是要创建一个主题
sh kafka-topics.sh --bootstrap-server localhost:9092 --topic test --create
# 执行结果
Created topic test.
查看下我们的主题是否创建成功
sh kafka-topics.sh --bootstrap-server localhost:9092 --list
# 执行结果
test
查看某一个主题的详细信息
sh kafka-topics.sh --bootstrap-server localhost:9092 --topic test --describe
# 执行结果
Topic: test TopicId: ehyjS3R3Saq8Cx2V1x0p7g PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: test Partition: 0 Leader: 1 Replicas: 1 Isr: 1
使用命令行消费数据
- 我们通过kafka-console-consumer.sh来生产消息。
sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
#输出
hello kafka
使用命令行生产数据
- 我们通过kafka-console-consumer.sh来生产消息。
sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
# 输入
>hello kafka
想了解如何启动Kafka,可以看这篇文章《Kafka基础入门》。
使用代码操作Kafka
添加依赖包
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.1</version>
</dependency>
</dependencies>
生产者代码
// 创建配置对象
Map<String,Object> configMap = new HashMap<>();
configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"lcoalhost:9092");
// 对生产的数据的K,V 进行序列化的操作
configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSelection.class.getName());
configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSelection.class.getName());
// 创建生产者对象
// 生产者需要设定泛型:数据类型的约束
KafkaProducer<String,String> producer = new KafkaProducer<String,String>(configMap);
// 创建数据
// 构建数据时,需要传递三个参数
// 第一个参数表示主题名称,主题不存在时会自动创建
// 第二个参数表示数据的Key
// 第二个参数表示数据的Value
ProducerRecord<String,String> record = new ProducerRecord<String,String>("test","key","value");
// 通过生产者对象,将数据发送到Kafka
producer.send(record);
//关闭生产者对象
producer.close();
消费者代码
// 创建配置对象
Map<String,Object> configMap = new HashMap<>();
configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"lcoalhost:9092");
// 对生产的数据的K,V 进行反序列化的操作
configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSelection.class.getName());
configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSelection.class.getName());
configMap.put(ConsumerConfig.GROUP_ID_CONFIG,"com.kafka.test");
// 创建消费者对象
KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String,String>(configMap);
//订阅主题
kafkaConsumer.subscribe(Collections.singleton("test"));
// 从Kafka中获取数据
// 消费者从Kafka中拉取数据
ConsumerRecords<String, String> datas = kafkaConsumer.poll(1000);
datas.forEach(data ->{
System.out.println(data);
});
// 关闭消费者对象
kafkaConsumer.close();
点击下方名片,关注『编程青衫客』
随时随地获取最新好文章!