Netty通信技术进阶二

news2024/12/24 10:03:06

Netty核心组件

    • 1. Bootstrap
    • 2 Channel
    • 3. EventLoopGroup 和 EventLoop
      • 3.1 eventLoopThreads 是多少?
    • 4. ChannelHandler & ChannelHandlerContext & ChannelPipeline
      • 4.1 复用Handler
      • 4.2 ChannelInboundHandlerAdapter or SimpleChannelInboundHandler
    • 5. ByteBuf
      • 5.1 ByteBuf的三个指针
      • 5.2 容量API
      • 5.3 读写指针相关的API
      • 5.4 丢弃、清理,释放API
      • 5.5 Wrap
      • 5.6 ByteBuf三类使用模式
      • 5.7 ByteBuf 的分配器BufAllocator
      • 5.8 ByteBuf 的释放
    • 6. Netty的异步编程模型(Future & Promise)
      • 6.1 Future
        • 6.1.1 ChannelFuture
      • 6.2 Promise
        • 6.2.1 ChannelPromise
    • 6. TCP 粘包,拆包(一次编码)
      • 6.1 Netty拆包器
        • 6.1.1 LineBasedFrameDecoder(分隔符解码器)\
        • 6.1.2 LengthFieldBasedFrameDecoder(基于长度的域解码器)
        • 6.1.3 自定义解码器
    • 7. 序列化(二次编码)
      • 7.1 ProtobufDecoder & ProtobufEecoder
    • 8. Keepalive 与 idle监测
      • 8.1 TCP Keepalvie
      • 8.1 应用层Keepalvie
    • 9. 高级特性,性能优化
      • 9.1 System 参数
      • 9.2. 应用诊断
        • 9.2.1 完善线程组的线程名
        • 9.2.2 添加Handler名称 & 日志
      • 9.3 线程模型优化
        • 9.3.1 CPU密集型:运算型
        • 9.3.2 CPU密集型:运算型
      • 9.4 零拷贝-Zero Copy

1. Bootstrap

Bootstrap是引导的意思,它的作用是配置整个Netty程序,将各个组件都串起来,最后绑定端口、启动Netty服务
Netty中提供了2种类型的引导类,一种用于客户端(Bootstrap),而另一种(ServerBootstrap)用于服务器,区别在于

1、ServerBootstrap 将绑定到一个端口,因为服务器必须要监听连接,而 Bootstrap 则是由想要连接到远程节点的客户端应用程序所使用的
2、引导一个客户端只需要一个EventLoopGroup,但是一个ServerBootstrap则需要两个
在这里插入图片描述

2 Channel

Netty中的Channel是与网络套接字相关的,可以理解为是socket连接,在客户端与服务端连接的时候就会建立一个Channel,它负责基本的IO操作,比如:bind()、connect(),read(),write() 等
不同协议、不同的I/O类型的连接都有不同的 Channel 类型与之对应
主要作用:

  1. 通过Channel可获得当前网络连接的通道状态。
  2. 通过Channel可获得网络连接的配置参数(缓冲区大小等)。
  3. Channel提供异步的网络I/O操作,比如连接的建立、数据的读写、端口的绑定等。

在这里插入图片描述

3. EventLoopGroup 和 EventLoop

Netty是基于事件驱动的,比如:连接注册,连接激活;数据读取;异常事件等等,有了事件,就需要一个组件去监控事件的产生和事件的协调处理,这个组件就是EventLoop(事件循环/EventExecutor)
在Netty 中每个Channel 都会被分配到一个 EventLoop。一个 EventLoop 可以服务于多个 Channel。每个EventLoop 会占用一个 Thread,同时这个 Thread 会处理 EventLoop 上面发生的所有 IO 操作和事件。
EventLoopGroup 是用来生成 EventLoop 的,包含了一组EventLoop(可以初步理解成Netty线程池)
在这里插入图片描述

3.1 eventLoopThreads 是多少?

核心线程数默认:cpu核数*2, 核心线程数在创建时可通过构造函数指定
对于boss group,我们其实也只用到了其中的一个线程,因为服务端一般只会绑定一个端口启动

4. ChannelHandler & ChannelHandlerContext & ChannelPipeline

4.1 复用Handler

每个客户端Channel创建后初始化时,均会向与该Channel绑定的Pipeline中添加handler,此种模式下,每个Channel享有的是各自独立的Handler
如果复用的handler对象不加@Sharable注解会报错, 另外存在线程安全问题, 内部全局变量线程安全问题要自己处理

4.2 ChannelInboundHandlerAdapter or SimpleChannelInboundHandler

入站处理器,可以选择继承ChannelInboundHandlerAdapter,也可以选择继承SimpleChannelInboundHandler<T>
继承SimpleChannelInboundHandler需要重写channelRead0方法,且可以通过泛型指定msg类型, 内部会自动release释放占用的Bytebuffer资源
注意事项:服务端异步处理数据,服务端最好不要继承SimpleChannelInboundHandler, 因为内部会释放资源, 下一链路的handler获取不到数据

public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter {
    private final TypeParameterMatcher matcher;
    private final boolean autoRelease;
	...

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        boolean release = true;

        try {
            if (this.acceptInboundMessage(msg)) {
                this.channelRead0(ctx, msg);
            } else {
                release = false;
                ctx.fireChannelRead(msg);
            }
        } finally {
            if (this.autoRelease && release) {
                ReferenceCountUtil.release(msg);
            }

        }

    }

5. ByteBuf

Java NIO 提供了ByteBuffer 作为它的字节容器,但是这个类使用起来过于复杂,而且也有些繁琐。Netty使用ByteBuf来替代ByteBuffer,它是一个强大的实现,既解决了JDK API 的局限性, 又为网络应用程序的开发者提供了更好的API

5.1 ByteBuf的三个指针

readerIndex: 指示读取的起始位置, 每读取一个字节, readerIndex自增累加1。 如果readerIndex 与writerIndex 相等,ByteBuf 不可读
writerIndex: 指示写入的起始位置, 每写入一个字节, writeIndex自增累加1, 达到capacity最大容量时自动扩容, 直到maxCapacity无法写入
maxCapacity 指示ByteBuf 可以扩容的最大容量

5.2 容量API

capacity(): ByteBuf底层在内存中占用的内存大小
maxCapacity(): ByteBuf 底层最大能够占用多少字节的内存
readableBytes() 与 isReadable(): readableBytes() 表示 ByteBuf 当前可读的字节数,它的值等于writerIndex-readerIndex,如果两者相等,则不可读,isReadable() 方法返回 false
writableBytes()、 isWritable() 、maxWritableBytes(): 第一个表示可写字节数。第二个表示是否可写。第三个表示最大可写字节数

5.3 读写指针相关的API

readerIndex() 与 readerIndex(int readerIndex): 前者表示返回当前的读指针 readerIndex, 后者表示设置读指针
writeIndex() 与 writeIndex(int writerIndex): 前者表示返回当前的写指针 writerIndex, 后者表示设置写指针
markReaderIndex() 与markWriterIndex(): 表示把当前的读指针/写指针保存起来,可用resetMarkReaderIndex和resetMarkWriterIndex进行恢复
writeBytes(byte[] src): 表示把字节数组 src 里面的数据全部写到 ByteBuf,src字节数组大小的长度通常小于等于writableBytes()

 byte[] bytes = new byte[buf.writableBytes()];
 buf.writableBytes(bytes);

readBytes(byte[] dst): 把 ByteBuf 里面的数据全部读取到 dst,dst 字节数组的大小通常等于readableBytes()

ByteBuf buf = (ByteBuf) msg;
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);

writeByte(int value)、readByte(): writeByte() 表示往 ByteBuf 中写一个字节,而 readByte() 表示从 ByteBuf 中读取一个字节,类的 API 还有 writeBoolean()、writeChar()、writeShort()、writeInt()、writeLong()、writeFloat()、writeDouble() 与 readBoolean()、readChar()、readShort()、readInt()、readLong()、readFloat()、readDouble() 等等

5.4 丢弃、清理,释放API

discardReadBytes(): 丢弃已读取的字节空间,可写空间变多(瘦身)
clear(): 重置readerIndex 、 writerIndex 为0,需要注意的是,重置并没有删除真正的内容, 再次写入会进行覆盖
release(): 真正去释放bytebuf中的数据
ReferenceCountUtil.release(buf): 工具方法,内部还是调用release()

5.5 Wrap

通过Wrap操作可以快速转换或得到一个ByteBuf对象,Unpooled 工具类中提供了很多重载的wrappedBuffer方法
Unpooled 中wrappedBuffer方法不会发生数据的拷贝,而copiedBuffer会发生数据的拷贝

5.6 ByteBuf三类使用模式

堆缓冲区(HeapByteBuf): 内存分配在jvm堆, 分配和回收速度比较快,可以被JVM自动回收, 缺点是在进行IO操作时, 会将堆内存对应的缓冲区复制到内核Channel中, 多一次数据复制的过程
直接缓冲区(DirectByteBuf): 内存分配的是堆外内存(系统内存),相比堆内存,它的分配和回收速度会慢一些,但是将它写入或从Socket Channel中读取时,由于减少了一次内存拷贝,速度比堆内存块
复合缓冲区(CompositeByteBuf): 顾名思义就是将两个不同的缓冲区从逻辑上合并,让使用更加方便
关于堆外内存的理解
在使用输入输出流进行读取的时候,就牵扯到了上下文的切换,应用读取数据的时候,就是讲数据从硬盘读取到内核,在切换到了用户态从数据从内核缓存读取到应用缓存

Netty默认使用的是DirectByteBuf,如果需要使用HeapByteBuf模式,则需要进行系统参数的设置
设置HeapByteBuf模式,但ByteBuf 的分配器ByteBufAllocator要设置为非池化,否则不能切换到堆缓冲器模式
System.setProperty(“io.netty.noUnsafe”, “true”)

5.7 ByteBuf 的分配器BufAllocator

ByteBufAllocator alloc = ChannelHandlerContext: alloc();

Netty 提供了两种 ByteBufAllocator 的实现,分别是:
PooledByteBufAllocator:实现了 ByteBuf 的对象的池化,提高性能减少并最大限度地减少内存碎片,池化思想通过预先申请一块专用内存地址作为内存池进行管理,从而不需要每次都进行分配和释放
UnpooledByteBufAllocator:没有实现对象的池化,每次会生成新的对象实例

Netty默认使用了PooledByteBufAllocator,但可以通过引导类设置非池化模式 源码:DefaultChannelConfig种的allocator属性

//引导类中设置非池化模式
bootstrap.childOption(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT)
//或者通过系统参数设置
System.setProperty("io.netty.allocator.type", "pooled");
System.setProperty("io.netty.allocator.type", "unpooled");

对于Pooled类型的ByteBuf,不管是PooledDirectByteBuf还是PooledHeapByteBuf都只能由Netty内部自己使用(构造是私有和受保护的),开发者可以使用Unpooled类型的ByteBuf。
Netty提供Unpooled工具类创建的ByteBuf都是unpooled类型,默认采用的Allocator是direct类型;当然用户可以自己选择创建UnpooledDirectByteBuf和UnpooledHeapByteBuf

在这里插入图片描述

5.8 ByteBuf 的释放

ByteBuf如果采用的是堆缓冲区模式的话,可以由GC回收,但是如果采用的是直接缓冲区,就不受GC的管理,就得手动释放,否则会发生内存泄露, Netty自身引入了引用计数,提供了ReferenceCounted接口,当对象的引用计数>0时要保证对象不被释放,当为0时需要被释放
关于ByteBuf的释放,分为手动释放与自动释放:
手动释放: 就是在使用完成后,调用ReferenceCountUtil.release(byteBuf); 进行释放,这种方式的弊端就是一旦忘记释放就可能会造成内存泄露
自动释放有三种方式: 入站的TailHandler(TailContext)、继承SimpleChannelInboundHandler、HeadHandler(HeadContext)的出站释放
入站消息:
对原消息不做处理,依次调用 ctx.fireChannelRead(msg)把原消息往下传,如果能到TailContext,那不用做什么释放,它会自动释放
将原消息转化为新的消息并调用 ctx.fireChannelRead(newMsg)往下传,那需要将原消息release掉
如果已经不再调用ctx.fireChannelRead(msg)传递任何消息,需要把原消息release掉
对于出站消息: 则无需用户关心,消息最终都会走到HeadContext,flush之后会自动释放

6. Netty的异步编程模型(Future & Promise)

future和promise,目的是将值(future)与其计算方式(promise)分离,从而允许更灵活地进行计算,特别是通过并行化。Future 表示目标计算的返回值,Promise 表示计算的方式,这个模型将返回结果和计算逻辑分离,目的是为了让计算逻辑不影响返回结果,从而抽象出一套异步编程模型。而计算逻辑与结果关联的纽带就是 callback
Netty中有非常多的异步调用,譬如:client/server的启动,连接,数据的读写等操作都是支持异步的
在这里插入图片描述

6.1 Future

ChannelFuture future = ctx.writeAndFlush(buffer);
future.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        future.get();
    }
});

在这里插入图片描述

@Test
public void testFuture() throws InterruptedException, ExecutionException {
    EventLoopGroup group = new NioEventLoopGroup();

    Future<String> future = group.submit(() -> {
        log.info("异步线程执行任务开始, time={}", LocalDateTime.now().toString());
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("异步线程执行任务结束, time={}", LocalDateTime.now().toString());
        return "hello netty future";
    });
    //设置监听
    future.addListener(future1 -> {
        log.info("收到异步线程执行任务结果通知----执行结果是;{},time={}", future1.get(), LocalDateTime.now().toString());
    });
    log.info("---主线程----");
    TimeUnit.SECONDS.sleep(10);
}

6.1.1 ChannelFuture

ChannelFuture:跟Channel的操作有关,Netty中的Handler处理都是异步IO,通过ChannelFuture添加事件监听,可获取Channel异步IO操作的结果;当然也可等待获取,但最好不要在handler中通过future的sync或await来获取异步操作的结果

6.2 Promise

Netty的Future,只是增加了监听器。整个异步的状态,是不能进行设置和修改的,于是Netty的 Promise接口扩展了
Netty的Future接口,可以设置异步执行的结果(callable机制)。在IO操作过程,如果顺利完成、或者发生异常,都可以设置Promise的
结果,并且通知Promise的Listener

@Test
public void testPromise() throws InterruptedException {
    EventLoopGroup group = new NioEventLoopGroup();

    Promise<String> promise = new DefaultPromise<>(group.next()); //promise绑定到eventloop上
    group.submit(() -> {
        try {
            // int i = 1/0;
            TimeUnit.SECONDS.sleep(3);
            promise.setSuccess("hello netty promise");
            TimeUnit.SECONDS.sleep(3);
            promise.setSuccess("hello netty promise2");
            log.info("异步线程执行任务结束, time={}", LocalDateTime.now().toString());
            log.info("finish");
        } catch (Exception e) {
            log.error("exception", e);
            promise.setFailure(e);
        }
    });
    //设置监听回调
    promise.addListener(future -> {
        future.get();
        log.info("异步任务执行结果: {}", future.isSuccess());
    });

    log.info("主线程");
    TimeUnit.SECONDS.sleep(100);
}

6.2.1 ChannelPromise

ChannelPromise接口,则继承扩展了Promise和ChannelFuture。所以,ChannelPromise既绑定了Channel,又具备
了设置监听回调的功能,还可以设置IO操作的结果,是Netty实际编程使用的最多的接口
在这里插入图片描述

6. TCP 粘包,拆包(一次编码)

在这里插入图片描述
造成上述原因的根本原因: TCP 协议是面向连接的、可靠的、基于字节流的传输层通信协议,是一种流式协议,消息无边界
解决TCP粘包,半包问题的根本:找出消息的边界
在这里插入图片描述

6.1 Netty拆包器

Netty提供了针对封装成帧这种形式下不同方式的拆包器,所谓的拆包其实就是数据的解码,所谓解码就是将网络中的一些原始数据解码成上层应用的数据,那对应在发送数据的时候要按照同样的方式进行数据的编码操作然后发送到网络中
在这里插入图片描述

6.1.1 LineBasedFrameDecoder(分隔符解码器)\

// 解码
// 默认以换行符进行分割, 参数最大解析长度, 一般Integer.MAX_VALUE
pipeline.addLast(new LineBasedFrameDecoder(65536));
// 指定分隔符
ByteBuf buf = ch.alloc().buffer().writeBytes("$".getBytes(StandardCharsets.UTF_8));
pipeline.addLast(new DelimiterBasedFrameDecoder(65536,buf));

编码代码中自定义设定

6.1.2 LengthFieldBasedFrameDecoder(基于长度的域解码器)

通过指定数组的边界, 有header和body, 如前4个字节存储的是内容的长度, 后面的标识内容
lengthFieldOffset:length域的偏移,正常情况下读取数据从偏移为0处开始读取,如果有需要可以从其他偏移量处开始读取
lengthFieldLength:length域占用的字节数
lengthAdjustment:在length域和content域中间是否需要填充其他字节数
initialBytesToStrip:解码后跳过的字节数

// 编码
pipeline.addLast(new LengthFieldPrepender(4));
// 解码
pipeline.addLast(new LengthFieldBasedFrameDecoder(65536, 0, 4, 0, 4));

6.1.3 自定义解码器

Netty中提供了ByteToMessageDecoder的抽象实现,自定义解码器只需要继承该类,实现decode()即可。Netty也提供了一些常用的解码器实现,用于数据入站的解码操作,基本都是开箱即用的;当然数据出站也需要采用对应的编码器
在这里插入图片描述

7. 序列化(二次编码)

我们把解决半包粘包问题的常用三种解码器叫一次解码器,其作用是将原始数据流(可能会出现粘包和半包的数据流)转换为用户数据(ByteBuf中存储),但仍然是字节数据,所以我们需要二次解码器将字节数组转换为java对象,或者将将一种格式转化为另一种格式,方便上层应用程序使用
一次解码器继承自:ByteToMessageDecoder;二次解码器继承自:MessageToMessageDecoder;但他们的本质都是继承ChannelInboundHandlerAdapter

7.1 ProtobufDecoder & ProtobufEecoder

protostuff是一个基于protobuf实现的序列化方法,它较于protobuf最明显的好处是,在几乎不损耗性能的情况下做到了不用我们
写.proto文件来实现序列化, github

/**
 * protostuff编解码工具类
 * @author: xxs
 * @create:2023-04-17 19:31
 */
@Slf4j
public class ProtostuffUtil {
	
