物联网中RocketMQ的使用
1. 背景
随着物联网行业的发展、智能设备数量越来越多,很多常见的智能设备都进入了千家万户;随着设备数量的增加,也对后台系统的性能提出新的挑战。
在日常中,存在一些特定的场景,属于高并发请求,瞬时的压力会达到20w/s
。为了保证用户的使用体验,提高服务器的处理能力刻不容缓。那应该如何来提高后台服务处理能力呢?
系统访问的两种常见方式:
- 同步方式,该方式是常用的一种方式,当请求时,就会阻塞当前线程等待返回结果。该方式开发很简单,但是在高并发的情况下,会导致系统负载剧增,处理能力下降,甚至会导致上游服务等待过长产生熔断。
- 异步方式,当我们请求时,不需要阻塞当前线程(主线程),当时结果处理完之后,再回调告诉主线程;该方式很适合与高并发的场景,但开发比较困难,会导致失败的情况。
2. 缓冲池
在讲解RocketMQ
之前,首先来介绍缓冲池的概念:
缓冲池是计算机里的概念,很像生活中的蓄水池:从上游来的水并不是直接到达用户家里,而是先存储到一个水池里面;其优点有:
- 多余的水可以在水池里面存起来,防止压力直接就传到下游;
- 控制水池的阀门可以调节出水大小,可以根据实际的使用控制水流。
3. RocketMq的组成
有了缓存池的概念,就很好理解RocketMQ
工作的基本原理,在RocketMQ
中主要有三部分组成:
- 生产者(
producer
),生产者用户生产消息,就像上游的水。 broker
,broker
就像蓄水池一样,会将生产的消息存储起来。- 消费者(
consumer
),将broker
队列里面的消息拉出来消费。
其工作的原理图如下:
通过生产者将消息传递到broker
存储起来,然后消费者去消费broker
里面的消息,其中broker
是路由,具体的存储是采用了队列数据结构。
4. RocketMQ在物联网中的应用
根据它的组成和原理,其非常合适用于异步解耦。比如在某个时间段,大量的请求来到云服务,如果云服务采用同步的方式去控制设备,势必导致服务延迟严重,出现大批量的失败。
因此为了达到异步解耦,流量削峰,当大量的请求来到云服务的时候,并不是直接去控制设备,而是先将控制消息存入RocketMQ
中,然后通过消费者去控制设备。因此具体的控制流程图如下:
如图所示,当控制参数来到服务时,现将控制的参数封装为message
,然后通过生产者将消息存放到RocketMQ
中broker
存储起来,在消费者端将启动多线程去消费消费消息—即去控制设备,当设备控制完成后,将控制的结果告诉上游服务。
通过以上的设计,不管多大的流量来,都先将参数存入rocketMQ
中,然后慢慢去消费,有效地保护了云端服务,提供了云端的性能,提高了用户的使用体验。
5. 使用RocketMQ搭建一个简单的生产者和消费者demo
-
前提—RocketMQ已经搭建好
-
依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
- 配置文件
server.port: 8085
rocketmq:
name-server: 10.16.17.246:9876
producer:
group: rocket-demo
- 首先是生产者,这里开发了一个接口:
@RestController()
@RequestMapping("/producer")
public class ProducerController {
// 注入
@Resource
RocketMQTemplate rocketMQTemplate;
@PostMapping("/push")
public void producer(){
ProducerDto producerDto = new ProducerDto();
Random random = new Random();
producerDto.setApplianceId(String.valueOf(random.nextInt()));
producerDto.setReqId(UUID.randomUUID().toString());
// 将producerDto 封装为消息
Message<ProducerDto> message = withPayload(producerDto).setHeader(PROPERTY_KEYS, producerDto.getReqId()).build();
System.out.println("生产的消息为:" + message);
// 将消息塞入指定的TOPIC
rocketMQTemplate.syncSend(TOPIC_DEMO, message);
}
}
其中,ProducerDto
消息体有applianceId
和reqId
两个参数,broker中将消息路由到与topic绑定的队列中。
- 消费者
@RocketMQMessageListener(
consumeThreadNumber = 16,
topic = TOPIC_DEMO,
consumerGroup = "consumer_demo_group"
)
@Component
public class Consumer implements RocketMQListener<MessageExt> {
private final String CHARSET = Charset.defaultCharset().name();
@Override
public void onMessage(MessageExt message) {
byte[] body = message.getBody();
String str = new String(body, Charset.forName(this.CHARSET));
System.out.println("消费者消费的消息为: " + str);
}
}
在消费者类上加上@RocketMQMessageListener、@Component注解即可实现消费。
- 结果, 实现了消息的生产和消费
生产的消息为:GenericMessage [payload=ProducerDto(reqId=086a5666-922a-4345-b408-956ba6bc18d9, applianceId=2140322229), headers={KEYS=086a5666-922a-4345-b408-956ba6bc18d9, id=12816bba-1
259-469a-d0ff-e42311a27fe7, timestamp=1676713480448}]
消费者消费的消息为: {"reqId":"086a5666-922a-4345-b408-956ba6bc18d9","applianceId":"2140322229"}
6. 结语
以上是关于RocketMQ
的简单介绍,只是皮毛而已,里面还有很多的功能可以挖掘。