本文从kafka中的实际应用场景分析,讲述kafka的一些基本概念。再讲述zookeeper集群环境的构建;kafka的搭建以及脚本文件编写;最后是一个快速入门的demo.内容会比较多,希望大家能有所收获!
1.Kafka(MQ)实战应用场景剖析
Kafka(mq)应用场景
1、Kafka之异步化、服务解耦、削峰填谷
2、Kafka海量日志收集
3、Kafka之数据同步应用
4、Kafka之实时计算分析
Kafka异步化实战:
异步化解耦,消息推送,持久化存储保证数据可靠性,避免需要重传。
Kafka服务解耦,削峰填谷:
下单请求-redis实现库存校验(list存储)
实现服务解耦指的是请求和接收方可以是两个完全独立的系统;
通常消息队列是作为不同服务之间信息传递的桥梁。
KAFKA海量日志收集:(logstash日志解析)
KAFKA之数据同步实战:
KAFKA实时计算分析:
2.Kafka基础概念
1集群架构简介
Topic主题与Partition分区(一对多的关系)
一个分区只能属于单个主题,同一个主题下可以有不同的分区。
分区里面包含不同的消息。分区可以理解为是一个可追加的日志文件。
Kafka可是通过offset保证消息在分区内的顺序性的。
路由规则有关分区区号,分区选择那个分区;
分区的目的是分散磁盘io;创建主题的时候也可以去制定分区的个数的。
通过增加分区,实现水平扩展
2副本概念(replica)
绿色p1是个主分片或主副本,
Broker2上的紫色p1是副本。副本分片。优点类似elasticsearch
通过replica,实现集群故障转移,保证了集群高可用。
用空间换取数据的高可靠性,稳定性。
3ISR详解
ISR集合的理解:数据录取及时进入ISR集合(比如p1s1的拉取时间为50ms,p1s2的pull时间为200ms,都小于规定超时时间1s。两者皆进入ISR.)跟数据一直性相关,副本从主副本的拉取时间
超时的话会进入OSR集合。
Leader副本(p1)主要是维护和跟踪ISR集合中的滞后状态。
ISR集合跟OSR集合是动态的。比如后面有消息来后,p1s2的时间超过规定时间1s,
则将p1s2放入OSR集合。
当p1宕机,一般是从ISR选取副本进行替代成为leader副本,即新的P1。
ISR只是模型,映射到实际的存储的一些基本概念。
HW:high watermark ,高水位线,消费者只能最多拉取到高水位线的消息
LEO:log end offset,日志文件的最后一条记录offset(偏移量)
ISR集合与HW和LEO直接存在着密不可分的关系
高水位线下一次需要拉取的数据是6
理解:当写入消息3和4时,此时leader已经成功写入,hw没变,
Leo变成4.HW没变是因为还没有完成数据同步,3和4目前consumer
目前还没有办法处理。
消息3同步的比较快,此时hw变成3.LEO变为4.
因为follower2对于数据同步的比较慢,所以暂定hw为3
3.构建zookeeper集群环境
4.开机启动与连接工具介绍
实现开机自启动步骤:
cd /etc/rc.d/init.d/
Touch zookeeper
Chmod 777 zookeeper
vi zookeeper
开启启动zookeeper脚本:
#!/bin/bash
#chkconfig:2345 20 90
#description:zookeeper
#processname:zookeeper
export JAVA_HOME=/usr/java/jdk1.8.0_301
export PATH=$JAVA_HOME/bin:$PATH
case $1 in
start) su root /home/software5/zookeeper-3.6.2/bin/zkServer.sh start;;
stop) su root /home/software5/zookeeper-3.6.2/bin/zkServer.sh stop;;
status) su root /home/software5/zookeeper-3.6.2/bin/zkServer.sh status;;
restart) su root /home/software5/zookeeper-3.6.2/bin/zkServer.sh restart;;
*) echo "require start|stop|status|restart" ;;
esac
使用ZooInspector图形界面连接zookeeper。
使用到的命令:java -jar.\zookeeper-dev-ZooInspector.jar
5.Kafka环境搭建
Kafka版本:kafka_2.12
管控台:kafkaManager2.0.0.2
协调服务:zookeeper-3.4.6
Kafka环境验证(操作台控制)
Kafka环境搭建:
准备zookeeper环境(zookeeper-3.6.2)
下载kafka安装包:kafka
上传到虚拟机:/home/software6
解压到/usr/local目录下:
tar -zxvf kafka_2.12-2.1.0.tgz.gz -C /usr/local
重命名压缩的文件:mv kafka_2.12-2.1.0 kafka_2.12
修改kafka配置文件:
cd kafka_2.12/config
vi server.properties
## 修改内容:
## The id of the broker. This must be set to a unique integer for each broker
broker.id=0
port=9092
host.name=192.168.11.221
dvertised.host.name=192.168.11.221
log.dirs=/usr/local/kafka_2.12/kafka-logs
num.partitions=5
zookeeper.connect=192.168.11.221:2181,192.168.11.222:2181,192.168.11.223:2181
创建kafka存储消息(log日志数据)的目录
mkdir /usr/local/kafka_2.12/kafka-logs
kafka配置成功,执行启动命令,启动kafka。
/usr/local/kafka_2.12/bin/kafka-server-start.sh /usr/local/kafka_2.12/config/server.properties &
/usr/local/kafka_2.12/bin/kafka-server-start.sh /usr/local/kafka_2.12/config/server.properties &
Kafka-manager管控台搭建与脚本测试验证
安装kafka Manager可视化管控台:安装到192.168.56.107上
解压zip文件:
unzip kafka-manager-2.0.0.2.zip -d /usr/local
修改配置文件: vi /usr/local/kafka-manager-2.0.0.2/conf/appication.conf
修改的内容:
kafka-manager.zkhosts=“192.168.56.107:2181,192.168.56.110:2181,
192.168.56.111:2181”
192.168.56.107节点启动kafka manager 控制台
/usr/local/kafka-manager-2.0.0.2/bin/kafka-manager &
浏览器访问控制台:默认端口号是9000
http://192.168.56.107:9000/
集群验证:
通过控制台创建了一个topic为"test" 2个分区 1个副本
消费发送与接收验证
cd /usr/local/kafka_2.12/bin
## 启动发送消息的脚本
## --broker-list 192.168.11.221 指的是kafka broker的地址列表
## --topic test 指的是把消息发送到test主题
./kafka-console-producer.sh --broker-list 192.168.56.107:9092 --topic topic-quickstart
## 启动接收消息的脚本
./kafka-console-consumer.sh --bootstrap-server 192.168.56.107:9092 --topic topic-serial
6.Kafka快速入门
Kafka急速入门:producer:
配置生产者参数属性,创建生产者对象;构建消息producerrecord
发送消息send;关闭生产者
Kafka快速入门:Consumer
配置消费者参数写构造消费者对象;订阅主题;
拉取消息并进行消费处理;提交消费偏移量,关闭消费者
kafka是通过序列化好的key然后去进行分区的。
Kafka急速入门:producer:
import com.alibaba.fastjson.JSON;
import com.bfxy.kafka.api.Const;
import com.bfxy.kafka.api.User;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class QuickStartProducer {
public static void main(String[] args) {
Properties properties = new Properties();
//1.配置生产者启动的关键属性参数
//1.1 BOOTSTRAP_SERVERS_CONFIG,连接kafka集群服务列表,如果有多个,使用","进行分隔
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.56.107:9092");
//1.2 CLIENT_ID_CONFIG: 这个属性的目的是标记kafka cilent的ID
properties.put(ProducerConfig.CLIENT_ID_CONFIG,"quickstart-producer");
//1.3对kafka的key和value做序列化(kafka只能识别二进制数据)
//org.apache.kafka.common.serialization.Serialization
//key;是kafka用于做消息投递计算具体投递到对应的主题的哪一个partition而需要的
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//value:实际发送消息的内容
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//2.创建kafka生产者对象,传递properties属性参数集合
KafkaProducer<String ,String> producer = new KafkaProducer<>(properties);
//3.构造消息内容
User user = new User("003","张飞");
//需要将user对象转化为string,arg1:topic arg2:实际消息体的内容
ProducerRecord<String,String> record =
new ProducerRecord<String,String>(Const.TOPIC_QUICKSTART,
JSON.toJSONString(user));
//4.发送消息
producer.send(record);
//5.关闭生产者,生产环境中一般不关闭
producer.close();
}
}
Kafka快速入门:Consumer
import com.bfxy.kafka.api.Const;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
/**
* @author nly
*/
public class QuickStartConsumer {
public static void main(String[] args) throws InterruptedException {
Properties properties = new Properties();
//1.配置属性参数
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.107:9092");
//key;是kafka用于做消息投递计算具体投递到对应的主题的哪一个partition而需要的
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//value:实际发送消息的内容
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//非常重要的属性配置,与我们消费者订阅组有关系
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "topic-quickstart");
//常规属性:会话连接超时时间
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
//常规属性:消费者提交offset:自动提交&手工提交,默认是自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,5000);
//2.创建消费者对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
//3.订阅你感兴趣的主题:Const.TOPIC_QUICKSTART
consumer.subscribe(Collections.singletonList(Const.TOPIC_QUICKSTART));
System.err.println("quickstart consumer started ...");
//4.采用拉取的方式消费数据
while (true){
//等待多久进行一次数据的拉取
//拉取TOPIC_QUICKSTART主题里面所有的消息
//topic和partition是一对多的关系,一个topic可以有多个partition
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(3000));
//因为消息是在partition中存储的,所以需要遍历partition集合
for (TopicPartition topicPartition : records.partitions()){
//通过topicPartition获取制定的消息集合,就是获取到当前topicPartition下面所有的消息
// 在records对象中的数据集合
List<ConsumerRecord<String,String>> partitionRecords = records.records(topicPartition);
//获取到每一个TopicPartition
String topic =topicPartition.topic();
//获取当前topicPartition下的消息条数
int size = partitionRecords.size();
System.out.println(String.format("--- 获取topic: %s, 分区位置 : %s, 消息总数: %s",
topic,
topicPartition.partition(),
size));
for(int i = 0; i < size; i++){
ConsumerRecord<String,String> consumerRecord = partitionRecords.get(i);
// 实际数据内容
String value = consumerRecord.value();
// 当前获取的消息的偏移量
long offset = consumerRecord.offset();
//ISR : High Watermark,如果要提交的话,比如提交当前消息的offset
//表示下一次从什么位置(offset)拉取消息
long commitOffset = offset + 1;
System.err.println(String.format("获取实际消息value: %s, 消息offset: %s, 提交offset: %s",
value,offset,commitOffset));
}
}
}
}
}