	//存储因为无法直接序列化/反序列化 而需要被包装的类型Class
	private static final Set<Class<?>> WRAPPER_SET = new HashSet<Class<?>>();
	
	static {
		WRAPPER_SET.add(List.class);
		WRAPPER_SET.add(ArrayList.class);
		WRAPPER_SET.add(CopyOnWriteArrayList.class);
		WRAPPER_SET.add(LinkedList.class);
		WRAPPER_SET.add(Stack.class);
		WRAPPER_SET.add(Vector.class);
		WRAPPER_SET.add(Map.class);
		WRAPPER_SET.add(HashMap.class);
		WRAPPER_SET.add(TreeMap.class);
		WRAPPER_SET.add(LinkedHashMap.class);
		WRAPPER_SET.add(Hashtable.class);
		WRAPPER_SET.add(SortedMap.class);
		WRAPPER_SET.add(Object.class);
	}
	
	//注册需要使用包装类进行序列化的Class对象
	public static void registerWrapperClass(Class<?> clazz) {
		WRAPPER_SET.add(clazz);
	}
	
	/**
	 * 将对象序列化为字节数组
	 * @param t
	 * @param useWrapper 为true完全使用包装模式 为false则选择性的使用包装模式
	 * @param <T>
	 * @return
	 */
	public static <T> byte[] serialize(T t,boolean useWrapper) {
		Object serializerObj = t;
		if (useWrapper) {
			serializerObj = SerializeDeserializeWrapper.build(t);
		}
		return serialize(serializerObj);
	}
	
	/**
	 * 将对象序列化为字节数组
	 * @param t
	 * @param <T>
	 * @return
	 */
	public static <T> byte[] serialize(T t) {
		//获取序列化对象的class
		Class<T> clazz = (Class<T>) t.getClass();
		Object serializerObj = t;
		if (WRAPPER_SET.contains(clazz)) {
			serializerObj = SerializeDeserializeWrapper.build(t);//将原始序列化对象进行包装
		}
		return doSerialize(serializerObj);
	}
	
	
	/**
	 * 执行序列化
	 * @param t
	 * @param <T>
	 * @return
	 */
	public static <T> byte[] doSerialize(T t) {
		//获取序列化对象的class
		Class<T> clazz = (Class<T>) t.getClass();
		//获取Schema
		// RuntimeSchema<T> schema = RuntimeSchema.createFrom(clazz);//根据给定的class创建schema
		/**
		 * this is lazily created and cached by RuntimeSchema
		 * so its safe to call RuntimeSchema.getSchema() over and over The getSchema method is also thread-safe
		 */
		Schema<T> schema = RuntimeSchema.getSchema(clazz);//内部有缓存机制
		/**
		 * Re-use (manage) this buffer to avoid allocating on every serialization
		 */
		LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
		byte[] protostuff = null;
		try {
			protostuff = ProtostuffIOUtil.toByteArray(t, schema, buffer);
		} catch (Exception e){
			log.error("protostuff serialize error,{}",e.getMessage());
		}finally {
			buffer.clear();
		}
		return protostuff;
	}
	
	
	/**
	 * 反序列化
	 * @param data
	 * @param clazz
	 * @param <T>
	 * @return
	 */
	public static <T> T deserialize(byte[] data,Class<T> clazz) {
		//判断是否经过包装
		if (WRAPPER_SET.contains(clazz)) {
			SerializeDeserializeWrapper<T> wrapper = new SerializeDeserializeWrapper<T>();
			ProtostuffIOUtil.mergeFrom(data,wrapper,RuntimeSchema.getSchema(SerializeDeserializeWrapper.class));
			return wrapper.getData();
		}else {
			Schema<T> schema = RuntimeSchema.getSchema(clazz);
			T newMessage = schema.newMessage();
			ProtostuffIOUtil.mergeFrom(data,newMessage,schema);
			return newMessage;
		}
	}
	
	
	private static class SerializeDeserializeWrapper<T> {
		//被包装的数据
		T data;
		
		public static <T> SerializeDeserializeWrapper<T> build(T data){
			SerializeDeserializeWrapper<T> wrapper = new SerializeDeserializeWrapper<T>();
			wrapper.setData(data);
			return wrapper;
		}
		
		public T getData() {
			return data;
		}
		
		public void setData(T data) {
			this.data = data;
		}
	}
}
@Slf4j
public class ProtoStuffDecoder extends MessageToMessageDecoder<ByteBuf> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
        try {
            int length = msg.readableBytes();
            byte[] bytes = new byte[length];
            msg.readBytes(bytes);
            UserInfo userInfo = ProtostuffUtil.deserialize(bytes, UserInfo.class);
            out.add(userInfo);
        } catch (Exception e) {
            log.error("protostuff decode error, msg={}", e.getMessage());
            throw new RuntimeException(e);
        }
    }
}
@Slf4j
public class ProtoStuffEncoder extends MessageToMessageEncoder<UserInfo> {

