前置:熟悉javase,熟悉linux,熟悉idea,熟悉hadoop
1. KafKa
1.1 KafKa定义
前端埋点记录用户(浏览,点赞,收藏,评论)到日志服务器,然后通过Flume(小于100m/s)将大日志文件导入到Hadoop集群,每产生一个日志就发送到hadoop(上传100m/s)中。
秒杀活动:Flume采集速度大于200ms/s,就需要KafKa集群。
Kafka传统定义:一个分布式的基于发布/订阅的消息队列(MessageQueue),主要用于大数据实时处理领域。
发布/订阅:消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息。
Kafka最新定义:一个开源的分布式事件流平台(EventStreamingPlatform),用于高性能数据管道,流分析,数据集成和关键任务应用。
1.2消息队列
常见消息队列:Kafka,ActiveMQ,RabbitMQ,RocketMQ,大数据场景主要采用Kafka。
1.2.1 传统消息队列应用
缓存/削峰,解耦和异步通信。
缓冲/削峰:控制和优化数据流速度,解决生产和消费消息处理速度不一致的情况。
解耦:独立的扩展或修改两边的处理过程,只要确保他们遵循同样的接口约束。
异步通信:允许用户把一个消息放入队列,但并不立即处理它,然后再需要的时候再去处理。
1.2.2 消息队列的两种模式
- 点对点模式 消费者主动拉取数据,消息收到后清除消息。
- 发布/订阅模式
- 可以有多个topic主题(浏览,点赞,收藏,评论)
- 消费者消费数据之后,不删除数据
- 每个消费者独立,都可以消费数据
1.3 KafKa基础架构
- 为方便扩展,提高吞吐量(大数据量),一个topic分为多个partition
- 配合分区的设计,提出消费者组(多消费)的概念,组内每个消费者并行消费,注意:某一个分区数据只能由一个消费者消费,防止会出现混乱
- 为提高可用性,为每个partition增加若干副本,类似NameNode HA,leader挂了后才会用副本。
- ZK中记录谁是leader,Kafka2.8.0后可不采用ZK,用kraft
2. KafKa快速入门
2.1 安装部署
2.1.1 集群规划
hadoop102 | hadoop103 | hadoop104 |
---|---|---|
ZK | ZK | ZK |
Kafka | Kafka | Kafka |
2.1.2 集群部署
注意:(在server.properties中修改)每个kafka在集群中的broker.id一定要唯一,修改log.dirs地址,修改zookeeper.connect地址
注意:一定要先关kafka,再关zookeeper
- 官方地址:http://kafka.apache.org/downloads.html
- 解压,修改文件名,修改配置文件
2.2.2 集群启停脚本
--kf.sh
#!/bin/bash
case $1 in
"start")
for i in hadoop1 hadoop2 hadoop3
do
echo "---启动 $i kafka---"
ssh $i"/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
done
;;
"stop")
for i in hadoop1 hadoop2 hadoop3
do
echo "---启动 $i kafka---"
ssh $i"/opt/module/kafka/bin/kafka-server-stop.sh"
done
;;
esac
chomod 777
2.2 Kafka命令行操作
2.2.1 主题命令行操作
- 查看操作主题命令: bin/kafka-topics.sh
连接的kafkaBroker主机名端口:–bootstarp-server<String:server toconnect to>
操作topic名称:–topic String:topic
创建主题:–create
删除主题:–delete
修改主题:–alter
查看主题:–list
查看详情 --describe
设置分区数 --partitions<Integer:# of partitions>
设置分区副本:–replication-factor<Integer:replication factor> - 查看当前服务器中的所有topic:bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
- 创建first topic:bin/kafka-topics.sh --bootstrap-server
进入kafka目录下,不用进入bin目录下
-- 查看所有topic
.\bin\windows\kafka-topics.bat --zookeeper localhost:2181 --list
-- 查看指定topic信息
.\bin\windows\kafka-topics.bat --zookeeper localhost:2181 --describe --topic test
-- 创建topic信息
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
-- 创建生产者产生消息,不关闭页面
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
-- 创建消费者接收消息,不关闭页面
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
-- 删除topic:
.\bin\windows\kafka-topics.bat kafka.admin.DeleteTopicCommand --zookeeper localhost:2181 --delete --topic test
2.2.2 生产者命令行操作
生产者发送:bin/kafka-console-producer.sh --bootstrap-server localhost:9092 -topic first
2.2.3 消费者命令行操作
消费者消费:bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 -topic first
查看历史数据:bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 -topic first-beginning
3 KafKa生产者
3.1 生产者消息发送流程
3.1.1 发送原理
在消息发送过程中有2线程,main和sender。main中创建一个双端队列RecordAccumulator。main将消息发给队列,Sender线程从中拉取消息发送到Kafka Broker。
DQueue中数据发给Send条件:
- batch.size:数据累计到batch.size,sender就会发送数据。默认16k
- linger.ms:如果数据没达到batch.size,sender在linger.ms时间后也会发送数据,单位ms,默认0ms,没有延时。
应答ack级别: - 0:生产者发送的数据不需要等待数据落盘应答
- 1:生产者发送的数据Leader收到数据后应答
- -1(all):生产者发送的数据,Leader和ISR队列里所有的节点收齐数据后应答。-1和all等价。
3.1 异步发送API
3.2.1 普通异步发送
- 需求:创建Kafka生产者,采用异步的方式发送到Kafka Broker
注:windows下创建topic: bin目录windows下执行:.\kafka-topics.bat --delete --bootstrap-server 127.0.0.1:9092 --topic test
public class CustomProducer {
public static void main(String[] args) {
//0.配置
Properties properties = new Properties();
//连接kafka
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
//指定对应的ke和value序列化类型
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//1.创建kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
//2.发送数据
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<String, String>("first", "xuyu" + i));
}
//3.关闭资源
kafkaProducer.close();
}
}
3.2.1 带回调函数的异步发送
回调函数会在producer收到ack时调用,为异步调用,该方法有两个参数:元数据信息(RecordMetadata)和异常信息(Exception),如果Exception为null,说明发送成功。
只需在send()方法上添加callback参数即可。
kafkaProducer.send(new ProducerRecord<String, String>("first", "xuyu" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e == null){
System.out.println("主题:" + recordMetadata.topic() + "分区:" + recordMetadata.partition());
}
}
);
3.3 同步发送API
只需在异步发送的基础上,再调用get()方法即可。
kafkaProducer.send(new ProducerRecord<String, String>("first", "xuyu" + i)).get();
3.4 生产者分区
3.4.1 分区好处
- 便于合理使用资源,每个partition在一个Broker上存储,可把海量数据按照分区切割成一块一块的数据存储在多台Broker上,合理控制分区,可实现负载均衡的效果。
- 提高并行度,生产者可以分区为单位发送数据,消费者可以分区为单位进行消费。
3.4.2 生产者发送消息的分区策略
- 默认的分区器DefaultPartitioner(源码:IDEA中ctrl+n,全局查找DefaultPartitioner)
- 有设置分区,则有设置值。
- 没设置分区,将key的hash与topic的partition数进行取余得到partition值。
- 没设置分区,也没有key,则用粘性分区,随机选择一分区,待该分区batch满了,kafka再随机一个分区。
3.4.2 自定义分区
如果根据企业需求,自己实现分区器。
- 需求
将发送过来的数据如果包含x,就发往0号分区,否则发往1号分区。 - 实现
/**
* @author 自定义分区器
*/
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//获取数据
String msgValue = value.toString();
int partition;
if (msgValue.contains("x")) {
partition = 0;
} else {
partition = 1;
}
return partition;
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> map) {}
}
//使用:关联自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com/xuyu/kafka/producer/config/MyPartitioner.java");
3.5 生产经验-如何提高吞吐量
从生产者到kafka集群broker仓库本来一次只发送一个data包数据,修改batch.size(批次大小,默认16k),linger.ms(等待时间,修改为5-100ms)使等待时间和发送数据量增大。同时使用compression.type(压缩snappy)将数据压缩。并将RecordAccumulator(缓冲区大小,修改为64m)调大。
public class CustomProducerParameters {
public static void main(String[] args) {
//0. 配置
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); //连接/集群
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //key序列化
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //value序列化
//优化参数
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); //缓冲区大小,默认32M
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); //批次大小,默认16K
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1); //linger.ms 等待时间,默认0
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); //压缩,默认none,可选gzip,snappy,lz4,zstd
//1. 创建生产者
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
//2. 发送
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<String, String>("first", "xuyu" + i));
}
//3. 关闭
kafkaProducer.close();
}
}
3.6 生产经验-数据可靠性
- acks应答原理
ACK应答级别
0:生产者发送过来的数据,不需要等数据落盘应答。
问题:当Producer数据到leader内存中,服务器挂掉,数据会丢失,效率高。
1:生产者发送夺来的数据,Leader收到数据后应答。
问题:当Producer数据到Leader内存中,应答完成后还没开始同步副本就挂掉了,新的leader不会收到这个信息,因为生产者已经认为发送成功了,数据会丢失,效率中等。
-1(all):生产者发送过来的数据,Leader和ISR队列里所有节点都收到数据后应答。
问题:Leader收到数据,ISR队列所有Follower都同步到数据,可靠性高,效率低,但有一个长时间没有同步到,怎么办?
解决:Leader维护了一个动态的in-sync replica set(ISR),意为和Leader保持同步的Follower+Leader集合(leader:0 isr:0,1,2),如果Follower长时间未向Leader发送通信请求或同步数据,则该follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。
数据可靠性分析:如果分区副本设置为1,或ISR里应答的最小副本数为1(min.insync.replicas默认1),和ack=1的效果是一样的,仍会丢失数据(leader:0 isr:0)。
数据完全可靠条件 = ACK级别设置为-1 + 分区副本>=2 + ISR里应答的最小副本数>=2
结论:生产环境中,ack=0不用,ack=1用于日志传输,ack=-1用于传输和钱相关的数据,对可靠性要求高的场景。
//acks
properties.put(ProducerConfig.ACKS_CONFIG, "1"); //ack参数,默认-1 可选0,1,-1
properties.put(ProducerConfig.RETRIES_CONFIG, 3); //重试次数,默认int最大值21亿
数据重复分析:生产者发送过来的数据,Leader和ISR队列中所有节点收到数据后,leader挂了,没有返回ack,选出新的leader重复收到相同的数据。解决见下文。
3.7 生产经验-数据去重
3.7.1 数据传递语义
- 至少一次(At LEadtest Once):ACK级别设置为-1 + 分区副本>=2 + ISR里应答的最小副本数>=2,可保证数据不丢失,不能保证数据不重复。
- 至多一次(At Most Once):ACK级别设置为0,可保证数据不重复,不能保证数据不丢失。
- 精确一次(Exactly Once):重要数据,既不能重复也不能丢失。
Kafka 0.11版本后,引入幂等性和事务。
3.7.2 幂等性
- 幂等原理
幂等性指Producer不论向Broker发送多少次重复数据,Broker端只会持久化一条,保证不重复。
精确一次(Exactly Once) = 幂等性 + 至少一次(ack = -1 + 分区副本数>=2 + ISR最小副本数量>=2)。
重复数据判断依据:具有<PID,Partition,SeqNumber>相同主键的消息提交时,Broker只会持久化一条。其中PID是Kafka每次重启都会分配一个新的;Partition表示分区号;Sequence Number是单调自增的。所以幂等性只能保证在单分区单会话内不重复。 - 如何使用幂等性
开启参数enable.idempotence默认为true。
3.2.3 生产者事务
- Kafka事务原理
说明:开启事务必须开启幂等性。
Producer在使用事务功能前,必须先自定义一个唯一的transactional.id。有此id,即使客户端挂掉,它重启后也能继续处理未完成的事务。
- Kafka的事务的5个API
void initTransactions(); 初始化事务
void beginTransaction(); 开启事务
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId); 在事务内提交已经消费的偏移量(主要用于消费者)
void commitTransaction(); 提交事务
void abortTransaction(); 放弃事务(回滚) - 单个Producer,使用事务保证消息仅一次发送。
public class CustomProducerTransactions {
public static void main(String[] args) {
//0.配置
Properties properties = new Properties();
//连接kafka
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
//指定对应的ke和value序列化类型
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_1"); //指定事务id
//1.创建kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
//添加事务
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
try {
//2.发送数据
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<String, String>("first", "xuyu" + i));
}
kafkaProducer.commitTransaction();
} catch (Exception e) {
kafkaProducer.abortTransaction();
} finally {
//3.关闭资源
kafkaProducer.close();
}
}
}
3.8 生产经验-数据有序
单分区内有序:有条件,见下文
多分区,分区与分区间无序。
3.8 生产经验-数据乱序
- kafka在1.x版本之前保证数据单分区有序,条件:max.in.flight.requests.per.connection=1(不需要考虑是否开启幂等)
- kafka在1.x以及后版本保证数据单分区有序,条件:
- 未开幂等性:max.in.flight.requests.per.connection=1
- 开启幂等性:max.in.flight.requests.per.connection需要设置<=5
原因:在kafka1.x后,启用幂等,kafka服务端会缓存producer发来的最近的5个request的元数据进行重排序,无论如何,都可保证最近5个request有序。因为幂等性的pid可保证。类似tcp数据校验