基于Vertx实现可配置及可扩展的IOT服务

news2025/1/20 2:48:33

搭建框架的目标

        相信写过IOT服务的伙伴应该知道,面对各种千奇百怪的通信协议,特别是16进制报文的协议,有些协议看的确实有点让人头疼。但这些协议中也有很多共性,不必针对每过协议都把一些业务无关的代码再撸一遍。

        搭建这个项目主要是针对常见的TCP连接为基础的设备通信协议做一些抽象及规范化处理,减低一些开发的成本,目标是实现一个可配置的,便于扩展各种协议的框架。

Vertx简介

        Vert.x是Eclipse基金会下面的一个开源项目,Vert.x的基本定位是一个事件驱动的编程框架,通过Vert.x使用者可以用相对低的成本就享受到NIO带来的高性能。netty是Vert.x底层使用的通讯组件,Vert.x为了最大限度的降低使用门槛,刻意屏蔽掉了许多底层netty相关的细节,比如ByteBuf、引用计数等等。

        本文主要见解的是搭建一个可配置和可扩展的IOT服务,并不会详细展开讲解Vertx,Vertx相关内容可上官网查看《vertx官网》​​​​​​

IOT通信中的常见概念

1.logicAddress

        逻辑通信地址。在常见的设备协议中,都会有逻辑通信地址这个概念,用于标识当前的连接是具体的某个设备。有了这个逻辑地址之后就可以很方便的找到这个连接。

        在业务系统中建立档案的时候用这个地址,后续也可以通过这个通信地址将指定的命令下发给指定的设备。

2.messageType

        消息类型。在TCP通信中,设备上报的不止一类消息,但在常见的设备通信协议中都会针对不同的消息做不同的标识,借此来区分每条上传消息的含义。所以在设备上报的报文中我们根据通信协议的定义找到报文的标识位,然后再做对应的处理。

3.session

        会话。session主要用于管理连接。设备和服务端建立连接之后会产生一个socket,但很多时候这个socket缺少一些语义和描述,所以我们会对这个socket做一些包装,比如抽象一些方法,绑定设备的逻辑地址以便后续查找和调用。

代码架构流程

        整体的核心流程如下:

核心代码分析

yaml文件配置

        配置多个协议的协议名称和协议通信端口号,这里用多个端口区分不同的协议,避免协议内容相近的时候出现解析错误的情况。

protocols:
  - name: ZHONGXING #中兴
    port: 8898
  - name: HUAWEI #华为
    port: 9666

 ProtocolServerBootstrap

        这里主要是加载yaml配置,然后启动相应的TcpServer服务监听端口,并根据配置定义找打对应协议的编解码器,将消息转发到对应的编解码器中。

        这里用到了两个自定义注解:

        @CodecScan:标识编解码器要扫描哪些包。

        @Protocol:注解来标识编解码器对应的通信协议。

/**
 * @author yan
 * @date 2023/9/12
 */
@Slf4j
public class ProtocolServerBootstrap extends AbstractVerticle {

    private Class<?> starter;
    private static Map<String, ProtocolConfig> protocols = new ConcurrentHashMap<>();
    private static Map<String, AbstractProtocolCodec> codecMap = new ConcurrentHashMap<>();

    public ProtocolServerBootstrap(Class<?> starter) {
        this.starter = starter;
    }

    @Override
    public void init(Vertx vertx, Context context) {
        super.init(vertx, context);
        loadProfile();
        loadProtocolCodec();
    }

