Spring Integration + MQTT

news2024/11/23 18:46:21

1. 简介

Spring Integration:

Spring Integration是一个开源的Java库,用于构建基于消息的应用程序。它提供了一套丰富的组件和工具,使得开发者可以轻松地开发出可靠、灵活和可扩展的集成解决方案。以下是Spring Integration的一些主要用途:

  1. 企业服务总线(ESB): Spring Integration可以用来构建企业服务总线,它支持各种协议和消息格式,使得不同系统间的数据和事件可以轻松交换。

  2. 消息传递和解耦: 它支持在不同的应用程序组件之间进行异步消息传递,从而降低系统组件间的耦合度。

  3. 事件驱动架构: Spring Integration支持事件驱动的架构风格,允许系统对事件做出响应,而不是基于传统的请求-响应模型。

  4. 数据转换和路由: 提供数据转换和路由的功能,可以将数据从一种格式转换为另一种格式,并根据内容将消息路由到不同的目的地。

  5. 错误处理: 它提供了一套完整的错误处理机制,包括重试、补偿和消息存储等策略。

  6. 文件和数据库集成: 可以轻松地与文件系统和数据库进行集成,支持文件传输、数据库操作等场景。

  7. 外部系统适配: 通过提供各种适配器,Spring Integration可以与外部系统(如JMS、AMQP、HTTP、FTP等)进行集成。

  8. 批处理和任务调度: 支持批处理操作和任务调度,可以用于处理大量数据或定时任务。

  9. 模块化和可扩展性: 它的模块化设计使得开发者可以根据需要添加或替换组件,从而构建高度可扩展的系统。

  10. 多环境支持: 支持多种部署环境,包括本地应用、云环境和微服务架构。

  11. 开发和配置的简化: 通过提供声明式的配置和简化的编程模型,Spring Integration降低了开发复杂性,并缩短了开发周期。

  12. 社区和生态系统: 作为Spring家族的一部分,Spring Integration受益于活跃的社区和广泛的生态系统,提供了大量的资源和支持。

Spring Integration + MQTT:

Spring Integration与MQTT的集成是一个非常强大的组合,它允许开发者在Spring应用程序中轻松地实现MQTT协议的消息发布和订阅功能。以下是Spring Integration与MQTT集成的一些主要用途和优势:

  1. 轻量级消息传递: MQTT是一种轻量级的消息传递协议,特别适合带宽有限和延迟敏感的环境,如物联网(IoT)应用。Spring Integration通过提供MQTT通道适配器,使得在Spring应用程序中集成MQTT变得简单直接 。

  2. 简化配置: 通过Spring Integration,开发者可以使用声明式配置来定义MQTT的入站(订阅)和出站(发布)消息通道,而不需要深入了解MQTT客户端库的复杂性 。

  3. 支持MQTT v5: 从Spring Integration 5.5.5版本开始,支持MQTT v5协议,包括对MQTT v5特有的消息属性的支持,如消息过期间隔、响应主题等 。

  4. 灵活的消息处理: Spring Integration提供了强大的消息处理能力,包括消息转换、路由、聚合、分割等,这些都可以通过声明式配置轻松实现 。

  5. 错误处理和重连机制: Spring Integration提供了错误处理机制,包括请求处理建议,例如重试或断路器。同时,支持MQTT的自动重连机制,确保了消息传递的可靠性 。

  6. 与Spring生态系统的集成: 作为Spring家族的一部分,Spring Integration可以很容易地与其他Spring项目(如Spring Boot、Spring Cloud等)集成,提供了与Spring Security、Spring Data等的无缝集成 。

  7. 提高开发效率: Spring Integration的声明式配置和编程模型简化了消息系统开发,降低了开发复杂性,并缩短了开发周期 。

  8. 动态主题管理: Spring Integration允许在运行时动态添加和删除MQTT订阅主题,提供了更高的灵活性 。

  9. 事件驱动架构: 支持事件驱动的架构风格,允许系统对事件做出响应,而不是基于传统的请求-响应模型 。

2. 基本时序架构

        1. 监听到订阅topic有消息流程

        2. 生产者推送一条消息后,中间经过一系列流程后被消费者消费的完整流程

3. 接收消息

通常涉及以下几个步骤:

1. 配置MQTT连接: 首先,需要配置与MQTT代理(如EMQX)的连接。这通常涉及到配置一个MqttPahoClientFactory Bean,它负责创建和管理MQTT客户端连接。

