点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
- Hadoop(已更完)
- HDFS(已更完)
- MapReduce(已更完)
- Hive(已更完)
- Flume(已更完)
- Sqoop(已更完)
- Zookeeper(已更完)
- HBase(已更完)
- Redis (已更完)
- Kafka (正在更新…)
章节内容
上节我们完成了:
- topics.sh、producer.sh、consumer.sh 脚本的基本使用
- pom.xml 配置
- JavaAPI的使用:producer 和 consumer
架构图
上节已经出现过了,这里再放一次
POM
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>springboot-kafka</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
配置文件
我们常见的配置文件如下图:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: my-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
template:
default-topic: my-topic
Producer
编写代码
编写了一个KafkaProducerController
里边写了两个方法,都是使用了 KafkaTemplate 的工具。
@RestController
public class KafkaProducerController {
@Resource
private KafkaTemplate<Integer, String> kafkaTemplate;
@RequestMapping("/sendSync/{message}")
public String sendSync(@PathVariable String message) {
ProducerRecord<Integer, String> record = new ProducerRecord<>("wzk_topic_test", 0, 1, message);
ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(record);
try {
SendResult<Integer, String> result = future.get();
System.out.println(result.getProducerRecord().key() + "->" +
result.getProducerRecord().partition() + "->" +
result.getProducerRecord().timestamp());
} catch (Exception e) {
e.printStackTrace();
}
return "Success";
}
@RequestMapping("/sendAsync/{message}")
public String sendAsync(@PathVariable String message) {
ProducerRecord<Integer, String> record = new ProducerRecord<>("wzk_topic_test", 0, 2, message);
ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(record);
future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
@Override
public void onFailure(Throwable ex) {
System.out.println("发送失败!");
ex.printStackTrace();
}
@Override
public void onSuccess(SendResult<Integer, String> result) {
System.out.println("发送成功");
System.out.println(result.getProducerRecord().key() + "->" +
result.getProducerRecord().partition() + "->" +
result.getProducerRecord().timestamp());
}
});
return "Success";
}
}
测试结果
http://localhost:8085/sendSync/wzktest1
http://localhost:8085/sendAsync/wzktest2
http://localhost:8085/sendAsync/wzktest222222
我们观察控制台的效果如下:
Consumer
编写代码
编一个类来实现Consumer:
@Configuration
public class KafkaConsumer {
@KafkaListener(topics = {"wzk_topic_test"})
public void consume(ConsumerRecord<Integer, String> consumerRecord) {
System.out.println(
consumerRecord.topic() + "\t"
+ consumerRecord.partition() + "\t"
+ consumerRecord.offset() + "\t"
+ consumerRecord.key() + "\t"
+ consumerRecord.value());
}
}
测试运行
2024-07-12 13:48:46.831 INFO 15352 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=wzk-test] Setting offset for partition wzk_topic_test-0 to the committed offset FetchPosition{offset=13, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=h121.wzk.icu:9092 (id: 0 rack: null), epoch=0}}
2024-07-12 13:48:46.926 INFO 15352 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : wzk-test: partitions assigned: [wzk_topic_test-0]
wzk_topic_test 0 13 1 wzktest
wzk_topic_test 0 14 2 wzktest222
wzk_topic_test 0 15 2 wzktest222222
控制台的截图如下: