【自定义网络协议】Java基于Vert.x的自定义TCP协议实现

news2024/10/5 14:43:34

在现代的软件开发中,TCP协议广泛应用于需要高效、低延迟数据传输的场景。相较于HTTP协议,TCP提供了更底层的控制和更高的性能,适用于嵌入式设备、实时数据传输等应用。Vert.x是一个基于事件驱动、异步和多线程的高效开发框架,特别适合用于构建TCP服务。本文将介绍如何使用Vert.x在Java中实现自定义TCP协议。

一、环境准备

创建Maven项目:首先,创建一个Maven项目,并添加Vert.x核心依赖。

<dependencies>  
    <dependency>  
        <groupId>io.vertx</groupId>  
        <artifactId>vertx-core</artifactId>  
        <version>4.2.7</version>  
    </dependency>  
</dependencies>

二、定义协议格式

在自定义TCP协议时,需要定义协议的格式。这里采用一个简单的二进制格式:
协议结构实体类


/**
 * 协议消息结构
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ProtocolMessage<T> {

    /**
     * 消息头
     */
    private Header header;

    /**
     * 消息体(请求或响应对象)
     */
    private T body;

    /**
     * 协议消息头
     */
    @Data
    public static class Header {

        /**
         * 魔数,保证安全性
         */
        private byte magic;

        /**
         * 版本号
         */
        private byte version;

        /**
         * 序列化器
         */
        private byte serializer;

        /**
         * 消息类型(请求 / 响应)
         */
        private byte type;

        /**
         * 状态
         */
        private byte status;

        /**
         * 请求 id
         */
        private long requestId;

        /**
         * 消息体长度
         */
        private int bodyLength;
    }

}

三、实现编码器和解码器

vert.x 的TCP 服务器收发的消息是 Buffer 类型,不能直接写入一个对象。因此,我们需要编码器和解码器,将 Java 的消息对象和 Buffer进行相互转换。


/**
 * 协议消息解码器
 */
public class ProtocolMessageDecoder {

    /**
     * 解码
     *
     * @param buffer
     * @return
     * @throws IOException
     */

    public static ProtocolMessage<?> decode(Buffer buffer) throws IOException {
        // 分别从指定位置读出 Buffer
        ProtocolMessage.Header header = new ProtocolMessage.Header();
        byte magic = buffer.getByte(0);
        // 校验魔数
        if (magic != ProtocolConstant.PROTOCOL_MAGIC) {
            throw new RuntimeException("消息 magic 非法");
        }
        header.setMagic(magic);
        header.setVersion(buffer.getByte(1));
        header.setSerializer(buffer.getByte(2));
        header.setType(buffer.getByte(3));
        header.setStatus(buffer.getByte(4));
        header.setRequestId(buffer.getLong(5));
        header.setBodyLength(buffer.getInt(13));
        // 解决粘包问题,只读指定长度的数据
        byte[] bodyBytes = buffer.getBytes(17, 17 + header.getBodyLength());
        // 解析消息体
        ProtocolMessageSerializerEnum serializerEnum = ProtocolMessageSerializerEnum.getEnumByKey(header.getSerializer());
        if (serializerEnum == null) {
            throw new RuntimeException("序列化消息的协议不存在");
        }
        Serializer serializer = SerializerFactory.getInstance(serializerEnum.getValue());
        ProtocolMessageTypeEnum messageTypeEnum = ProtocolMessageTypeEnum.getEnumByKey(header.getType());
        if (messageTypeEnum == null) {
            throw new RuntimeException("序列化消息的类型不存在");
        }
        switch (messageTypeEnum) {
            case REQUEST:
                RpcRequest request = serializer.deserialize(bodyBytes, RpcRequest.class);
                return new ProtocolMessage<>(header, request);
            case RESPONSE:
                RpcResponse response = serializer.deserialize(bodyBytes, RpcResponse.class);
                return new ProtocolMessage<>(header, response);
            case HEART_BEAT:
            case OTHERS:
            default:
                throw new RuntimeException("暂不支持该消息类型");
        }
    }

}

编码器


public class ProtocolMessageEncoder {

    /**
     * 编码
     *
     * @param protocolMessage
     * @return
     * @throws IOException
     */
    public static Buffer encode(ProtocolMessage<?> protocolMessage) throws IOException {
        if (protocolMessage == null || protocolMessage.getHeader() == null) {
            return Buffer.buffer();
        }
        ProtocolMessage.Header header = protocolMessage.getHeader();
        // 依次向缓冲区写入字节
        Buffer buffer = Buffer.buffer();
        buffer.appendByte(header.getMagic());
        buffer.appendByte(header.getVersion());
        buffer.appendByte(header.getSerializer());
        buffer.appendByte(header.getType());
        buffer.appendByte(header.getStatus());
        buffer.appendLong(header.getRequestId());
        // 获取序列化器
        ProtocolMessageSerializerEnum serializerEnum = ProtocolMessageSerializerEnum.getEnumByKey(header.getSerializer());
        if (serializerEnum == null) {
            throw new RuntimeException("序列化协议不存在");
        }
        Serializer serializer = SerializerFactory.getInstance(serializerEnum.getValue());
        byte[] bodyBytes = serializer.serialize(protocolMessage.getBody());
        // 写入 body 长度和数据
        buffer.appendInt(bodyBytes.length);
        buffer.appendBytes(bodyBytes);
        return buffer;
    }
}

四、实现TCP服务器

创建一个服务处理类,处理请求和响应


public class TcpServerHandler implements Handler<NetSocket> {

    @Override
    public void handle(NetSocket netSocket) {
        // 处理连接
        netSocket.handler(buffer -> {
            // 接受请求,解码
            ProtocolMessage<RpcRequest> protocolMessage;
            try {
                protocolMessage = (ProtocolMessage<RpcRequest>) ProtocolMessageDecoder.decode(buffer);
            } catch (IOException e) {
                throw new RuntimeException("协议消息解码错误");
            }
            RpcRequest rpcRequest = protocolMessage.getBody();

            // 处理请求
            // 构造响应结果对象
            RpcResponse rpcResponse = new RpcResponse();
            try {
                // 获取要调用的服务实现类,通过反射调用
                Class<?> implClass = LocalRegistry.get(rpcRequest.getServiceName());
                Method method = implClass.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());
                Object result = method.invoke(implClass.newInstance(), rpcRequest.getArgs());
                // 封装返回结果
                rpcResponse.setData(result);
                rpcResponse.setDataType(method.getReturnType());
                rpcResponse.setMessage("ok");
            } catch (Exception e) {
                e.printStackTrace();
                rpcResponse.setMessage(e.getMessage());
                rpcResponse.setException(e);
            }

            // 发送响应,编码
            ProtocolMessage.Header header = protocolMessage.getHeader();
            header.setType((byte) ProtocolMessageTypeEnum.RESPONSE.getKey());
            ProtocolMessage<RpcResponse> responseProtocolMessage = new ProtocolMessage<>(header, rpcResponse);
            try {
                Buffer encode = ProtocolMessageEncoder.encode(responseProtocolMessage);
                netSocket.write(encode);
            } catch (IOException e) {
                throw new RuntimeException("协议消息编码错误");
            }
        });
    }
}


实现TCP服务器



public class VertxTcpServer implements HttpServer {

    private byte[] handleRequest(byte[] requestData)  {
        // 在这里编写处理请求的逻辑,根据 requestData 构造响应数据并返回
        return "Hello, client!".getBytes();
    }

    @Override
    public void doStart(int port) {
        // 创建 Vert.x 实例
        Vertx vertx = Vertx.vertx();

        // 创建 TCP 服务器
        NetServer server = vertx.createNetServer();

        // 处理请求
        server.connectHandler(new TcpServerHandler());

        // 启动 TCP 服务器并监听指定端口
        server.listen(port, result -> {
            if (result.succeeded()) {
                System.out.println("TCP server started on port " + port);
            } else {
                System.err.println("Failed to start TCP server: " + result.cause());
            }
        });
    }

    public static void main(String[] args) {
        new VertxTcpServer().doStart(8888);
    }
}

五、编写一个TCP客户端用于测试服务器。


public class VertxTcpClient {

    private final static String HOST = "127.0.0.1";
    private final static Integer PORT = 8888;
    private final static byte SETSERIALIZER = 0x1;
    private final static byte VERSION = 0x1;
    private final static byte MAGIC = 0x1;
    private final static byte TYPE = 0x1;

    public void start() {

        try {

            // 发送 TCP 请求
            Vertx vertx = Vertx.vertx();
            NetClient netClient = vertx.createNetClient();
            netClient.connect(PORT, HOST,
                    result -> {
                        if (result.succeeded()) {
                            System.out.println("Connected to TCP server");
                            io.vertx.core.net.NetSocket socket = result.result();
                            // 发送数据
                            // 构造消息
                            ProtocolMessage<Object> protocolMessage = new ProtocolMessage<>();
                            ProtocolMessage.Header header = new ProtocolMessage.Header();
                            header.setMagic(MAGIC);
                            header.setVersion(VERSION);
                            header.setSerializer((SETSERIALIZER);
                            header.setType(TYPE);
                            header.setRequestId(IdUtil.getSnowflakeNextId());
                            protocolMessage.setHeader(header);
                            protocolMessage.setBody(null);
                            // 编码请求
                            try {
                                Buffer encodeBuffer = ProtocolMessageEncoder.encode(protocolMessage);
                                socket.write(encodeBuffer);
                            } catch (IOException e) {
                                throw new RuntimeException("协议消息编码错误");
                            }

                            // 接收响应
                            socket.handler(buffer -> {
                                try {
                                    ProtocolMessage<Response> ResponseProtocolMessage = (ProtocolMessage<Response>) ProtocolMessageDecoder.decode(buffer);
                                } catch (IOException e) {
                                    throw new RuntimeException("协议消息解码错误");
                                }
                            });
                        } else {
                            System.err.println("Failed to connect to TCP server");
                        }
                    });

            // 记得关闭连接
            netClient.close();
           
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        new VertxTcpClient().start();
    }
}

六、总结
本文介绍了如何使用Vert.x在Java中实现自定义TCP协议。通过定义协议格式、实现TCP服务器和客户端,我们可以构建高效、低延迟的数据传输系统。Vert.x的异步、事件驱动特性使得它在处理大量并发连接时表现优异,非常适合用于实时通信、物联网等场景。希望本文对你有所帮助,祝你编程愉快!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2190031.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

电源入口防护电路

电源入口防护电路 原则:先防护&#xff0c;后防反&#xff0c;最后滤波基本防护器件防反措施 原则:先防护&#xff0c;后防反&#xff0c;最后滤波 在设计电路保护方案时&#xff0c;遵循“先防护&#xff0c;后防反&#xff0c;最后滤波”的原则是非常重要的。这种顺序确保了…

Python入门--函数

目录 1. 函数介绍 2. 函数的定义 3. 函数的参数 4. 函数的返回值 5. 函数说明文档 6. 函数的嵌套调用 7. 函数的作用域 (1). 局部变量 (2). 全局变量 (3). global关键字 1. 函数介绍 函数&#xff1a;是组织好的&#xff0c;可重复使用的&#xff0c;用来实现特定功能…

【无题】夜入伊人笑愉,泪湿心夜难眠。

在这句诗中&#xff0c;意境描绘了一种深沉的情感体验&#xff0c;充满了温柔与哀愁。诗人通过“夜入伊人笑愉”开启了一段梦境之旅&#xff0c;其中“夜入”象征着进入梦境的状态。在这个梦幻的世界里&#xff0c;诗人与心爱的人欢笑嬉戏&#xff0c;那份快乐和亲昵如同真实的…

java高并发场景RabbitMQ的使用

场景是面试时被问到&#xff0c;一次性请求100多万个前端请求&#xff0c;请问你如果进行后端处理。因为之前的电商也没有一次性这么大的业务量&#xff0c;所以只是前端nginx做了对应的负载均衡技术。所以回答的不是那么流畅。面试官的回答你可以用RabbitMQ做分流,削峰,异步处…

什么是 ARP 欺骗和缓存中毒攻击?

如果您熟悉蒙面歌王&#xff0c;您就会明白蒙面歌王的概念&#xff1a;有人伪装成别人。然后&#xff0c;当面具掉下来时&#xff0c;您会大吃一惊&#xff0c;知道了这位名人是谁。类似的事情也发生在 ARP 欺骗攻击中&#xff0c;只是令人惊讶的是&#xff0c;威胁行为者利用他…

中小型网络系统综合实验

一、实验要求 1.pc端自动获取ip地址&#xff0c;可以互通访问&#xff0c;可以访问域名解析服务器 2.设置vlan&#xff0c;三层交换机实现不同vlan之间的交流 3.设置静态路由&#xff0c;配置nat地址转换&#xff0c;实现全网可达 二、实验思路 1.首先给LSW2配置vlan 10 &a…

双十一不能错过的好物推荐!强推五款超好用的品牌好物

双十一快到了&#xff0c;这个时候的优惠力度都是最大的&#xff0c;还不知道买啥的小伙伴们赶紧来看这篇好物推荐&#xff01;以下五款产品是我花了几天时间精心挑选出来的&#xff0c;看完之后保证你想加入购物车&#xff01; 品牌好物推荐一、希亦CG超声波清洗机 如果你带眼…

用Manim实现高尔顿板(Galton Board)

高尔顿板的介绍 高尔顿板&#xff08;Galton Board&#xff09;&#xff0c;有时也称为贝尔图&#xff08;Bean Machine&#xff09;&#xff0c;是由英国统计学家弗朗西斯高尔顿&#xff08;Francis Galton&#xff09;于19世纪末发明的一种物理装置&#xff0c;用于演示随机分…

【智能算法应用】蒲公英优化算法求解二维路径规划问题

摘要 在二维路径规划问题中&#xff0c;通常需要在不规则的障碍物环境中找到一条从起点到终点的最优路径。本文应用蒲公英优化算法&#xff08;DOA&#xff09;进行路径规划&#xff0c;其能够有效避开障碍物并找到最短路径。通过实验验证&#xff0c;DOA具有收敛速度快、全局…

2024年【金属非金属矿山(露天矿山)安全管理人员】模拟试题及金属非金属矿山(露天矿山)安全管理人员模拟考试题库

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 金属非金属矿山&#xff08;露天矿山&#xff09;安全管理人员模拟试题参考答案及金属非金属矿山&#xff08;露天矿山&#xff09;安全管理人员考试试题解析是安全生产模拟考试一点通题库老师及金属非金属矿山&#…

SAP学习笔记 - Basis01 - 创建Client ,拷贝Client

最近工作当中用到了Client间数据移送的内容&#xff0c;想把自己的虚机给弄两个Client。 最后也没完全弄成&#xff0c;先把过程整理一下&#xff0c;以后有空接着弄。 目录 1&#xff0c;SALE - 新建逻辑系统 2&#xff0c;SCC4 - 分配Client到集团 3&#xff0c;RZ10 - 取…

算法【Java】—— 二叉树的深搜

深搜 深搜简单来说就是一直递归到底&#xff0c;然后返回&#xff0c;以二叉树为例&#xff0c;就是从根节点出发一直搜索到叶子节点&#xff0c;然后想上返回。 这里简单说明一下&#xff1a;深搜的英文缩写是 dfs&#xff0c;下面定义深搜函数名我直接命名为 dfs 实战演练 …

AVL树的创建与检测

个人主页&#xff1a;敲上瘾-CSDN博客 个人专栏&#xff1a;游戏、数据结构、c语言基础、c学习、算法 目录 一、什么是AVL树&#xff1f; 二、平衡因子 1、什么是平衡因子&#xff1f; 2、平衡因子如何更新&#xff1f; 三、单旋 1、左单旋 ​编辑 2、右单旋 四、双旋…

OSPF的不规则区域

1.远离骨干非骨干区域 2.不连续骨干 解决方案 tunnel ---点到点GRE 在合法与非ABR间建立隧道&#xff0c;然后将其宣告于OSPF协议中&#xff1b; 缺点&#xff1a;1、周期和触发信息对中间穿越区域造成资源占用&#xff08;当同一条路由来自不同区域&#xff0c;路由器会先…

JS基础练习|动态创建多个input并且用数组记录其中的数据

效果图 、 在点击添加输入框的时候&#xff0c;创建新的元素&#xff0c;并且为其绑定响应的事件。 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-…

稀缺森林火险等级预测算法,基于xgboost方法的火险等级预测,共划分5级,依据当前地区月份,降水量,风力等参数进行预测,并提供15000字的报告

森林火险等级预测算法&#xff0c;基于xgboost方法的火险等级预测&#xff0c;共划分5级&#xff0c;依据当前地区月份&#xff0c;降水量&#xff0c;风力等参数进行预测&#xff0c;并提供15000字的报告 森林火险等级预测算法介绍 项目名称 基于XGBoost的森林火险等级预测算…

双向无头非循环链表的简单实现及介绍

前言 欢迎大家阅读小奥奇的新作&#xff0c;听说上一篇我们留下了一点点 “ 简单的题目 ” &#xff0c;我们在本篇要干什么呢&#xff0c;请看本篇任务&#xff01; 本篇任务概述&#xff1a; 1、解决 “ 简单的遗留题目 ” 2、 LInkedList&#xff08;双向&#xff09;的使用…

2.1MyBatis——ORM对象关系映射

2.1MyBatis——ORM对象关系映射 1. 验证映射配置2.ResultType和ResultMap2.1ResultMap是最终的ORM依据2.2ResultType和ResultMap的使用区别 3.具体的转换逻辑3.1 TypeHandle类型转换 5.总结 概括的说&#xff0c;MyBatis中&#xff0c;对于映射关系的声明是由开发者在xml文件手…

“2024年最流行的10个前端框架”

大多数时候&#xff0c;前端开发人员需要使用一组组合语言来构建他们的前端 Web 应用程序。 HTML 负责网页中的基本布局&#xff0c;CSS 管理视觉格式和结构&#xff0c;JavaScript 用于维护交互性和功能。在这篇文章中&#xff0c;我们将了解最好的前端框架&#xff0c;这些框…

FL Studio 24.1.2.4381中文版免费下载及FL Studio 24最新使用学习教程

家好呀&#xff0c;作为一个资深的音乐爱好者和制作人&#xff0c;今天我要安利一个我最近超级痴迷的数字音频工作站软件——FL Studio24.1.2.4381中文版。这款产品可是让我的音乐创作之路如虎添翼&#xff0c;快来跟我一起看看它的炫酷功能吧&#xff01; 最近接到很多小伙伴的…