一、消息队列场景
1.1、异步
1.2、解耦
1.3、削峰
1.4、缓冲
二、springboot整合kafka
导入pom依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
修改配置
spring.kafka.bootstrap-servers=192.168.200.1:9092
#配置序列化
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
发送消息:
@SpringBootTest
class Boot3KafkaApplicationTests {
@Autowired
private KafkaTemplate kafkaTemplate;
@Test
void contextLoads() {
//计时
StopWatch stopWatch = new StopWatch();
CompletableFuture[] completableFuture = new CompletableFuture[1000];
stopWatch.start();
for (int i = 0; i < 1000; i++) {
CompletableFuture send = kafkaTemplate.send("timi", "timi1", "haha");
completableFuture[i] = send;
}
CompletableFuture.allOf(completableFuture).join();
stopWatch.stop();
//统计花费时间
long totalTimeMillis = stopWatch.getTotalTimeMillis();
System.out.println("1000条消息发送时间:"+ totalTimeMillis);
}
@Test
void testKafka(){
Person person = new Person();
person.setName("张三");
person.setAge(12);
CompletableFuture send = kafkaTemplate.send("timi", "person", person);
send.join();
}
}
创建主题
@Configuration
public class KafkaConfig {
//创建主题
@Bean
public NewTopic topic(){
return TopicBuilder.name("ax")
.partitions(1)
.compact()
.build();
}
}
获取消息
@Component
@Log4j2
public class TimiKafkaListener {
//默认获取最后一条消息
@KafkaListener(topics = "timi",groupId = "timi")
public void timiKafka(ConsumerRecord record){
Object key = record.key();
Object value = record.value();
log.info("接收到消息的key {},value:{}",key,value);
}
//获取所有消息
@KafkaListener(groupId = "ya",topicPartitions = {
@TopicPartition(topic = "timi",partitionOffsets = {
@PartitionOffset(partition = "0",initialOffset = "0")
})
})
public void timiKafka2(ConsumerRecord record){
Object key = record.key();
Object value = record.value();
log.info("接收到消息的key2 {},value2:{}",key,value);
}
}