    public void loadProfile() {
        InputStream inputStream = null;
        try {
            inputStream = this.getClass().getClassLoader().getResourceAsStream("protocol.yml");
            Yaml yaml = new Yaml();
            Map<String, List<Object>> map = yaml.load(inputStream);
            List<Object> protocolConfigs = map.get("protocols");
            String host = NetUtil.getLocalhost().getHostAddress();
            protocolConfigs.stream()
                    .map(item -> JSONUtil.toBean(JSONUtil.toJsonStr(item), ProtocolConfig.class))
                    .forEach(config -> {
                        protocols.put(config.getName(), new ProtocolConfig().setName(config.getName()).setHost(host).setPort(config.getPort()));
                    });
        } catch (Exception e) {
            e.printStackTrace();
            log.error("配置文件解析失败:" + e);
        } finally {
            try {
                inputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void loadProtocolCodec() {
        try {
            CodecScan codecScan = starter.getAnnotation(CodecScan.class);
            if(codecScan == null){
                this.protocols.clear();
                return;
            }
            String[] packages = codecScan.value();
            for (String p : packages) {
                Reflections reflection = new Reflections(p);
                Set<Class<?>> classes = reflection.getTypesAnnotatedWith(Protocol.class);
                for (Class<?> aClass : classes) {
                    Protocol annotation = aClass.getAnnotation(Protocol.class);
                    codecMap.put(annotation.value(), (AbstractProtocolCodec) aClass.newInstance());
                    log.info("加载编解码器:" + aClass.getName());
                }
            }
        } catch (Exception e) {
            log.error("加载编解码器失败:" + e);
        }
    }

    @Override
    public void start() {
        protocols.forEach((name, protocol) -> {
            AbstractProtocolCodec codec = codecMap.get(name);
            vertx.deployVerticle(codec);
            SocketAddress address = new SocketAddressImpl(protocol.getPort(), protocol.getHost());
            NetServer server = vertx.createNetServer();
            server.connectHandler(codec)
                    .listen(address)
                    .onComplete(res -> {
                        if (res.succeeded()) {
                            log.info("{}服务启动成功,绑定/{}", protocol.getName(), address);
                        } else {
                            if (res.cause() != null) {
                                log.error("服务启动失败,cause:" + res.cause());
                            }
                        }
                    });
        });
    }

    public AbstractProtocolCodec getProtocolCodec(String protocolName) {
        return codecMap.get(protocolName);
    }
}

AbstractProtocolCodec

         抽象编解码器类,主要包含监听服务端收到的消息,会话管理,管理处理器等。

/**
 * @author yan
 * @date 2023/9/12
 */
@Slf4j
public abstract class AbstractProtocolCodec extends AbstractVerticle implements Handler<NetSocket> {

    private Map<String, BaseSession> logicAddressSessionMap = new ConcurrentHashMap<>();
    private Map<NetSocket, BaseSession> socketSessionMap = new ConcurrentHashMap<>();

    private Map<String, AbstractProtocolHandler> handlerMap = new ConcurrentHashMap<>();

    @Override
    public void init(Vertx vertx, Context context) {
        super.init(vertx, context);
        vertx.eventBus().registerDefaultCodec(BaseMessage.class, new GenericMessageCodec<BaseMessage>() {
        });
        vertx.eventBus().registerDefaultCodec(BaseSession.class, new GenericMessageCodec<BaseSession>() {
        });
        registerHandlers();
    }

    @Override
    public void handle(NetSocket socket) {
        log.info("收到新的连接:" + socket);
        activeSocket(socket);
        socket.closeHandler(handler -> {
            log.info("连接已断开:" + socket);
            afterCloseSocket(socket);
            removeSession(socket);
        });
        socket.handler(data -> {
            try {
                BaseMessage message = new BaseMessage().setSocket(socket).setBuffer(data);
                if(!socketSessionMap.containsKey(socket)){
                    String logicAddress = getLogicAddress(message);
                    registerSession(logicAddress, socket);
                }
                decode(message);
            } catch (Exception e) {
                e.printStackTrace();
                log.error("解码处理失败,throw:" + e);
            }
        });
    }

    private BaseSession registerSession(String logicAddress, NetSocket socket) {
        BaseSession session = new BaseSession().setLogicAddress(logicAddress).setSocket(socket);
        logicAddressSessionMap.put(logicAddress, session);
        socketSessionMap.put(socket, session);
        return session;
    }

    private void removeSession(NetSocket socket) {
        BaseSession session = socketSessionMap.remove(socket);
        if(session != null){
            logicAddressSessionMap.remove(session.getLogicAddress());
        }
    }

    public BaseSession getSessionByLogicAddress(String logicAddress) {
        return logicAddressSessionMap.get(logicAddress);
    }

    protected abstract List<AbstractProtocolHandler> getHandlers();

    private void registerHandlers() {
        List<AbstractProtocolHandler> handlers = getHandlers();
        handlers.forEach(handler -> {
            handlerMap.put(handler.getMessageType(), handler);
            vertx.deployVerticle(handler);
        });
    }

    public AbstractProtocolHandler getHandlerByMessageType(String messageType) {
        return handlerMap.get(messageType);
    }

    protected abstract void decode(BaseMessage message);


    protected abstract String getLogicAddress(BaseMessage message);


    protected void activeSocket(NetSocket socket) {

    }

    protected void afterCloseSocket(NetSocket socket) {

    }
}

AbstractProtocolHandler

        处理器抽象类

/**
 * @author yan
 * @date 2023/9/14
 */
public abstract class AbstractProtocolHandler<T, R> extends AbstractVerticle implements Handler<Message<T>>, InvokeHandler<R> {

    @Override
    public void start() throws Exception {
        vertx.eventBus().consumer(getTopic(), this::handle);
    }

    protected abstract String getTopic();

    protected abstract String getMessageType();

    @Override
    public void write(BaseSession session, Buffer buffer) {
        session.getSocket().write(buffer);
    }
}

InvokeHandler

/**
 * @author yan
 * @date 2023/9/14
 */
public interface InvokeHandler<T> {

    /**
     * 根据传入参数获取buffer
     * @param req
     * @return
     */
    Buffer getBuffer(T req);

    /**
     * 下发消息
     * @param session
     * @param buffer
     */
    void write(BaseSession session, Buffer buffer);
}

InvokeAdapter

/**
 * 服务调用适配器
 *
 * @author yan
 * @date 2023/9/14
 */
public class InvokeAdapter {

    private ProtocolServerBootstrap bootstrap;

    public InvokeAdapter(ProtocolServerBootstrap bootstrap){
        this.bootstrap = bootstrap;
    }

    public void send(String protocolName, String logicAddress, String messageType, Object param) {
        AbstractProtocolCodec codec = bootstrap.getProtocolCodec(protocolName);
        BaseSession session = codec.getSessionByLogicAddress(logicAddress);
        if (session == null || session.getSocket() == null) {
            throw new RuntimeException("session is not exist or closed");
        }
        AbstractProtocolHandler handler = codec.getHandlerByMessageType(messageType);
        Buffer buffer = handler.getBuffer(param);
        handler.write(session, buffer);
    }

}

华为协议实现

HuaweiCodec

        华为协议编解码器

/**
 * @author yan
 * @date 2023/9/12
 */
@Slf4j
@Protocol("HUAWEI")
public class HuaweiCodec extends AbstractProtocolCodec {

    @Override
    protected List<AbstractProtocolHandler> getHandlers() {
        return Arrays.asList(new HuaweiParamReadHandler(), new HuaweiParamWriteHandler(), new HuaweiParamWriteBatchHandler());
    }

    @Override
    protected void decode(BaseMessage message) {
        String dataStr = ByteUtils.hexToHexString(message.getBuffer().getBytes());
        String messageType = dataStr.substring(14, 16);
        vertx.eventBus().publish(HuaweiMessageTypeConstants.getMessageTopic(messageType), message);
    }

    @Override
    protected String getLogicAddress(BaseMessage message) {
        // 这里根据消息解析出对应的通信地址
        return "001";
    }
}

Handler处理器

/**
 * @author yan
 * @date 2023/9/13
 */
@Slf4j
public class HuaweiParamReadHandler extends AbstractProtocolHandler<BaseMessage, Object> {

    @Override
    protected String getTopic() {
        return HuaweiMessageTypeConstants.READ;
    }

    @Override
    public void handle(Message<BaseMessage> message) {
        BaseMessage baseMessage = message.body();
        log.info("收到读参数命令返回:" + ByteUtils.hexToHexString(baseMessage.getBuffer().getBytes()));
        baseMessage.getSocket().write(baseMessage.getBuffer());
    }

    @Override
    public String getMessageType() {
        return HuaweiMessageTypeConstants.READ;
    }

    @Override
    public Buffer getBuffer(Object req) {
        log.info("发送read消息:" + req);
        return Buffer.buffer(new byte[]{0x11, 0x11, 0x11});
    }
}

测试调用

编写主类

/**
 * @author yan
 * @date 2023/9/11
 */
@CodecScan("com.cdw.pv.iot.modules")
public class PvApplication {

    public static void main(String[] args) {
        ProtocolServerBootstrap bootstrap = new ProtocolServerBootstrap(PvApplication.class);
        Vertx vertx = Vertx.vertx();
        vertx.deployVerticle(bootstrap);
        // 开启一个http服务,模拟外部调用
        startHttpServer(vertx, bootstrap);
    }

    private static void startHttpServer(Vertx vertx, ProtocolServerBootstrap bootstrap) {
        InvokeAdapter adapter = new InvokeAdapter(bootstrap);
        HttpServer httpServer = vertx.createHttpServer();
        httpServer.requestHandler(handler -> {
            System.out.println("request请求:" + handler);
            adapter.send("HUAWEI", "001", HuaweiMessageTypeConstants.READ, "123");
            handler.response().end(Buffer.buffer("success"));
        }).listen(8899).onComplete(handler -> {
            if (handler.succeeded()) {
                System.out.println("http服务器启动成功");
            }
        });
    }
}

发生指令

      使用TCP连接根据发送指令  

服务调用

        模拟发送请求

        收到消息

总结

        以上就是通用IOT服务的整体架构了。

        这个只是一个基本版本的,后续可以根据实际情况做调整。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1114229.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Javascript】‘var‘ is used instead of ‘let‘ or ‘const‘

解决&#xff1a; 设置完之后,var 就不会再出现黄色波浪线警告

普通二维码跳转微信小程序实战

简介 服务端springboot项目,前端基于uniapp的微信小程序,要求扫描二维码之后进入到小程序指定页面,下面记录一下实现过程以及过程中遇到的问题. 实现过程 下面是成功跳转的配置截图: 首先说下二维码规则,这个地方需要填写扫描二维码之后打开的地址,这个地址在我的项目里…

Keil实现Flash升级跳转(STM32/GD32/HC32)

编写BOOT程序&#xff0c;和APP程序。 BOOT程序检查OTA参数&#xff0c;执行OTA升级&#xff0c;然后跳转到APP代码。 记录一下跳转APP需要修改得东西&#xff1a; 1、BOOT程序 修改跳转地址 先检查APP地址是否有效 然后关闭外设 反初始化 设置MSP指针&#xff0c;进行跳转 …

多商户自营连锁小程序商城的作用是什么

近几年&#xff0c;线上线下经营压力很大&#xff0c;不少商家都希望通过数字化转型实现破局或增长&#xff0c;单店管理力度相对容易些&#xff0c;但如果是连锁门店&#xff0c;近几年则相对风险大些&#xff0c;但从2020年至今依然有不少品牌选择扩店&#xff0c;增加连锁规…

vue 组件封装 综合案例

vue 组件封装 综合案例 **创建 工程&#xff1a; H:\java_work\java_springboot\vue_study ctrl按住不放 右键 悬着 powershell H:\java_work\java_springboot\js_study\Vue2_3入门到实战-配套资料\01-随堂代码素材\day05\准备代码\11-综合案例-商品列表 vue --version vue c…

深入探索Sharding JDBC:分库分表的利器

随着互联网应用的不断发展和用户量的不断增加&#xff0c;传统的数据库在应对高并发和大数据量的场景下面临着巨大的挑战。为了解决这一问题&#xff0c;分库分表成为了一个非常流行的方案。分库分表主流的技术包括MyCat和Sharding JDBC。我们来通过一张图来了解这两者有什么区…

38 WEB漏洞-反序列化之PHPJAVA全解(下)

目录 Java中的API实现序列化和反序列化演示案例WebGoat_Javaweb靶场反序列化测试2020-网鼎杯-朱雀组-Web-think java真题复现 文章参考&#xff1a; https://www.cnblogs.com/zhengna/p/15737517.html https://blog.csdn.net/MCTSOG/article/details/123819548 ysoserial生成攻…

CC攻击和其防御策略

CC攻击简介 CC攻击&#xff0c;又称为Challenge Collapsar攻击&#xff0c;是一种常见的DDoS&#xff08;分布式拒绝服务&#xff09;攻击方式&#xff0c;旨在使Web服务在第七层协议层面遭受攻击。攻击者并不需要大量的肉鸡来实施CC攻击&#xff0c;相反&#xff0c;他们使用…

Flyway Desktop updated

Flyway Desktop updated 为比较工件序列化和反序列化添加了额外的调试日志记录。 Flyway Desktop现在将记住以前用于创建项目和匹配克隆的位置。 新的脱机许可工作流现在已在Microsoft Windows上启用。 现在&#xff0c;在配置目标数据库列表时&#xff0c;环境ID是可见的。 现…

HTML图像标签

html文件&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>图像标签学习</title> </head> <body> <img src"../resources/image/01.jpg" alt"小狗图…

【机器学习】集成模型/集成学习:多个模型相结合实现更好的预测

1. 概述 1.1 什么是集成模型/集成学习 "模型集成"和"集成学习"是相同的概念。它们都指的是将多个机器学习模型组合在一起&#xff0c;以提高预测的准确性和稳定性的技术。通过结合多个模型的预测结果&#xff0c;集成学习可以减少单个模型的偏差和方差&am…

2023年全球及中国分离纯化装备市场发展概况分析:未来市场将持续增长[图]

随着创新药物的研发加速&#xff0c;纯化环节在药物生产过程中的重要性也将日益凸显&#xff0c;纯化设备市场未来有望不断增长。2022年&#xff0c;全球小分子药物分离纯化装备市场规模达到45美元&#xff0c;期间复合年增长率为21.9%&#xff1b;预计未来全球小分子药物分离纯…

使用标准模板 MRI 主题根据 EEG 数据计算前向算子

# https://mne.tools/stable/auto_tutorials/forward/35_eeg_no_mri.html# 本教程说明如何使用标准模板 MRI 主题根据 EEG 数据计算前向算子 # # 成人模板 MRI (fsaverage) # 首先我们展示如何fsaverage用作替代subject# Authors: Alexandre Gramfort <alexandre.gramfortin…

vscode摸鱼插件开发

不知道大家在写代码的时候&#xff0c;摸不摸鱼&#xff0c;是不是时不时得打开一下微博&#xff0c;看看今天发生了什么大事&#xff0c;又有谁塌房&#xff0c;而你没有及时赶上。 为此&#xff0c;我决定开发一个vscode插件&#xff0c;来查看微博热搜 插件名称&#xff1…

每日一练 | 华为认证真题练习Day121

1、如下图所示的交换网络&#xff0c;所有交换机都运行了STP协议。当拓扑稳定后&#xff0c;在下列那台交换机上修改配置BPDU的发送周期&#xff0c;可以影响SWD配置BPDU的发送周期 A. SWD B. SWC C. SWB D. SWA 2、如下图所示的网络&#xff0c;交换机的MAC地址已标出。在S…

(数据结构)单链表的相关操作

//1.预编译部分#define _CRT_SECURE_NO_WARNINGS#include<stdio.h>#include<stdlib.h>//2.单链表的结构体typedef struct LNode{int data;struct LNode* next; //因为next指针指向的为结构体类型&#xff0c;所以类型为struct LNode*}LNode, * LinkList; …

如何保障Facebook账号登录稳定?跨境人必看

作为全球最大的社交媒体平台&#xff0c;Facebook已成为众多跨境人们拓展海外市场的重要渠道。然而&#xff0c;Facebook对跨境业务卖家的监管越来越严格&#xff0c;封号政策也日趋严厉。对于想要在Facebook上开展业务的跨境人们而言&#xff0c;大家是否被Facebook封号问题困…

如何对需求变更进行精准的风险评估?

1、明确需求变更背景和目的 首先需要了解需求变更的背景&#xff0c;即需求产生的原因和环境&#xff0c;如需求变更是由业务变化、用户需求变化、技术限制、资源限制等因素引起的。其次需要明确需求变更目的是为了解决问题还是满足需求。 明确需求变更背景和目的&#xff0c;有…

X32位汇编和X64位区别无参函数分析(一)

前言 一、X32汇编函数无参无返回分析 二、X64汇编函数无参无返回分析 总结 前言 提示&#xff1a;以下是个人学习总结&#xff1a;如有错误请大神指出来&#xff0c;只供学习参考&#xff0c;本内容使用使用VS2017开发工具&#xff1a;语言是C&#xff0c;需要一些常见的汇编指…

数据库管理-第112期 Oracle Exadata 03-网络与ILOM(20231020)

数据库管理-第112期 Oracle Exadata 03-网络与ILOM&#xff08;202301020&#xff09; 在Exadata中&#xff0c;除了对外网络以外&#xff0c;其余网络都是服务于一体机内部各组件的网络&#xff0c;本期对这些网络的具体情况和硬件管理相关做一个讲解。 1 网络分类 1.1 生产…