@Bean
public MqttPahoClientFactory mqttClientFactory() {
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    factory.setConnectionOptions(mqttConnectOptions());
    return factory;
}

2. 创建入站通道适配器: 使用MqttPahoMessageDrivenChannelAdapter创建一个入站通道适配器。这个适配器负责从MQTT代理订阅主题,并在接收到消息时将消息发送到Spring Integration的通道。

@Slf4j
@Configuration
@IntegrationComponentScan
public class MqttInboundConfiguration {

    @Autowired
    private MqttPahoClientFactory mqttClientFactory;

    @Resource(name = ChannelName.INBOUND)
    private MessageChannel inboundChannel;

    /**
     * Clients of inbound message channels.
     * @return
     */
    @Bean(name = "adapter")
    public MessageProducerSupport mqttInbound() {
        MqttClientOptions options = MqttConfiguration.getBasicClientOptions();
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
                options.getClientId() + "_consumer_" + System.currentTimeMillis(),
                mqttClientFactory, options.getInboundTopic().split(","));
        DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
        // use byte types uniformly
        converter.setPayloadAsBytes(true);
        adapter.setConverter(converter);
        adapter.setQos(1);
        adapter.setOutputChannel(inboundChannel);

        // 添加钩子函数,确保在程序关闭时正确断开连接
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            try {
                if (adapter != null) {
                    adapter.stop();
                    log.warn("[consumer] MQTT client stopped successfully.");
                }
            } catch (Exception e) {
                log.error("[consumer] MQTT client stopped with error: {}",e.getMessage(),e);
            }
        }));
        return adapter;
    }

3. 配置消息通道: 配置一个消息通道(如DirectChannel),用于传输从MQTT代理接收到的消息。

@Bean(name = ChannelName.INBOUND)
public MessageChannel inboundChannel() {
    return new ExecutorChannel(threadPool);
}

4. 设置消息监听器: 使用@ServiceActivator注解定义一个服务激活器,它将作为消息监听器处理接收到的消息。这个消息监听器可以是一个方法,这个方法将对通道中的消息进行处理。

5. 处理消息: 实现业务逻辑来处理消息。这通常涉及到从消息中提取数据,并执行所需的操作,例如更新数据库、调用服务或触发事件。

@Bean
@ServiceActivator(inputChannel = ChannelName.INBOUND)
public MessageHandler defaultInboundHandler() {
    return message -> {
    // 处理消息
        // log.info("The default channel does not handle messages." +
        //         "\nTopic: " + message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC) +
        //         "\nPayload: " + message.getPayload());
    };
}

4. 发布信息

        发送MQTT消息通常是通过配置出站通道适配器(MqttOutboundChannelAdapter)来实现的。这个适配器负责将从Spring Integration通道中传来的消息发布到指定的MQTT主题上。

发送MQTT消息的步骤:

