1. 简介
Spring Integration:
Spring Integration是一个开源的Java库,用于构建基于消息的应用程序。它提供了一套丰富的组件和工具,使得开发者可以轻松地开发出可靠、灵活和可扩展的集成解决方案。以下是Spring Integration的一些主要用途:
-
企业服务总线(ESB): Spring Integration可以用来构建企业服务总线,它支持各种协议和消息格式,使得不同系统间的数据和事件可以轻松交换。
-
消息传递和解耦: 它支持在不同的应用程序组件之间进行异步消息传递,从而降低系统组件间的耦合度。
-
事件驱动架构: Spring Integration支持事件驱动的架构风格,允许系统对事件做出响应,而不是基于传统的请求-响应模型。
-
数据转换和路由: 提供数据转换和路由的功能,可以将数据从一种格式转换为另一种格式,并根据内容将消息路由到不同的目的地。
-
错误处理: 它提供了一套完整的错误处理机制,包括重试、补偿和消息存储等策略。
-
文件和数据库集成: 可以轻松地与文件系统和数据库进行集成,支持文件传输、数据库操作等场景。
-
外部系统适配: 通过提供各种适配器,Spring Integration可以与外部系统(如JMS、AMQP、HTTP、FTP等)进行集成。
-
批处理和任务调度: 支持批处理操作和任务调度,可以用于处理大量数据或定时任务。
-
模块化和可扩展性: 它的模块化设计使得开发者可以根据需要添加或替换组件,从而构建高度可扩展的系统。
-
多环境支持: 支持多种部署环境,包括本地应用、云环境和微服务架构。
-
开发和配置的简化: 通过提供声明式的配置和简化的编程模型,Spring Integration降低了开发复杂性,并缩短了开发周期。
-
社区和生态系统: 作为Spring家族的一部分,Spring Integration受益于活跃的社区和广泛的生态系统,提供了大量的资源和支持。
Spring Integration + MQTT:
Spring Integration与MQTT的集成是一个非常强大的组合,它允许开发者在Spring应用程序中轻松地实现MQTT协议的消息发布和订阅功能。以下是Spring Integration与MQTT集成的一些主要用途和优势:
-
轻量级消息传递: MQTT是一种轻量级的消息传递协议,特别适合带宽有限和延迟敏感的环境,如物联网(IoT)应用。Spring Integration通过提供MQTT通道适配器,使得在Spring应用程序中集成MQTT变得简单直接 。
-
简化配置: 通过Spring Integration,开发者可以使用声明式配置来定义MQTT的入站(订阅)和出站(发布)消息通道,而不需要深入了解MQTT客户端库的复杂性 。
-
支持MQTT v5: 从Spring Integration 5.5.5版本开始,支持MQTT v5协议,包括对MQTT v5特有的消息属性的支持,如消息过期间隔、响应主题等 。
-
灵活的消息处理: Spring Integration提供了强大的消息处理能力,包括消息转换、路由、聚合、分割等,这些都可以通过声明式配置轻松实现 。
-
错误处理和重连机制: Spring Integration提供了错误处理机制,包括请求处理建议,例如重试或断路器。同时,支持MQTT的自动重连机制,确保了消息传递的可靠性 。
-
与Spring生态系统的集成: 作为Spring家族的一部分,Spring Integration可以很容易地与其他Spring项目(如Spring Boot、Spring Cloud等)集成,提供了与Spring Security、Spring Data等的无缝集成 。
-
提高开发效率: Spring Integration的声明式配置和编程模型简化了消息系统开发,降低了开发复杂性,并缩短了开发周期 。
-
动态主题管理: Spring Integration允许在运行时动态添加和删除MQTT订阅主题,提供了更高的灵活性 。
-
事件驱动架构: 支持事件驱动的架构风格,允许系统对事件做出响应,而不是基于传统的请求-响应模型 。
2. 基本时序架构
1. 监听到订阅topic有消息流程
2. 生产者推送一条消息后,中间经过一系列流程后被消费者消费的完整流程
3. 接收消息
通常涉及以下几个步骤:
1. 配置MQTT连接: 首先,需要配置与MQTT代理(如EMQX)的连接。这通常涉及到配置一个MqttPahoClientFactory
Bean,它负责创建和管理MQTT客户端连接。
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(mqttConnectOptions());
return factory;
}
2. 创建入站通道适配器: 使用MqttPahoMessageDrivenChannelAdapter
创建一个入站通道适配器。这个适配器负责从MQTT代理订阅主题,并在接收到消息时将消息发送到Spring Integration的通道。
@Slf4j
@Configuration
@IntegrationComponentScan
public class MqttInboundConfiguration {
@Autowired
private MqttPahoClientFactory mqttClientFactory;
@Resource(name = ChannelName.INBOUND)
private MessageChannel inboundChannel;
/**
* Clients of inbound message channels.
* @return
*/
@Bean(name = "adapter")
public MessageProducerSupport mqttInbound() {
MqttClientOptions options = MqttConfiguration.getBasicClientOptions();
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
options.getClientId() + "_consumer_" + System.currentTimeMillis(),
mqttClientFactory, options.getInboundTopic().split(","));
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
// use byte types uniformly
converter.setPayloadAsBytes(true);
adapter.setConverter(converter);
adapter.setQos(1);
adapter.setOutputChannel(inboundChannel);
// 添加钩子函数,确保在程序关闭时正确断开连接
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
if (adapter != null) {
adapter.stop();
log.warn("[consumer] MQTT client stopped successfully.");
}
} catch (Exception e) {
log.error("[consumer] MQTT client stopped with error: {}",e.getMessage(),e);
}
}));
return adapter;
}
3. 配置消息通道: 配置一个消息通道(如DirectChannel
),用于传输从MQTT代理接收到的消息。
@Bean(name = ChannelName.INBOUND)
public MessageChannel inboundChannel() {
return new ExecutorChannel(threadPool);
}
4. 设置消息监听器: 使用@ServiceActivator
注解定义一个服务激活器,它将作为消息监听器处理接收到的消息。这个消息监听器可以是一个方法,这个方法将对通道中的消息进行处理。
5. 处理消息: 实现业务逻辑来处理消息。这通常涉及到从消息中提取数据,并执行所需的操作,例如更新数据库、调用服务或触发事件。
@Bean
@ServiceActivator(inputChannel = ChannelName.INBOUND)
public MessageHandler defaultInboundHandler() {
return message -> {
// 处理消息
// log.info("The default channel does not handle messages." +
// "\nTopic: " + message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC) +
// "\nPayload: " + message.getPayload());
};
}
4. 发布信息
发送MQTT消息通常是通过配置出站通道适配器(MqttOutboundChannelAdapter
)来实现的。这个适配器负责将从Spring Integration通道中传来的消息发布到指定的MQTT主题上。
发送MQTT消息的步骤:
1. 配置MQTT客户端工厂(MqttPahoClientFactory
): 这个工厂负责创建和管理MQTT客户端连接。
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(mqttConnectOptions());
return factory;
}
2. 配置MQTT出站通道适配器(MqttOutboundChannelAdapter
): 这个适配器将消息通道中的消息发布到MQTT代理上。
@Configuration
public class MqttOutboundConfiguration {
@Autowired
private MqttPahoClientFactory mqttClientFactory;
@Bean
@ServiceActivator(inputChannel = ChannelName.OUTBOUND)
public MqttOutboundChannelAdapter mqttOutboundAdapter() {
MqttOutboundChannelAdapter adapter = new MqttOutboundChannelAdapter(
"client_id",
mqttClientFactory,
"outputTopic");
adapter.setQos(1); // 设置服务质量
adapter.setAsync(true); // 异步发送消息
return adapter;
}
}
可以通过setDefaultTopic
方法设置默认主题,这样在发送消息时如果没有指定主题,就会使用这个默认主题。
3. 发送消息到消息通道: 通过编程方式或通过其他Spring Integration组件,将消息发送到与MqttOutboundChannelAdapter
绑定的消息通道。
@Autowired
private MessageChannel mqttOutboundChannel;
public void sendMqttMessage(String payload) {
mqttOutboundChannel.send(MessageBuilder.withPayload(payload).build());
}
注:
1. 要确定消息发送到哪一个主题,可以在发送消息时通过消息头MqttHeaders.TOPIC
指定。如果没有指定,就会使用在MqttPahoMessageHandler
中配置的默认主题。
@Autowired
private MessageChannel mqttOutboundChannel;
public void sendMqttMessage(String topic, String payload) {
mqttOutboundChannel.send(MessageBuilder.withPayload(payload)
.setHeader(MqttHeaders.TOPIC, topic)
.build());
}
2. 通过使用IMqttMessageGateway接口去发送消息到OUTBOUND通道,再由MqttPahoMessageHandler
去处理消息
@Component
@MessagingGateway(defaultRequestChannel = ChannelName.OUTBOUND)
public interface IMqttMessageGateway {
/**
* Publish a message to a specific topic.
* @param topic target
* @param payload message
*/
void publish(@Header(MqttHeaders.TOPIC) String topic, byte[] payload);
/**
* Use a specific qos to push messages to a specific topic.
* @param topic target
* @param payload message
* @param qos qos
*/
void publish(@Header(MqttHeaders.TOPIC) String topic, byte[] payload, @Header(MqttHeaders.QOS) int qos);
}