io之netty

news2025/1/22 13:13:52

写在前面

netty当前是网络io框架的事实标准,基于nio实现,框架的作者是韩国一位姓李的朋友,开始我们这位行李的韩国朋友开发一个io框架mina,但后来其离职,mina也就和其没有关系了,所以后来其改进了mina的不足和各种问题,重新开发了一个全新的框架,也就是现在的netty。本文就一起看下其简单的使用和底层原理。

1:实现一个http server

能够在浏览器访问并给出响应的的server我们就可以叫做http server,即只要简单的支持http协议即可,netty对此提供了支持。

首先定义启动类:

/**
 * 通过netty实现一个http 的server
 */
public class NettyHttpServer {
    public static void main(String[] args) throws InterruptedException {

        int port = 8808;
        // boss 线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup(2);
        // 工作线程组
        EventLoopGroup workerGroup = new NioEventLoopGroup(16);

        try {
            // 入口启动类
            ServerBootstrap b = new ServerBootstrap();
            // 设置各种配置
            b.option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childOption(ChannelOption.SO_REUSEADDR, true)
                    .childOption(ChannelOption.SO_RCVBUF, 32 * 1024)
                    .childOption(ChannelOption.SO_SNDBUF, 32 * 1024)
                    .childOption(EpollChannelOption.SO_REUSEPORT, true)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new HttpInitializer());

            Channel ch = b.bind(port).sync().channel();
            System.out.println("开启netty http服务器,监听地址和端口为 http://127.0.0.1:" + port + '/');
            ch.closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }


    }
}

这里需要一个初始化类来绑定ChannelHandler到channel对应的pipieline上,如下:

public class HttpInitializer extends ChannelInitializer<SocketChannel> {
	
	@Override
	public void initChannel(SocketChannel ch) {
		ChannelPipeline p = ch.pipeline();
		p.addLast(new HttpServerCodec());
		//p.addLast(new HttpServerExpectContinueHandler());
		p.addLast(new HttpObjectAggregator(1024 * 1024));
		p.addLast(new HttpHandler());
	}
}

这里增加了HttpServerCodec编解码器,来支持http协议,并定义http响应的业务处理类HttpHandler,如下:

public class HttpHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        try {
            //logger.info("channelRead流量接口请求开始,时间为{}", startTime);
            FullHttpRequest fullRequest = (FullHttpRequest) msg;
            String uri = fullRequest.uri();
            //logger.info("接收到的请求url为{}", uri);
            if (uri.contains("/test")) {
                handlerTest(fullRequest, ctx, "hello,kimmking");
            } else {
                handlerTest(fullRequest, ctx, "hello,others");
            }
    
        } catch(Exception e) {
            e.printStackTrace();
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

    private void handlerTest(FullHttpRequest fullRequest, ChannelHandlerContext ctx, String body) {
        FullHttpResponse response = null;
        try {
            String value = body; // 对接上次作业的httpclient或者okhttp请求另一个url的响应数据

//            httpGet ...  http://localhost:8801
//            返回的响应,"hello,nio";
//            value = reponse....

            response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(value.getBytes("UTF-8")));
            response.headers().set("Content-Type", "application/json");
            response.headers().setInt("Content-Length", response.content().readableBytes());

        } catch (Exception e) {
            System.out.println("处理出错:"+e.getMessage());
            response = new DefaultFullHttpResponse(HTTP_1_1, NO_CONTENT);
        } finally {
            if (fullRequest != null) {
                if (!HttpUtil.isKeepAlive(fullRequest)) {
                    ctx.write(response).addListener(ChannelFutureListener.CLOSE);
                } else {
                    response.headers().set(CONNECTION, KEEP_ALIVE);
                    ctx.write(response);
                }
                ctx.flush();
            }
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }

}

接着我们启动主类来进行测试,先curl:

C:\WINDOWS\system32>curl http://localhost:8808/
hello,others

然后sb压测:

C:\WINDOWS\system32>sb -u http://localhost:8808/ -c 40 -N 10
Starting at 2023/6/1 星期四 13:36:01
[Press C to stop the test]
96867   (RPS: 7115.8)
---------------Finished!----------------
Finished at 2023/6/1 星期四 13:36:15 (took 00:00:13.7040309)
Status 200:    96880

