netty websocket使用

news2024/11/27 12:35:29

一.maven依赖

<!-- springboot的依赖,如果系统不使用web相关的功能,也可以不使用 -->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-web</artifactId>
	<version>${springboot.version}</version>
</dependency>

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>${lombok.version}</version>
    <scope>provided</scope>
</dependency>

<dependency>
    <groupId>com.alibaba.fastjson2</groupId>
    <artifactId>fastjson2</artifactId>
    <version>${fastjson2.version}</version>
</dependency>

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>${netty.version}</version>
</dependency>
<!-- 握手前校验使用,也可以使用其他的校验方式 -->
<dependency>
    <groupId>com.auth0</groupId>
    <artifactId>java-jwt</artifactId>
    <version>${jwt.version}</version>
</dependency>

二.包结构

demo包结构

三.demo代码

1.基础架构层代码

(1)netty功能基础实现和接口
socket基础接口
package com.zzc.netty.infrastructure.netty;

import com.zzc.netty.infrastructure.netty.config.SocketConfig;

public interface Socket<C extends SocketConfig> {

    boolean start();

    boolean start(C serverConfig, WebSocketChannelHandler webSocketChannelHandler);

    boolean isStarted();

    void close();

    C getConfig();

    Socket setConfig(C config);

    boolean isServer();
}
socket基础实现和初始化
package com.zzc.netty.infrastructure.netty;

import com.zzc.netty.infrastructure.netty.config.SocketConfig;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;

@Slf4j
public abstract class BaseSocket<C extends SocketConfig> implements Socket<C> {

    private WebSocketChannelHandler webSocketChannelHandler;

    private C config;

    private boolean server = true;

    private boolean started = false;

    public BaseSocket(boolean server) {
        this.server = server;
    }

    @Override
    public boolean start() {
        C serverConfig = getConfig();
        if (serverConfig == null) {
            throw new RuntimeException("serverConfig is null.");
        }
        if (getWebSocketChannelHandler() == null) {
            throw new RuntimeException("OnChannelHandler is null.");
        }
        boolean result = false;
        try {
            result = doStart(serverConfig);
        } catch (Exception e) {
            throw new RuntimeException("start server error");
        }
        setStarted(result);
        return result;
    }

    @Override
    public boolean start(C serverConfig, WebSocketChannelHandler webSocketChannelHandler) {
        this.webSocketChannelHandler = webSocketChannelHandler;
        setConfig(serverConfig);
        return start();
    }

    @Override
    public boolean isStarted() {
        return started;
    }

    @Override
    public void close() {
        if (isStarted()) {
            log.info("stop server");
            try {
                doClose();
            } catch (Exception e) {
                log.error("stop server error.", e);
            }
        }
    }

    @Override
    public C getConfig() {
        return config;
    }

    protected WebSocketChannelHandler getWebSocketChannelHandler() {
        return this.webSocketChannelHandler;
    }

    @Override
    public Socket setConfig(C config) {
        if (config == null) {
            throw new RuntimeException("conf is null");
        }
        this.config = config;
        return this;
    }

    @Override
    public boolean isServer() {
        return server;
    }


    protected void setStarted(boolean started) {
        this.started = started;
    }

    protected void addNettyIdleHandler(ChannelPipeline pipeline) {
        long readIdleTimeout = getReadIdleTimeout();
        long writeIdleTimeout = getWriteIdleTimeout();
        long allIdleTimeout = getAllIdleTimeout();
        pipeline.addLast(new IdleStateHandler(readIdleTimeout, writeIdleTimeout, allIdleTimeout, TimeUnit.MILLISECONDS));
        pipeline.addLast(new SocketIdleStateTrigger());
    }

    protected long getReadIdleTimeout() {
        C conf = getConfig();
        return conf.getCloseTimeout();
    }

    protected long getWriteIdleTimeout() {
        C conf = getConfig();
        long closeTimeout = conf.getCloseTimeout();
        return Math.min(Math.max(closeTimeout / 4, (15 * 1000)), (closeTimeout / 2));
    }

    protected long getAllIdleTimeout() {
        C conf = getConfig();
        long closeTimeout = conf.getCloseTimeout();
        return closeTimeout + 500;
    }

    protected abstract boolean doStart(C serverConfig);

    protected abstract void doClose();

    protected void addNettyOtherHandler(ChannelPipeline pipeline) {
        //TODO
    }

    @ChannelHandler.Sharable
    class SocketIdleStateTrigger extends ChannelInboundHandlerAdapter {

        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                IdleState state = ((IdleStateEvent) evt).state();

                /*if (state == IdleState.WRITER_IDLE) {
                    handleWriteIdle(ctx);
                } else if (state == IdleState.READER_IDLE) {
                    handleReadIdle(ctx);
                } else if (state == IdleState.ALL_IDLE) {
                    // 太长时间无收发消息,一般要做断开连接
                    handleAllIdle(ctx);
                }*/
                getWebSocketChannelHandler().onIdleStateEvent(ctx.channel(), state);
            } else {
                super.userEventTriggered(ctx, evt);
            }
        }
    }
}
消息或指令接收接口
package com.zzc.netty.infrastructure.netty;

import io.netty.channel.Channel;
import io.netty.handler.codec.http.FullHttpMessage;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.timeout.IdleState;

import java.util.Map;

/**
 * 消息或指令接收接口
 */
public interface WebSocketChannelHandler {

    boolean beforeHandshake(Channel channel, Map<String, Object> params);

    boolean afterHandshake(Channel channel, Map<String, Object> params);

    void channelActive(Channel channel);

    void channelInactive(Channel channel);

    void channelRead(Channel channel, Object msg);

    void onException(Channel channel, Throwable throwable);

    void onIdleStateEvent(Channel channel, IdleState state);


}

(2)netty配置(config)
package com.zzc.netty.infrastructure.netty.config;
/**
 * netty配置接口
 */
public interface SocketConfig {

    public String getIp();

    public void setIp(String ip);

    public int getPort();

    public void setPort(int port);

    public long getConnectTimeout();

    public void setConnectTimeout(long connectTimeout);

    public long getWriteTimeout();

    public void setWriteTimeout(long writeTimeout);

    public long getCloseTimeout();

    public void setCloseTimeout(long closeTimeout);

}

package com.zzc.netty.infrastructure.netty.config;


/**
 * 基础配置接口实现,后续如果协议拓展需要其他配置则进行继承
 */
public class BaseSocketConfig implements SocketConfig {

    long TIMEOUT_CONNECT = 30 * 1000;

    long TIMEOUT_WRITE = 30 * 1000;

    long TIMEOUT_CLOSE = 120 * 1000;

    private String ip;

    private int port = 9696;

    private long connectTimeout = TIMEOUT_CONNECT;

    private long writeTimeout = TIMEOUT_WRITE;

    private long closeTimeout = TIMEOUT_CLOSE;

    @Override
    public String getIp() {

        return ip;
    }

    @Override
    public void setIp(String ip) {
        this.ip = ip;
    }

    @Override
    public int getPort() {
        return port;
    }

    @Override
    public void setPort(int port) {
        this.port = port;
    }

    @Override
    public long getConnectTimeout() {
        return connectTimeout;
    }

    @Override
    public void setConnectTimeout(long connectTimeout) {
        this.connectTimeout = connectTimeout;
    }

    @Override
    public long getWriteTimeout() {
        return writeTimeout;
    }

    @Override
    public void setWriteTimeout(long writeTimeout) {
        this.writeTimeout = writeTimeout;
    }

    @Override
    public long getCloseTimeout() {
        return closeTimeout;
    }

    @Override
    public void setCloseTimeout(long closeTimeout) {
        this.closeTimeout = closeTimeout;
    }
}
(3)链接接口和封装(conn)
channel基础操作接口封装
package com.zzc.netty.infrastructure.netty.conn;

public interface Conn {

    //boolean isServer();

    boolean isOpen();

    void setAllowWrite(boolean allowWrite);

    boolean isAllowWrite();

    String getConnId();

    void delayRelease();

    void delayRelease(int delayMilliSecond);

    void release();

    void releaseGracefully(Object statusCode);

    void releaseGracefully();

    boolean writeObj(Object msg);


    void writeObjAsyn(Object msg);


    void writeObjAsyn(Object msg, ConnFutureListener listener);


    void writePing();

    void writePong();

}

/**
 * 监听channel操作结果
 */
public interface ConnFutureListener {

    void onSuccess(Conn conn);

    void onCancel();

    void onFailed();

}

websocket能力接口的封装
package com.zzc.netty.infrastructure.netty.conn;

import com.zzc.netty.domain.protocol.Response;

public interface WebSocketConn extends Conn {

    boolean writeResp(Response response);

    void writeRespAsyn(Response response);

}

package com.zzc.netty.infrastructure.netty.conn;

import com.zzc.netty.domain.protocol.Response;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class WebSocketConnImpl extends BaseConn implements WebSocketConn {
    public WebSocketConnImpl(Channel channel) {
        super(channel);
    }


    @Override
    public boolean writeResp(Response response) {
        TextWebSocketFrame frame = new TextWebSocketFrame(response.toString());
        return writeObj(frame);
    }

    @Override
    public void writeRespAsyn(Response response) {
        TextWebSocketFrame frame = new TextWebSocketFrame(response.toString());
        writeObjAsyn(frame);
    }
}

