1、下载kafka的jar包文件
https://www.apache.org/dyn/closer.cgi?path=/kafka/3.1.0/kafka_2.12-3.1.0.tgz
2、下载完成直接操作命令启动
1、打开新的terminal(终端)窗口,进入kafka的bin目录 启动zk
./zookeeper-server-start.sh ../config/zookeeper.properties
2、新开命令行
启动kafka,进入bin目录:./kafka-server-start.sh ../config/server.properties &
3、测试
1、进入bin目录 ,创建主题
./kafka-topics.sh —create —bootstrap-server localhost:9092 —replication-factor 1 —partition 1 —topic kafkaTest
2、启动生产者
./kafka-console-producer.sh --broker-list localhost:9092 --topic kafkaTest
3、启动消费者
./kafka-console-consumer.sh --bootstrap-server qf01:9092 --topic test --from-beginning
4、展示
生产者:
发送kafka的测试,消费者直接消费
5、java操作kafka完成消费
需要启动上面的操作 启动kafka 建立主题,启动zk
首先下载一个可视化软件
mac版本
https://www.kafkatool.com/download.html
5.1 导入依赖
<!-- kafka客户端工具 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
<!-- 工具类 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-io</artifactId>
<version>1.3.2</version>
</dependency>
5.2 开发生产者
调用send发送1-100消息到指定Topic test
public class KafkaProducerTest {
public static void main(String[] args) {
// 1. 创建用于连接Kafka的Properties配置
Properties props = new Properties();
props.put("bootstrap.servers", "server1:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 2. 创建一个生产者对象KafkaProducer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
// 3. 调用send发送1-100消息到指定Topic test
for (int i = 0; i < 100; ++i) {
try {
// 获取返回值Future,该对象封装了返回值
Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("test", null, i + ""));
// 调用一个Future.get()方法等待响应
future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
// 5. 关闭生产者
producer.close();
}
}
5.3 示例
5.4 开发消费者
public class KafkaConsumerTest {
/**
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
// 自动提交offset
props.setProperty("enable.auto.commit", "true");
// 自动提交offset的时间间隔
props.setProperty("auto.commit.interval.ms", "1000");
// 拉取的key、value数据的
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 2.创建Kafka消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
// 3. 订阅要消费的主题
// 指定消费者从哪个topic中拉取数据
kafkaConsumer.subscribe(Arrays.asList("test"));
// 4.使用一个while循环,不断从Kafka的topic中拉取消息
while (true) {
// Kafka的消费者一次拉取一批的数据
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(5);
// 5.将将记录(record)的offset、key、value都打印出来
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
// 主题
String topic = consumerRecord.topic();
// offset:这条消息处于Kafka分区中的哪个位置
long offset = consumerRecord.offset();
// key\value
String key = consumerRecord.key();
String value = consumerRecord.value();
System.out.println("topic: " + topic + " offset:" + offset + " key:" + key + " value:" + value);
}
Thread.sleep(1000);
}
}
}