RPS: 8778.8 (requests/second)
Max: 220ms
Min: 0ms
Avg: 0.2ms

  50%   below 0ms
  60%   below 0ms
  70%   below 0ms
  80%   below 0ms
  90%   below 0ms
  95%   below 0ms
  98%   below 1ms
  99%   below 3ms
99.9%   below 16ms

性能还不错。

2:netty的模型

最简单的阻塞io模型,每个线程都要通过while(true)的方式来获取数据以及处理数据,其中的获取数据过程,即IO操作是阻塞的,这就会导致线程占用CPU资源没有真正的执行业务,导致CPU的使用率降低,如下图:

在这里插入图片描述

在这里插入图片描述

如果是我们能够将IO的操作抽取出来,就能解决IO阻塞业务线程导致性能降低的问题了,这个问题我们可以使用NIO来解决,通过其selector机制来查看所有的socket的数据状态,这样就能解决业务线程因为IO阻塞而导致的CPU利用率降低问题,如下图:
在这里插入图片描述

在继续往下分析之前,我们先看下事件处理机制,事件处理机制,根据不同的事件来生成不同的任务,并将任务放在任务队列中,然后任务分发器按照任务不同的类型放到不同的任务处理器的任务队列中,如下图:

在这里插入图片描述

结合事件处理机制以及NIO机制就产生了reactor模式,doug lea针对reactor模式提出了3三种reactor线程模型,分别来看下。

2.1:单线程reactor线程模型

单线程的reactor线程模型,只有一个reactor线程,负责网络连接状态的维护,数据的读取,以及有数据时业务代码的执行,执行结果的发送,如下图:

在这里插入图片描述

通过selector轮询网络数据状态

此时只有一个线程,所以阻塞的io操作和业务执行还是耦合在一起的,效果同阻塞io模型,所以性能比较低,此时可以将其进化为reactor多线程模型。

2.2:reactor多线程模型

reactor多线程模型,不同于reactor单线程模型,会有一个专门的线程池从已经准备就绪的channle中(注意是已经是有数据可读取了,所以此处io不会阻塞,而只是从channle中read数据)读取数据并执行业务代码,这样就解耦了阻塞式的io操作和业务操作,并且业务操作是以线程池的方式来执行的,所以效率会比较高,如下图:

在这里插入图片描述
红线以上区域就是reactor单线程模型的部分,红线下方就是扩展出来的线程池。也可参考下图:

在这里插入图片描述
这种方式还有一个性能瓶颈点就是,负责网络连接状态维护的reactor线程依然是单线程的,这个reactor线程要负责连接状态的维护,数据可读状态的检测,通道数据可读事件的分发(即实现分发器),负责处理的工作还是比较多的,特别是数据可读事件的分发,当并发量比较高时,可能成为性能瓶颈,因此我们可以将这些功能进行拆分,进化为reactor主从模型。

2.3:reactor主从模型

reactor的主从模型在reactor多线程模型的基础上,对rector线程的功能进行了拆分,其中网络连接状态的维护依然由mainReactor线程负责,但是channel数据的可读状态的检测,通道可读事件的分发,由一个单独的线程池subReactor负责,负责业务执行的线程池和reactor多线程模型相同,如下图:
在这里插入图片描述

2.4:netty对3中reactor线程模型的支持

netty使用了reactor线程模型,并支持了其具体的3种线程模型,编程方式类似,如下图:
在这里插入图片描述
当使用reactor的主从模式时netty启动过程如下:
在这里插入图片描述

这里有一个很重要的对象,NIOEventLoop是一个thread+selector的集合,负责不断的处理任务并从channel中读取数据执行业务代码。接着看下netty的整体信息:

在这里插入图片描述
信息如下:

1:很多的client连接到boss group,即mainReactor
2:workergourp,即subReactor,负责channel数据可读状态的检测,并分发数据处理任务到EventLoop的任务队列中
3:如果EventLoop判断当前任务对应的channel是自己负责的,则从channle中读取数据,并调用与该channel绑定的pipeline,并执行其中N多的ChannelHanlder(包含了我们的业务执行代码)。

netty的关键对象如下:

1:启动
    服务端ServerBootstrap,客户端Bootstrap
2:事件处理
    EventLoopGroup,封装一组EventLoop
    EventLoop,负责一组channel的数据的处理
3:通道
    SocketChannel,绑定在EventLoop上,代表一个网络连接
4:业务处理
    ChannelPipeline,绑定在SocketChannel上,负责管理一组业务处理的ChannelHandler
    ChannelHandler,多个,绑定在ChannelPipeline,负责具体的业务处理
5:绑定ChannelPipeline到SocketChannel
    ChannelInitailizer,完成绑定工作

如下图:

在这里插入图片描述

另外就是ChannelPipeline还分为是读入数据还是写出数据,分为InboundChannelHandler和OutBoundChannleHandler,在编程时需要注意:

在这里插入图片描述

写在后面

参考文章列表

Reactor之多工作线程与主从模式 。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/618631.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Maxcompute数据上云一致性比对

我写过很多如何去对数、如何批量对数的技术文档&#xff0c;最近项目遇到这个问题&#xff0c;我才发现在官方博客上还没有发布过这个课题的文章。这就像灯下黑&#xff0c;太长用到的知识点&#xff0c;反而没有意识到其重要性。 注&#xff1a;这里对数的场景就是指在阿里云…

docker 装机/卸载 Mysql

1、首先&#xff0c;需要安装Docker。可以使用以下命令安装&#xff1a; > yum install docker 2、安装完成后&#xff0c;启动Docker服务&#xff1a; > systemctl start docker3、CentOS7环境下的Docker使用 docker快速部署mysql数据库并初始化 docker快速部署mysq…

Power BI API调用注意事项 (By Power Automate)

注&#xff1a;本文最初发布于https://d-bi.gitee.io和medium, 2023年6月迁移至CSDN 前述 本站关于实现Power BI REST API的博文已有许多&#xff0c;包括&#xff1a; Power BI REST API有多强大&#xff1f;PBI开发者必读Power BI REST API实战教程&#xff1a;PowerQuery为…

基于SSM的便利店系统

✌全网粉丝20W,csdn特邀作者、博客专家、CSDN新星计划导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取项目下载方式&#x1f345; 一、项目背景介绍&#xff1a; 基于SSH的便利店系统是…

Java中方法的重载与重写

文章目录 前言方法重载方法重写 前言 提示&#xff1a;这里可以添加本文要记录的大概内容&#xff1a; 方法的重载与重写容易混&#xff0c;所以单独拿出来比较 提示&#xff1a;以下是本篇文章正文内容&#xff0c;下面案例可供参考 方法重载 在同一个类中&#xff0c;允…

springcloud-alibaba (06)RocketMQ控制台安装与启动

RocketMQ控制台 ✨让你的消息传输更高效✨ 如果你是一名开发者&#xff0c;或者是对消息传输有需求的企业用户&#xff0c;那么你肯定不陌生于 RocketMQ&#xff0c;它是一个高可用、高可靠、高性能、分布式消息中间件。但是有时候&#xff0c;在 Windows 上安装和启动 Rocke…

生产环境可用的 Seata-go 1.2.0 来啦!!!

文&#xff5c;刘月财&#xff08;GitHub ID&#xff1a;luky116) 360 服务端开发专家 Seata-go 项目负责人 本文 2752 字 阅读 7 分钟 发布概览 Seata-go 1.2.0 版本支持 XA 模式。XA 协议是由 X/Open 组织提出的分布式事务处理规范&#xff0c;其优点是对业务代码无侵入。当前…

小巧长续航的主动降噪耳机,更轻更好用,QCY ArcBuds上手

我平时听歌、玩游戏的时候喜欢戴上一副蓝牙耳机&#xff0c;这种耳机选择很多&#xff0c;这几年进步还很快&#xff0c;市面上有很多价格合理、音质出色的选择。我目前用的是一款QCY ArcBuds&#xff0c;这款耳机支持主动降噪&#xff0c;户外使用体验不错&#xff0c;而且它做…

Dockerfile实现LNMP

systemctl stop firewalld systemctl disable firewalld setenforce 0 docker network create --subnet172.18.0.0/16 --opt "com.docker.network.bridge.name""docker1" mynetwork #部署nginx&#xff08;容器IP 为 172.18.0.10&#xff09; mkdir /o…

一文讲解 基于C++手写Rpc项目

目录 github 预备知识 集群和分布式 单机聊天服务器 集群聊天服务器 分布式聊天服务器 从集群式 到 分布式聊天服务器 看来只有好处 ,但代价是什么? rpc 的 通信原理 remote procedure call 分布式通信 手写的rpc部分 protobuf>json 好处? 介绍protobuf protob…

RabbitMQ - 死信队列

RabbitMQ - 死信队列 死信的概念死信的来源死信实战死信之TTl死信之最大长度死信之消息被拒 死信的概念 先从概念解释上搞清楚这个定义&#xff0c;死信&#xff0c;顾名思义就是无法被消费的消息&#xff0c;字面意思可以这样理 解&#xff0c;一般来说&#xff0c;producer …

【进程间通信:管道】

目录 1 进程间通信介绍 1.1 进程间通信目的 1.2 进程间通信发展 1.3 进程间通信分类 2 管道 2.1 什么是管道 2.2 匿名管道 2.2.1 匿名管道的使用 2.2.2 使用匿名管道创建进程池 2.3 管道读写规则 2.4 匿名管道特点 2.5 命名管道 2.5.1 概念 2.5.2 使用 1 进程间通…

Learning C++ No.28 【C++11语法实战】

引言&#xff1a; 北京时间&#xff1a;2023/6/5/9:25&#xff0c;今天8点45分起床&#xff0c;一种怎么都睡不够的感觉&#xff0c;特别是周末&#xff0c;但是如果按照我以前的睡觉时间来看&#xff0c;妥妥的是多睡了好久好久&#xff0c;并且昨天也睡了一天&#xff0c;哈…

C#,码海拾贝(32)——计算“实对称三对角阵的全部特征值与特征向量的”之C#源代码

using System; namespace Zhou.CSharp.Algorithm { /// <summary> /// 矩阵类 /// 作者&#xff1a;周长发 /// 改进&#xff1a;深度混淆 /// https://blog.csdn.net/beijinghorn /// </summary> public partial class Matrix {…

第⑩讲:Ceph集群CephFS文件存储核心概念及部署使用

文章目录 1.CephFS文件存储核心概念1.1.CephFS文件存储简介1.2.CephFS文件存储架构1.3.CephFS文件系统的应用场景与特性 2.在Ceph集群中部署MDS组件3.在Ceph集群中创建一个CephFS文件存储系统3.1.为CephFS文件存储系统创建Pool资源池3.2.创建CephFS文件系统3.3.再次观察Ceph集群…

chatgpt赋能python:从后到前查找Python字符串

从后到前查找Python字符串 Python是一种流行的编程语言&#xff0c;广泛用于Web开发、数据科学和算法设计等领域。其中&#xff0c;字符串是Python编程中的重要概念之一&#xff0c;它不仅可以表示文本&#xff0c;还可以进行各种处理。本篇文章将介绍Python字符串从后到前的查…

chatgpt赋能python:Python如何运行最方便

Python 如何运行最方便 Python 是一种高级编程语言&#xff0c;被广泛使用于各类领域。由于其简单易学&#xff0c;可读性高&#xff0c;适用于不同平台的特性&#xff0c;Python 已成为计算领域、Web 开发、数据分析等领域的首选语言之一。如果您正在学习 Python 或需要对其进…

【SQL】Oracle数据库安装并实现远程访问

文章目录 前言1. 数据库搭建2. 内网穿透2.1 安装cpolar内网穿透2.2 创建隧道映射 3. 公网远程访问4. 配置固定TCP端口地址4.1 保留一个固定的公网TCP端口地址4.2 配置固定公网TCP端口地址4.3 测试使用固定TCP端口地址远程Oracle 前言 Oracle&#xff0c;是甲骨文公司的一款关系…

RTL8380MI/RTL8382MI管理型交换机系统软件操作指南六:RSTP/快速生成树协议

对RSTP/快速生成树协议进行详细的描述&#xff0c;主要包括以下内容&#xff1a;STP概述、RSTP介绍、全局配置、端口配置、RSTP信息、端口信息. 1.1 STP概述 STP&#xff08;Spanning Tree Protocol&#xff09;是生成树协议的英文缩写。STP协议中定义了根桥&#xff08;RootB…

报表生成器FastReport .Net用户指南:显示数据列、HTML标签

FastReport .Net是一款全功能的Windows Forms、ASP.NET和MVC报表分析解决方案&#xff0c;使用FastReport .NET可以创建独立于应用程序的.NET报表&#xff0c;同时FastReport .Net支持中文、英语等14种语言&#xff0c;可以让你的产品保证真正的国际性。 FastReport.NET官方版…