Spring架构篇--2.7.3 远程通信基础--Netty原理--bind实现端口的绑定

news2025/1/17 5:55:46

前言:在对ServerBootstrap 进行属性赋值之后,通过bind 方法完成端口的绑定,并开始在NioEventLoop中进行轮询进行事件的处理;本文主要探究ServersocketChannel 在netty 中是如何完成注册,以及端口的绑定

1 Nio selector 多路复用模型:

为了更好的探究netty 的channel 的注册和端口的绑定,先来回顾下Nio selector 的事件注册以及端口绑定步骤以及netty 步骤的对应:
在这里插入图片描述
可以看到在 nio 中的步骤,在netty 也都是有的;然后对以上步骤的关键点在netty 中实现进行探究;

2 跟踪bind 方法: server.bind(this.port).sync()

2.1 AbstractBootstrap 类 server.bind(this.port):
AbstractBootstrap bind 完成端口的绑定和监听

public ChannelFuture bind(int inetPort) {
    return this.bind(new InetSocketAddress(inetPort));
}

使用new InetSocketAddress(inetPort) 构建要监听的端口,然后通过bind 方法进行绑定
AbstractBootstrap 的bind 方法:

public ChannelFuture bind(SocketAddress localAddress) {
	// 验证父类事件处理 的EventLoopGroup 和NioServerSocketChannel 工厂类是否赋值
    this.validate();
    // 端口的绑定和启动
    return this.doBind((SocketAddress)ObjectUtil.checkNotNull(localAddress, "localAddress"));
}

关键点 :2.2 进入 AbstractBootstrap doBind:在这里开始真正进行channel 管道的初始化,注册占位事件,完成端口的绑定

private ChannelFuture doBind(final SocketAddress localAddress) {
// 初始化NioServerSocketChannel 对象并赋值 regFuture 
    final ChannelFuture regFuture = this.initAndRegister();
// 得到NioServerSocketChannel  channel 对象
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    } else if (regFuture.isDone()) {
// 通道初始化完成
        ChannelPromise promise = channel.newPromise();
// 地址绑定
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    promise.setFailure(cause);
                } else {
                    promise.registered();
                    AbstractBootstrap.doBind0(regFuture, channel, localAddress, promise);
                }

            }
        });
        return promise;
    }
}

关键点 :2.2.1 this.initAndRegister():NioServerSocketChannel 对象的初始化,以及channel 管道的占位事件注册:
AbstractBootstrap 类中 initAndRegister 方法

final ChannelFuture initAndRegister() {
    Channel channel = null;

    try {
//  调用NioServerSocketChannel 无参构造方法 初始化 NioServerSocketChannel 对象
        channel = this.channelFactory.newChannel();
// NioServerSocketChannel 对象初始化
        this.init(channel);
    } catch (Throwable var3) {
        if (channel != null) {
            channel.unsafe().closeForcibly();
            return (new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE)).setFailure(var3);
        }

        return (new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE)).setFailure(var3);
    }
// 从父类group 中的EventLoopGroup 选出一个EventLoop 并注册到channel 管道上
    ChannelFuture regFuture = this.config().group().register(channel);
    if (regFuture.cause() != null) {
// 注册失败 关闭ServerSocketChannel  通道
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
// 返回注册的结果, 由于是异步的方法 返回regFuture 
    return regFuture;
}

关键点 2.2.1.1 this.channelFactory.newChannel() 的方法:NioServerSocketChannel 以及jdk 原生的ServerSocketChannel 对象创建:

(1) this.channelFactory.newChannel() 通过反射机制调用NioServerSocketChannel 的鬼样子方法中,进行jdk 原生的ServerSocketChannel 对象创建:

// 获取 WindowsSelectorProvider 的实例对象
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

private static java.nio.channels.ServerSocketChannel newSocket(SelectorProvider provider) {
    try {
    	// 返回ServerSocketChannelImpl 的实例对象
        return provider.openServerSocketChannel();
    } catch (IOException var2) {
        throw new ChannelException("Failed to open a server socket.", var2);
    }
}

this(newSocket(DEFAULT_SELECTOR_PROVIDER));
NioServerSocketChannel 的this 方法:

public NioServerSocketChannel(java.nio.channels.ServerSocketChannel channel) {
// 父类方法对ServerSocketChannel  对象设置属性
    super((Channel)null, channel, 16);
// ServerSocketChannel   的config 对象进行初始化
//   private final ServerSocketChannelConfig config;
    this.config = new NioServerSocketChannelConfig(this, this.javaChannel().socket());
}

super((Channel)null, channel, 16) 调用 AbstractNioMessageChannel:这里可以看到把感兴趣的事件accept 16 作为参数进行了传递

// parent 为null ,ch 为ServerSocketChannel channel 对象,
// readInterestOp为int OP_ACCEPT = 1 << 4; 对应客户端连接服务器事件
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent, ch, readInterestOp);
}

关键点(2)继续进父类的调用:AbstractNioChannel: 对java 原生的ServerSocketChannel 的channel 管道的非阻塞属性进行了设置

private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractNioChannel.class);
private final SelectableChannel ch;
protected final int readInterestOp;
volatile SelectionKey selectionKey;
boolean readPending;
private final Runnable clearReadPendingRunnable = new Runnable() {
    public void run() {
        AbstractNioChannel.this.clearReadPending0();
    }
};
private ChannelPromise connectPromise;
private ScheduledFuture<?> connectTimeoutFuture;
private SocketAddress requestedRemoteAddress;

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
//  将ServerSocketChannel 对象进行赋值
    this.ch = ch;
// 感兴趣的事件 进行赋值 16 final int OP_ACCEPT = 1 << 4; 对应客户端连接服务器事件
    this.readInterestOp = readInterestOp;

    try {
		// 设置管道非阻塞
        ch.configureBlocking(false);
    } catch (IOException var7) {
        try {
            ch.close();
        } catch (IOException var6) {
            logger.warn("Failed to close a partially initialized socket.", var6);
        }

        throw new ChannelException("Failed to enter non-blocking mode.", var7);
    }
}

关键点(3) super(parent); 调用父类:AbstractChannel:这里对java 原生的ServerSocketChannel 的pipeline 进行双向链表的初始化,以便后续加入对accept事件handler 的加入;

private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class);
private final Channel parent;
private final ChannelId id;
private final Channel.Unsafe unsafe;
private final DefaultChannelPipeline pipeline;
private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
private final CloseFuture closeFuture = new CloseFuture(this);
private volatile SocketAddress localAddress;
private volatile SocketAddress remoteAddress;
private volatile EventLoop eventLoop;
private volatile boolean registered;
private boolean closeInitiated;
private Throwable initialCloseCause;
private boolean strValActive;
private String strVal;

protected AbstractChannel(Channel parent) {
// parent 为null
    this.parent = parent;
// chanel 通道设置id
    this.id = this.newId();
// chanel 通过设置文件描述
    this.unsafe = this.newUnsafe();
// 当前通道初始 pipeline 
    this.pipeline = this.newChannelPipeline();
}

this.newChannelPipeline():

protected DefaultChannelPipeline newChannelPipeline() {
    return new DefaultChannelPipeline(this);
}

DefaultChannelPipeline 类中对pipeline 完成初始化:

