Dubbo远程调用的性能问题
Dubbo调用普遍存在于我们的微服务项目中
这些Dubbo调用全部是同步的操作
这里的"同步"指:消费者A调用生产者B之后,A的线程会进入阻塞状态,等待生产者B运行结束返回之后,A才能运行之后的代码
Dubbo消费者发送调用后进入阻塞状态,这个状态表示该线程仍占用内存资源,但是什么动作都不做
如果生产者运行耗时较久,消费者就一直等待,如果消费者利用这个时间,那么可以处理更多请求,业务整体效率会提升
实际情况下,Dubbo有些必要的返回值必须等待,但是不必要等待的服务返回值,我们可以不等待去做别的事情
这种情况下我们就要使用消息队列
什么是消息队列
消息队列(Message Queue)简称MQ,也称:"消息中间件"
消息队列是采用"异步(两个微服务项目并不需要同时完成请求)"的方式来传递数据完成业务操作流程的业务处理方式
消息队列的特征
常见面试题:消息队列的特征(作用)
利用异步的特性,提高服务器的运行效率,减少因为远程调用出现的线程等待\阻塞时间
削峰填谷:在并发峰值超过当前系统处理能力时,我们将没处理的信息保存在消息队列中,在后面出现的较闲的时间中去处理,直到所有数据依次处理完成,能够防止在并发峰值时短时间大量请求而导致的系统不稳定
消息队列的延时:因为是异步执行,请求的发起者并不知道消息何时能处理完,如果业务不能接受这种延迟,就不要使用消息队列
常见消息队列软件
Kafka:性能好\功能弱:适合大数据量,高并发的情况,大数据领域使用较多
RabbitMQ:功能强\性能一般:适合发送业务需求复杂的消息队列,java业务中使用较多
RocketMQ:阿里的
ActiveMQ:前几年流行的,老项目可能用到
.....
消息队列的事务处理
当接收消息队列中信息的模块运行发生异常时,怎么完成事务的回滚?
当消息队列中(stock)发生异常时,在异常处理的代码中,我们可以向消息的发送者(order)发送消息,然后通知发送者(order)处理,消息的发送者(order)接收到消息后,一般要手写代码回滚,如果回滚代码过程中再发生异常,就又要思考回滚方式,如果一直用消息队列传递消息的话,可能发生异常的情况是无止境的
所以我们在处理消息队列异常时,经常会设置一个"死信队列",将无法处理的异常信息发送到这个队列中
死信队列没有任何处理者,通常情况下会有专人周期性的处理死信队列的消息
Kafka
什么是Kafka
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。Kafka最初是由LinkedIn开发,并随后于2011年初开源。
kafka软件结构
Kafka是一个结构相对简单的消息队列(MQ)软件
kafka软件结构图
Kafka Cluster(Kafka集群)
Producer:消息的发送方,也就是消息的来源,Kafka中的生产者
order就是消息的发送方,在Dubbo中order是消费者,这个身份变化了
Consumer:消息的接收方,也是消息的目标,Kafka中的消费者
stock就是消息的接收方,在Dubbo中stock是生产者,这个身份变化了
Topic:话题或主题的意思,消息的收发双方要依据同一个话题名称,才不会将信息错发给别人
Record:消息记录,就是生产者和消费者传递的信息内容,保存在指定的Topic中
Kafka的特征与优势
Kafka作为消息队列,它和其他同类产品相比,突出的特点就是性能强大
Kafka将消息队列中的信息保存在硬盘中
Kafka对硬盘的读取规则进行优化后,效率能够接近内存
硬盘的优化规则主要依靠"顺序读写,零拷贝,日志压缩等技术"
Kafka处理队列中数据的默认设置:
Kafka队列信息能够一直向硬盘中保存(理论上没有大小限制)
Kafka默认队列中的信息保存7天,可以配置这个时间,缩短这个时间可以减少Kafka的磁盘消耗
Kafka的安装和配置
必须将我们kafka软件的解压位置设置在一个根目录,文件夹名称尽量短(例如:kafka)
然后路径不要有空格和中文
我们要创建一个空目录用于保存Kafka运行过程中产生的数据
本次创建名称为data的空目录
下面进行Kafka启动前的配置
先到F:\kafka\config下配置有文件zookeeper.properties
找到dataDir属性修改如下
dataDir=F:/data
修改完毕之后要Ctrl+S进行保存,否则修改无效!!!!
注意F盘和data文件夹名称,匹配自己电脑的真实路径和文件夹名称
还要修改server.properties配置文件
log.dirs=F:/data
修改注意事项和上面相同
启动kafka
要想启动Kafka必须先启动Zookeeper
Zookeeper介绍
zoo:动物园
keeper:园长
可以引申为管理动物的人
Linux服务器中安装的各种软件,很多都是有动物形象的
如果这些软件在Linux中需要修改配置信息的话,就需要进入这个软件,去修改配置,每个软件都需要单独修改配置的话,工作量很大
我们使用Zookeeper之后,可以创建一个新的管理各种软件配置的文件管理系统
Linux系统中各个软件的配置文件集中到Zookeeper中
实现在Zookeeper中,可以修改服务器系统中的各个软件配置信息
长此以往,很多软件就删除了自己写配置文件的功能,而直接从Zookeeper中获取
Kafka就是需要将配置编写在Zookeeper中的软件之一
所以要先启动zookeeper才能启动kafka
Zookeeper启动
进入路径F:\kafka\bin\windows
输入cmd进入dos命令行
F:\kafka\bin\windows>zookeeper-server-start.bat ..\..\config\zookeeper.properties
kafka启动
总体方式一样,输入不同指令
F:\kafka\bin\windows>kafka-server-start.bat ..\..\config\server.properties
附录
Mac系统启动Kafka服务命令(参考):
# 进入Kafka文件夹cd Documents/kafka_2.13-2.4.1/bin/# 动Zookeeper服务./zookeeper-server-start.sh -daemon ../config/zookeeper.properties # 启动Kafka服务./kafka-server-start.sh -daemon ../config/server.properties
Mac系统关闭Kafka服务命令(参考):
# 关闭Kafka服务./kafka-server-stop.sh # 启动Zookeeper服务./zookeeper-server-stop.sh
在启动kafka时有一个常见错误
wmic不是内部或外部命令
这样的提示,需要安装wmic命令,安装方式参考
https://zhidao.baidu.com/question/295061710.html
如果启动kafka无响应
在“环境变量”的“用户变量路径”中Path属性添加一行后
%SystemRoot%\System32\Wbem;%SystemRoot%\System32\;%SystemRoot%
Kafka使用演示
启动的zookeeper和kafka的窗口不要关闭
我们在csmall项目中编写一个kafka使用的演示
csmall-cart-webapi模块
添加依赖
<!-- SpringBoot整合Kafka的依赖 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- google提供的可以将java对象和json格式字符串转换的工具 -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
修改yml文件进行配置
spring:
kafka:
bootstrap-servers: localhost:9092
# consumer.group-id是Spring-Kafka框架要求的必须配置的内容,不配置启动会报错
# 作用是给话题分组,防止不同项目恰巧相同的话题名称混淆
# 本质上,在当前项目发送给kafka消息时,会使用这个分组作为话题名称的前缀
# 例如发送一个message的话题名称,实际上会发送的话题名称是csmall_message
consumer:
group-id: csmall
在SpringBoot启动类中添加启动Kafka的注解
@SpringBootApplication
@EnableDubbo
// 项目启动时启用对kafka的支持
@EnableKafka
// 我们为了测试kafka消息的收发效果
// 利用SpringBoot自带的定时任务工具,实现周期性向kafka发送消息的功能
// 明确我们SpringBoot自带定时任务和Kafka没有必然联系
@EnableScheduling
public class CsmallCartWebapiApplication {
public static void main(String[] args) {
SpringApplication.run(CsmallCartWebapiApplication.class, args);
}
}
下面我们就可以实现周期性的向kafka发送消息并接收的操作了
编写消息的发送
cart-webapi包下创建kafka包
包中创建Producer类来发送消息
消息的发送
// SpringBoot启动时,将当前类对象实例化后保存到Spring容器,才能实现周期运行
@Component
public class Producer {
// 配置Spring-Kafka框架提供的能够连接kafka操作的对象
// 这个对象需要指定泛型<String,String>
// KafkaTemplate<[话题类型],[消息类型]>
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
// 定义话题常量
public static final String TOPIC_KEY="myCart";
int i=1;
// 发送消息的方法
// 实现每隔10秒(10000毫秒)运行一次,需要添加下面的注解
@Scheduled(fixedRate = 10000)
public void sendMessage(){
// 实例化一个要发送的对象
Cart cart=new Cart();
cart.setId(i++);
cart.setCommodityCode("PC100");
cart.setUserId("UU100");
cart.setPrice(10+ RandomUtils.nextInt(90));
cart.setCount(1+RandomUtils.nextInt(10));
// 因为当前kafka连接只能发送字符串类型对象
// 所以我们需要将上面的cart对象转换为json格式字符串
// 例如 {"id":"1","userId":"UU100","price":"10",...}
Gson gson=new Gson();
// 使用gson转换cart对象称为json格式字符串
String json=gson.toJson(cart);
System.out.println("要发送的json格式字符串为:"+json);
// 执行发送
kafkaTemplate.send(TOPIC_KEY,json);
}
}
消息的接收
下面开始接收
kafka包中创建一个叫Consumer的类来接收消息
接收消息的类可以是本模块的类,也可以是其它模块的类,编写的代码是完全一致
// 当前Consumer是用于接收kafka消息的类
// 要求将这个类也保存到Spring容器中,因为SpringKafka框架使用Spring容器中的对象
@Component
public class Consumer {
// Spring-Kafka接收消息,使用了"监听机制"
// 框架设计了一条线程,实时关注Kafka话题接收的情况
// 我们可以指定一个话题名称(myCart),设置一旦这个话题中有消息,监听线程就通知下面方法运行
@KafkaListener(topics = Producer.TOPIC_KEY)
// 上面注解就实现了监听的机制
// 当前Kafka的myCart话题出现消息时,会自动运行下面方法
// 下面方法的参数和返回值是不能修改的(方法名随意)
public void received(ConsumerRecord<String,String> record){
// 方法的返回值必须是void ,参数必须是ConsumerRecord
// ConsumerRecord<[话题类型],[消息类型]>
// 从record对象中获取消息内容
String json=record.value();
// json可能是这样的字符串:{"id":"1","userId":"UU100","price":"10",...}
// 最终要活的Cart对象,需要将json字符串转换为java对象
// 仍然使用Gson工具类实现
Gson gson=new Gson();
Cart cart=gson.fromJson(json, Cart.class);
// 转换完成输出cart,测试验证消息的接收效果
System.out.println(cart);
}
}
Nacos\Seata\Zookeeper\Kafka启动
启动cart模块
观察是否10秒出现一次输出
有发送消息和接收消息的效果