netty构建udp服务器以及发送报文到客户端客户端详细案例

news2024/10/2 3:19:41

目录

一、基于netty创建udp服务端以及对应通道设置关键

二、发送数据

三、netty中的ChannelOption常用参数说明

1、ChannelOption.SO_BACKLOG

2、ChannelOption.SO_REUSEADDR

3、ChannelOption.SO_KEEPALIVE

4、ChannelOption.SO_SNDBUF和ChannelOption.SO_RCVBUF

5、ChannelOption.SO_LINGER

6、ChannelOption.TCP_NODELAY


一、基于netty创建udp服务端以及对应通道设置关键

@Configuration
@RefreshScope
public class NettyUdpServer {

    @Value("${netty.server.udpPort}")
    private int port;

    private EventLoopGroup bossGroup;//主线程

    private Channel channel;//通道

    private ChannelFuture future; //回调

    @Autowired
    private DataCollector dataCollector;;

    public Channel start() throws InterruptedException {
        //判断是否支持Epoll模式,从而创建不同的线程组
        bossGroup = Epoll.isAvailable()? new EpollEventLoopGroup() : new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            //linux平台下增加SO_REUSEPORT特性提高性能,支持多个进程或者线程绑定到同一个端口,提高服务器程序的吞吐性能
            if(Epoll.isAvailable()) {
                //设置反应器线程组
                b.group(bossGroup)
                 .handler(new EpollUdpServerInitializer(dataCollector))
                //设置nio类型的通道
                 .channel(EpollDatagramChannel.class)
                 .option(ChannelOption.SO_BROADCAST, true)
                 .option(ChannelOption.SO_REUSEADDR, true)
                 .option(ChannelOption.SO_RCVBUF, 1024 * 1024)
                 .option(EpollChannelOption.SO_REUSEPORT, true);
            }else{
                //设置反应器线程组
                b.group(bossGroup)
                 .handler(new UdpServerInitializer(dataCollector))
                 //设置nio类型的通道
                 .channel(NioDatagramChannel.class)
                 //设置通道的参数
                 .option(ChannelOption.SO_BROADCAST, true)
                 .option(ChannelOption.SO_REUSEADDR, true);
            }
            //Channel channel = server.bind(port).sync().channel();
            //开始绑定服务器,通过调用sync()同步方法阻塞直到绑定成功
            //ChannelFuture channelFuture = b.bind(port).sync();
            //等待通道关闭的异步任务结束
            //ChannelFuture closeFuture = channelFuture.channel().closeFuture();
            //closeFuture.sync();

            ChannelFuture f = b.bind(port).sync();
            channel = f.channel();
            if(f.isSuccess()){
                //MasterSelector registry = new MasterSelector("","netty-services",  port);
                System.out.println("UDP服务器启动,监听在端口:" + port);
            }else {
                channel.closeFuture().sync();
            }

        } finally {
            //bossGroup.shutdownGracefully().sync();
        }
        System.out.println("Udp服务器启动,监听在端口:"+port);
        return channel;
    }
}

以上代码中Epoll.isAvailable()用户判断是window还是linux环境,linux环境默认采用Epoll相关通道,所以显式设置EpollDatagramChannel通道。在处理(handler)的设置中要根据不同的通道设置初始化的通道类型:

linux环境下EpollDatagramChannel通道设置 .handler(new EpollUdpServerInitializer(dataCollector))具体代码

public class EpollUdpServerInitializer extends ChannelInitializer<EpollDatagramChannel> {
    private final DataCollector dataCollector;

    public EpollUdpServerInitializer(DataCollector dataCollector) {
        this.dataCollector = dataCollector;
    }

    @Override
    protected void initChannel(EpollDatagramChannel epollDatagramChannel) throws Exception {
        epollDatagramChannel.pipeline()
                //添加netty空闲超时检查的支持
                .addLast(new UdpServerHandler(dataCollector));
    }

要使 通过服务器端通过EpollDatagramChannel通道发送数据,客户端能够正常接收到数据,下图中标红的泛型通道类要与服务器端设置的通道类一致

同意要支持Nio类型通道为NioDatagramChannel.class时,通道初始化为:

public class UdpServerInitializer extends ChannelInitializer<NioDatagramChannel> {
    private final DataCollector dataCollector;

    public UdpServerInitializer(DataCollector dataCollector) {
        this.dataCollector = dataCollector;
    }

    @Override
    protected void initChannel(NioDatagramChannel nioDatagramChannel) throws Exception {
        nioDatagramChannel.pipeline()
                //添加netty空闲超时检查的支持
                .addLast(new UdpServerHandler(dataCollector));
    }
}

 要使 通过服务器端通过NioDatagramChannel通道发送数据,客户端能够正常接收到数据,下图中标红的泛型通道类要与服务器端设置的通道类一致

二、发送数据

关键代码,采用writeAndFlush发送数据,注意:要发送udp数据报,

public class UdpServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {

    /**设置最大消息大小*/
    private static final int MAX_MESSAGE_SIZE = 2048;
    /**线程池*/
    private ExecutorService executorService;

    private final DataCollector dataCollector;

    public UdpServerHandler(DataCollector dataCollector) {
        this.dataCollector = dataCollector;
        //根据当前系统可用的处理器数量创建一个固定长度的线程池
        executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket datagramPacket) throws Exception {
        ByteBuf buffer = datagramPacket.content();
        //确保不会超出最大消息大小
        if(buffer.readableBytes() > MAX_MESSAGE_SIZE) {
            buffer.release();
            return;
        }
        UdpDatagram udpDatagram = parseUdpDatagram(buffer);
        UdpDatagram respUdpDatagram = dataCollector.processUdpDatagram(udpDatagram);
        if (null != respUdpDatagram) {
             handleReceivedData(ctx, respUdpDatagram, datagramPacket);
        }
    }

    /**
     * 处理接收到的数据
     * @param ctx
     * @param udpDatagram
     */
    public void handleReceivedData(ChannelHandlerContext ctx, UdpDatagram udpDatagram, DatagramPacket datagramPacket) throws ExecutionException, InterruptedException {
        Channel channel = ctx.channel();
        if (log.isInfoEnabled()) {
            log.info("received udp message: sessionId: {}, opCode: {}, short messageId: {}",
                    ctx.channel().id(), udpDatagram.getMessageTypeEnum(), udpDatagram.getShortMessageId());
        }
        byte[] payloadBytes = udpDatagram.getPayloadBytes();
        ByteBuf copiedBuffer = Unpooled.copiedBuffer(payloadBytes);
        ChannelFuture channelFuture = channel.writeAndFlush(new DatagramPacket(copiedBuffer.retain(), datagramPacket.sender()));
        channelFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (channelFuture.isSuccess()) {
                    // 数据发送成功
                    log.info("数据发送成功:sender host: {}, sender port:{}, sender address:{}",datagramPacket.sender().getHostName(),datagramPacket.sender().getPort(), datagramPacket.sender().getAddress());
                } else {
                    // 数据发送失败
                    log.error("数据发送失败: {}",channelFuture.cause().getStackTrace());
                    channelFuture.cause().printStackTrace();
                }
            }
        });
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        dataCollector.tcpConnect(ctx.channel());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (log.isWarnEnabled()) {
            log.warn("udp session throw an exception, sessionId:{} exception message: {}",
                    ctx.channel().id().asLongText(), cause.getMessage());
        }
    }

    //当客户端关闭链接时关闭通道
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        dataCollector.tcpChannelDisconnect(ctx.channel());

    }
}

处理类继承SimpleChannelInboundHandler类泛型类为DatagramPacket 

 writeAndFlush方法中发送的数据类型要是DatagramPacket

三、netty中的ChannelOption常用参数说明

1、ChannelOption.SO_BACKLOG

ChannelOption.SO_BACKLOG对应的是tcp/ip协议listen函数中的backlog参数。函数listen(int socketfd, int backlog)用来初始化服务端可连接队列。

服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小。

2、ChannelOption.SO_REUSEADDR

ChanneOption.SO_REUSEADDR对应于套接字选项中的SO_REUSEADDR,这个参数表示允许重复使用本地地址和端口。

比如,某个服务器进程占用了TCP的80端口进行监听,此时再次监听该端口就会返回错误,使用该参数就可以解决问题,该参数允许共用该端口,这个在服务器程序中比较常使用。

比如某个进程非正常退出,该程序占用的端口可能要被占用一段时间才能允许其他进程使用,而且程序死掉以后,内核一需要一定的时间才能够释放此端口,不设置SO_REUSEADDR就无法正常使用该端口。

3、ChannelOption.SO_KEEPALIVE

Channeloption.SO_KEEPALIVE参数对应于套接字选项中的SO_KEEPALIVE,该参数用于设置TCP连接,当设置该选项以后,连接会测试链接的状态,这个选项用于可能长时间没有数据交流的连接。

当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文。

4、ChannelOption.SO_SNDBUF和ChannelOption.SO_RCVBUF

ChannelOption.SO_SNDBUF参数对应于套接字选项中的SO_SNDBUF,ChannelOption.SO_RCVBUF参数对应于套接字选项中的SO_RCVBUF这两个参数用于操作发送缓冲区大小和接受缓冲区大小。

接收缓冲区用于保存网络协议站内收到的数据,直到应用程序读取成功,发送缓冲区用于保存发送数据,直到发送成功。

5、ChannelOption.SO_LINGER

ChannelOption.SO_LINGER参数对应于套接字选项中的SO_LINGER,Linux内核默认的处理方式是当用户调用close()方法的时候,函数返回,在可能的情况下,尽量发送数据,不一定保证会发送剩余的数据,造成了数据的不确定性,使用SO_LINGER可以阻塞close()的调用时间,直到数据完全发送。

6、ChannelOption.TCP_NODELAY

ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关。

Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到来,组装成大的数据包进行发送,虽然该算法有效提高了网络的有效负载,但是却造成了延时。

而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输。和TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。

ChannelOption属性
SO_BROADCAST对应套接字层的套接字:SO_BROADCAST,将消息发送到广播地址。
如果目标中指定的接口支持广播数据包,则启用此选项可让应用程序发送广播消息。
SO_KEEPALIVE对应套接字层的套接字:SO_KEEPALIVE,保持连接。
在空闲套接字上发送探测,以验证套接字是否仍处于活动状态。
SO_SNDBUF对应套接字层的套接字:SO_SNDBUF,设置发送缓冲区的大小。
SO_RCVBUF对应套接字层的套接字:SO_RCVBUF,获取接收缓冲区的大小。
SO_REUSEADDR对应套接字层的套接字:SO_REUSEADDR,本地地址复用。
启用此选项允许绑定已使用的本地地址。
SO_LINGER对应套接字层的套接字:SO_LINGER,延迟关闭连接。
启用此选项,在调用close时如果存在未发送的数据时,在close期间将阻止调用应用程序,直到数据被传输或连接超时。
SO_BACKLOG

对应TCP/IP协议中<font color=red>backlog</font>参数,<font color=red>backlog</font>即连接队列,设置TCP中的连接队列大小。如果队列满了,会发送一个ECONNREFUSED错误信息给C端,即“ Connection refused”。

SO_TIMEOUT等待客户连接的超时时间。
IP_TOS对应套接字层的套接字:IP_TOS,在IP标头中设置服务类型(TOS)和优先级。
IP_MULTICAST_ADDR对应IP层的套接字选项:IP_MULTICAST_IF,设置应发送多播数据报的传出接口。
IP_MULTICAST_IF对应IP层的套接字选项:IP_MULTICAST_IF2,设置应发送多播数据报的IPV6传出接口。
IP_MULTICAST_TTL对应IP层的套接字选项:IP_MULTICAST_TTL,在传出的 多播数据报的IP头中设置生存时间(TTL)。
IP_MULTICAST_LOOP_DISABLED取消 指定应将 传出的多播数据报的副本 回传到发送主机,只要它是多播组的成员即可。
TCP_NODELAY

对应TCP层的套接字选项:TCP_NODELAY,指定TCP是否遵循<font color=#35b998>Nagle算法</font> 决定何时发送数据。Nagle算法代表通过减少必须发送包的个数来增加网络软件系统的效率。即尽可能发送大块数据避免网络中充斥着大量的小数据块。如果要追求高实时性,需要设置关闭Nagle算法;如果需要追求减少网络交互次数,则设置开启Nagle算法。

  

 ChannelOption通用配置

参数说明
ALLOCATORByteBuf的分配器,默认值为ByteBufAllocator.DEFAULT。
RCVBUF_ALLOCATOR

用于Channel分配接受Buffer的分配器,默认值为AdaptiveRecvByteBufAllocator.DEFAULT,是一个自适应的接受缓冲区分配器,能根据接受到的数据自动调节大小。可选值为FixedRecvByteBufAllocator,固定大小的接受缓冲区分配器。

MESSAGE_SIZE_ESTIMATOR

消息大小估算器,默认为DefaultMessageSizeEstimator.DEFAULT。估算ByteBuf、ByteBufHolder和FileRegion的大小,其中ByteBuf和ByteBufHolder为实际大小,FileRegion估算值为0。该值估算的字节数在计算水位时使用,FileRegion为0可知FileRegion不影响高低水位。

CONNECT_TIMEOUT_MILLIS连接超时毫秒数,默认值30000毫秒即30秒。
WRITE_SPIN_COUNT一个Loop写操作执行的最大次数,默认值为16。也就是说,对于大数据量的写操作至多进行16次,如果16次仍没有全部写完数据,此时会提交一个新的写任务给EventLoop,任务将在下次调度继续执行。这样,其他的写请求才能被响应不会因为单个大数据量写请求而耽误。
WRITE_BUFFER_WATER_MARK
ALLOW_HALF_CLOSURE一个连接的远端关闭时本地端是否关闭,默认值为False。值为False时,连接自动关闭;为True时,触发ChannelInboundHandler的userEventTriggered()方法,事件为ChannelInputShutdownEvent。
AUTO_READ自动读取,默认值为True。Netty只在必要的时候才设置关心相应的I/O事件。对于读操作,需要调用channel.read()设置关心的I/O事件为OP_READ,这样若有数据到达才能读取以供用户处理。该值为True时,每次读操作完毕后会自动调用channel.read(),从而有数据到达便能读取;否则,需要用户手动调用channel.read()。需要注意的是:当调用config.setAutoRead(boolean)方法时,如果状态由false变为true,将会调用channel.read()方法读取数据;由true变为false,将调用config.autoReadCleared()方法终止数据读取。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1549654.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

CUDA安装 Windows版

目录 一、说明 二、安装工具下载 三、CUDA安装 四、cuDNN配置 五、验证安装是否成功 一、说明 windows10 版本安装 CUDA &#xff0c;首先需要下载两个安装包 CUDA toolkitcuDNN 官方教程 CUDA&#xff1a;https://docs.nvidia.com/cuda/cuda-installation-guide-micro…

2.2 添加商户缓存

实战篇Redis 2.2 添加商户缓存 在我们查询商户信息时&#xff0c;我们是直接操作从数据库中去进行查询的&#xff0c;大致逻辑是这样&#xff0c;直接查询数据库那肯定慢咯&#xff0c;所以我们需要增加缓存 GetMapping("/{id}") public Result queryShopById(Pat…

政安晨:【深度学习神经网络基础】(一)—— 逐本溯源

政安晨的个人主页&#xff1a;政安晨 欢迎 &#x1f44d;点赞✍评论⭐收藏 收录专栏: 政安晨的机器学习笔记 希望政安晨的博客能够对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出指正&#xff01; 与计算机一样的古老历史 神经网络的出现可追溯到20世纪40年…

Android源码阅读WorkMangaer - 6

前言 由于笔者目前水平限制&#xff0c;表达能力有限&#xff0c;尽请见谅。 WorkManager 是 Android Jetpack 库的一部分&#xff0c;提供了一种向后兼容的方式来安排可延迟的异步任务&#xff0c;这些任务即使在应用退出或设备重启后也应该继续执行&#xff0c;它是 Androi…

记录 AI绘图 Stable Diffusion的本地安装使用,可搭建画图服务端

开头 最近刷短视频看到了很多关于AI绘图&#xff0c;Midjourney&#xff0c;gittimg.ai&#xff0c;Stable Diffusion等一些绘图AI工具&#xff0c;感受到了AI绘画的魅力。通过chatGPT生成关键词再加上绘图工具&#xff0c;真是完美&#xff0c;文末教大家如何用gpt提词 Midj…

Anaconda的GEE环境中安装torch库

打开Anaconda&#xff0c;点击运行&#xff0c;打开terminal 输入pip install torch 而且由于anaconda中自己配置好了镜像源&#xff0c;在pip时自动使用清华镜像源

2024年4月份 风车IM即时通讯系统APP源码 版完整苹果安卓教程

关于风车IM&#xff0c;你在互联网上能随便下载到了基本都是残缺品&#xff0c; 经过我们不懈努力最终提供性价比最高&#xff0c;最完美的版本&#xff0c; 懂货的朋友可以直接下载该版本使用&#xff0c;经过严格测试&#xff0c;该版本基本完美无缺。 下载地址&#xff1a;…

【正点原子FreeRTOS学习笔记】————(4)FreeRTOS中断管理

这里写目录标题 一、什么是中断&#xff1f;&#xff08;了解&#xff09;二、中断优先级分组设置&#xff08;熟悉&#xff09;三、中断相关寄存器&#xff08;熟悉&#xff09;四、FreeRTOS中断管理实验&#xff08;掌握&#xff09; 一、什么是中断&#xff1f;&#xff08;…

华为数通 HCIP-Datacom H12-831 题库补充(3/27)

2024年 HCIP-Datacom&#xff08;H12-831&#xff09;最新题库&#xff0c;完整题库请扫描上方二维码&#xff0c;持续更新。 如图所示&#xff0c;关于R4路由器通过IS-IS计算出来的IPv6路由&#xff0c;哪一选项的描述是错误的&#xff1f; A&#xff1a;R4通过IS—IS只学习到…

【企业动态】吉利雷达汽车来访东胜物联,考察交流,洽谈车联网生态合作

近日&#xff0c;我们非常高兴接待吉利雷达汽车一行莅临东胜物联位于湖州市的生产工厂&#xff0c;进行参观考察&#xff0c;并就未来的合作展开深入商讨与交流。 雷达新能源汽车隶属于吉利控股集团&#xff0c;是一家专注于户外生态的中高端新能源智能汽车企业。雷达通过共享吉…

【启发式算法】同核分子优化算法 Homonuclear Molecules Optimization HMO算法【Matlab代码#70】

文章目录 【获取资源请见文章第4节&#xff1a;资源获取】1. 算法简介2. 部分代码展示3. 仿真结果展示4. 资源获取 【获取资源请见文章第4节&#xff1a;资源获取】 1. 算法简介 同核分子优化算法&#xff08;Homonuclear Molecules Optimization&#xff0c;HMO&#xff09;是…

网页版短信系统后台开发要点|短信平台软件开发搭建

在开发网页版短信系统的后台时&#xff0c;有一些关键要点需要注意&#xff0c;以确保系统的稳定性、安全性和高效性。以下是一些开发网页版短信系统后台时的重要要点&#xff1a; 用户管理&#xff1a;实现用户权限管理功能&#xff0c;包括用户注册、登录、角色分配等&#x…

Redis为什么快

引言 Redis是一个高性能的开源内存数据库,以其快速的读写速度和丰富的数据结构支持而闻名。作为一个轻量级、灵活的键值存储系统,Redis在各种应用场景下都展现出了惊人的性能优势。无论是作为缓存工具、会话管理组件、消息传递媒介,还是在实时数据处理任务和复杂的分布式系…

YOLOv9改进策略:卷积魔改 | SPD-Conv,低分辨率图像和小物体涨点明显

&#x1f4a1;&#x1f4a1;&#x1f4a1;本文改进内容&#xff1a;SPD-Conv由一个空间到深度(SPD)层和一个无卷积步长(Conv)层组成,特别是在处理低分辨率图像和小物体等更困难的任务时。 &#x1f4a1;&#x1f4a1;&#x1f4a1;SPD-Conv在多个数据集验证能够暴力涨点&#x…

python3字典的排序

创建一个字典 dict1{a:2,b:3,c:8,d:4} 1、分别取键、值 取字典的所有键&#xff0c;所有的值&#xff0c;利用dict1.keys()&#xff0c;dict1.vaules()&#xff0c; 由于键&#xff0c;值有很多个&#xff0c;所以要加s&#xff0c;另外注意这里要加括号&#xff0c;这样的小…

java spirng和 mybatis 常用的注解有哪些

当在Java Spring和MyBatis中进行开发时&#xff0c;常用的注解对于简化配置和提高开发效率非常重要。以下是更多常用的注解以及它们的详细说明和用途&#xff1a; 在Spring中常用的注解&#xff1a; Component&#xff1a; 用途&#xff1a;表明一个类会作为组件被Spring容器管…

YOLOv9改进策略:block优化 | Transformer架构ConvNeXt 网络在检测中大放异彩

&#x1f4a1;&#x1f4a1;&#x1f4a1;本文改进内容&#xff1a;Transformer架构 ConvNeXt 网络在图像分类和识别、分割领域大放异彩&#xff0c;同时对比 Swin-T 模型&#xff0c;在多种任务中其模型的大小和准确率均有一些提升&#xff0c;模型的 FLOPs 较大的减小且 Acc …

创建AI智能体

前言 灵境矩阵是百度推出的基于文心大模型的智能体&#xff08;Agent&#xff09;平台&#xff0c;支持广大开发者根据自身行业领域、应用场景&#xff0c;选取不同类型的开发方式&#xff0c;打造大模型时代的产品能力。开发者可以通过 prompt 编排的方式低成本开发智能体&am…

保姆级指导0基础如何快速搭建“对话机器人”类ChatGPT

参考了CDSN上的文章&#xff0c;但发现不work&#xff0c; 不是这里有问题&#xff0c;就是那里有问题&#xff0c;查阅了大量的资料&#xff0c;做了无数次试验&#xff0c;终于整理出来了一个完整的教程&#xff0c;保可用&#xff0c;保真~~~~~如果各位遇到什么问题&#xf…

哈希的一些题目

题目1&#xff1a;星空之夜 1402. 星空之夜 - AcWing题库 刚开始看到这个题目感觉一懵&#xff0c;因为这个哈希是关于形状的哈希&#xff0c;不知道要怎么表示。 但是这道题的数据范围比较小&#xff0c;暴力也可以过。 暴力的方法是&#xff1a;搜索每一个连通块并保存下来…