(4)handler实现(handler)
安全校验handler,在握手之前实现
package com.zzc.netty.infrastructure.netty.handler;

import com.zzc.netty.infracore.common.utils.HttpxUtils;
import com.zzc.netty.infrastructure.netty.WebSocketChannelHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;

import java.util.Map;

@Slf4j
@ChannelHandler.Sharable
public class SecurityServerHandler extends ChannelInboundHandlerAdapter {

    public static final AttributeKey<Map<String, Object>> SECURITY_CHECK_ATTRIBUTE_KEY =
            AttributeKey.valueOf("SECURITY_CHECK_ATTRIBUTE_KEY");

    private WebSocketChannelHandler webSocketChannelHandler;

    public SecurityServerHandler(WebSocketChannelHandler webSocketChannelHandler) {
        this.webSocketChannelHandler = webSocketChannelHandler;
    }

    /**
     * 经过测试,在 ws 的 uri 后面不能传递参数,不然在 netty 实现 websocket 协议握手的时候会出现断开连接的情况。
     * 针对这种情况在 websocketHandler 之前做了一层 地址过滤,然后重写
     * request 的 uri,并传入下一个管道中,基本上解决了这个问题。
     * TODO 其他方式,就是重写握手的流程
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof FullHttpRequest) {
            //签名校验
            FullHttpRequest request = (FullHttpRequest) msg;
            Map<String, Object> params = HttpxUtils.urlQueryParams(request.uri());//解析uri中的参数
            boolean valid = webSocketChannelHandler.beforeHandshake(ctx.channel(), params);//握手前校验链接是否通过
            log.info("channelRead valid:{}", valid);
            if (valid) {//校验通过则使用netty的事件发布
                request.setUri("/ws");//需要覆盖uri的参数,否则后续的握手包处有问题
                ctx.channel().attr(SECURITY_CHECK_ATTRIBUTE_KEY).set(params);
                ctx.fireUserEventTriggered(params);
                ctx.pipeline().remove(this);
            } else {
                ctx.close();
                return;
            }
            super.channelRead(ctx, msg);
        }

    }

}
websocket消息处理
package com.zzc.netty.infrastructure.netty.handler;

import com.zzc.netty.infrastructure.netty.WebSocketChannelHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;

import java.util.Map;

@Slf4j
@ChannelHandler.Sharable
public class WebSocketFrameHandler extends SimpleChannelInboundHandler {

    private WebSocketChannelHandler webSocketChannelHandler;

    public WebSocketFrameHandler(WebSocketChannelHandler webSocketChannelHandler) {
        this.webSocketChannelHandler = webSocketChannelHandler;
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        super.channelRegistered(ctx);
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        super.channelUnregistered(ctx);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        webSocketChannelHandler.channelActive(ctx.channel());
        super.channelActive(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        webSocketChannelHandler.channelInactive(ctx.channel());
        super.channelInactive(ctx);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        super.channelReadComplete(ctx);
    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        super.channelWritabilityChanged(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        webSocketChannelHandler.onException(ctx.channel(), cause);
        super.exceptionCaught(ctx, cause);
    }


    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
         if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
            Map<String, Object> params = ctx.channel().attr(SecurityServerHandler.SECURITY_CHECK_ATTRIBUTE_KEY).get();
            log.info("userEventTriggered params:{}", params);
            webSocketChannelHandler.afterHandshake(ctx.channel(), params);
        }
        super.userEventTriggered(ctx, evt);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        webSocketChannelHandler.channelRead(ctx.channel(), msg);
    }

}
(5)服务实现(server)
server配置
package com.zzc.netty.infrastructure.netty.server;


import com.zzc.netty.infrastructure.netty.config.BaseSocketConfig;

/**
 * server配置,实际上 ServerSocketConfig 应该定义为接口,然后在实现,方便拓展
 */
public class ServerSocketConfig extends BaseSocketConfig {

    private int bossThreads = 2;

    private int workThreads = Runtime.getRuntime().availableProcessors() * 2;

    public ServerSocketConfig() {
        super();
    }

    public ServerSocketConfig(int bossThreads, int workThreads) {
        this.bossThreads = bossThreads;
        this.workThreads = workThreads;
    }

    public int getBossThreads() {
        return bossThreads;
    }

    public int getWorkThreads() {
        return workThreads;
    }

}

socket功能实现
package com.zzc.netty.infrastructure.netty.server;


import com.zzc.netty.infrastructure.netty.Socket;

public interface ServerSocket extends Socket<ServerSocketConfig> {

}

package com.zzc.netty.infrastructure.netty.server;

import com.zzc.netty.infracore.common.utils.HttpxUtils;
import com.zzc.netty.infracore.common.utils.ThreadPoolUtils;
import com.zzc.netty.infrastructure.netty.BaseSocket;
import com.zzc.netty.infrastructure.netty.handler.SecurityServerHandler;
import com.zzc.netty.infrastructure.netty.handler.WebSocketFrameHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;

import java.util.Map;
import java.util.concurrent.TimeUnit;


@Slf4j
public class ServerSocketImpl extends BaseSocket<ServerSocketConfig> implements ServerSocket {

    private final static String THREAD_PREFIX_BOSS = "websocketBossServer";

    private final static String THREAD_PREFIX_WORK = "websocketWorkServer";

    ServerBootstrap bootstrap = null;

    EventLoopGroup bossGroup = null;

    EventLoopGroup workGroup = null;

    public ServerSocketImpl() {
        super(true);
    }

    @Override
    protected boolean doStart(ServerSocketConfig serverConfig) {
        long currentTimeMillis = System.currentTimeMillis();
        boolean listenResuset = false;
        try {
            bootstrap = new ServerBootstrap();
            bossGroup = new NioEventLoopGroup(serverConfig.getBossThreads(), ThreadPoolUtils.newThreadFactory(THREAD_PREFIX_BOSS));
            workGroup = new NioEventLoopGroup(serverConfig.getWorkThreads(), ThreadPoolUtils.newThreadFactory(THREAD_PREFIX_WORK));
            bootstrap.group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .childOption(ChannelOption.SO_REUSEADDR, true)
                    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                    .childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 1024, 65535))
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel channel) throws Exception {
                            ChannelPipeline pipeline = channel.pipeline();
                            addNettyIdleHandler(pipeline);
                            addNettyOtherHandler(pipeline);
                        }
                    });
            listenResuset = bootstrap.bind(serverConfig.getPort()).await(serverConfig.getConnectTimeout(), TimeUnit.MICROSECONDS);
        } catch (Exception e) {
            log.error("listen server timeout.", e);
            return false;
        } finally {
            if (listenResuset) {
                log.info("listen server, result:{}, spendTime:{}", listenResuset, (System.currentTimeMillis() - currentTimeMillis));
            } else {
                log.error("listen server error, result:{}, spendTime:{}", listenResuset, (System.currentTimeMillis() - currentTimeMillis));
            }
        }
        return listenResuset;
    }

    @Override
    protected void doClose() {
        if (workGroup != null) {
            workGroup.shutdownGracefully();
        }
        if (bossGroup != null) {
            bossGroup.shutdownGracefully();
        }
    }

    @Override
    protected void addNettyOtherHandler(ChannelPipeline pipeline) {
        pipeline.addLast("http-codec", new HttpServerCodec());
        pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
        pipeline.addLast("http-chunked", new ChunkedWriteHandler());
        pipeline.addLast("security-handler", new SecurityServerHandler(getWebSocketChannelHandler()));
        pipeline.addLast("websocket-compression", new WebSocketServerCompressionHandler());//websocket数据压缩
        pipeline.addLast("handler", new WebSocketServerProtocolHandler("/ws", null, true, 1024 * 1024, true));
        pipeline.addLast("websocket-handler", new WebSocketFrameHandler(getWebSocketChannelHandler()));

    }
}

2.应用层实现(命令模式)

(1)service实现
消息(指令)接收者抽象实现(Receive接收者角色)
package com.zzc.netty.application.service;

import com.zzc.netty.domain.Constant;
import com.zzc.netty.infracore.common.utils.JWTUtils;
import com.zzc.netty.infracore.common.utils.StrUtils;
import com.zzc.netty.infrastructure.netty.WebSocketChannelHandler;
import com.zzc.netty.infrastructure.netty.conn.WebSocketConn;
import com.zzc.netty.infrastructure.netty.conn.WebSocketConnImpl;
import com.zzc.netty.domain.enums.DisconReason;
import io.netty.channel.Channel;
import io.netty.handler.timeout.IdleState;
import lombok.extern.slf4j.Slf4j;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
public abstract class AbstractWebSocketChannelHandler implements WebSocketChannelHandler {

    private static Map<String, WebSocketConn> children = new ConcurrentHashMap<>();

    @Override
    public boolean beforeHandshake(Channel channel, Map<String, Object> params) {
        if (params == null) {
            return false;
        }
        String appId = (String) params.get(Constant.KEY_APP_ID);
        String userId = (String) params.get(Constant.KEY_USERID);
        String username = (String) params.get(Constant.KEY_USERNAME);
        String platform = (String) params.get(Constant.KEY_PLATFORM);
        String token = (String) params.get(Constant.KEY_TOKEN);
        String ts = (String) params.get(Constant.KEY_TS);
        if (StrUtils.isBlank(appId, userId, username, platform, token, ts)) {
            log.info("beforeHandshake param exist null. appId:{}, userId:{}, username:{}, platform:{}, ts:{}", appId, userId, username, platform, ts);
            return false;
        }
        boolean verify = JWTUtils.verify(token);//TODO
        return verify;
    }

