kafka是一种消息队列框架。
如果不用消息队列框架,就需要用阻塞队列来实现发送系统消息和系统通知
1.阻塞队列
阻塞队列是一种用来解决进程间通信的方式
阻塞队列依靠自带的两个方法put(往队列里面存数据)和take(从队列里面取数据)
2.Kafka
kafka最早只是用来发消息的,后来不断扩充自己的功能,现在功能已经不局限于消息队列了,而是一个功能综合的分布式的流媒体平台
kafka集群的每一台服务器称为Broker
Zookeeper是用来管理集群的,由于kafka需要使用集群,所以Zookeeper可以用来管理kafka,kafka里面内置了Zookeeper
消息队列大致有两种实现方式,一种是点对点的方式(阻塞队列就是点对点的方式),另一种是发布订阅模式(kafka采取的就是这种方式),生产者把消息放到某一个位置,可以有很多消费者关注、订阅这个位置,这个时候这个消息可以被多个消费者同时读到 ,发布订阅模式下,消息发布的那个位置、那个空间就叫topic(可以理解为一个文件夹),Partition就是对topic进行分区
offset是消息在partition中的索引,比如9,10,11
Replica是副本的意思,副本就是对数据做备份(冗余的多存几份数据),副本分为Leader Replica主副本和Follower Replica随从副本, 生产者可以从主副本中获取数据(主副本会作出响应),从副本只负责复制一份数据, 生产者不可以从副本中获取数据,主副本挂掉以后,会从众多从副本中选择一个成为新的主副本
3.springboot 整合kafka
step1:引入spring-kafka的依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.9.0</version>
</dependency>
step2: 在application.properties文件中对kafka进行配置
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=community-consumer-group
spring.kafka.consumer.enable-auto-commit=true
#自动提交频率
spring.kafka.consumer.auto-commit-interval=3000
测试:生产者生产一个东西,看消费者是否能够自动接收到这个东西
@Component
class KafkaProducer
{
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendMessage(String topic,String content)
{
kafkaTemplate.send(topic,content);
}
}
@Component
class KafkaConsumer
{
@KafkaListener(topics={"test"})
public void handleMessage(ConsumerRecord record)
{
System.out.println(record.value() );
}
}
@SpringBootTest
class EmsVueApplicationTests
{
@Autowired
private KafkaProducer kafkaProducer;
@Test
public void testKafka() throws InterruptedException {
kafkaProducer.sendMessage( "test" ,"你好");
kafkaProducer.sendMessage( "test" ,"在吗");
Thread.sleep(1000*10);
}
}