    @Override
    protected void encode(ChannelHandlerContext ctx, UserInfo msg, List<Object> out) throws Exception {
        try {
            byte[] bytes = ProtostuffUtil.serialize(msg);
            ByteBuf buffer = ctx.alloc().buffer(bytes.length);
            buffer.writeBytes(bytes);
            out.add(buffer);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

8. Keepalive 与 idle监测

在这里插入图片描述

8.1 TCP Keepalvie

在这里插入图片描述

sysctl -algrep tcp_ keepalive
net.ipv4.tcp_ keepalive_ time = 7200
net.ipv4.tcp_ keepalive_ intvl= 75
net.ipv4.tcp_ keepalive_ probes= 9
TCP层的KEEPALIVE时间太长, 默认>2小时,虽然可改,但属于系统参数,改动影响所有应用

// server开启tcp keepalive
 .option(ChannelOption.SO_BACKLOG,1024) // 设置连接队列大小
 .childOption(ChannelOption.SO_KEEPALIVE,true)// 开始tcp keepalive
 .childOption(ChannelOption.TCP_NODELAY,true)// Nagle算法是将小的数据包组装为更大的帧然后进行发送,会有延迟 false开启, trur关闭 
 .handler(new LoggingHandler(LogLevel.INFO))// 日志

8.1 应用层Keepalvie

Idle 监测,只是负责诊断,诊断后,做出不同的行为,决定Idle 监测的最终用途,一般用来配合keepalive ,减少keepalive 消息
空闲监测+ 判定为Idle 时才发keepalive,有其他数据传输的时候,不发送keepalive ,无数据传输超过一定时间,判定为Idle,再发keepalive

服务器添加read idle check,10s接收不到channel数据就断掉连接,保护自己,瘦身
客户端添加write idle check + keepalive,5s不发送数据就发送一个keepalive,避免连接被断,也避免频繁keepalive
服务端

pipeline.addLast(new ServerReadIdleHandler());
@Slf4j
public class ServerReadIdleHandler extends IdleStateHandler {

    public ServerReadIdleHandler() {
        super(10, 0, 0, TimeUnit.SECONDS);
    }


    @Override
    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
        log.info("server channel idle");
        if (evt == IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT) {
            ctx.close();
            log.info("server read idle , close channel...");
            return;
        }
        super.channelIdle(ctx, evt);
    }
}

客户端

pipeline.addLast(new ClientWriterIdleHandler());
@Slf4j
public class ClientWriterIdleHandler extends IdleStateHandler {

    public ClientWriterIdleHandler() {
        super(0, 5, 0, TimeUnit.SECONDS);
    }

    @Override
    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
        if (evt == IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT) {
            //发送keepalive消息
            UserInfo userInfo = new UserInfo();
            userInfo.setName("keeepalive");
            ctx.channel().writeAndFlush(userInfo);
        }
        super.channelIdle(ctx, evt);
    }
}

9. 高级特性,性能优化

9.1 System 参数

针对ServerScoketChannel,通过.childOption设置:
1、SO_KEEPALIVE,tcp层keepalvie,默认关闭,一般选择关闭tcp keepalive 而使用应用keepalive
2、TCP_NODELAY:设置是否启用nagle算法,该算法是tcp在发送数据时将小的、碎片化的数据拼接成一个大的报文一起发送,以此来提高效率,默认是false(启用),如果启用可能会导致有些数据有延时,如果业务不能忍受,小报文也需要立即发送则可以禁用该算法
3、SO_BACKLOG:最大等待连接数量,netty在linux下该值的获取是通过:io.netty.util.NetUtil完成的

9.2. 应用诊断

9.2.1 完善线程组的线程名

  EventLoopGroup boss = new NioEventLoopGroup(1, new DefaultThreadFactory("boss"));
  EventLoopGroup worker = new NioEventLoopGroup(0, new DefaultThreadFactory("worker"));

9.2.2 添加Handler名称 & 日志

pipeline.addLast("protoStuffDecoder", new ProtoStuffDecoder());

9.3 线程模型优化

9.3.1 CPU密集型:运算型

保持当前线程模型:
1、Runtime.getRuntime().availableProcessors() * 2
2、io.netty.availableProcessors *2 【docker】
3、io.netty.eventLoopThreads
还需要具体调优,根据业务执行时间不断调整找到一个合适的值

9.3.2 CPU密集型:运算型

需要整改线程模型:需要独立的“业务线程池”来处理业务(执行handler)
1、在handler内部使用JDK Executors
2、向pipeline中添加handle时使用netty提供的UnorderedTheadPoolEventExector

EventExecutorGroup business = new UnorderedThreadPoolEventExecutor(NettyRuntime.availableProcessors() * 2, new DefaultThreadFactory("business"));

// 执行业务
pipeline.addLast(business, "tcptesthandler", new TcpStickHalfHandler());

9.4 零拷贝-Zero Copy

系统内核处理 IO 操作分为两个阶段:等待数据和拷贝数据:
1、等待数据,就是系统内核在等待网卡接收到数据后,把数据写到内核中。
2、拷贝数据,就是系统内核在获取到数据后,将数据拷贝到用户进程的空间中。
在这里插入图片描述
1、Netty接收和发送ByteBuffer采用DirectBuffer,使用堆外直接内存进行Socket读写,不需要进行字节缓冲区的二次拷贝
2、Netty提供CompositeByteBuf组合缓冲区类,可以将多个 ByteBuf合并为一个逻辑上的ByteBufer,避免了各个ByteBufer之间的拷贝,将几个小buffer合并成一个大buffer的繁琐操作
3、通过 wrap 操作, 我们可以将 byte[] 数组、ByteBuf、ByteBuffer等包装成一个 Netty ByteBuf 对象, 进而避免了拷贝操作
4、通过 FileRegion 包装的FileChannel.tranferTo (Java nio)实现文件传输, 可以直接将文件缓冲区的数据发送到目标
Channel, 避免了传统通过循环 write 方式导致的内存拷贝问题

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/432513.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux系统之部署ZFile在线网盘服务

Linux系统之部署ZFile在线网盘服务 一、ZFile介绍1.ZFile简介2.ZFile特点 二、本地环境介绍1.本次实践说明2.本地环境规划 三、安装环境依赖1.安装java2.检查java版本 四、下载ZFile软件1.创建安装部署目录2.声明安装路径3.下载ZFile软件包4.解压ZFile软件包5.授权启动停止脚本…

FOC专题--环路PID算法拆分分析

foc中&#xff0c;其实foc算法并不是最难理解的&#xff0c;反而是在其中使用的PID算法&#xff0c;之前我只会套用别人的代码&#xff0c;但并不理解其中的各参数含义&#xff0c;导致在实际调整PI参数的时候&#xff0c;很难调到合适的值。 在实际理解什么是PID算法以及各参数…

【C++引用 】

目录 前言一、引用的概念二、使用引用时注意事项三、引用的使用场景及优势四、常引用、引用的权限五、引用和指针的区别 前言 相信大家应该在网上看过这样的段子。 大家都知道鲁迅原名周树人&#xff0c;浙江绍兴人。"鲁迅"是"周树人"的别名或者说是笔名…

TCP版本的 echo server 和 echo client

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言Tcp的api展示服务端客户端存在的问题解决问题服务端和客户端的大致流程 前言 上一篇文章我们介绍了UCP的客户端与服务器的一个简单实例,这篇我们简单的来介绍T…

111.【金橘社区1.0】

金橘社区1.0版本 (一)、SpringBoot整合SpringSecurity1.导入依赖2.数据库3.登入表单4. 添加配置类 SecurityConfig5.接口实现类 CkqnUserServiceImpl6.前端认证问题 (二)、SpringBoot整合Ajax1.登入表单2. JavaScript (三)、SpringBoot整合editor.md1.编写页面(1).前端页面(2).…

Windows安装RedisJSON(无需编译)

文章目录 Windows安装RedisJSON下载解压配置文件启动服务启动客户端 Windows安装RedisJSON 下载 打开网址 https://github.com/zkteco-home/RedisJson。 在网页的右上角&#xff0c;点击“Code”按钮&#xff0c;然后选择“Download ZIP”以下载最新版本的RedisJSON。 网盘 …

C6678-控制GPIO输入/输出

C6678-控制GPIO输入/输出 术语寄存器起始地址原理输入输出测试中断功能原理中断原理框图芯片中断控制器原理框图内核中断控制器原理框图中断路由架构一级中断表二级中断表CIC0二级中断CIC1二级中断CIC2二级中断CIC3 中断演示代码参考资料 术语 NMI&#xff1a; 不可屏蔽中断CI…

6、在vscode上利用cmake创建第一个简单C++程序

文章目录 &#xff08;1&#xff09;前期准备工作&#xff1a;即安装对应的环境1&#xff09;在vscode上安装插件&#xff1a;C/C、Cmake、CMake tools2&#xff09;安装Cmake环境&#xff08;这是在前面博客提到的已经安装好gcc等环境的前提下进行的&#xff09; &#xff08;…

Springboot基础学习之(二十二):异步任务和邮件任务

方向一&#xff1a;高效学习方法分享 我认为学习的最好的办法就是做笔记:本人特别喜欢在网上学习一些课堂外的知识&#xff0c;但是如果你没有及时的复习&#xff0c;要想找到自己想要的知识该怎么办呢&#xff1f;对&#xff0c;就是做笔记我在csdn这个app发的所有内容都是笔记…

Android UI布局优化之include、merge与ViewStub标签的巧用方法

前言 在开发中UI布局是我们都会遇到的问题&#xff0c;随着UI越来越多&#xff0c;布局的重复性、复杂度也会随之增长。 相信大家经常听到include、merge、ViewStub这样的标签&#xff0c;官方也提到这三种布局可用于布局的优化。今天就介绍下这三种布局的使用&#xff0c;记…

SRv6项目实践(一):环境与工具介绍

在一切开始之前&#xff0c;首先介绍一下我们要做什么&#xff0c;做这个要有什么基础&#xff0c;以及实现的环境 1&#xff0c;实验目标与实验基础 我们要在图下图所示的拓扑中&#xff0c;完成在如以下拓扑所示的网络中&#xff0c;配合ONOS实现基本的L2L3转发以及SRv6&am…

港联证券|券商再迎利好!这一比例大幅下调,释放300亿资金

券商利好接二连三。 4月10日&#xff0c;即日起下调证券公司转融通保证金份额。资信优质的公司&#xff0c;保证金份额由20%下调至5%&#xff1b;资信杰出的公司&#xff0c;由20%下调至10%&#xff1b;其余公司由25%下调至15%。中证金融预计&#xff0c;保证金份额下调后&…

Linux系统之tomcat的安装方法

Linux系统之tomcat的安装方法一、tomcat介绍1.tomcat简介2.tomcat官网二、本次环境规划三、安装jdk1.下载jdk包2.安装jdk3.检查jdk版本四、安装tomcat1.下载tomcat2.解压tomcat软件包3.设置环境变量4.查看tomcat版本五、启动tomcat1.启动tomcat服务2.检查tomcat服务状态3.访问t…

原来情感可以这样影响用户体验设计

&#x1f525;情绪的基本情况 Emotion&#xff1a;即刻的生理反应&#xff0c; Feeling&#xff1a;物理的或者心理上的&#xff0c;是emotion经过思考后的 Mood&#xff1a;持续时间更长&#xff0c;是一种状态&#xff0c;受到很多因素影响&#xff08;天气、睡眠&#x…

92-TCP三次握手及TCP四次挥手

TCP三次握手及TCP四次挥手1.tcp三次握手(1)tcp的特点(2)tcp三次握手发生在什么阶段(3)tcp协议报头(4)tcp三次握手的流程2.tcp四次挥手(1)tcp四次挥手发生在什么阶段(2)tcp四次挥手的流程(3)能不能将服务器发端发送的ACK和FIN放在一起发送呢1.tcp三次握手 (1)tcp的特点 TCP 协…

十六、市场活动:查看市场活动明细(二)

功能需求 点击市场活动名称链接,跳转到明细页面,查看市场活动明细 -市场活动的基本信息 -市场活动下所有的备注信息 功能分析 流程图 代码实现 一、ActivityRemarkMapper 1.ActivityRemarkMapper接口 /*** 根据市场活动id查询备注*/List<ActivityRemark> selectActivi…

贪吃蛇小项目

1.总体程序 #include <curses.h> #include <stdlib.h> #include <pthread.h> #include <math.h> #include <time.h> struct Snake //贪吃蛇身子节点 {char node; //节点序号int row; //行坐标int column; //列坐标stru…

探索实践低光照场景下YOLOv5s模型上限,融合CBAM注意力机制开发构建基于改进YOLOv5s的低光照条件下目标检测识别分析系统

在现实生活场景里面&#xff0c;很多场景下光线光照条件都是比较差的&#xff0c;比如夜晚、室内等&#xff0c;这时候以往的目标检测模型是否还能够胜任我们所需的目标检测任务呢&#xff1f;这里主要的想法就是基于地光线条件下的数据集来开发构建目标检测系统&#xff0c;探…

Flowable6.x导出/查看/跟踪流程图(续)

书接上回 项目源码仓库 无论是待办、已办&#xff0c;亦或是流转中、已结束的流程实例&#xff0c;通过使用JS绘制SVG格式的交互式流程图&#xff0c;与以上篇博文中三种方式相比&#xff0c;在效果上都具有明显优势。 运行效果如下图所示&#xff1a; 整合、改造Flowable中…

110.【23种设计模式--创建者模式】

Java 23种设计模式 (一)、设计模式相关内容介绍1.软件设计模式概述(1).软件设计模式的产生背景(2).软件设计模式的概念(3).学习设计模式的重要性(4).设计模式分类 2.UML图(1).类图概述(2).类图的作用(3).类图表示法 3.软件设计原则(1).开闭原则 (重写不修改)(2).里氏代换原则 (…