kafka是什么?
是一种高吞吐量的、分布式、发布、订阅、消息系统
1.导入maven坐标
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
2.编写提供者
public class KafkaProducer {
public static void main(String[] args) {
Properties prop = new Properties();
prop.put("bootstrap.servers", "localhost:9092");
prop.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
prop.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
prop.put("acks", "all");
prop.put("retries", 0);
prop.put("batch.size", 16384);
prop.put("linger.ms", 1);
prop.put("buffer.memory", 33554432);
String topic = "hello"; // 主题
org.apache.kafka.clients.producer.KafkaProducer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(prop);
producer.send(new ProducerRecord<String, String>(topic, Integer.toString(2), "hello kafka"));
producer.close();
}
}
3.编写消费者
public class KafkaConsumer {
public static void main(String[] args) throws InterruptedException {
Properties prop = new Properties();
prop.put("bootstrap.servers", "192.168.8.166:9092");
prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
prop.put("group.id", "con-1");
prop.put("auto.offset.reset", "latest");
//自动提交偏移量
prop.put("auto.commit.intervals.ms", "true");
//自动提交时间
prop.put("auto.commit.interval.ms", "1000");
org.apache.kafka.clients.consumer.KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(prop);
ArrayList<String> topics = new ArrayList<>();
//可以订阅多个消息
topics.add("hello");
consumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(20));
for (ConsumerRecord<String, String> consumerRecord : poll) {
System.out.println(consumerRecord);
System.out.println(consumerRecord.key());
System.out.println(consumerRecord.value());
System.out.println(consumerRecord.topic());
}
}
}
}
4.下载kafka
点此去官网下载——>Apache Kafka
解压后进入config目录
修改zookeeper.properties
dataDir=D:/kafka_2.13-3.5.1/tmp/zookeeper
修改日志存放的路径server.properties
log.dirs=D:/kafka_2.13-3.5.1/tmp/kafka-logs
启动zookeeper服务
zookeeper-server-start.bat ../../config/zookeeper.properties
启动kafka服务
kafka-server-start.bat ../../config/server.properties