    @Override
    public boolean afterHandshake(Channel channel, Map<String, Object> params) {
        log.info("afterHandshake params:{}", params);
        String connId = getConnId(channel);
        WebSocketConn conn = new WebSocketConnImpl(channel);
        boolean connected = connected(conn, params);
        //
        if (connected) {
            addConn(connId, conn);
        }
        return connected;
    }

    @Override
    public void channelActive(Channel channel) {
        log.info("channelActive connId:{}", getConnId(channel));
    }

    @Override
    public void channelInactive(Channel channel) {
        String connId = getConnId(channel);
        WebSocketConn conn = getConn(connId);
        if (conn != null) {
            log.info("remove conn. connId:{}", connId);
            try {
                disconnected(conn, DisconReason.NORMAL);//TODO
            } finally {
                ensureRelease(conn, channel);
            }
        }
    }

    @Override
    public void channelRead(Channel channel, Object msg) {
        String connId = getConnId(channel);
        WebSocketConn conn = getConn(connId);
        if (conn == null) {
            log.error("channelRead error, local cache is null. connId:{}, msg:{}", connId, msg);
            return;
        }
        receiver(conn, msg);
    }

    @Override
    public void onException(Channel channel, Throwable throwable) {
        String connId = getConnId(channel);
        WebSocketConn conn = getConn(connId);
        log.info("onException connId:{}", connId);
        try {
            disconnected(conn, DisconReason.EXCEPTION);
        } finally {
            ensureRelease(conn, channel);
        }
    }

    @Override
    public void onIdleStateEvent(Channel channel, IdleState state) {
        if (state == IdleState.WRITER_IDLE) {

        } else if (state == IdleState.READER_IDLE) {

        } else if (state == IdleState.ALL_IDLE) {
            // 太长时间无收发消息,一般要做断开连接
            String connId = getConnId(channel);
            WebSocketConn conn = getConn(connId);
            log.info("onIdleStateEvent connId:{}", connId);
            try {
                disconnected(conn, DisconReason.IDLE);
            } finally {
                ensureRelease(conn, channel);
            }
        }

    }

    private String getConnId(Channel channel) {
        if (channel != null) {
            return channel.id().asLongText();
        }
        return null;
    }


    private WebSocketConn getConn(String connId) {
        return children.get(connId);
    }

    private void addConn(String connId, WebSocketConn conn) {
        children.put(connId, conn);
    }

    private void removeConn(String connId) {
        children.remove(connId);
    }

    private void ensureRelease(WebSocketConn conn, Channel channel) {
        if (conn == null) {
            channel.close();
            log.warn("ensureRelease conn is null.");
            return;
        }
        try {
            conn.release();
        } finally {
            removeConn(conn.getConnId());
        }
    }

    /**
     * 握手成功后,业务层处理,比如保存用户会话到redis
     * @param conn
     * @param params
     */
    protected abstract boolean connected(WebSocketConn conn, Map<String, Object> params);


    /**
     * ws链接断开,内部清除完本地缓存之后,业务层处理
     */
    protected abstract void disconnected(WebSocketConn conn, DisconReason reason);


    /**
     * 链接正常,收到客户端的消息
     * @param conn
     * @param msg
     */
    protected abstract void receiver(WebSocketConn conn, Object msg);
}

定义指令调用者(Invoker调用者角色)
package com.zzc.netty.application.service;

import com.alibaba.fastjson2.JSON;
import com.zzc.netty.application.CommandFactory;
import com.zzc.netty.domain.command.CommandContext;
import com.zzc.netty.application.CommandHandler;
import com.zzc.netty.infracore.api.CommonCode;
import com.zzc.netty.domain.protocol.Response;
import com.zzc.netty.infracore.common.utils.ThreadPoolUtils;
import com.zzc.netty.infrastructure.netty.conn.WebSocketConn;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Slf4j
@Service
public class CommandInvoker {

    private static ThreadPoolExecutor executor = ThreadPoolUtils.newThreadPoolExecutorDirectAndAsy(
            "command-handler",
            2 * Runtime.getRuntime().availableProcessors(),
            4 * Runtime.getRuntime().availableProcessors(),
            120,
            TimeUnit.SECONDS,
            30);

