点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
- Hadoop(已更完)
- HDFS(已更完)
- MapReduce(已更完)
- Hive(已更完)
- Flume(已更完)
- Sqoop(已更完)
- Zookeeper(已更完)
- HBase(已更完)
- Redis (已更完)
- Kafka (正在更新…)
章节内容
上节我们完成了:
- Kafka介绍
- ZK的基本环境
- Kafka下载解压配置
- Kafka启动配置
- Kafka启动服务
Kafka启动
上节我们通过sh脚本启动,但是当我们的SSH关闭的时候,Kafka服务也退出。
这里我们可以使用 Kakfa 的守护进程的方式启动,就可以在后台运行了。
kafka-server-start.sh -daemon /opt/servers/kafka_2.12-2.7.2/config/server.properties
启动之后,我们可以通过 ps 工具看到:
ps aux | grep kafka
返回结果如下图:
sh脚本使用
topics.sh
kakfa-topics.sh 用于管理主题
查看所有
kafka-topics.sh --list --zookeeper h121.wzk.icu:2181
当前执行返回的是空的,因为我们没有任何主题。
创建主题
kafka-topics.sh --zookeeper h121.wzk.icu:2181 --create --topic wzk_topic_1 --partitions 1 --replication-factor 1
执行结果中,我们可以观察到,已经顺利的完成了。
查看主题
kafka-topics.sh --zookeeper h121.wzk.icu:2181 --describe --topic wzk_topic_1
执行结果中,我们可以观察到,已经顺利的完成了。
删除主题
kafka-topics.sh --zookeeper h121.wzk.icu:2181 --delete --topic wzk_topic_1
新建主题(用于测试)
kafka-topics.sh --zookeeper h121.wzk.icu:2181 --create --topic wzk_topic_test --partitions 1 --replication-factor 1
producer.sh
kafka-console-producer.sh 用于生产消息
生成数据
kafka-console-producer.sh --topic wzk_topic_test --broker-list h121.wzk.icu:9092
手动生成一批数据来进行测试:
consumer.sh
kafka-console-consumer.sh 用于消费消息
消费数据
kafka-console-consumer.sh --bootstrap-server h121.wzk.icu:9092 --topic wzk_topic_test
此时,我们需要再开启一个 Producer 产生数据,它才会继续消费。
从头消费
kafka-console-consumer.sh --bootstrap-server h121.wzk.icu:9092 --topic wzk_topic_test --from-beginning
从头开始消费的话,我们可以看到消费者已经把刚才我们写入的数据都消费了
Java API
架构图
POM
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.2</version>
</dependency>
生产者1测试
public class TestProducer01 {
public static void main(String[] args) throws Exception {
Map<String, Object> configs = new HashMap<>();
configs.put("bootstrap.servers", "h121.wzk.icu:9092");
configs.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
configs.put("acks", "1");
KafkaProducer<Integer, String> producer = new KafkaProducer<>(configs);
ProducerRecord<Integer, String> record = new ProducerRecord<>(
"wzk_topic_test",
0, 0,
"hello world by java!"
);
Future<RecordMetadata> future = producer.send(record);
future.get(3_000, TimeUnit.SECONDS);
producer.close();
}
}
生产者1运行
2024-07-12 11:53:11,542 INFO [org.apache.kafka.clients.producer.ProducerConfig] - ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [h121.wzk.icu:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
internal.auto.downgrade.txn.commit = false
key.serializer = class org.apache.kafka.common.serialization.IntegerSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
运行结果如下图:
生产者2测试
public class TestProducer02 {
public static void main(String[] args) throws Exception {
Map<String, Object> configs = new HashMap<>();
configs.put("bootstrap.servers", "h121.wzk.icu:9092");
configs.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
configs.put("acks", "1");
KafkaProducer<Integer, String> producer = new KafkaProducer<>(configs);
ProducerRecord<Integer, String> record = new ProducerRecord<>(
"wzk_topic_test",
0, 0,
"hello world by java! CallBack test!"
);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
System.out.println(
"主题: " + recordMetadata.topic() + ", " +
"分区: " + recordMetadata.partition() + ", " +
"时间戳: " + recordMetadata.timestamp()
);
} else {
System.out.println("生产消息异常!!!");
}
}
});
producer.close();
}
}
运行之后,控制台输出:
2024-07-12 12:46:48,795 INFO [org.apache.kafka.common.utils.AppInfoParser] - Kafka version: 2.7.2
2024-07-12 12:46:48,795 INFO [org.apache.kafka.common.utils.AppInfoParser] - Kafka commitId: 37a1cc36bf4d76f3
2024-07-12 12:46:48,795 INFO [org.apache.kafka.common.utils.AppInfoParser] - Kafka startTimeMs: 1720759608792
2024-07-12 12:46:49,200 INFO [org.apache.kafka.clients.Metadata] - [Producer clientId=producer-1] Cluster ID: DGjwPmfLSk2OKosFFLZJpg
2024-07-12 12:46:49,209 INFO [org.apache.kafka.clients.producer.KafkaProducer] - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
主题: wzk_topic_test, 分区: 0, 时间戳: 1720759609201
2024-07-12 12:46:49,282 INFO [org.apache.kafka.common.metrics.Metrics] - Metrics scheduler closed
2024-07-12 12:46:49,282 INFO [org.apache.kafka.common.metrics.Metrics] - Closing reporter org.apache.kafka.common.metrics.JmxReporter
2024-07-12 12:46:49,282 INFO [org.apache.kafka.common.metrics.Metrics] - Metrics reporters closed
2024-07-12 12:46:49,283 INFO [org.apache.kafka.common.utils.AppInfoParser] - App info kafka.producer for producer-1 unregistered
运行的之后的控制台如下:
消费者01运行
public class TestConsumer01 {
public static void main(String[] args) throws Exception {
Map<String, Object> configs = new HashMap<>();
configs.put("bootstrap.servers", "h121.wzk.icu:9092");
configs.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
configs.put("group.id", "wzk-test");
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(configs);
final List<String> topics = Arrays.asList("wzk_topic_test");
consumer.subscribe(topics, new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
collection.forEach(item -> {
System.out.println("剥夺的分区: " + item.partition());
});
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
collection.forEach(item -> {
System.out.println("接收的分区: " + item.partition());
});
}
});
final ConsumerRecords<Integer, String> records = consumer.poll(3_000);
final Iterable<ConsumerRecord<Integer, String>> topic1Iterable = records.records("topic_1");
topic1Iterable.forEach(record -> {
System.out.println("消息头字段:" + Arrays.toString(record.headers().toArray()));
System.out.println("消息的key:" + record.key());
System.out.println("消息的偏移量:" + record.offset());
System.out.println("消息的分区号:" + record.partition());
System.out.println("消息的序列化key字节数:" + record.serializedKeySize());
System.out.println("消息的序列化value字节数:" + record.serializedValueSize());
System.out.println("消息的时间戳:" + record.timestamp());
System.out.println("消息的时间戳类型:" + record.timestampType());
System.out.println("消息的主题:" + record.topic());
System.out.println("消息的值:" + record.value());
});
consumer.close();
}
}
消费者01测试
2024-07-12 13:00:17,456 INFO [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] - [Consumer clientId=consumer-wzk-test-1, groupId=wzk-test] Adding newly assigned partitions: wzk_topic_test-0
接收的分区: 0
2024-07-12 13:00:17,480 INFO [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] - [Consumer clientId=consumer-wzk-test-1, groupId=wzk-test] Setting offset for partition wzk_topic_test-0 to the committed offset FetchPosition{offset=12, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[h121.wzk.icu:9092 (id: 0 rack: null)], epoch=0}}
消息头字段:[]
消息的key:0
消息的偏移量:12
消息的分区号:0
消息的序列化key字节数:4
消息的序列化value字节数:20
消息的时间戳:1720760404260
消息的时间戳类型:CreateTime
消息的主题:wzk_topic_test
消息的值:hello world by java!
控制台运行截图如下: