Kafka系列
注:大家觉得博客好的话,别忘了点赞收藏呀,本人每周都会更新关于人工智能和大数据相关的内容,内容多为原创,Python Java Scala SQL 代码,CV NLP 推荐系统等,Spark Flink Kafka Hbase Hive Flume等等~写的都是纯干货,各种顶会的论文解读,一起进步。
今天继续和大家分享一下Kafka系列
#博学谷IT学习技术支持
文章目录
- Kafka系列
- 前言
- 一、Kafka的架构
- 二、Kafka的shell命令使用
- 1. 如何创建Topic
- 2.查看当前有那些topic:
- 3.查看某一个Topic的详细信息:
- 4.如何删除Topic
- 5.如何修改Topic
- 6.如何模拟生产者: 发送数据
- 7.如何模拟消费者: 消费数据
- 三、Kafka的Java API的操作
- 1.创建一个Maven的项目, 导入相关的依赖
- 2.演示如何将数据生产到Kafka
- 3.演示如何从Kafka消费数据
- 总结
前言
Kafka是Apache旗下的一款开源免费的消息队列的中间件产品,最早是由领英公司开发的, 后期共享给Apache, 目前已经是Apache旗下的顶级开源的项目, 采用语言为Scala
适用场景: 数据传递工作, 需要将数据从一端传递到另一端, 此时可以通过Kafka来实现, 不局限两端的程序
在实时领域中, 主要是用于流式的数据处理工作
一、Kafka的架构
Kafka Cluster: kafka集群
broker: kafka的节点
producer: 生产者
consumer: 消费者
Topic: 主题/话题 理解就是一个大的逻辑容器(管道)
shard: 分片. 一个Topic可以被分为N多个分片, 分片的数量与节点数据没有关系
replicas: 副本, 可以对每一个分片构建多个副本, 副本数量最多和节点数量一致(包含本身) 保证数据不丢失
zookeeper: 存储管理集群的元数据信息
二、Kafka的shell命令使用
Kafka是一个消息队列的中间件产品, 主要的作用: 将数据从程序一端传递到另一端的操作, 所以说学习Kafka主要学习如何使用Kafka生产数据, 以及如何使用Kafka消费数据
1. 如何创建Topic
./kafka-topics.sh --create --zookeeper node1:2181,node2:2181,node3:2181 --topic test01 --partitions 3 --replication-factor 2
2.查看当前有那些topic:
./kafka-topics.sh --list --zookeeper node1:2181,node2:2181,node3:2181
3.查看某一个Topic的详细信息:
./kafka-topics.sh --describe --zookeeper node1:2181,node2:2181,node3:2181 --topic test01
4.如何删除Topic
./kafka-topics.sh --delete --zookeeper node1:2181,node2:2181,node3:2181 --topic test01
5.如何修改Topic
Topic 仅允许增大分片, 不允许减少分片 同时也不支持修改副本的数量
./kafka-topics.sh --alter --zookeeper node1:2181,node2:2181,node3:2181 --topic test01 --partitions 5
6.如何模拟生产者: 发送数据
./kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9092 --topic test01
7.如何模拟消费者: 消费数据
默认从当前的时间开始消费数据, 如果想从头开始消费, 可以添加 --from-beginning 参数即可
./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic test01
三、Kafka的Java API的操作
1.创建一个Maven的项目, 导入相关的依赖
<repositories><!--代码库-->
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases><enabled>true</enabled></releases>
<snapshots>
<enabled>false</enabled>
<updatePolicy>never</updatePolicy>
</snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-io</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.6</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<target>1.8</target>
<source>1.8</source>
</configuration>
</plugin>
</plugins>
</build>
2.演示如何将数据生产到Kafka
package com.itheima.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class KafkaProducerTest {
public static void main(String[] args) {
// 第一步: 创建kafka的生产者核心对象: KafkaProducer 传入相关的配置
Properties props = new Properties();
props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
//2. 执行发送数据操作
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
"test01", "张三"+i
);
producer.send(producerRecord);
}
//3. 执行close 释放资源
producer.close();
}
}
3.演示如何从Kafka消费数据
package com.itheima.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumerTest {
public static void main(String[] args) {
// 1- 创建Kafka的消费者的核心对象: KafkaConsumer
Properties props = new Properties();
props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
props.put("group.id", "test"); // 消费者组的ID
props.put("enable.auto.commit", "true"); // 是否自动提交偏移量offset
props.put("auto.commit.interval.ms", "1000"); // 自动提交的间隔时间
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // key值的反序列化的类
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value的值反序列化的类
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//2. 订阅topic: 表示消费者从那个topic来消费数据 可以指定多个
consumer.subscribe(Arrays.asList("test01"));
while (true) {
// 3. 从kafka中获取消息数据, 参数表示当kafka中没有消息的时候, 等待的超时时间, 如果过了等待的时间, 返回空对象(对象存在, 但是内部没有数据 相当于空容器)
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
long offset = record.offset();
String key = record.key();
String value = record.value();
// 偏移量: 每一条数据 其实就是一个偏移量 , 每个分片单独统计消息到达了第几个偏移量 偏移量从 0 开始的
System.out.println("消息的偏移量为:"+offset+"; key值为:"+key + "; value的值为:"+ value);
}
}
}
}
总结
以上就是今天要讲的内容,本文主要介绍了Kafka的基础和一些常规Shell和Java的操作。后续还会补充更多关于Kafka的内容