一、简单的收发消息demo
父工程pom:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>kafka-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>pom</packaging>
<modules>
<module>producer</module>
<module>consumer-1</module>
<module>consumer-2</module>
</modules>
<!-- springBoot -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.4.RELEASE</version>
</parent>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--kafka-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<!-- <version>3.0.0</version>-->
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.78</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
1、生产者
1.1、配置文件
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
user.topic = userTest
school.topic = schoolTest
1.2、dto
package com.example.dto;
import lombok.Builder;
import lombok.Data;
@Data
@Builder
public class SchoolDTO {
private String schoolId;
private String schoolName;
}
package com.example.dto;
import lombok.Builder;
import lombok.Data;
@Data
@Builder
public class UserDTO {
private String userId;
private String userName;
private Integer age;
}
1.3、service
package com.example.service.impl;
import com.alibaba.fastjson.JSON;
import com.example.dto.SchoolDTO;
import com.example.dto.UserDTO;
import com.example.service.SchoolService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service("schoolService")
@Slf4j
public class SchoolServiceImpl implements SchoolService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${school.topic}")
private String schoolTopic;
@Override
public void sendSchoolMsg(SchoolDTO schoolDTO) {
String msg = JSON.toJSONString(schoolDTO);
ProducerRecord producerRecord = new ProducerRecord(schoolTopic,msg);
kafkaTemplate.send(producerRecord);
log.info("school消息发送成功");
}
}
package com.example.service.impl;
import com.alibaba.fastjson.JSON;
import com.example.dto.UserDTO;
import com.example.service.UserService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service("userService")
@Slf4j
public class UserServiceImpl implements UserService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${user.topic}")
private String userTopic;
@Override
public void sendUserMsg(UserDTO userDTO) {
String msg = JSON.toJSONString(userDTO);
ProducerRecord producerRecord = new ProducerRecord(userTopic,msg);
kafkaTemplate.send(producerRecord);
log.info("user消息发送成功");
}
}
1.4、启动类
package com.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
}
2、消费者
2.1、配置文件
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit = true
user.topic = userTest
user.group.id = user-group-1
school.topic = schoolTest
school.group.id = school-group-1
server.port = 2222
2.2、监听
package com.example.listen;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Optional;
import java.util.Properties;
@Component
@Slf4j
public class SchoolConsumer {
@KafkaListener(topics = "${school.topic}", groupId = "${school.group.id}")
public void consumer(ConsumerRecord<?, ?> record) {
try {
Object message = record.value();
if (message != null) {
String msg = String.valueOf(message);
log.info("接收到:msg={},topic:{},partition={},offset={}",msg,record.topic(),record.partition(),record.offset());
}
} catch (Exception e) {
log.error("topic:{},is consumed error:{}", record.topic(), e.getMessage());
} finally {
//ack.acknowledge();
}
}
}
package com.example.listen;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class UserConsumer {
@KafkaListener(topics = "${user.topic}", groupId = "${user.group.id}")
public void consumer(ConsumerRecord<?, ?> record) {
try {
Object message = record.value();
if (message != null) {
String msg = String.valueOf(message);
log.info("接收到:msg={},topic:{},partition={},offset={}",msg,record.topic(),record.partition(),record.offset());
}
} catch (Exception e) {
log.error("topic:{},is consumed error:{}", record.topic(), e.getMessage());
} finally {
//ack.acknowledge();
}
}
}
不指定group.id会报错,这也验证了kafka consumer必须要有group id。如写:
@KafkaListener(topics = "${user.topic}") public void consumer(ConsumerRecord<?, ?> record) 启动报错:Caused by: java.lang.IllegalStateException: No group.id found in consumer config, container properties, or @KafkaListener annotation; a group.id is required when group management is used.
at org.springframework.util.Assert.state(Assert.java:73) ~[spring-core-5.1.6.RELEASE.jar:5.1.6.RELEASE]
2.3、启动类
package com.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;
@SpringBootApplication
//@EnableKafka
public class Consumer1Application {
public static void main(String[] args) {
SpringApplication.run(Consumer1Application.class, args);
}
}
3、测试
启动消费者:
生产者这里通过单元测试来发送消息:
package com.demo.kafka;
import com.example.ProducerApplication;
import com.example.dto.SchoolDTO;
import com.example.dto.UserDTO;
import com.example.service.SchoolService;
import com.example.service.UserService;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest(classes = {ProducerApplication.class}, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@RunWith(SpringRunner.class)
public class Test {
@Autowired
private SchoolService schoolService;
@Autowired
private UserService userService;
@org.junit.Test
public void sendUserMsg(){
UserDTO userDTO = UserDTO.builder()
.userId("id-1")
.age(18)
.userName("zs")
.build();
userService.sendUserMsg(userDTO);
}
@org.junit.Test
public void sendSchoolMsg(){
SchoolDTO schoolDTO = SchoolDTO.builder()
.schoolId("schoolId-1")
.schoolName("mid school")
.build();
schoolService.sendSchoolMsg(schoolDTO);
}
}
运行单测,观察消费者输出:
修改参数再次运行,观察到消费者都可以正常监听:
2024-06-22 17:09:06.383 INFO 76104 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 2222 (http) with context path ''
2024-06-22 17:09:06.390 INFO 76104 --- [ntainer#1-0-C-1] org.apache.kafka.clients.Metadata : Cluster ID: ZdpIAHTjS9GhJlvPP8n0Rw
2024-06-22 17:09:06.392 INFO 76104 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-4, groupId=user-group-1] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-06-22 17:09:06.393 INFO 76104 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-4, groupId=user-group-1] Revoking previously assigned partitions []
2024-06-22 17:09:06.394 INFO 76104 --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked: []
2024-06-22 17:09:06.394 INFO 76104 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-4, groupId=user-group-1] (Re-)joining group
2024-06-22 17:09:06.401 INFO 76104 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=school-group-1] Successfully joined group with generation 11
2024-06-22 17:09:06.403 INFO 76104 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=school-group-1] Setting newly assigned partitions [schoolTest-0]
2024-06-22 17:09:06.403 INFO 76104 --- [ main] com.example.Consumer1Application : Started Consumer1Application in 8.606 seconds (JVM running for 9.621)
2024-06-22 17:09:06.413 INFO 76104 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [schoolTest-0]
2024-06-22 17:09:06.491 INFO 76104 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-4, groupId=user-group-1] Successfully joined group with generation 3
2024-06-22 17:09:06.493 INFO 76104 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-4, groupId=user-group-1] Setting newly assigned partitions [userTest-0]
2024-06-22 17:09:06.611 INFO 76104 --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [userTest-0]
2024-06-22 17:16:29.775 INFO 76104 --- [ntainer#1-0-C-1] com.example.listen.UserConsumer : 接收到:msg={"age":18,"userId":"id-1","userName":"zs"},topic:userTest,partition=0,offset=4
2024-06-22 17:16:48.157 INFO 76104 --- [ntainer#0-0-C-1] com.example.listen.SchoolConsumer : 接收到:msg={"schoolId":"schoolId-1","schoolName":"mid school"},topic:schoolTest,partition=0,offset=1
2024-06-22 17:17:39.458 INFO 76104 --- [ntainer#1-0-C-1] com.example.listen.UserConsumer : 接收到:msg={"age":20,"userId":"id-2","userName":"ls"},topic:userTest,partition=0,offset=5
2024-06-22 17:17:59.474 INFO 76104 --- [ntainer#0-0-C-1] com.example.listen.SchoolConsumer : 接收到:msg={"schoolId":"schoolId-2","schoolName":"primary school"},topic:schoolTest,partition=0,offset=2
4、多个消费者
4.1、同一个groupId
将consumer-1的代码copy到consumer-2,注意端口号修改成不一样的3333,并启动,
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit = true
user.topic = userTest
user.group.id = user-group-1
school.topic = schoolTest
school.group.id = school-group-1
server.port = 3333
2024-06-22 17:23:59.524 INFO 78096 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 3333 (http) with context path ''
2024-06-22 17:23:59.531 INFO 78096 --- [ main] example.Consumer2Application : Started Consumer2Application in 7.534 seconds (JVM running for 8.506)
2024-06-22 17:24:00.021 INFO 78096 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=school-group-1] Successfully joined group with generation 12
2024-06-22 17:24:00.024 INFO 78096 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=school-group-1] Setting newly assigned partitions []
2024-06-22 17:24:00.025 INFO 78096 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: []
2024-06-22 17:24:00.028 INFO 78096 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-4, groupId=user-group-1] Successfully joined group with generation 4
2024-06-22 17:24:00.028 INFO 78096 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-4, groupId=user-group-1] Setting newly assigned partitions []
2024-06-22 17:24:00.029 INFO 78096 --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: []
再执行次生产者的Test,观察两个消费者:
可以看到consumer-1接收到了,而consumer-2没有接收到。
再次执行,结果相同。school也是同样的结果。
验证了:同一个topic下的某个分区只能被消费者组中的一个消费者消费。
4.2、不同group
现修改cosumer-2中groupId并重启
user.group.id = user-group-2
school.group.id = school-group-2
启动后自动接收了之前发送的所有消息(因为这是一个新的消费者组):
再次发送新的消息:
可以看到consumer-1和2同时都接收到了:
验证了:同一个topic可以被不同的消费者组消费。
二、生产者分区partition
先观察上面步骤产生的数据文件:
上面只有一个patition, 所有两个topic各自只有一个数据目录。现将userTest这个topic分成多个partition,结合四种分区策略看下: