最近的精神状态真的不是很好,刚刚脱离🐏羊的苦海,收获了很多吧,任何经历都是我们成长的关键。本文是我 Kafka 入门部分的一个笔记,大家如果有有疑问的地方可以评论区或者私信我,我看见了都会回复的。最后,祝大家都身体健康 ~
1. 什么是 Kafka ?
Kafka是最初由 Linkedin 公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统 \ MessageQueue系统。可以用于 web/nginx 日志、访问日志,消息服务以及微服务。Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。
zookeeper : 协调组件共同工作的组件
主要应用场景是:日志收集系统和消息系统
举个栗子
生产者消费者,生产者生产鸡蛋,消费者消费鸡蛋,生产者生产一个鸡蛋,消费者就消费一个鸡蛋。
- 假设消费者消费鸡蛋的时候噎住了(系统宕机了),生产者还在生产鸡蛋,那新生产的鸡蛋就丢失了;
- 再比如生产者很强劲(大交易量的情况),生产者1秒钟生产100个鸡蛋,消费者1秒钟只能吃50个鸡蛋,那要不了一会,消费者就吃不消了(消息堵塞,最终导致系统超时),消费者拒绝再吃了,”鸡蛋“又丢失了;
这个时候我们放个篮子在它们中间,生产出来的鸡蛋都放到篮子里,消费者去篮子里拿鸡蛋,这样鸡蛋就不会丢失了,都在篮子里,而这个篮子就是”kafka“。鸡蛋其实就是“数据流”,系统之间的交互都是通过“数据流”来传输的(就是tcp、https什么的),也称为报文,也叫“消息”。消息队列满了,其实就是篮子满了,”鸡蛋“ 放不下了,那赶紧多放几个篮子,其实就是kafka的扩容。各位现在知道kafka是干什么的了吧,它就是那个”篮子“
术语解释
- producer:生产者,就是它来生产“鸡蛋”的
- consumer:消费者,生出的“鸡蛋”它来消费
- topic:主题,可理解为标签,生产者每生产出来一个鸡蛋就贴上一个标签(topic),消费者可不是谁生产的“鸡蛋”都吃的,这样不同的生产者生产出来的“鸡蛋”,消费者就可以选择性的“吃”了
- broker:就是篮子了
二、Kafka 设计目标
- 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证访问性能。
- 高吞吐率 :即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。
- 支持Kafka Server间的消息分区,及分布式消费,同时保证每partition内的消息顺序传输。
- 同时支持离线数据处理和实时数据处理。
- Scale out:支持在线水平扩展
三、什么是消息系统
- 一个消息系统负责将数据从一个应用传递到另外一个应用,应用只需关注于数据,无需关注数据在两个或多个应用间是如何传递的。
- 分布式消息传递基于可靠的消息队列,在客户端应用和消息系统之间异步传递消息。
- 有两种主要的消息传递模式:点对点传递模式、发布-订阅模式。大部分的消息系统选用发布-订阅模式。Kafka就是一种发布-订阅模式。
四、Kafka 优点
- 解耦
在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。 - 冗余
有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。 - 扩展性
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。 - 灵活性&峰值处理能力
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。 - 可恢复性
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。 - 顺序保证
在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。Kafka保证一个Partition内的消息的有序性。
五、Kafka与其他MQ对比
- RabbitMQ
RabbitMQ是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正因如此,它非常重量级,更适合于企业级的开发。同时实现了Broker构架,这意味着消息在发送给客户端时先在中心队列排队。对路由,负载均衡或者数据持久化都有很好的支持。 - Redis
Redis是一个基于Key-Value对的NoSQL数据库,开发维护很活跃。虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用。对于RabbitMQ和Redis的入队和出队操作,各执行100万次,每10万次记录一次执行时间。测试数据分为128Bytes、512Bytes、1K和10K四个不同大小的数据。实验表明:入队时,当数据比较小时Redis的性能要高于RabbitMQ,而如果数据大小超过了10K,Redis则慢的无法忍受;出队时,无论数据大小,Redis都表现出非常好的性能,而RabbitMQ的出队性能则远低于Redis。 - ActiveMQ
ActiveMQ是Apache下的一个子项目。 类似于ZeroMQ,它能够以代理人和点对点的技术实现队列。同时类似于RabbitMQ,它少量代码就可以高效地实现高级应用场景。 - Kafka/Jafka
Kafka是Apache下的一个子项目,是一个高性能跨语言分布式发布/订阅消息队列系统,而Jafka是在Kafka之上孵化而来的,即Kafka的一个升级版。具有以下特性:快速持久化,可以在O(1)的系统开销下进行消息持久化;高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现负载均衡;支持Hadoop数据并行加载,对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka通过Hadoop的并行加载机制统一了在线和离线的消息处理。Apache Kafka相对于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。
六、Kafka 安装配置
本文写于 2022/12/ 14 日,安装教程如下,但是安装教程是随时可能变化的,万一10年后的你看到这个教程就… (那时候我应该还在世界上,哈哈哈哈,不过可能早就被这行淘汰了),到时候大家去官网看安装教程就行了,我这里也只是把自己安装官网安装的过程写出来
官网教程安装地址: https://kafka.apache.org/quickstart
1. 下载、传输、然后解压
把压缩包下载好,随便找个向虚拟机传输文件的工具,传输进去
然后解压 Kafka
tar -xzf kafka_2.13-3.3.1.tgz
2. 开启 ZooKeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
3. 再开一个终端,开启 kafka
开启 ZooKeeper 的终端,开启成功后,会卡死,再另外开启一个终端,开启 kafka
bin/kafka-server-start.sh config/server.properties
4. 再开一个终端,模拟生产者生产消息,消费者消费消息
开启Kafka 的终端,开启成功后,会卡死,再另外开启一个终端,测试 kafka
- 创建一个 topic ,名称为 test
bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
- 创建一个生产者,向 test 中加入消息,不想加入了,按 Ctrl + C 停止即可
bin/kafka-console-producer. --topic test --bootstrap-server localhost:9092
- 创建一个消费者,取出 test 中的消息
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
七、SpringBoot 整合 Kafka
1. 先修改虚拟机上的 kafka 配置
Kafka默认只监听本机消息,配置Kafka监听其他IP消息的方式如下:
找到 Kafka 安装目录下 config 下的 server.properties 配置文件,修改如下,其中 192.168.150.131:9092
部分修改为你要监听的 ip 地址
listeners=PLAINTEXT://192.168.150.131:9092
2. 重启 kafka
Ctrl + C 停掉 ZooKeeper & Kafka 然后安装上面的方式重新启动即可
3. 创建 SpringBoot 项目
1) 导入 Kafka 依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2) 编写 yml 配置文件: 修改为你自己的 ip 地址即可
生产者类:
spring:
kafka:
producer:
bootstrap-servers: 192.168.244.130:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
kafka:
topic:
test: test
logging:
level:
root: DEBUG
消费者类 :
spring:
kafka:
consumer:
#指定消息组
group-id: test
# 指定消息被消费之后自动提交偏移量,以便下次继续消费
enable-auto-commit: true
bootstrap-servers: 192.168.244.130:9092
# 指定从最近地方开始消费(earliest)
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
kafka:
topic:
test: test
3)编写生产者消息发送类和消费者消息接收类
生产者消息发送类:
package com.example.demo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import javax.annotation.Resource;
@Component
public class KafkaMessageProducer {
private static final Logger logger = LoggerFactory.getLogger(KafkaMessageProducer.class);
@Resource
private KafkaTemplate<String, String> kafkaTemplate; // 装配Kafka模板Bean
@Value("${kafka.topic.test}") // 读取配置文件中Topic的设置
public String topic;
public void send() {
String message = "Hello World -6.18--" + System.currentTimeMillis();
// 向kafka发送消息
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(this.topic, message);
// 设置成功与失败的回调方法
future.addCallback(success -> logger.info("KafkaMessageProducer 发送消息成功!"),
fail -> logger.error("KafkaMessageProducer 发送消息失败!"));
}
}
消费者消息接收类:
package com.example.demo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Component
public class KafkaMessageConsumer {
private static final Logger logger = LoggerFactory.getLogger(KafkaMessageConsumer.class);
@KafkaListener(topics = {"${kafka.topic.test}"})
public void receive(@Payload String message, @Headers MessageHeaders headers){
logger.info("KafkaMessageConsumer 接收到消息:" + message);
headers.keySet().forEach(key->logger.info("{}: {}",key,headers.get(key)));
}
}
4)测试类
package com.example.demo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import static org.junit.Assert.*;
@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaMessageProducerTest {
@Autowired
KafkaMessageProducer kafkaMessageProducer;
@Test
public void send() {
for (int index = 0; index < 10; index ++) {
kafkaMessageProducer.send();
}
}
}