Netty实战与调优

news2024/10/7 10:16:17

Netty实战与调优

聊天室业务介绍

代码参考

/**
 * 用户管理接口
 */
public interface UserService {

    /**
     * 登录
     * @param username 用户名
     * @param password 密码
     * @return 登录成功返回 true, 否则返回 false
     */
    boolean login(String username, String password);
}
/**
 * 会话管理接口
 */
public interface Session {

    /**
     * 绑定会话
     * @param channel 哪个 channel 要绑定会话
     * @param username 会话绑定用户
     */
    void bind(Channel channel, String username);

    /**
     * 解绑会话
     * @param channel 哪个 channel 要解绑会话
     */
    void unbind(Channel channel);

    /**
     * 获取属性
     * @param channel 哪个 channel
     * @param name 属性名
     * @return 属性值
     */
    Object getAttribute(Channel channel, String name);

    /**
     * 设置属性
     * @param channel 哪个 channel
     * @param name 属性名
     * @param value 属性值
     */
    void setAttribute(Channel channel, String name, Object value);

    /**
     * 根据用户名获取 channel
     * @param username 用户名
     * @return channel
     */
    Channel getChannel(String username);
}
/**
 * 聊天组会话管理接口
 */
public interface GroupSession {

    /**
     * 创建一个聊天组, 如果不存在才能创建成功, 否则返回 null
     * @param name 组名
     * @param members 成员
     * @return 成功时返回组对象, 失败返回 null
     */
    Group createGroup(String name, Set<String> members);

    /**
     * 加入聊天组
     * @param name 组名
     * @param member 成员名
     * @return 如果组不存在返回 null, 否则返回组对象
     */
    Group joinMember(String name, String member);

    /**
     * 移除组成员
     * @param name 组名
     * @param member 成员名
     * @return 如果组不存在返回 null, 否则返回组对象
     */
    Group removeMember(String name, String member);

    /**
     * 移除聊天组
     * @param name 组名
     * @return 如果组不存在返回 null, 否则返回组对象
     */
    Group removeGroup(String name);

    /**
     * 获取组成员
     * @param name 组名
     * @return 成员集合, 没有成员会返回 empty set
     */
    Set<String> getMembers(String name);

    /**
     * 获取组成员的 channel 集合, 只有在线的 channel 才会返回
     * @param name 组名
     * @return 成员 channel 集合
     */
    List<Channel> getMembersChannel(String name);
}

注册事件处理器

@Slf4j
public class ChatServer {
    public static void main(String[] args) {
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
        MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
        LoginRequestMessageHandler LOGIN_HANDLER = new LoginRequestMessageHandler();
        ChatRequestMessageHandler CHAT_HANDLER = new ChatRequestMessageHandler();
        GroupCreateRequestMessageHandler GROUP_CREATE_HANDLER = new GroupCreateRequestMessageHandler();
        GroupChatMessageHandler GROUP_CHAT_HANDLER = new GroupChatMessageHandler();
        QuitHandler QUIT_HANDLER = new QuitHandler();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.group(boss, worker);
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    //空闲检测 arg1:读空闲 arg2:写空闲 arg3:读写空闲  单位:s  0代表不关注
                    ch.pipeline().addLast(new IdleStateHandler(5,0,0));
                    //处理空闲事件
                    ch.pipeline().addLast(new ChannelDuplexHandler(){
                        @Override
                        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                            if (evt instanceof IdleStateEvent){
                                IdleStateEvent event = (IdleStateEvent) evt;
                                if (event.state().equals(IdleState.READER_IDLE)){
                                    log.info("读空闲超过5s!");
                                }
                            }
                        }
                    });
                    ch.pipeline().addLast(new ProcotolFrameDecoder());
                    ch.pipeline().addLast(LOGGING_HANDLER);
                    ch.pipeline().addLast(MESSAGE_CODEC);
                    ch.pipeline().addLast(LOGIN_HANDLER);
                    ch.pipeline().addLast(CHAT_HANDLER);
                    ch.pipeline().addLast(GROUP_CREATE_HANDLER);
                    ch.pipeline().addLast(GROUP_CHAT_HANDLER);
                    ch.pipeline().addLast(QUIT_HANDLER);
                }
            });
            Channel channel = serverBootstrap.bind(8080).sync().channel();
            channel.closeFuture().sync();
        } catch (InterruptedException e) {
            log.error("server error", e);
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}

客户端

@Slf4j
public class ChatClient {
    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup();
        LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
        MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
        CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1);
        AtomicBoolean LOGIN = new AtomicBoolean(false);
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.group(group);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ProcotolFrameDecoder());
//                    ch.pipeline().addLast(LOGGING_HANDLER);
                    ch.pipeline().addLast(MESSAGE_CODEC);
                    //添加心跳机制
                    ch.pipeline().addLast(new IdleStateHandler(0,3,0));
                    ch.pipeline().addLast(new ChannelDuplexHandler(){
                        @Override
                        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                            if (evt instanceof IdleStateEvent){
                                IdleStateEvent event = (IdleStateEvent) evt;
                                //超过3s没有写入数据则触发WRITER_IDLE事件
                                if (IdleState.WRITER_IDLE.equals(event.state())){
//                                    log.info("发送心跳数据包...");
                                    ctx.writeAndFlush(new PingMessage());
                                }
                            }
                        }
                    });
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            new Thread(() -> {
                                Scanner scanner = new Scanner(System.in);
                                System.out.print("username:");
                                String username = scanner.nextLine();
                                System.out.print("password:");
                                String password = scanner.nextLine();
                                LoginRequestMessage request = new LoginRequestMessage(username, password);
                                ctx.channel().writeAndFlush(request);
                                //阻塞
                                try {
                                    WAIT_FOR_LOGIN.await();
                                } catch (InterruptedException e) {
                                    throw new RuntimeException(e);
                                }
                                if (LOGIN.get()) {
                                    while (true) {
                                        System.out.println("==================================");
                                        System.out.println("send [username] [content]");
                                        System.out.println("gsend [group name] [content]");
                                        System.out.println("gcreate [group name] [m1,m2,m3...]");
                                        System.out.println("gmembers [group name]");
                                        System.out.println("gjoin [group name]");
                                        System.out.println("gquit [group name]");
                                        System.out.println("quit");
                                        System.out.println("==================================");
                                        String command = scanner.nextLine();
                                        if (command != null && command.length() > 0) {
                                            String[] commandList = command.split(" ");
                                            switch (commandList[0]) {
                                                case "send":
                                                    ChatRequestMessage message = new ChatRequestMessage(username, commandList[1], commandList[2]);
                                                    ctx.channel().writeAndFlush(message);
                                                    break;
                                                case "gsend":
                                                    GroupChatRequestMessage groupMessage = new GroupChatRequestMessage(username, commandList[1], commandList[2]);
                                                    ctx.channel().writeAndFlush(groupMessage);
                                                    break;
                                                case "gcreate":
                                                    String members = commandList[2];
                                                    if (members != null && members.length() > 0) {
                                                        String[] memberList = members.split(",");
                                                        Set<String> memberSet = Arrays.stream(memberList).collect(Collectors.toSet());
                                                        memberSet.add(username);
                                                        GroupCreateRequestMessage groupCreateRequestMessage = new GroupCreateRequestMessage(commandList[1], memberSet);
                                                        ctx.channel().writeAndFlush(groupCreateRequestMessage);
                                                    }
                                                    break;
                                                case "gmembers":
                                                    GroupMembersRequestMessage groupMembersRequestMessage = new GroupMembersRequestMessage(commandList[1]);
                                                    ctx.channel().writeAndFlush(groupMembersRequestMessage);
                                                    break;
                                                case "gjoin":
                                                    GroupJoinRequestMessage groupJoinRequestMessage = new GroupJoinRequestMessage(username, commandList[1]);
                                                    ctx.channel().writeAndFlush(groupJoinRequestMessage);
                                                    break;
                                                case "gquit":
                                                    GroupQuitRequestMessage groupQuitRequestMessage = new GroupQuitRequestMessage(username, commandList[1]);
                                                    ctx.channel().writeAndFlush(groupQuitRequestMessage);
                                                    break;
                                                case "quit":
                                                    ctx.channel().close();
                                                    return;
                                            }
                                        }
                                    }
                                } else {
                                    ctx.channel().close();
                                }
                            }).start();
                        }

                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                            log.info("msg:{}", msg);
                            if (msg instanceof LoginResponseMessage response) {
                                LOGIN.set(response.isSuccess());
                                WAIT_FOR_LOGIN.countDown();
                            }
                        }
                    });
                }
            });
            Channel channel = bootstrap.connect("localhost", 8080).sync().channel();
            channel.closeFuture().sync();
        } catch (Exception e) {
            log.error("client error", e);
        } finally {
            group.shutdownGracefully();
        }
    }
}

登录

@ChannelHandler.Sharable
public class LoginRequestMessageHandler extends SimpleChannelInboundHandler<LoginRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, LoginRequestMessage msg) throws Exception {
        String username = msg.getUsername();
        String password = msg.getPassword();
        boolean login = UserServiceFactory.getUserService().login(username, password);
        if (login) {
            SessionFactory.getSession().bind(ctx.channel(), username);
        }
        ctx.channel().writeAndFlush(new LoginResponseMessage(login, login ? "登录成功!" : "登录失败,用户名或密码有误!"));
    }
}

单聊

@ChannelHandler.Sharable
public class ChatRequestMessageHandler extends SimpleChannelInboundHandler<ChatRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, ChatRequestMessage chatRequestMessage) throws Exception {
        String to = chatRequestMessage.getTo();
        String from = chatRequestMessage.getFrom();
        String content = chatRequestMessage.getContent();
        Channel channel = SessionFactory.getSession().getChannel(to);
        if (channel!=null){
            //转发消息
            channel.writeAndFlush(new ChatResponseMessage(from,content));
        }
        else {
            channelHandlerContext.writeAndFlush(new ChatResponseMessage(false,"发送失败,用户不在线!"));
        }
    }
}

创建群聊

@ChannelHandler.Sharable
@Slf4j
public class GroupCreateRequestMessageHandler extends SimpleChannelInboundHandler<GroupCreateRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, GroupCreateRequestMessage groupCreateRequestMessage) throws Exception {
        Set<String> members = groupCreateRequestMessage.getMembers();
        String groupName = groupCreateRequestMessage.getGroupName();
        GroupSession session = GroupSessionFactory.getGroupSession();
        Group group = session.createGroup(groupName, members);
        GroupCreateResponseMessage message = null;
        if (group != null) {
            message = new GroupCreateResponseMessage(false, "创建群聊失败!");
        } else {
            message = new GroupCreateResponseMessage(true, "创建群聊成功!");
            List<Channel> channels = session.getMembersChannel(groupName);
            channels.forEach(channel -> {
                GroupJoinResponseMessage joinResponseMessage = new GroupJoinResponseMessage(true, "您已被拉入群聊:" + groupName);
                channel.writeAndFlush(joinResponseMessage);
            });
        }
        channelHandlerContext.writeAndFlush(message);
    }
}

发送群聊消息

@ChannelHandler.Sharable
public class GroupChatMessageHandler extends SimpleChannelInboundHandler<GroupChatRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, GroupChatRequestMessage groupChatRequestMessage) throws Exception {
        String groupName = groupChatRequestMessage.getGroupName();
        String msg = groupChatRequestMessage.getContent();
        String from = groupChatRequestMessage.getFrom();
        List<Channel> channels = GroupSessionFactory.getGroupSession().getMembersChannel(groupName);
        channels.forEach(channel -> {
            channel.writeAndFlush(new GroupChatResponseMessage(from, msg));
        });
    }
}

退出

@ChannelHandler.Sharable
@Slf4j
public class QuitHandler extends ChannelInboundHandlerAdapter {
    //处理channel正常关闭事件
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        //移除channel
        SessionFactory.getSession().unbind(ctx.channel());
        log.info("channel:{}已断开!",ctx.channel());
    }

    //处理channel异常关闭事件
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        SessionFactory.getSession().unbind(ctx.channel());
        log.info("channel:{}异常断开!",ctx.channel());
    }
}

空闲检测

连接假死

原因

  • 网络设备出现故障,例如网卡,机房等,底层的 TCP 连接已经断开了,但应用程序没有感知到,仍然占用着资源。
  • 公网网络不稳定,出现丢包。如果连续出现丢包,这时现象就是客户端数据发不出去,服务端也一直收不到数据,就这么一直耗着
  • 应用程序线程阻塞,无法进行数据读写

问题

  • 假死的连接占用的资源不能自动释放
  • 向假死的连接发送数据,得到的反馈是发送超时

服务器端解决

  • 怎么判断客户端连接是否假死呢?如果能收到客户端数据,说明没有假死。因此策略就可以定为,每隔一段时间就检查这段时间内是否接收到客户端数据,没有就可以判定为连接假死
					//空闲检测 arg1:读空闲 arg2:写空闲 arg3:读写空闲  单位:s  0代表不关注
                    ch.pipeline().addLast(new IdleStateHandler(5,0,0));
                    //处理空闲事件
                    ch.pipeline().addLast(new ChannelDuplexHandler(){
                        @Override
                        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                            if (evt instanceof IdleStateEvent){
                                IdleStateEvent event = (IdleStateEvent) evt;
                                if (event.state().equals(IdleState.READER_IDLE)){
                                    log.info("读空闲超过5s!");
                                    //关闭channel
                                }
                            }
                        }
                    });

客户端定时心跳

  • 客户端可以定时向服务器端发送数据,只要这个时间间隔小于服务器定义的空闲检测的时间间隔,那么就能防止前面提到的误判,客户端可以定义如下心跳处理器
                    //添加心跳机制
                    ch.pipeline().addLast(new IdleStateHandler(0,3,0));
                    ch.pipeline().addLast(new ChannelDuplexHandler(){
                        @Override
                        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                            if (evt instanceof IdleStateEvent){
                                IdleStateEvent event = (IdleStateEvent) evt;
                                //超过3s没有写入数据则触发WRITER_IDLE事件
                                if (IdleState.WRITER_IDLE.equals(event.state())){
                                    log.info("发送心跳数据包...");
                                    ctx.writeAndFlush(new PingMessage());
                                }
                            }
                        }
                    });

优化

拓展序列化算法

序列化,反序列化主要用在消息正文的转换上

  • 序列化时,需要将 Java 对象变为要传输的数据(可以是 byte[],或 json 等,最终都需要变成 byte[])
  • 反序列化时,需要将传入的正文数据还原成 Java 对象,便于处理

为了支持更多序列化算法,抽象一个 Serializer 接口

public interface Serializer {

    // 反序列化方法
    <T> T deserialize(Class<T> clazz, byte[] bytes);

    // 序列化方法
    <T> byte[] serialize(T object);
    
}

提供两个实现,我这里直接将实现加入了枚举类 Serializer.Algorithm 中

  enum Algorithm implements Serializer {
        Java {
            @Override
            public <T> T deserialize(Class<T> clazz, byte[] bytes) {
                try {
                    ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes));
                    Object object = in.readObject();
                    return (T) object;
                } catch (IOException | ClassNotFoundException e) {
                    throw new RuntimeException("SerializerAlgorithm.Java 反序列化错误", e);
                }
            }

            @Override
            public <T> byte[] serialize(T object) {
                try {
                    ByteArrayOutputStream out = new ByteArrayOutputStream();
                    new ObjectOutputStream(out).writeObject(object);
                    return out.toByteArray();
                } catch (IOException e) {
                    throw new RuntimeException("SerializerAlgorithm.Java 序列化错误", e);
                }
            }
        },

        // Json 实现(引入了 Gson 依赖)
        Json {
            @Override
            public <T> T deserialize(Class<T> clazz, byte[] bytes) {
                return new Gson().fromJson(new String(bytes, StandardCharsets.UTF_8), clazz);
            }

            @Override
            public <T> byte[] serialize(T object) {
                return new Gson().toJson(object).getBytes(StandardCharsets.UTF_8);
            }
        };

        // 需要从协议的字节中得到是哪种序列化算法
        public static Algorithm getByInt(int type) {
            Algorithm[] array = Algorithm.values();
            if (type < 0 || type > array.length - 1) {
                throw new IllegalArgumentException("Out of index!");
            }
            return array[type];
        }
    }

增加配置类和配置文件

public abstract class Config {
    static Properties properties;
    static {
        try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {
            properties = new Properties();
            properties.load(in);
        } catch (IOException e) {
            throw new ExceptionInInitializerError(e);
        }
    }
    public static int getServerPort() {
        String value = properties.getProperty("server.port");
        if(value == null) {
            return 8080;
        } else {
            return Integer.parseInt(value);
        }
    }
    public static Serializer.Algorithm getSerializerAlgorithm() {
        String value = properties.getProperty("serializer.algorithm");
        if(value == null) {
            return Serializer.Algorithm.Java;
        } else {
            return Serializer.Algorithm.valueOf(value);
        }
    }
}

配置文件

serializer.algorithm=Json

修改编解码器

public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {
        ByteBuf out = ctx.alloc().buffer();
        // 1. 4 字节的魔数
        out.writeBytes(new byte[]{1, 2, 3, 4});
        // 2. 1 字节的版本,
        out.writeByte(1);
        // 3. 1 字节的序列化方式 jdk 0 , json 1
        out.writeByte(0);
        // 4. 1 字节的指令类型
        out.writeByte(msg.getMessageType());
        // 5. 4 个字节
        out.writeInt(msg.getSequenceId());
        // 无意义,对齐填充
        out.writeByte(0xff);
        // 6. 获取内容的字节数组
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(bos);
        oos.writeObject(msg);
        byte[] bytes = bos.toByteArray();
        // 7. 长度
        out.writeInt(bytes.length);
        // 8. 写入内容
        out.writeBytes(bytes);
        outList.add(out);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int magicNum = in.readInt();
        byte version = in.readByte();
        byte serializerType = in.readByte();
        byte messageType = in.readByte();
        int sequenceId = in.readInt();
        in.readByte();
        int length = in.readInt();
        byte[] bytes = new byte[length];
        in.readBytes(bytes, 0, length);
        ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
        Message message = (Message) ois.readObject();
        log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
        log.debug("{}", message);
        out.add(message);
    }
}

参数调优

CONNECT_TIMEOUT_MILLIS

  • 属于 SocketChannal的参数
  • 用在客户端建立连接时,如果在指定毫秒内无法连接,会抛出 timeout 异常
  • 注意:Netty 中不要用成了SO_TIMEOUT 主要用在阻塞 IO,而 Netty 是非阻塞 IO
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,300);// SocketChannel0.3s内未建立连接就抛出异常

源码部分 io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#connect

					// Schedule connect timeout.
                    int connectTimeoutMillis = config().getConnectTimeoutMillis();//获取超时时间
                    if (connectTimeoutMillis > 0) { 
                        //创建定时任务
                        connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                            @Override
                            public void run() {  //如果超时后Promise依然存在,则向Promise标记异常
                                ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                                ConnectTimeoutException cause =
                                        new ConnectTimeoutException("connection timed out: " + remoteAddress);
                                if (connectPromise != null && connectPromise.tryFailure(cause)) {
                                    close(voidPromise());
                                }
                            }
                        }, connectTimeoutMillis, TimeUnit.MILLISECONDS);//在connectTimeoutMillis后开始执行任务
                    }

SO_BACKLOG

  • 该参数是ServerSocketChannel的参数

三次握手与连接队列
tcp三次握手

简单易懂的解释

1.第一次握手时,因为客户端与服务器之间的连接还未完全建立,连接会被放入半连接队列

2.当完成三次握手以后,连接会被放入全连接队列中

3.服务器处理Accept事件是在TCP三次握手,也就是建立连接之后。服务器会从全连接队列中获取连接并进行处理

详细的解释

  1. 第一次握手,client 发送 SYN 到 server,状态修改为 SYN_SEND,server 收到,状态改变为 SYN_REVD,并将该请求放入 sync queue 队列
  2. 第二次握手,server 回复 SYN + ACK 给 client,client 收到,状态改变为 ESTABLISHED,并发送 ACK 给 server
  3. 第三次握手,server 收到 ACK,状态改变为 ESTABLISHED,将该请求从 sync queue 放入 accept queue

Linux的backlog 参数

在 Linux 2.2 之前,backlog 大小包括了两个队列的大小,在 Linux 2.2 之后,分别用下面两个参数来控制

  • 半连接队列 - sync queue
    • 大小通过 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在 syncookies 启用的情况下,逻辑上没有最大值限制,这个设置便被忽略
  • 全连接队列 - accept queue
    • 其大小通过 /proc/sys/net/core/somaxconn 指定,在使用 listen 函数时,内核会根据传入的 backlog 参数与系统参数,取二者的较小值
    • 如果 accept queue 队列满了,server 将发送一个拒绝连接的错误信息到 client

SO_BACKLOG作用

在Netty中,SO_BACKLOG主要用于设置全连接队列的大小。当处理Accept的速率小于连接建立的速率时,全连接队列中堆积的连接数大于SO_BACKLOG设置的值时,便会抛出异常

设置方式

// 设置全连接队列,大小为2
new ServerBootstrap().option(ChannelOption.SO_BACKLOG, 2);

SO_BACKLOG默认值

查询方法调用ServerSocketChannelImpl.bind

	@Override
    public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException {
        synchronized (stateLock) {
            ensureOpen();
            if (localAddress != null)
                throw new AlreadyBoundException();
            if (isUnixSocket()) {
                localAddress = unixBind(local, backlog);
            } else {
                localAddress = netBind(local, backlog);
            }
        }
        return this;
    }

在方法NioServerSocketChannel.doBind方法中调用了bind方法绑定backlog,此处调用了config.getBacklog方法

	@Override
    protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }

查询config实现类

private final class NioServerSocketChannelConfig extends DefaultServerSocketChannelConfig 

在父类DefaultServerSocketChannelConfig中可以看到backlog被赋值

public class DefaultServerSocketChannelConfig extends DefaultChannelConfig implements 		     ServerSocketChannelConfig {
    protected final ServerSocket javaSocket;
    private volatile int backlog = NetUtil.SOMAXCONN;//为backlog赋值
}

具体赋值操作

SOMAXCONN = AccessController.doPrivileged(new PrivilegedAction<Integer>() {
    @Override
    public Integer run() {
        // Determine the default somaxconn (server socket backlog) value of the platform.
        // The known defaults:
        // - Windows NT Server 4.0+: 200
        // - Linux and Mac OS X: 128
        int somaxconn = PlatformDependent.isWindows() ? 200 : 128;
        File file = new File("/proc/sys/net/core/somaxconn");
        BufferedReader in = null;
        try {
            // file.exists() may throw a SecurityException if a SecurityManager is used, so execute it in the
            // try / catch block.
            // See https://github.com/netty/netty/issues/4936
            if (file.exists()) {
                in = new BufferedReader(new FileReader(file));
                // 将somaxconn设置为Linux配置文件中设置的值
                somaxconn = Integer.parseInt(in.readLine());
                if (logger.isDebugEnabled()) {
                    logger.debug("{}: {}", file, somaxconn);
                }
            } else {
                ...
            }
            ...
        }  
        // 返回backlog的值
        return somaxconn;
    }
}
  • backlog的值会根据操作系统的不同,来

    选择不同的默认值

    • Windows 200
    • Linux/Mac OS 128
  • 如果配置文件/proc/sys/net/core/somaxconn存在,会读取配置文件中的值,并将backlog的值设置为配置文件中指定的

    • 注意:windows系统中并不存在该文件

ulimit -n

  • 属于操作系统参数
  • 最大文件限制数
  • 在linux下一切皆文件,开启一个进程就是打开一个文件,控制ulimit大小作用等同于限制进程及其子进程的资源使用

使用ulimit -a 可以查看当前系统的所有限制值,使用ulimit -n 可以查看当前的最大打开文件数。

新装的 linux 默认只有1024,当作负载较大的服务器时,很容易遇到error: too many open files。因此,需要将其改大。

使用 ulimit -n 65535 可即时修改,但重启后就无效了。(注ulimit -SHn 65535 等效 ulimit -n 65535,-S指soft,-H指hard)

设置永久生效

可以修改配置文件/etc/profile

vi /etc/profile

加入一行

ulimit  -SHn  65536

保存退出。然后加载配置文件

source /etc/profile

再次查看ulimit

ulimit -n

第二种修改方法

在/etc/security/limits.conf最后增加如下两行记录

soft nofile 65535
hard nofile 65535

TCP_NODELAY

  • 属于SocketChannal参数
  • 因为Nagle算法,数据包会堆积到一定的数量后一起发送,这就可能导致数据的发送存在一定的延时
  • 该参数默认为false,如果不希望的发送被延时,则需要将该值设置为true

在有些网络通信的场景下,要求低延迟,这样就需要我们设置一些TCP的链接属性

客户端设置

bootstap.option(ChannelOption.TCP_NODELAY, true); 

服务器端是在worker的Channel端设置属性

boot.childOption(ChannelOption.TCP_NODELAY, true); 

设置这样做好的好处就是禁用nagle算法

  • Nagle算法试图减少TCP包的数量和结构性开销, 将多个较小的包组合成较大的包进行发送.但这不是重点, 关键是这个算法受TCP延迟确认影响, 会导致相继两次向连接发送请求包,

  • 读数据时会有一个最多达500毫秒的延时.

  • TCP/IP协议中,无论发送多少数据,总是要在数据前面加上协议头,同时,对方接收到数据,也需要发送ACK表示确认。为了尽可能的利用网络带宽,TCP总是希望尽可能的发送足够大的数据。(一个连接会设置MSS参数,因此,TCP/IP希望每次都能够以MSS尺寸的数据块来发送数据)

  • Nagle算法就是为了尽可能发送大块数据,避免网络中充斥着许多小数据块

SO_SNDBUF & SO_RCVBUF

  • SO_SNDBUF 属于SocketChannal参数
  • SO_RCVBUF既可用于 SocketChannal 参数,也可以用于 ServerSocketChannal 参数(建议设置到 ServerSocketChannal 上)
  • 该参数用于指定接收方与发送方的滑动窗口大小
  • 一般不建议设置该参数,系统会自动设置大小

ALLOCATOR

  • 属于SocketChannal参数
  • 用来配置ByteBuf是池化还是非池化,是直接内存还是堆内存

使用

// 选择ALLOCATOR参数,设置SocketChannel中分配的ByteBuf类型
// 第二个参数需要传入一个ByteBufAllocator,用于指定生成的 ByteBuf 的类型
new ServerBootstrap().childOption(ChannelOption.ALLOCATOR, new PooledByteBufAllocator());

ByteBufAllocator类型

  • 池化并使用直接内存

    // true表示使用直接内存
    new PooledByteBufAllocator(true);
    
  • 池化并使用堆内存

    // false表示使用堆内存
    new PooledByteBufAllocator(false);
    
  • 非池化并使用直接内存

    // ture表示使用直接内存
    new UnpooledByteBufAllocator(true);
    
  • 非池化并使用堆内存

    // false表示使用堆内存
    new UnpooledByteBufAllocator(false);
    

RCVBUF_ALLOCATOR

  • 属于SocketChannal参数
  • 控制Netty接收缓冲区大小
  • 负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct 直接内存,具体池化还是非池化由 allocator决定

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/417977.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何快速上手Vue框架?

编译软件&#xff1a;IntelliJ IDEA 2019.2.4 x64 运行环境&#xff1a;Google浏览器 Vue框架版本&#xff1a;Vue.js v2.7.14 目录一. 框架是什么&#xff1f;二. 怎么写一个Vue程序&#xff08;以IDEA举例&#xff09;&#xff1f;三. 什么是声明式渲染?3.1 声明式3.2 渲染四…

docker安装oracle_11g -- 命还长时,自己搞的小玩具!!!

前言: 如果不是嫌命长, 建议不这么玩, 因为装到最后你会很崩溃, 感觉毫无意义, 就是个玩具, 哎~~~就是玩!!! 参考文档 1.https://blog.51cto.com/u_12946336/5722259 2.https://www.muzhuangnet.com/show/118178.html 3.https://blog.csdn.net/qq_42957435/article/details/1…

spring security+jwt实现认证和授权

最近正在研究前后端分离的开发模式&#xff0c;做做小项目练练手&#xff0c;正好用到了spring security的认证和授权&#xff0c;就总结一波。 首先&#xff0c;引入相关的依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId&g…

支付系统设计:收银台设计一

文章目录前言1. 收银台前端页面1. 1 收银台的业务场景1. 2 同应用不同支付场景下的收银台2. 商户平台配置管理2.1 配置流程2.2 支付工具列表配置2.3 支付配置2.3 支付银行配置3. 系统处理流程3.1 下单流程3.1 拉起收银台流程总结前言 收银台即用户日常付款前选择支付方式的页面…

革新设计,小巧强大,水库保卫无忧!

水库安全运行事关广大人民群众生命财产安全&#xff0c;为规范水库管理&#xff0c;落实水库预报、预警、预演、预案措施&#xff0c;提升水库信息化管理水平&#xff0c;保障水库安全运行。水库大坝是重要的国民基础设施&#xff0c;承担着防洪抗旱&#xff0c;节流发电的重要…

新规拉开中国生成式AI“百团大战”序幕?

AI将走向何方&#xff1f; ChatGPT在全球范围掀起的AI热潮正在引发越来越多的讨论&#xff0c;AI该如何管理&#xff1f;AI该如何发展&#xff1f;一系列问题都成为人们热议的焦点。此前&#xff0c;马斯克等海外名人就在网络上呼吁OpenAI暂停ChatGPT的模型训练和迭代&#xf…

SGAT丨单基因分析工具SingleGeneAnalysisTool

Single Gene Analysis Tool 简介&#xff1a;SGAT是一个免费开源的单基因分析工具&#xff0c;基于Linux系统实现自动化批量处理&#xff0c;能够快速准确的完成单基因和表型的关联分析&#xff0c;只需要输入基因型和表型原始数据&#xff0c;即可计算出显著关联的SNP位点&…

学习大数据需要什么语言基础

Python易学&#xff0c;人人都可以掌握&#xff0c;如果零基础入门数据开发行业的小伙伴&#xff0c;可以从Python语言入手。 Python语言简单易懂&#xff0c;适合零基础入门&#xff0c;在编程语言排名上升最快&#xff0c;能完成数据挖掘、机器学习、实时计算在内的各种大数…

测试名词介绍

测试名词介绍一&#xff1a;敏捷测试1. 定义&#xff1a;2. 敏捷测试的核心&#xff1a;3. 敏捷测试的8大原则和传统测试的区别二&#xff1a;测试名词介绍瀑布模型回归测试Alpha测试Beta测试性能测试白盒测试黑盒测试灰盒测试三&#xff1a;测试流程单元测试 (unit test)集成测…

Java RSA加解密算法学习

一、前言 1.1 问题思考 为什么需要加密 / 解密&#xff1f;信息泄露可能造成什么影响&#xff1f; 二、 基础回顾 2.1 加密技术 加密技术是最常用的安全保密手段&#xff0c;利用技术手段把重要的数据变为乱码&#xff08;加密&#xff09;传送&#xff0c;到达目的地后再…

nginx的前端部署方式

1. 什么是nginx Nginx是一款高性能的http 服务器/反向代理服务器及电子邮件&#xff08;IMAP/POP3&#xff09;代理服务器。 由俄罗斯的程序设计师Igor Sysoev所开发&#xff0c;官方测试nginx能够支支撑5万并发链接&#xff0c; 并且cpu、内存等资源消耗却非常低&#xff0…

javascript 数组详解

1.数组是可变的 数组内元素可以是不同的类型&#xff1a; 字符串一旦创建就不可变&#xff0c;但数组是可变的&#xff0c;且操作起来十分随意&#xff0c;例如&#xff1a; 直接修改数组长度&#xff0c;若新赋予长度小于原数组长度&#xff0c;会直接舍弃多余元素: 若新赋予…

【AI绘画】Midjourney和Stable Diffusion教程

之前我向大家介绍了这两个AI绘画网站&#xff1a; Stable Diffusion介绍&#xff1a; https://mp.csdn.net/mp_blog/creation/editor/130059509 Midjourney介绍: https://mp.csdn.net/mp_blog/creation/editor/130003233 前言 这里是新星计划本周最后一篇&#xff0c;主要…

python 连接oracle

前提&#xff0c;navicate成功连接oracle 1、下载cx_oracle,根据python版本下载whl&#xff0c;或者通过 ​pip install cx_Oracle -i http://pypi.douban.com/simple/ 下载地址&#xff1a; cx-Oracle PyPIhttps://pypi.org/project/cx-Oracle/#files2、navicate下instant…

​Auction Design in the Auto-bidding World系列一:面向异质目标函数广告主的拍卖机制设计...

导读&#xff1a; 传统拍卖机制不存在了&#xff01;出价产品智能化成为行业发展趋势&#xff0c;自动出价&#xff08;Auto-bidding&#xff09;已成为互联网广告主营销的主流&#xff0c;经典效用最大化模型&#xff08;Utility Maximizer&#xff09;的假设已经不再能良好地…

使用 LXCFS 文件系统实现容器资源可见性

使用 LXCFS 文件系统实现容器资源可见性一、基本介绍二、LXCFS 安装与使用1.安装 LXCFS 文件系统2.基于 Docker 实现容器资源可见性3.基于 Kubernetes 实现容器资源可见性前言&#xff1a;Linux 利用 Cgroup 实现了对容器资源的限制&#xff0c;但是当在容器内运行 top 命令时就…

《金阁寺》金阁美之于幻想,我用摧毁它来成就其美

《金阁寺》金阁美之于幻想&#xff0c;我用摧毁它来成就其美 三岛由纪夫&#xff08;1925-1970&#xff09;,日本当代小说家、剧作家、记者、电影制作人和电影演员&#xff0c;右翼分子。主要作品有《金阁寺》《鹿鸣馆》《丰饶之海》等。曾3次获诺贝尔文学奖提名&#xff0c;属…

基于Sketch Up软件校园建模案例分享

Acknowledgements&#xff1a; 由衷感谢覃婉柔、赵泽昊同学在本次课程实习中做出的巨大贡献&#xff0c;感谢本团队成员一起努力奋斗的岁月。 一、建模地点&#xff08;中国地质大学&#xff08;武汉&#xff09;未来城校区图书馆周边&#xff09; 中国地质大学&#xff08;武汉…

关于ChatGPT的一些随笔

大家好&#xff0c;我是老三&#xff0c;最近几个月关于ChatGPT的信息可以说是铺天盖地。 “王炸&#xff0c;ChatGPT……” “xxx震撼发布……” “真的要失业了&#xff0c;xxx来袭……” “普通如何利用ChatGPT……” …… 不过老三前一阵比较忙&#xff0c;对ChatGPT…

【MySQL】锁详解——从结构分类到适用场景

我们要学习锁首先要了解下我们想了解的锁到底是什么&#x1f914; 而在MySQL中给某个数据加锁的本质其实就是在内存中创建一个锁结构与之关联&#xff0c;而这个锁结构就是我们常提到的MySQL的锁&#x1f512; 那么接下来的问题就是&#xff0c;这个锁结构长啥样呢&#xff1…