static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
private static final String HEAD_NAME = generateName0(HeadContext.class);
private static final String TAIL_NAME = generateName0(TailContext.class);
private static final FastThreadLocal<Map<Class<?>, String>> nameCaches = new FastThreadLocal<Map<Class<?>, String>>() {
    protected Map<Class<?>, String> initialValue() {
        return new WeakHashMap();
    }
};
private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, MessageSizeEstimator.Handle> ESTIMATOR = AtomicReferenceFieldUpdater.newUpdater(DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle");
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
private final Channel channel;
private final ChannelFuture succeededFuture;
private final VoidChannelPromise voidPromise;
private final boolean touch = ResourceLeakDetector.isEnabled();
private Map<EventExecutorGroup, EventExecutor> childExecutors;
private volatile MessageSizeEstimator.Handle estimatorHandle;
private boolean firstRegistration = true;
private PendingHandlerCallback pendingHandlerCallbackHead;
private boolean registered;
// channel 为 NioServerSocketChannel对象
protected DefaultChannelPipeline(Channel channel) {
// chanel 赋值
    this.channel = (Channel)ObjectUtil.checkNotNull(channel, "channel");
    this.succeededFuture = new SucceededChannelFuture(channel, (EventExecutor)null);
// 管道初始化异常 异常原因 promise
    this.voidPromise = new VoidChannelPromise(channel, true);
// pipeline 双向链表设置
    this.tail = new TailContext(this);
    this.head = new HeadContext(this);
    this.head.next = this.tail;
    this.tail.prev = this.head;
}

(4)NioServerSocketChannel 的config 初始化:在完成了对NioServerSocketChannel 对象的创建并初始化pipeline和设置其为非阻塞流之后;回到 NioServerSocketChannel 类中:

this.config = new NioServerSocketChannelConfig(this, this.javaChannel().socket())

private NioServerSocketChannelConfig(NioServerSocketChannel channel, ServerSocket javaSocket) {
    super(channel, javaSocket);
}

super(channel, javaSocket) 方法调用到DefaultServerSocketChannelConfig 类:

protected final ServerSocket javaSocket;
private volatile int backlog;

public DefaultServerSocketChannelConfig(ServerSocketChannel channel, ServerSocket javaSocket) {
    super(channel);
    this.backlog = NetUtil.SOMAXCONN;
	// ServerSocket  进行赋值
    this.javaSocket = (ServerSocket)ObjectUtil.checkNotNull(javaSocket, "javaSocket");
}

继续super(channel);调用到 DefaultChannelConfig

//  channel 为 ServerSocketChannel 对象
public DefaultChannelConfig(Channel channel) {
    this(channel, new AdaptiveRecvByteBufAllocator());
}
protected DefaultChannelConfig(Channel channel, RecvByteBufAllocator allocator) {
// channel NioServerSocketChannel   allocator:通道所用的缓冲区
    this.allocator = ByteBufAllocator.DEFAULT;
    this.msgSizeEstimator = DEFAULT_MSG_SIZE_ESTIMATOR;
// 通道属性的赋值
    this.connectTimeoutMillis = 30000;
    this.writeSpinCount = 16;
    this.autoRead = 1;
    this.autoClose = true;
    this.writeBufferWaterMark = WriteBufferWaterMark.DEFAULT;
    this.pinEventExecutor = true;
    this.setRecvByteBufAllocator(allocator, channel.metadata());
    this.channel = channel;
}

2.2.1.2 继续看 initAndRegister() 中的this.init(channel) 方法:

// channel为 NioServerSocketChannel  对象
void init(Channel channel) {
	// 将父类AbstractBootstrap 的options 属性赋值到 NioServerSocketChannel  对象
    setChannelOptions(channel, this.newOptionsArray(), logger);
    // 将父类AbstractBootstrap 的attrs属性赋值到 NioServerSocketChannel  对象
    setAttributes(channel, (Map.Entry[])this.attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
// 获取  ServerSocketChannel  初始化的pipeline DetaultChannelPipeline
  ChannelPipeline p = channel.pipeline();
// 获取 子事件轮询组
    final EventLoopGroup currentChildGroup = this.childGroup;
// 获取子事件的 业务逻辑处理任务
    final ChannelHandler currentChildHandler = this.childHandler;
// 获取的子事件中设置的配置项
    final Map.Entry[] currentChildOptions;
    synchronized(this.childOptions) {
        currentChildOptions = (Map.Entry[])this.childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
    }
// 子事件设置的属性
    final Map.Entry<AttributeKey<?>, Object>[] currentChildAttrs = (Map.Entry[])this.childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
    p.addLast(new ChannelHandler[]{new ChannelInitializer<Channel>() {
        public void initChannel(final Channel ch) {
// 获取 ServerSocketChannel 的pipeline
            final ChannelPipeline pipeline = ch.pipeline();
// 添加 原有的handler
            ChannelHandler handler = ServerBootstrap.this.config.handler();
            if (handler != null) {
                pipeline.addLast(new ChannelHandler[]{handler});
            }
// 添加 Acceptor hadler 到 ServerSocketChannel 的pipeline 双向链表中
            ch.eventLoop().execute(new Runnable() {
                public void run() {
                    pipeline.addLast(new ChannelHandler[]{new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)});
                }
            });
        }
    }});
}

关键点:(1)下面代码向ServerSocketChannel 中的pipeline 在双向链表的尾部增加了一个处理客户端连接事件的handler,并且可以看到这个添加handler 的操作是在ServerSocketChannel 初始化完成之后 由eventLoop 中的线程执行了任务:handler 在增加之后双向链表的构造:header->acceptor->tail

pipeline.addLast(new ChannelHandler[]{new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)})

到这里可以做下小结:

  • this.channelFactory.newChannel() 完成了对NioServerSocketChannel 的创建,此处可以对应NIo 中的 SocketChannel socketChannel = SocketChannel.open();
  • this.init(channel); 对创建的SocketChannel 进行属性的初始化,对于事件处理的Pipeline进行双向链表的初始化,此处可以对应NIo 中的 socketChannel.configureBlocking(false);
  • 并且在ServerSocketChannel 真正完成初始化之后会有nio 的Eventloop 线程将ServerBootstrapAcceptor的handler 增加到ServerSocketChannel pipeline 链表中;

(2)再回到 AbstractBootstrap 的 initAndRegister() 方法中:看下事件是如果被注册到ServerSocketChannel 上:

final ChannelFuture initAndRegister() {
    Channel channel = null;

    try {
//  调用NioServerSocketChannel 无参构造方法 初始化 NioServerSocketChannel 对象
        channel = this.channelFactory.newChannel();
// NioServerSocketChannel 对象初始化
        this.init(channel);
    } catch (Throwable var3) {
        if (channel != null) {
            channel.unsafe().closeForcibly();
            return (new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE)).setFailure(var3);
        }

        return (new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE)).setFailure(var3);
    }
// 从父类group 中的EventLoopGroup 选出一个EventLoop 并注册到channel 管道上
    ChannelFuture regFuture = this.config().group().register(channel);
    if (regFuture.cause() != null) {
// 注册失败 关闭ServerSocketChannel  通道
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
// 返回注册的结果
    return regFuture;
}

关键点(3): ChannelFuture regFuture = this.config().group().register(channel):事件轮询器注册到ServerSocketChannel 对象上:
调用 MultithreadEventLoopGroup 中的register 方法

public ChannelFuture register(Channel channel) {
    return this.next().register(channel);
}

next() 方法: 使用在new NioEventLoopGroup 时设置的 选择器,从父类 EventExecutor[] children 数组中选择一个 EventExecutor ;然后继续看register 方法:
SingleThreadEventLoop 中的register:

public ChannelFuture register(Channel channel) {
// 创建DefaultChannelPromise 的promise 对象 赋值 当前的ServerSocketChannel channel 和 子事件轮询器的EventExecutor ,并返回包装类的ChannelPromise
    return this.register((ChannelPromise)(new DefaultChannelPromise(channel, this)));
}
public ChannelFuture register(ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
//  NioServerSocketChannel 的channel 进行register
    promise.channel().unsafe().register(this, promise);
    return promise;
}

继续进入AbstractChannel 类中的register 方法:

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    ObjectUtil.checkNotNull(eventLoop, "eventLoop");
    if (AbstractChannel.this.isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
    } else if (!AbstractChannel.this.isCompatible(eventLoop)) {
        promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
    } else {
// 子事件轮训器进行赋值
        AbstractChannel.this.eventLoop = eventLoop;
      // eventLoop 是否有线程可以直接使用
  if (eventLoop.inEventLoop()) {
            this.register0(promise);
        } else {
            try {
// new 一个线程来执行register
                eventLoop.execute(new Runnable() {
                    public void run() {
                        AbstractUnsafe.this.register0(promise);
                    }
                });
            } catch (Throwable var4) {
            	// 注册失败,设置异常原因
                AbstractChannel.logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, var4);
                this.closeForcibly();
                AbstractChannel.this.closeFuture.setClosed();
                this.safeSetFailure(promise, var4);
            }
        }

    }
}

关键点(3.1 )调用AbstractUnsafe register0 方法:完成占位事件的注册

private voidregister0(ChannelPromise promise) {
    try {
        if (!promise.setUncancellable() || !this.ensureOpen(promise)) {
            return;
        }
		// 第一次注册
        boolean firstRegistration = this.neverRegistered;
        // 事件注册
        AbstractChannel.this.doRegister();
        this.neverRegistered = false;
        // 注册成功
        AbstractChannel.this.registered = true;
        AbstractChannel.this.pipeline.invokeHandlerAddedIfNeeded();
        // promise 设置注册成功标识
        this.safeSetSuccess(promise);
        AbstractChannel.this.pipeline.fireChannelRegistered();
        if (AbstractChannel.this.isActive()) {
            if (firstRegistration) {
                AbstractChannel.this.pipeline.fireChannelActive();
            } else if (AbstractChannel.this.config().isAutoRead()) {
                this.beginRead();
            }
        }
    } catch (Throwable var3) {
        this.closeForcibly();
        AbstractChannel.this.closeFuture.setClosed();
        this.safeSetFailure(promise, var3);
    }

}

关键点(3.2 )AbstractChannel.this.doRegister():占位事件0完成注册:
AbstractNioChannel 类中 doRegister 方法:

protected void doRegister() throws Exception {
    boolean selected = false;

    while(true) {
        try {
        	//  this.javaChannel() 为ServerSocketChannel对象
        	//  ServerSocketChannel对象 调用 register 方法
            this.selectionKey = this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException var3) {
            if (selected) {
                throw var3;
            }

            this.eventLoop().selectNow();
            selected = true;
        }
    }
}

ServerSocketChannel对象 调用 register 方法
AbstractSelectableChannel 类中 register 方法

// Selector  为 从父事件NioEventLoopGroup 中选择出来的某个NioEventLoop 对应的 Selector 选择器器
// ops 感兴趣的事件 0 ,
// att 属性NioServerSocketChannel 对象
public final SelectionKey register(Selector sel, int ops,
                                   Object att)
    throws ClosedChannelException
{
    synchronized (regLock) {
        if (!isOpen())
            throw new ClosedChannelException();
        if ((ops & ~validOps()) != 0)
            throw new IllegalArgumentException();
        if (blocking)
            throw new IllegalBlockingModeException();
        SelectionKey k = findKey(sel);
        if (k != null) {
            k.interestOps(ops);
            k.attach(att);
        }
        if (k == null) {
            // New registration
            synchronized (keyLock) {
                if (!isOpen())
                    throw new ClosedChannelException();
                k = ((AbstractSelector)sel).register(this, ops, att);
                addKey(k);
            }
        }
        return k;
    }
}

调用AbstractSelector 的 register 方法完成将事件注册到selector多路复用器中:

protected final SelectionKey register(AbstractSelectableChannel var1, int var2, Object var3) {
    if (!(var1 instanceof SelChImpl)) {
        throw new IllegalSelectorException();
    } else {
        SelectionKeyImpl var4 = new SelectionKeyImpl((SelChImpl)var1, this);
        var4.attach(var3);
        synchronized(this.publicKeys) {
            this.implRegister(var4);
        }

        var4.interestOps(var2);
        return var4;
    }
}

关键点(3.3) AbstractChannel.this.pipeline.invokeHandlerAddedIfNeeded() 在channel初始化完成之后回调到initAndRegister() 中的this.init(channel) 方法,完成accept handler 的增加:
在这里插入图片描述
可以看到具体进行accept 事件增加的处理是借由 EventLoop 中的execute 进行任务的提交;

关键点关键点(3.4) this.safeSetSuccess(promise) 为promise 设置成功结果后回调到AbstractBootstrap的doBind的operationComplete 方法进行端口号的绑定:
在这里插入图片描述

关键点 2.2.1.3 ServerSocketChannel 管道初始化完成之后 进行doBind0 端口的板绑定

private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = this.initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    } else if (regFuture.isDone()) {
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        // 监听注册完成的事件
        regFuture.addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    promise.setFailure(cause);
                } else {
                	// 设置注册成功标识
                    promise.registered();
                    // 绑定端口
                    AbstractBootstrap.doBind0(regFuture, channel, localAddress, promise);
                }

            }
        });
        return promise;
    }
}

(1) AbstractBootstrap 的doBind0 方法:使用nio 线程进行channel 的绑定:

// ChannelFuture  为 DefaultChannelPromise
// channel 为  NioServerSocketChannel
// localAddress为 0.0.0.0/0.0.0.0:8080
// promise为 AbstractBootstrap$PendingRegistrationPromise
private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {
    channel.eventLoop().execute(new Runnable() {
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }

        }
    });
}

AbstractChannel 的bind 方法:

public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
	//  this.pipeline 取的 NioServerSocketChannel 的pipeline 
    return this.pipeline.bind(localAddress, promise);
}

DefaultChannelPipeline 类中bind 方法:

public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return this.tail.bind(localAddress, promise);
}

(2) AbstractChannelHandlerContext 类中bind 方法, 找到第一个Outbound handler (head 节点)完成端口绑定:

public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
    ObjectUtil.checkNotNull(localAddress, "localAddress");
    if (this.isNotValidPromise(promise, false)) {
        return promise;
    } else {
        final AbstractChannelHandlerContext next = this.findContextOutbound(512);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
        	// 端口绑定
            next.invokeBind(localAddress, promise);
        } else {
            safeExecute(executor, new Runnable() {
                public void run() {
                    next.invokeBind(localAddress, promise);
                }
            }, promise, (Object)null, false);
        }

        return promise;
    }
}

AbstractChannelHandlerContext 类 invokeBind 方法:

private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
    if (this.invokeHandler()) {
        try {
            ((ChannelOutboundHandler)this.handler()).bind(this, localAddress, promise);
        } catch (Throwable var4) {
            notifyOutboundHandlerException(var4, promise);
        }
    } else {
        this.bind(localAddress, promise);
    }

}

关键点(3) 最终进入NioServerSocketChannel 中的doBind 进行端口的绑定 使用原生的java ServerSocketChannel 进行端口的绑定:

在这里插入图片描述
关键点(4) 在dobind 之后:判断channel 管道是否已经就绪了,如果就绪 就依次调用 NioServerSocketChannel 的Pipeline 中的 每个handler 中的channelActive 方法:
在这里插入图片描述
目前Pipeline 中有3个handler :header->acceptor->tail
(5) 最终进入head 头节点的 DefaultChannelPipeline 中的channelActive方法 :
在这里插入图片描述
(6) 进入AbstractChannelHandlerContext read 方法,从tail 向前找到Outbound 的handler 然后进行操作:
在这里插入图片描述

关键点(7):进入到AbstractNioChannel中doBeginRead() 然后关注accept 事件:
在这里插入图片描述
到这里我们看到了NioServerSocketChannel 对象的初始化,以及非阻塞属性的设置,已经最终对管道accept 事件 的注册;

3 对于bind 方法总结:

  • initAndRegister 分为init 和register ;
  • init 中可以理解为 在main 线程中创建了创 NioServerSocketChannel 对象并且, 在main 线程中 添加 NioServerSocketChannel 初始化 handler ;并随后在NioServerSocketChannel 管道初始化完成之后,使用nio 线程向NioServerSocketChannel 的Pipeline 增加accept 处理客户端连接的handler;
  • register 中会切换到nio线程完成对NioServerSocketChannel 初始事件的绑定;
  • 在dobind0 中 使用nio 线程完成NioServerSocketChannel 端口的绑定,并触发NioServerSocketChannel active 事件 进入到head 节点selectionKey.interestOps(SelectionKey.OP ACCEPT): 对于accept 事件进行绑定;
  • 这里nio 线程指的是我们声明的boss NioEventLoopGroup;需要注意的是 NioEventLoopGroup 中存在多个NioEventLoop,每个NioEventLoop 都有自己的selector,任务队列;所以每个NioEventLoop 都可以处理io 事件,普通任务,和定时任务;值得注意的是每个NioEventLoop 的处理任务的线程只有一个(虽然有线程池,但是也只有一个线程,典型的多生产者,一个消费者);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/619808.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

两个月涨粉90万,B站内容风向又有新指示?

6月1日&#xff0c;B站公布了2023年第一季度财报。 财报中显示第一季度&#xff0c;B站日均活跃用户达9370万&#xff0c;同比增长18%。用户日均使用时长96分钟&#xff0c;日均视频播放量达41亿&#xff0c;其中&#xff0c;本季度B站Story-Mode竖屏视频日均播放量同比增长82…

网络安全怎么学?才不会成为脚本小子?

一&#xff0c;怎么入门&#xff1f; 1、Web 安全相关概念&#xff08;2 周&#xff09; 了解网络安全相关法律法规 熟悉基本概念&#xff08;SQL 注入、上传、XSS、CSRF、一句话木马等&#xff09;。 通过关键字&#xff08;SQL 注入、上传、XSS、CSRF、一句话木马等&#…

Python使用WMI模块获取Windows系统的硬件信息,并使用pyinstaller库编译打包成exe的可执行文件

引言 今天给大家分享一篇有关Python和Windows系统的文章。你有没有想过如何获取Windows系统的硬件信息&#xff1f;或者你是否曾经尝试过将Python脚本编译打包成可执行文件&#xff1f;如果你对这些问题感兴趣&#xff0c;那么这篇文章一定适合你。 背景 由于公司现阶段大多…

软件兼容性测试怎么进行?兼容性测试有什么作用?

随着软件的不断更新和升级&#xff0c;兼容性测试也逐渐成为了软件测试的一项重要内容。那么&#xff0c;软件兼容性测试到底是什么?怎么进行?又有什么作用呢? 一、什么是软件兼容性测试? 软件兼容性测试是指在不同的操作系统、硬件设备、浏览器等多个环境下测试软件的可…

VulnHub项目:Hogwarts:Bellatrix

靶机地址&#xff1a;Hogwarts: Bellatrix ~ VulnHub 哈利波特系列~有趣~~~ 渗透过程&#xff1a; 首先获取靶机ip地址 对其端口探测 访问web端口&#xff0c;发现了小天狼星的堂姐Bellatrix Lestrange贝拉特里克斯莱斯特兰奇那个疯狂的女人&#xff01;&#xff01;&#x…

天涯直播筹资300万,七天仅赚14.99万,重启计划岌岌可危

我是卢松松&#xff0c;点点上面的头像&#xff0c;欢迎关注我哦&#xff01; 天涯前天涯社区总编直播卖货欲筹300万&#xff0c;重启天涯。直播七天七夜只赚了14.99万。对于普通人来说&#xff0c;这收入真的是望尘莫及了&#xff0c;但对于天涯来说杯水车薪。 也许天涯部分…

前端工程化初体验

最近在尝试着完整地体验一下前端工程化需要的那些流程&#xff0c;于是自己尝试一套属于自己的前端工程化流程。 前端工程化需要做什么&#xff1a; 1、创建项目需要有项目模板资源&#xff0c;所以这里我创建了一个前端脚手架CLI工具&#xff0c;mfex-project&#xff0c;主…

Win10系统蓝屏如何使用U盘重装系统?

Win10系统蓝屏如何使用U盘重装系统&#xff1f;很多用户遇到了电脑系统蓝屏问题之后&#xff0c;都不知道怎么去进行系统重装的方法。其实系统重装没有大家想象的那么困难&#xff0c;以下为大家带来详细的图文教程分享。 准备工作&#xff1a; 1、U盘一个&#xff08;尽量使用…

2023年5月CSDN客服月报|解决5个重大问题和18个次要问题,采纳1个用户建议,处理2个用户需求

听用户心声&#xff0c;解用户之需。hello&#xff0c;大家好&#xff0c;这里是《CSDN客诉报告》第20期&#xff0c;接下来就请大家一同回顾我们5月份解决的bug&#xff5e; 一、重大问题 1、【商城】已支付订单权益未正常下发 反馈量&#xff1a;23 2、【UC】收藏/评论/账…

2023年湖北高考作文AI写

本文由 大侠(AhcaoZhu)原创&#xff0c;转载请声明。 链接: https://blog.csdn.net/Ahcao2008 2023年湖北高考作文AI写 &#x1f9ca;摘要&#x1f9ca;原题&#x1f9ca;解析&#x1f9ca;AIGCInsCode AI 创作助手文心一言讯飞星火 SparkDeskChatGPT3.5 &#x1f9ca;摘要 本文…

CANoe学习笔记一

目录 摘要 1、CANoe工程的新建 2、通过Trace工具查看交互的报文内容 3、通过Logging保存日志文件 4、创建IG发送报文 5、通过Graphics界面抓取信号波形 6、加载cdd文件 7、过滤报文ID的接收 8、其他 摘要 CANoe是德国Vector公司为汽车总线的开发而设计的一款总线开发环…

nuxt3.0学习-四、nuxt3.0的middleware(中间键)、composables(可组合物)以及Plugins(插件目录)

Nuxt3.0中间键了解地址 Nuxt提供了一个可定制的路由中间件框架&#xff0c;您可以在整个应用程序中使用它&#xff0c;非常适合在导航到特定路由之前提取要运行的代码&#xff1b; 路由中间件有三种&#xff1a; 匿名&#xff08;或内联&#xff09;路由中间件&#xff0c;直…

最佳 AI 生产力工具:更聪明地工作,而不是更努力地工作

在20世纪50年代&#xff0c;AI 在内存耗尽之前几乎无法完成跳棋游戏。 快进七个激动人心的十年&#xff0c;可以理解自然语言的人工智能系统——大型语言模型 (LLM)——正在成为我们数字工具箱中的重要工具。 在今天的文章中&#xff0c;我们梳理了一些提高生产力的最佳人工智…

【SpringBoot】 设置随机数据 用于测试用例

个人简介&#xff1a;Java领域新星创作者&#xff1b;阿里云技术博主、星级博主、专家博主&#xff1b;正在Java学习的路上摸爬滚打&#xff0c;记录学习的过程~ 个人主页&#xff1a;.29.的博客 学习社区&#xff1a;进去逛一逛~ 设置随机数据——常用于测试用例 SpringBoot设…

Research Day 2023:Succinct ZKP最新进展

1. 引言 主要见Ying Tong在Research Day 2023上视频分享&#xff1a; Advances in the Efficiency of Succinct Proofs - Ying Tong ZKP技术可用于&#xff1a; 1&#xff09;Verifiable virtual machine&#xff1a;如各种zkEVM和zkVM。2&#xff09;verifiable cloud com…

领跑云原生安全赛道!安全狗云甲获信通院多项荣誉认可

6月6日&#xff0c;ICT 中国2023高层论坛-云原生产业发展论坛在北京成功举办。 作为国内云原生安全领导厂商&#xff0c;安全狗受邀出席此次活动。 据悉&#xff0c;此次论坛主要包含论坛开幕式、成果发布、产业发展等三大部分。云原生领域领军智库、院士专家、企业高层等多位…

企业培训直播场景下嘉宾连线到底是如何实现的?

企业培训直播场景下&#xff0c;进行音视频连线的嘉宾&#xff0c;都拥有面向学员教学的权限&#xff0c;支持多位老师/专家异地同堂授课&#xff0c;那么&#xff0c;这种嘉宾连线到底是如何实现的&#xff1f; 企业培训&#xff0c;如何做到不受时间和地点限制&#xff0c;实…

数据单一触发数据库锁

【引言】 作为一名数据库开发人员或者管理员&#xff0c;那么你一定知道数据库锁在维护数据一致性中的作用。但是&#xff0c;你有没有想过&#xff0c;什么情况下会触发数据库锁呢&#xff1f; 本文将讲述一种常见的情况——数据单一触发数据库锁&#xff0c;并且分享如何避…

KDZD绝缘油介质损耗电阻率测试仪特点

一、概述 测试仪依据GB/T5654-2007《液体绝缘材料相对电容率、介质损耗因数和直流电阻率的测量》设计制造。用于绝缘油等液体绝缘介质的介质损耗因数和直流电阻率的测量。 一体化结构。内部集成了介损油杯、温控仪、温度传感器、介损测试电桥、交流试验电源、标准电容器、高阻计…

我用ChatGPT写2023高考语文作文(一):全国甲卷

2023高考全国甲卷语文作文题目&#xff1a; 人们因技术发展得以更好地掌控时间&#xff0c;但也有人因此成了时间的仆人。 这句话引发了你怎样的联想与思考&#xff1f;请写一篇文章。 要求&#xff1a;选准角度&#xff0c;确定立意&#xff0c;明确文体&#xff0c;自拟标题&…