背景
随着物联网行业的发展、智能设备数量越来越多,随着设备活跃量过大,常常存在一些高并发的请求,形成了流量尖峰,过多的请求会压垮服务器,影响其他服务运行。因此,为了保护云端服务,需要对请求进行缓冲,RocketMQ
就是一款非常优秀消息队列的中间件,在互联网领域久经考验,也被各个行业广泛应用。
在上一篇文章中,介绍了RocketMQ
的工作原理使用。物联网中如何使用RockeMQ
如何在配置文件端配置消费者线程大小
当生产者将大量的消息堆积到消息队列中时,需要同步启用消费者去消费队列中的消息,达到动态平衡的效果。
如下代码所示,在消费者类上,会使用@RocketMQMessageListener
注解,并填写必要的属性:consumerThreadNumber
:消费线程、主题、消费组。
@RocketMQMessageListener(
// 指定消费线程大小
consumeThreadNumber = 16,
topic = TOPIC_DEMO,
consumerGroup = "consumer_demo_group"
)
@Component
public class Consumer implements RocketMQListener<MessageExt> {
private final String CHARSET = Charset.defaultCharset().name();
@Override
public void onMessage(MessageExt message) {
byte[] body = message.getBody();
String str = new String(body, Charset.forName(this.CHARSET));
System.out.println("消费者消费的消息为: " + str);
}
}
其中的topic
、consumerGroup
可以指定一次就不会变啦,但是consumerThreadNumber
会根据机器的性能发生变化;因此需要将其提出放到配置文件中,方便修改,比如"application.yaml
"。
那应该如何实现呢?
其中 consumerThreadNumber = 16
,表明填入的是一个static
的变量,因此如果简单地使用@Value
来进行注入变量是不成功的,因为它只能注入非静态变量。为了能实现从配置文件中读取变量,并转为static
变量,采用了显示set
方式赋值变量的方法。
/**
* 注入mq消费的线程数量
*/
public static int RocketMQThreadSize;
@Value("${biz.RocketMQThreadSize}")
public void setRocketMQThreadSize(int threadSize) {
RocketMQThreadSize = threadSize;
}
那在配置文件中就可以配置RocketMQThreadSize
的大小啦。如下在application.yaml
就可以搞定。
biz: RocketMQThreadSize: 200
如何使得自定义的线程大小生效
如上一章所示,可以得到静态的变量,那如何在消费者中生效呢?幸好RocketMQ
提供一个接口可以实现消费者线程的自定义。
在消费者的类需要实现RocketMQPushConsumerLifecycleListener
接口即可,然后在类中实现prepareStart
方法即可。具体如下所示:
@RocketMQMessageListener(
topic = TOPIC_DEMO,
consumerGroup = "consumer_demo_group"
)
@Component
public class Consumer implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {
private final String CHARSET = Charset.defaultCharset().name();
@Override
public void onMessage(MessageExt message) {
byte[] body = message.getBody();
String str = new String(body, Charset.forName(this.CHARSET));
System.out.println("消费者消费的消息为: " + str);
}
@Override
public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
// 指定消费线程大小
defaultMQPushConsumer.setConsumeThreadMin(ConfigProperties.RocketMQThreadSize);
defaultMQPushConsumer.setConsumeThreadMin(ConfigProperties.RocketMQThreadSize);
}
}
在prepareStart
方法中,指定一些必要的线程参数
- 最大线程:
defaultMQPushConsumer.setConsumeThreadMin(ConfigProperties.RocketMQThreadSize);
- 最小线程:
defaultMQPushConsumer.setConsumeThreadMin(ConfigProperties.RocketMQThreadSize);
并且通过实验和查看源码,其中最大、最小设置一样才会生效。
如何确定合适的线程大小
以上的步骤已经帮忙把线程设置提取出来啦,之后只需在配置文件中修改线程数大小,而不需去代码中修改,避免代码导致系统出现问题。那如何去确定线程的数量大小呢?
线程是计算机执行任务的基本的单位,即消费任务可以交给线程去执行。
当线程数量较少时,CPU
性能不能充分发挥。但是线程数量过的就会导致过多的线程处于等待中,机器的负载升高。因此需要确定适合当前机器的线程数量。
在RocketMQ
线程调优有两个指标可以帮助大致确定消费线程大小:
- 消费者的
TPS
,表明消费者的能力; - 机器负载,分为
CPU
负载和IO
负载,和自身的核心数量有关。
RocketMQ
提供web
界面,可以监测TPS
的大小,这个数量当然是越大越好,但是也要考虑负载。
在服务器输入top
命令就可以看大,当前机器的负载:
分别为1、5、15分钟负载。
在进行压测的时候,需要知道机器的核心数量,监测负载的时候负载的大小就不能超过核心数量。
在测试的时候可以从小到大调节线程数大小,并且关注TPS
和负载。
结尾
以上就是确定消费者线程大小的整个过程,有疑问欢迎留言交流!!!
线程介绍