文章目录
- 1、异步通信原理
- 1.1 观察者模式
- 1.2 生产者消费者模式
- 1.3 缓冲区
- 1.4 数据单元
- 2、消息系统原理
- 2.1 点对点消息传递
- 2.2 发布订阅消息传递
- 3、Kafka简介
- 3.1 设计目标
- 3.2 Kafka的优点
- 4、Kafka系统架构
- 4.1 Broker
- 4.2 Topic
- 4.3 Partition
- 4.4 Leader
- 4.5 Follower
- 4.6 replication
- 4.7 producer
- 4.8 consumer
- 4.9 Consumer Group
- 4.10 offset偏移量
- 4.11 Zookeeper
- 5、Kafka环境搭建
- 6、Spring Boot集成Kafka
- 1 入门
- 导入spring-kafka依赖信息
- 在resources下创建文件application.yml
- 消息生产者
- 消息消费者
- 2 传递消息为对象
1、异步通信原理
1.1 观察者模式
1.2 生产者消费者模式
1.3 缓冲区
1.4 数据单元
2、消息系统原理
一个消息系统负责将数据从一个应用传递到另一个应用,应用只需关注于数据,无需关注数据在两个或多个应用间是如何传递的
2.1 点对点消息传递
2.2 发布订阅消息传递
3、Kafka简介
官网:http://kafka.apache.org
3.1 设计目标
3.2 Kafka的优点
4、Kafka系统架构
4.1 Broker
4.2 Topic
4.3 Partition
4.4 Leader
4.5 Follower
4.6 replication
4.7 producer
4.8 consumer
4.9 Consumer Group
4.10 offset偏移量
4.11 Zookeeper
5、Kafka环境搭建
Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装Kafka之前必须先安装zookeeper
- Docker安装Zookeeper
下载镜像
docker pull zookeeper:3.4.14
创建容器
docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
Docker安装Kafka
docker pull wurstmeister/kafka:2.12-2.3.1
创建容器
docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--restart always=true
--net=host wurstmeister/kafka:2.12-2.3.1
(1)创建Kafka-demo项目,导入依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
(2)生产者发送消息
package com.heima.kafka.sample;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
* 生产者
*/
public class ProducerQuickStart {
public static void main(String[] args) {
//1.kafka的配置信息
Properties properties = new Properties();
//kafka的连接地址
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
//发送失败,失败的重试次数
properties.put(ProducerConfig.RETRIES_CONFIG,5);
//消息key的序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//消息value的序列化器
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//2.生产者对象
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
//封装发送的消息
ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima-topic","100001","hello kafka");
//3.发送消息
producer.send(record);
//4.关闭消息通道,必须关闭,否则消息发送不成功
producer.close();
}
}
(3)消费者接收消息
package com.heima.kafka.sample;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
* 消费者
*/
public class ConsumerQuickStart {
public static void main(String[] args) {
//1.添加kafka的配置信息
Properties properties = new Properties();
//kafka的连接地址
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");
//消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
//消息的反序列化器
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//2.消费者对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
//3.订阅主题
consumer.subscribe(Collections.singletonList("itheima-topic"));
//当前线程一直处于监听状态
while (true) {
//4.获取消息
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key());
System.out.println(consumerRecord.value());
}
}
}
}
6、Spring Boot集成Kafka
1 入门
导入spring-kafka依赖信息
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- kafkfa -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
</dependencies>
在resources下创建文件application.yml
server:
port: 9991
spring:
application:
name: kafka-demo
kafka:
bootstrap-servers: 192.168.200.130:9092
producer:
retries: 10
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: ${spring.application.name}-test
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
消息生产者
package com.heima.kafka.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class HelloController {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@GetMapping("/hello")
public String hello(){
kafkaTemplate.send("itcast-topic","黑马程序员");
return "ok";
}
}
消息消费者
package com.heima.kafka.listener;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
@Component
public class HelloListener {
@KafkaListener(topics = "itcast-topic")
public void onMessage(String message){
if(!StringUtils.isEmpty(message)){
System.out.println(message);
}
}
}
2 传递消息为对象
目前springboot整合后的kafka,因为序列化器是StringSerializer,这个时候如果需要传递对象可以有两种方式
方式一:可以自定义序列化器,对象类型众多,这种方式通用性不强,本章节不介绍
方式二:可以把要传递的对象进行转json字符串,接收消息后再转为对象即可,本项目采用这种方式
- 发送消息
@GetMapping("/hello")
public String hello(){
User user = new User();
user.setUsername("xiaowang");
user.setAge(18);
kafkaTemplate.send("user-topic", JSON.toJSONString(user));
return "ok";
}
- 接受消息
package com.heima.kafka.listener;
import com.alibaba.fastjson.JSON;
import com.heima.kafka.pojo.User;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
@Component
public class HelloListener {
@KafkaListener(topics = "user-topic")
public void onMessage(String message){
if(!StringUtils.isEmpty(message)){
User user = JSON.parseObject(message, User.class);
System.out.println(user);
}
}
}