    public void action(WebSocketConn conn, String jsonStr) {
        executor.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    CommandContext ctx = JSON.parseObject(jsonStr, CommandContext.class);
                    ctx.setResultCode(CommonCode.SUCCESS);
                    log.info("action ctx:{}, resultCode:{}", JSON.toJSONString(ctx), JSON.toJSONString(ctx.getResultCode()));

                    ctx.setWebSocketConn(conn);
                    String command = ctx.getCommand();
                    CommandHandler handler = CommandFactory.getHandler(command);
                    handler.execute(ctx);
                } catch (Exception e) {
                    conn.writeRespAsyn(Response.error(CommonCode.SYSTEM_ERROR));
                    log.error("receiver cmd error.", e);
                }
            }
        });

    }

}

命令抽象实现(命令角色)
package com.zzc.netty.application;

import com.zzc.netty.domain.command.CommandContext;
import com.zzc.netty.infracore.api.CommonCode;
import com.zzc.netty.infracore.exception.DomainException;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public abstract class CommandHandler<T> {

    enum State {

        ACK,

        BEFORE,

        PROCESS,

        AFTER,

        FINISH
    }

    protected abstract boolean ack(CommandContext<T> ctx);

    protected abstract boolean beforeHandler(CommandContext<T> ctx);

    protected abstract boolean handler(CommandContext<T> ctx);

    protected abstract boolean afterHandler(CommandContext<T> ctx);

    protected abstract boolean answer(CommandContext<T> ctx);

    public void execute(CommandContext<T> ctx) {
        State state = State.ACK;
        switch (state) {
            case ACK:
                if (!ack(ctx)) {
                    break;
                }
                state = State.BEFORE;
            case BEFORE:
                if (!beforeHandler(ctx)) {
                    answer(ctx);
                    break;
                }
                state = State.PROCESS;
            case PROCESS:
                if (!handler(ctx)) {
                    answer(ctx);
                    break;
                }
                state = State.AFTER;
            case AFTER:
                if (!afterHandler(ctx)) {
                    answer(ctx);
                    break;
                }
                state = State.FINISH;
            case FINISH:
                answer(ctx);
                return;
            default:
                throw new DomainException(CommonCode.SYSTEM_ERROR);
        }
    }

}

消息或者指令实现(Receive接收者角色和高层模块应用)
package com.zzc.netty.application.service;

import com.zzc.netty.application.dto.CloseWsCommand;
import com.zzc.netty.domain.Constant;
import com.zzc.netty.domain.protocol.ProtocolFactory;
import com.zzc.netty.domain.command.CommandEnums;
import com.zzc.netty.domain.session.UserSession;
import com.zzc.netty.domain.session.UserSessionService;
import com.zzc.netty.infracore.api.CommonCode;
import com.zzc.netty.domain.protocol.Response;
import com.zzc.netty.infrastructure.netty.conn.WebSocketConn;
import com.zzc.netty.domain.enums.DisconReason;
import com.zzc.netty.infrastructure.netty.server.ServerSocketConfig;
import com.zzc.netty.infrastructure.netty.server.ServerSocketImpl;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.Map;

@Slf4j
@Service
public class SignalServiceImpl extends AbstractWebSocketChannelHandler {

    @Autowired
    private UserSessionService userSessionService;

    @Autowired
    private CommandInvoker commandInvoker;

    @PostConstruct
    public void init() {
        ServerSocketImpl serverSocket = new ServerSocketImpl();
        serverSocket.start(new ServerSocketConfig(), this);
    }

    @Override
    protected boolean connected(WebSocketConn conn, Map<String, Object> params) {
        String sessionId = conn.getConnId();
        if (!conn.isOpen()) {
            throw new RuntimeException("connected conn is close. connId:" + sessionId);
        }
        if (params == null || params.isEmpty()) {
            conn.writeRespAsyn(Response.error(CommonCode.PARAM_NULL));
            return false;
        }

        String appId = (String) params.get(Constant.KEY_APP_ID);
        String userId = (String) params.get(Constant.KEY_USERID);
        String platform = (String) params.get(Constant.KEY_PLATFORM);
        String ts = (String) params.get(Constant.KEY_TS);
        String username = (String) params.get(Constant.KEY_USERNAME);

        //TODO 判断是否重连
        UserSession userSession = UserSession.Builder()
                .appId(appId)
                .sessionId(sessionId)
                .userId(userId)
                .platform(platform)
                .username(username)
                .ts(ts)
                .build();

        userSessionService.addUserSession(userSession);
        return conn.writeResp(ProtocolFactory.createResp(CommandEnums.OPENWS.getCommand(), sessionId, ts));
    }

    @Override
    protected void disconnected(WebSocketConn conn, DisconReason reason) {
        if (conn == null) {
            log.error("disconnected error, conn is null.");
            return;
        }
        log.info("disconnected connId:{}", conn.getConnId());
        UserSession userSession = userSessionService.removeUserSession(conn.getConnId());
        if (userSession != null) {
            CloseWsCommand closeWs = new CloseWsCommand();
            closeWs.setReason(reason.getReason());
            conn.writeRespAsyn(ProtocolFactory.createResp(closeWs, CommandEnums.CLOSEWS.getCommand(), userSession.getSessionId(), userSession.getTs()));
        } else {
            log.warn("usersession is null. connId:{}", conn.getConnId());
        }
    }