1. 配置MQTT客户端工厂(MqttPahoClientFactory: 这个工厂负责创建和管理MQTT客户端连接。

@Bean
public MqttPahoClientFactory mqttClientFactory() {
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    factory.setConnectionOptions(mqttConnectOptions());
    return factory;
}

2. 配置MQTT出站通道适配器(MqttOutboundChannelAdapter: 这个适配器将消息通道中的消息发布到MQTT代理上。

@Configuration
public class MqttOutboundConfiguration {

    @Autowired
    private MqttPahoClientFactory mqttClientFactory;

    @Bean
    @ServiceActivator(inputChannel = ChannelName.OUTBOUND)
    public MqttOutboundChannelAdapter mqttOutboundAdapter() {
        MqttOutboundChannelAdapter adapter = new MqttOutboundChannelAdapter(
                "client_id", 
                mqttClientFactory, 
                "outputTopic");
        adapter.setQos(1); // 设置服务质量
        adapter.setAsync(true); // 异步发送消息
        return adapter;
    }
}

可以通过setDefaultTopic方法设置默认主题,这样在发送消息时如果没有指定主题,就会使用这个默认主题。

3. 发送消息到消息通道: 通过编程方式或通过其他Spring Integration组件,将消息发送到与MqttOutboundChannelAdapter绑定的消息通道。

@Autowired
private MessageChannel mqttOutboundChannel;

public void sendMqttMessage(String payload) {
    mqttOutboundChannel.send(MessageBuilder.withPayload(payload).build());
}

注:

1. 要确定消息发送到哪一个主题,可以在发送消息时通过消息头MqttHeaders.TOPIC指定。如果没有指定,就会使用在MqttPahoMessageHandler中配置的默认主题。

@Autowired
private MessageChannel mqttOutboundChannel;

public void sendMqttMessage(String topic, String payload) {
    mqttOutboundChannel.send(MessageBuilder.withPayload(payload)
                                         .setHeader(MqttHeaders.TOPIC, topic)
                                         .build());
}

2. 通过使用IMqttMessageGateway接口去发送消息到OUTBOUND通道,再由MqttPahoMessageHandler去处理消息

@Component
@MessagingGateway(defaultRequestChannel = ChannelName.OUTBOUND)
public interface IMqttMessageGateway {

    /**
     * Publish a message to a specific topic.
     * @param topic target
     * @param payload   message
     */
    void publish(@Header(MqttHeaders.TOPIC) String topic, byte[] payload);

    /**
     * Use a specific qos to push messages to a specific topic.
     * @param topic     target
     * @param payload   message
     * @param qos   qos
     */
    void publish(@Header(MqttHeaders.TOPIC) String topic, byte[] payload, @Header(MqttHeaders.QOS) int qos);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2205841.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

剪辑视频怎么学?四大工具助你轻松入门!

无论是制作短视频、记录生活点滴,还是从事专业影视制作,掌握视频剪辑技巧都至关重要。那么,剪辑视频怎么学呢?本文将为大家推荐四款实用的视频剪辑工具,助你轻松入门! 福昕视频剪辑:简单易用&a…

基于SpringBoot+Vue的医院预约挂号管理系统

作者:计算机学姐 开发技术:SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等,“文末源码”。 专栏推荐:前后端分离项目源码、SpringBoot项目源码、Vue项目源码、SSM项目源码、微信小程序源码 精品专栏:…

使用Postman搞定各种接口token实战!

现在许多项目都使用jwt来实现用户登录和数据权限,校验过用户的用户名和密码后,会向用户响应一段经过加密的token,在这段token中可能储存了数据权限等,在后期的访问中,需要携带这段token,后台解析这段token才…

1.1 前端技术的发展

大家好!今天,我将与大家分享一个非常有趣的话题——前端技术的发展。在这个数字化时代,前端技术已经成为我们日常生活中不可或缺的一部分。我们的学习目标是了解前端技术的发展,并探讨使用框架开发项目的优势。 首先,…

基于大模型LLama2+Langchain构建知识库问答系统

1 背景 知识库需求在各行各业中普遍存在,例如制造业中历史故障知识库、游戏社区平台的内容知识库、电商的商品推荐知识库和医疗健康领域的挂号推荐知识库系统等。传统知识库搜索系统基于关键字匹配,缺少对用户问题理解和答案二次处理能力。为保证推荐系…

Go-知识泛型

Go-知识泛型 1. 认识泛型1.1 不使用泛型1.2 使用泛型 2. 泛型的特点2.1 函数泛化2.2 类型泛化 3. 类型约束3.1 类型集合3.2 interface 类型集合3.2.1 内置interface类型集合3.2.2 自定义interface类型集合3.2.2.1 任意类型元素3.2.2.2 近似类型元素3.2.2.3 联合类型元素 3.2.3 …

腾讯云直播录制相关

直播录制的原理是什么? 对于一条直播流,一旦开启录制,音视频数据就会被旁路到录制系统。主播的手机推上来的每一帧数据,都会被录制系统追加写入到录制文件中。 一旦直播流中断,接入层会立刻通知录制服务器将正在写入的…

for深入学习作业

作业&#xff1a; 写一个程序判断1-100中9的个数 代码: #include<stdio.h> int main() {int i 9,sum0;for (i 9; i < 100; i) {if ((i % 10 9) || (i / 10 9)) {sum;}}printf("%d", sum);return 0; } //9 19 29 39 49 59 69 79 89 99 //91 92 93 94 …

LVGL设计汽车仪表盘(开源!!)

驾驶界面图 有图无真相&#xff0c;下面视频展示&#xff1a; 汽车仪表盘展示 资源已绑定&#xff0c;自行下载哦 关注我&#xff0c;后面出LVGL移植教程&#xff01;

在线拍卖|基于springBoot的在线拍卖系统设计与实现(附项目源码+论文+数据库)

私信或留言即免费送开题报告和任务书&#xff08;可指定任意题目&#xff09; 摘要 在线拍卖系统&#xff0c;主要的模块包括管理员&#xff1b;首页、个人中心、用户管理、商品类型管理、拍卖商品管理、历史竞拍管理、竞拍订单管理、留言板管理、系统管理&#xff0c;用户&am…

【动手学电机驱动】 TI InstaSPIN-FOC(1)电机驱动和控制测试平台

【动手学电机驱动】 TI InstaSPIN-FOC&#xff08;1&#xff09;电机驱动和控制测试平台 1. 本系列的资源需求1.1 电机驱动控制概况1.2 InstaSPIN-FOC 电机控制方案1.3 资源需求 2. 软件安装2.1 安装 CCS2.2 安装 MotorWare2.3 安装 ControlSUITE&#xff08;可选&#xff09; …

中科星图GVE(案例)——AI实现地块提取

目录 简介 函数 gve.Services.AI.plotExtraction(image) 代码 结果 知识星球 机器学习 简介 AI可以通过图像处理和机器学习算法实现地块提取。首先&#xff0c;AI可以对高分辨率遥感图像进行预处理&#xff0c;包括图像校正和去噪等处理。然后&#xff0c;AI可以使用图…

如何挑选Axure元件库? Axure原型赏析

在挑选Axure元件库时&#xff0c;可以从以下几个方面进行考虑和赏析&#xff0c;以确保选择到最适合项目需求的元件库&#xff1a; 一、元件库的全面性和丰富度 组件全面&#xff1a;优秀的Axure元件库应包含丰富的元件类型&#xff0c;以高效应对各种复杂业务流程的原型设计…

探索OAuth 2.0授权模式:全面解析与场景应用选择

文章目录 1. 什么是OAuth 2.0授权模式&#xff1f;2. 授权模式详解2.1 客户端凭证模式&#xff08;Client Credentials Grant&#xff09;2.2 授权码模式&#xff08;Authorization Code Grant&#xff09;2.3 简化模式/隐藏式&#xff08;Implicit Grant&#xff09;2.4 密码模…

解决:Ubuntu连接不上网络

今天莫名奇妙&#xff0c;我的ubuntu20.04断网了。检查了一下&#xff0c;使用的也是桥接模式&#xff0c;啥也没有变化。 然后我上Ubuntu16.04版本看了&#xff0c;那里又可以成功上网&#xff0c;所以&#xff0c;不是电脑的问题。 看了网上两个教程&#xff0c;解决了。 …

深入理解 Maven Profiles

前言 在现代软件开发中&#xff0c;项目通常需要部署到多种环境中&#xff0c;比如开发&#xff08;development&#xff09;、测试&#xff08;test&#xff09;和生产&#xff08;production&#xff09;。每种环境可能具有不同的配置需求。为了满足这种多样性&#xff0c;A…

还在找地图切片工具?这五款免费软件值得一试

地图切片&#xff08;Map Tiling&#xff09;是指将大型地图或影像数据按照一定的规则切割成多个较小的图块&#xff08;称为瓦片&#xff09;&#xff0c;并根据缩放级别和用户请求逐步加载这些瓦片&#xff0c;从而提升地图在网络或应用中的显示速度和效率。地图切片技术广泛…

ABAP SE37创建FUNCTION报错:函数的主程序不是以function-pool开头

问题&#xff1a;SE37在新建函数时&#xff0c;检查语法没有问题&#xff0c;但激活报错&#xff1a;函数"***"的主程序不是以function-pool开头; 原因&#xff1a;新建函数的函数组没有激活&#xff0c;可以通过se80或在SE37跳转进行激活 按一下路径 右键激活即可

变倍镜头参数详解

变倍镜头是一种重要的光学镜头&#xff0c;其参数对于了解镜头的性能和适用场景至关重要。以下是对变倍镜头参数的详细解释&#xff1a; 变焦倍数&#xff1a; 定义&#xff1a;变焦倍数是变倍镜头的一个关键参数&#xff0c;表示镜头最长焦距与最短焦距的比值。作用&#xff1…

Linux_kernel内核定时器14

一、内核定时器 1、内核定时器 使用方法&#xff1a; 2、系统时钟中断处理函数 1&#xff09;更新时间 2&#xff09;检查当前时间片是否耗尽 Linux操作系统是基于时间片轮询的&#xff0c;属于抢占式的内核 3&#xff09;jiffies 3、基本概念 1&#xff09;HZ HZ决定了1秒钟产…