    @Override
    protected void receiver(WebSocketConn conn, Object msg) {
        if (msg instanceof TextWebSocketFrame) {
            String jsonData = ((TextWebSocketFrame) msg).text();
            log.info("receiver rev msg:{}", jsonData);
            commandInvoker.action(conn, jsonData);
        } else {
            log.warn("receiver other data. class:{}", msg.getClass());
        }
    }

}

指令工厂
package com.zzc.netty.application;

import com.zzc.netty.adapter.handler.AddRoomHandler;
import com.zzc.netty.domain.command.CommandEnums;
import com.zzc.netty.infracore.common.utils.SpringBeansUtil;

import java.util.HashMap;
import java.util.Map;

public class CommandFactory {

    private static final Map<String, CommandHandler> commandHandlers = new HashMap<>();


    static {
    	//继承CommandHandler实习指令后均可添加到此处,也可以使用spring的IOC注入后获取bean;
    	//但是不能使用@Autowired获取,因为都是继承CommandHandler实现的
        commandHandlers.put(CommandEnums.ADDROOM.getCommand(), new AddRoomHandler());
    }

    public static CommandHandler getHandler(String command) {
        return commandHandlers.get(command);
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2060666.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Matlab基本知识

&#x1f308;个人主页&#xff1a;羽晨同学 &#x1f4ab;个人格言:“成为自己未来的主人~” %% Matlab基本的小常识 % (1)在每一行的语句后面加上分号&#xff08;一定要是英文的) a3; a5; % (2)多行注释&#xff1a;选中要注释的若干语句&#xff0c;快捷键CtrlR % a3; %…

【Vue3】集成 Element Plus

【Vue3】集成 Element Plus 背景简介开发环境开发步骤及源码总结 背景 随着年龄的增长&#xff0c;很多曾经烂熟于心的技术原理已被岁月摩擦得愈发模糊起来&#xff0c;技术出身的人总是很难放下一些执念&#xff0c;遂将这些知识整理成文&#xff0c;以纪念曾经努力学习奋斗的…

openai whisper使用

whisper使用 介绍 Whisper是一种通用的语音识别模型。它是在大量不同音频数据集上训练的&#xff0c;也是一个多任务模型&#xff0c;可以执行多语言语音识别、语音翻译和语言识别。 GitHub&#xff1a;https://github.com/openai/whisper 论文链接&#xff1a;https://arx…

AI-Talk开发板更新CH32固件

一、说明 CSK6011A有33个GPIO&#xff0c;但把WIFI、LCD、TP、CAMERA这些外设全部加上后&#xff0c;CSK6011A的IO不够用&#xff0c;还差6个&#xff0c;所以增加了一颗IO扩展MCU。CSK6-MIX开发板使用的IO扩展MCU为CH32V003F4P6&#xff0c;并且SDK也包含了此MCU的固件。AI-Ta…

机械学习—零基础学习日志(如何理解概率论3)

随机变量的函数分布 一维随机变量分布&#xff0c;可以看到下图&#xff0c;X为不同情况的概率。而x如果是大于等于X&#xff0c;那么当x在40以内时&#xff0c;没有概率&#xff0c;为0。 当x变大&#xff0c;在40-80之间&#xff0c;那么x大于X的概率为&#xff0c;0.7&…

性能测试 —— 系统架构性能优化思路!

这篇文章重点还是谈已经上线的业务系统后续出现性能问题后的问题诊断和优化重点。 系统性能问题分析流程 我们首先来分析下如果一个业务系统上线前没有性能问题&#xff0c;而在上线后出现了比较严重的性能问题&#xff0c;那么实际上潜在的场景主要来自于以下几个方面。 业务出…

回归预测 | Matlab实现BES-ESN秃鹰搜索算法优化回声状态网络多输入单输出回归预测

回归预测 | Matlab实现BES-ESN秃鹰搜索算法优化回声状态网络多输入单输出回归预测 目录 回归预测 | Matlab实现BES-ESN秃鹰搜索算法优化回声状态网络多输入单输出回归预测效果一览基本介绍程序设计参考资料 效果一览 基本介绍 1.Matlab实现BES-ESN秃鹰搜索算法优化回声状态网络…

宿舍管理系统_o4dvi

TOC springboot574宿舍管理系统_o4dvi--论文 第一章 概述 1.1 研究背景 近些年&#xff0c;随着中国经济发展&#xff0c;人民的生活质量逐渐提高&#xff0c;对网络的依赖性越来越高&#xff0c;通过网络处理的事务越来越多。随着宿舍管理的常态化&#xff0c;如果依然采用…

JVM 运行时内存结构简介

JVM 运行时内存结构简介 一、前言二、JVM 运行时内存结构2.1 线程隔离数据区&#xff1a;2.2 线程共享数据区&#xff1a; 三、JVM 内存区域划分1. 程序计数器&#xff08;PC&#xff09;2. 虚拟机栈3. 本地方法栈4. Java 堆5. 方法区6. 运行时常量池 附录 一、前言 JVM&#…

LLM RAG检索生成的深度解析:理解其工作原理与应用

前言 2024年随着大模型进一步增强升级&#xff0c;越来越多的大模型应用落地&#xff0c;经过初期的探索和研究&#xff0c;目前业界逐渐收敛聚聚于两个主要的应用方向&#xff1a;RAG和Agents。今天我们就来先聊聊这个RAG&#xff5e; 一.RAG基本介绍 RAG&#xff1a;全称R…

Linux下enable bbr

最近开通一台VPS&#xff0c;操作系统选择了Ubuntu 22.04&#xff0c;需要启用bbr功能。 BBR 是 Bottleneck Bandwidth&#xff08;瓶颈带宽&#xff09;的缩写&#xff0c;而 RTT 是一种拥塞控制系统。您可以在 Linux 桌面上启用 TCP BBR&#xff0c;以改善整体网上冲浪体验。…

IO模型-----聊天室

运行1个服务器和2个客户端 实现效果&#xff1a; 服务器和2个客户端互相聊天&#xff0c;服务器和客户端都需要使用select模型去实现 服务器要监视2个客户端是否连接&#xff0c;2个客户端是否发来消息以及服务器自己的标准输入流 客户端要监视服务器是否发来消息以及客户端自己…

Elasticsearch 安装 windows

1&#xff0c;下载Elasticsearch Download Elasticsearch | Elastic 2,下载的压缩包解压后 3&#xff0c;进入bin文件夹&#xff0c;双击 elasticsearch.bat 启动 4&#xff0c;修改配置 启动成功后先关掉了ctrlc关掉 进入config文件夹&#xff0c;打开文件 elasticsearch.ym…

力扣 1425带限制的子序列和

这是一道 动态规划加单调队列的题&#xff0c;重点加强单调队列知识的学习 回归本题&#xff0c;这个题中&#xff0c;动态规划的部分略去&#xff0c;状态转移方程可求 单调队列部分 1维护队头 if(i-sta.front() k) sta.pop_front(); 2维护队尾 while(!sta.empty() &…

企业级web应用服务器tomcat

目录 一、Web技术 1.1 HTTP协议和B/S 结构 1.2 前端三大核心技术 1.2.1 HTML 1.2.2 CSS&#xff08;Cascading Style Sheets&#xff09;层叠样式表 1.2.3 JavaScript 二、tomcat的功能介绍 2.1 安装 tomcat 环境准备 2.1.1 安装java环境 2.1.2 安装并启动tomcat …

C++:模板 II(非类型模板参数,特化,分离编译)

目录 非类型模板参数 模板的特化 函数模板特化 类模板特化 全特化 偏特化 引用特化 指针特化 模板分离编译 非类型模板参数 什么是非类型模板参数&#xff1f; 顾名思义&#xff0c;它的类型形参并不是一个类型&#xff0c;就是用一个常量来作为类模板或函数模板的…

(软件测试)基础2

1.等价类划分法 步骤&#xff1a; 2.编写数据用例 例题&#xff1a;参考基础1中的手机号实例 2.边界值分析法

p8 Run的流程和Docker原理

docker run的运行原理图 docker是怎么工作的&#xff1f; docker是一个cs的一个结构的系统docker的守护进程运行在宿主机上面通过socket进行访问 其实就是看下面的这个图&#xff0c;通过客户端的命令来操作docker的守护进程然后启动一些容器&#xff0c;默认容器是不启动的 …

凡图公益行:凡图家庭教育以行动筑梦,点亮孩子心中的光芒

在教育的路上&#xff0c;每一步都承载着未来的希望&#xff0c;凡图(山东)教育科技集团有限公司一直致力于为每一个孩子及家庭提供最优质的心理咨询服务。 在这样的背景下&#xff0c;凡图家庭教育以独特的使命感和责任感&#xff0c;成为了众多家庭信赖的伙伴。 也因此成为…

【 打印菱形】打印菱形

打印菱形 C语言实现&#xff0c;具体代码&#xff1a; #include<stdio.h>int main(){int i,j,k;// 上半部分for(i0;i<3;i){for(j0;j<2-i;j)printf(" ");for(k0;k<2*i;k)printf("*");printf("\n");}// 下半部分for(i0;i<2;i)…