使用高性能NIO框架netty实现IM集群对聊方案

news2024/9/8 10:36:18

文章目录

    • 前言
    • 技术积累
      • 什么是netty
      • netty如何实现IM
      • 如何实现IM集群
    • 实战演示
      • 基础配置
      • netty搭建IM集群
      • redis发布订阅
    • 实战测试

前言

在前面的博文中我们分享了原生websoket集群搭建,也用redis 发布订阅实现了集群消息正常有序分发。但是有不少同学希望风向一期netty实现websoket,可以实现对聊方案。其实前面分享的websocket集群解决方案完全可以实现im对聊的,只是性能处理上没有nettty那么好。今天我们就分享一起使用高性能NIO框架netty实现IM集群对聊方案,各位看官敬请鉴赏。

技术积累

什么是netty

Netty是由JBOSS提供的一个java开源框架,现为 Github上的独立项目。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

netty的核心是支持零拷贝的bytebuf缓冲对象、通用通信api和可扩展的事件模型;它支持多种传输服务并且支持HTTP、Protobuf、二进制、文本、WebSocket 等一系列常见协议,也支持自定义协议。
  
在这里插入图片描述

netty的模型是基于reactor多线程模型,其中mainReactor用于接收客户端请求并转发给subReactor。SubReactor负责通道的读写请求,非 IO 请求(具体逻辑处理)的任务则会直接写入队列,等待 worker threads 进行处理。
在这里插入图片描述

netty如何实现IM

netty支持websocket通讯协议,那么我们就可以用它来实现websoket,实现后端服务主动向前端推送消息的功能。

比如AB用户分别注册到websoket后台,A用户向B用户发送消息,后端接收到A用户消息后判断消息接收者是B用户,然后后端逻辑直接调用B用户websoket连接进行推送即可。

如何实现IM集群

由于websocket 长连接是注册到后端服务本地缓存的,而且这个信道回话是不能被其他中间件缓存的,你们我们就只能在缓存目标用户的服务上拿到会话进行推送消息。

之前博文中也讲到过可以使用消息广播的形式找到目标回话服务,比如Redis的发布订阅、其他Mq等等。当A用户注册到C后端服务上,B服务注册到D后端服务上,这个时候如果A向B发送消息,则需要在C后端服务上增肌广播逻辑,让其他服务感知并监听到消息体,其他服务D收到消息后会验证是否这个用户会话缓存在本地,如果存在则向B前端对送消息,不再则不予处理。这样,就完成了IM集群回话的整个链路流程。
在这里插入图片描述

实战演示

本次实战我们简单使用netty实现IM集群通讯即可,如果需要用于生成环境需要增加一些防卫式程序设计,比如Redis发布监听冗余检验后错误处理等等。

基础配置

maven依赖

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.12.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
    <java.version>8</java.version>
    <spring-cloud.version>Hoxton.SR12</spring-cloud.version>
</properties>
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring-cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
<!--netty-->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.6.Final</version>
</dependency>
<!-- 整合thymeleaf前端页面 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.68</version>
</dependency>

application配置文件

server:
  port: 9999
spring:
  profiles:
    active: dev
  mvc:
    pathmatch:
      # Springfox使用的路径匹配是基于AntPathMatcher的,而Spring Boot 2.6.X使用的是PathPatternMatcher
      matching-strategy: ant_path_matcher
  redis:
    host: 127.0.0.1
    port: 6379
  thymeleaf:
    mode: HTML
    encoding: UTF-8
    content-type: text/html
    cache: false
    prefix: classpath:/templates/

websocket演示html

<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8">
    <title>websocket通讯</title>
</head>
<script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.js"></script>
<script>
    var socket;
    function openSocket() {
        if (typeof (WebSocket) == "undefined") {
            console.log("您的浏览器不支持WebSocket");
        } else {
            console.log("您的浏览器支持WebSocket");
            //实现化WebSocket对象,指定要连接的服务器地址与端口  建立连接
            //等同于socket = new WebSocket("ws://localhost:8888/xxxx/im/25");
            //var socketUrl="${request.contextPath}/im/"+$("#userId").val();
            //var socketUrl = "ws://192.168.112.10:7777/ws/" + $("#userId").val();
            //var socketUrl = "wss://192.168.112.10/ws/"+ $("#userId").val();
            //var socketUrl = "wss://192.168.112.10:8899"
            var socketUrl = "ws://127.0.0.1:88?userId="+$("#userId").val();
            socketUrl = socketUrl.replace("https", "ws").replace("http", "ws");
            console.log(socketUrl);
            if (socket != null) {
                socket.close();
                socket = null;
            }
            socket = new WebSocket(socketUrl);
            //打开事件
            socket.onopen = function () {
                console.log("websocket已打开");
                //socket.send("这是来自客户端的消息" + location.href + new Date());
            };
            //获得消息事件
            socket.onmessage = function (msg) {
                console.log("接收消息为:"+msg.data);
            };
            //关闭事件
            socket.onclose = function () {
                console.log("websocket已关闭");
            };
            //发生了错误事件
            socket.onerror = function () {
                console.log("websocket发生了错误");
            }
        }
    }

    function sendMessage() {
        if (typeof (WebSocket) == "undefined") {
            console.log("您的浏览器不支持WebSocket");
        } else {
            console.log("您的浏览器支持WebSocket");
            console.log('发送消息为:{"fromUserId":"' + $("#userId").val() + '","toUserId":"' + $("#toUserId").val() + '","contentText":"' + $("#contentText").val() + '"}');
            socket.send('{"fromUserId":"' + $("#userId").val() + '","toUserId":"' + $("#toUserId").val() + '","contentText":"' + $("#contentText").val() + '"}');
        }
    }

</script>
<body>
<p>【userId】:<div><input id="userId" name="userId" type="text" value="10"></div>
<p>【toUserId】:<div><input id="toUserId" name="toUserId" type="text" value="20"></div>
<p>【toUserId】:<div><input id="contentText" name="contentText" type="text" value="hello websocket"></div>
<p>【操作】:<div><button onclick="openSocket()">开启socket</button></div>
<p>【操作】:<div><button onclick="sendMessage()">发送消息</button></div>
</body>
</html>

netty搭建IM集群

创建消息体
Message.java

import lombok.Data;

/**
 * Message
 * @author senfel
 * @version 1.0
 * @date 2024/5/17 14:39
 */
@Data
public class Message {
    /**
     * 消息编码
     */
    private String code;

    /**
     * 来自(保证唯一)
     */
    private String fromUserId;

    /**
     * 去自(保证唯一)
     */
    private String toUserId;

    /**
     * 内容
     */
    private String contentText;


}

创建netty websocket连接池
NettyWebSocketPool.java

import io.netty.channel.Channel;
import java.util.concurrent.ConcurrentHashMap;

/**
 * NettyWebSocketPool
 * @author senfel
 * @version 1.0
 * @date 2024/5/23 10:19
 */
public class NettyWebSocketPool {
    /**
     * 通道连接池
     */
    public static final ConcurrentHashMap<String, Channel> CHANNELS = new ConcurrentHashMap<>();
}

创建websocket处理器
WebsocketServerHandler.java

import com.example.ccedemo.im.CommonConstants;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;

/**
 * WebsocketServerHandler
 * @author senfel
 * @version 1.0
 * @date 2024/5/22 10:57
 */
@Slf4j
public class WebsocketServerHandler extends SimpleChannelInboundHandler<Object> {

    private final static ThreadLocal<String> USER_LIST = new ThreadLocal<>();

    private WebSocketServerHandshaker handshaker;

    private StringRedisTemplate stringRedisTemplate;

    public WebsocketServerHandler() {
    }

    public WebsocketServerHandler(StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        InetSocketAddress reAddr = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = reAddr.getAddress().getHostAddress();
        String clientPort = String.valueOf(reAddr.getPort());
        log.debug("有新的客户端接入:{}:{}", clientIp, clientPort);
    }


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof FullHttpRequest) {
            handleHttpRequest(ctx, (FullHttpRequest) msg);
        } else if (msg instanceof WebSocketFrame) {
            handleWebSocketFrame(ctx, (WebSocketFrame) msg);
        }
    }

    private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame msg) {
        if (msg instanceof CloseWebSocketFrame) {
            disconnectCurrentUser();
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) msg.retain());
            return;
        }
        if (msg instanceof PingWebSocketFrame) {
            log.info("websocket ping message");
            ctx.channel().write(new PingWebSocketFrame(msg.content().retain()));
        } else if (msg instanceof TextWebSocketFrame) {
            // websocket消息解压成字符串让下一个handler处理
            String text = ((TextWebSocketFrame) msg).text();
            log.info("请求数据|{}", text);
            // 如果不调用这个方法后面的handler就获取不到数据
            ctx.fireChannelRead(text);
        } else {
            log.error("不支持的消息格式");
            throw new UnsupportedOperationException("不支持的消息格式");
        }
    }

    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
        if (!req.decoderResult().isSuccess()
                || (!"websocket".equalsIgnoreCase(req.headers().get(HttpHeaderNames.UPGRADE)))) {
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
            return;
        }
        WebSocketServerHandshakerFactory wsShakerFactory = new WebSocketServerHandshakerFactory(
                "ws://" + req.headers().get(HttpHeaderNames.HOST), null, false);
        handshaker = wsShakerFactory.newHandshaker(req);
        if (handshaker == null) {
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
        } else {
            String uri = req.uri();
            Map<String, String> paramMap = null;
            //如果url包含参数,需要处理
            if (uri.contains(CommonConstants.QUESTION)) {
                paramMap = getUrlParams(uri);
                String newUri = uri.substring(0, uri.indexOf(CommonConstants.QUESTION));
                req.setUri(newUri);
            }
            //缓存当前连接
            assert paramMap != null;
            String channelId = "userId:"+paramMap.get("userId");
            log.info("缓存用户通道信息:{}",ctx.channel().localAddress());
            log.info("缓存用户通道信息:{}",ctx.channel().remoteAddress());
            NettyWebSocketPool.CHANNELS.put(channelId, ctx.channel());
            USER_LIST.set(channelId);
            //写入在线用户
            stringRedisTemplate.opsForValue().set(CommonConstants.WEBSOCKET_CHANNEL_ID_PREFIX+channelId, channelId);
            //建立websocket连接握手
            handshaker.handshake(ctx.channel(), req);
        }
    }

    private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest msg, DefaultFullHttpResponse response) {
        if (response.status().code() != HttpResponseStatus.OK.code()) {
            ByteBuf buf = Unpooled.copiedBuffer(response.status().toString(), CharsetUtil.UTF_8);
            response.content().writeBytes(buf);
            buf.release();
        }
        ChannelFuture cf = ctx.channel().writeAndFlush(response);
        if (!HttpUtil.isKeepAlive(msg) || response.status().code() != HttpResponseStatus.OK.code()) {
            cf.addListener(ChannelFutureListener.CLOSE);
        }
    }


    /**
     * url 参数切割
     * @param url
     * @return
     */
    private Map<String, String> getUrlParams(String url) {
        Map<String, String> map = new HashMap<>(4);
        url = url.replace("?", ";");
        if (!url.contains(";")) {
            return map;
        }
        if (url.split(";").length > 0) {
            String[] arr = url.split(";")[1].split("&");
            for (String s : arr) {
                String[] data = s.split("=");
                if (data.length > 1) {
                    map.put(data[0], data[1]);
                }
            }
            return map;
        } else {
            return map;
        }
    }


    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("消息处理异常:{}", USER_LIST.get(), cause);
        disconnectCurrentUser();
        ctx.close();
    }

    /**
     * 状态触发 检测是否处于空闲状态 间隔时间 60s
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {
            log.info("握手完成,连接地址为:{}", ctx.channel().remoteAddress());
        } else if (evt instanceof IdleStateEvent) {
            if (!StringUtils.isEmpty(USER_LIST.get())) {
                //断开连接
                disconnectCurrentUser();
                ctx.disconnect();
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    /**
     * disconnectCurrentUser
     * @author senfel
     * @date 2024/5/23 15:13
     * @return void
     */
    private void disconnectCurrentUser() {
        log.info("谁断开了连接:{}",USER_LIST.get());
        log.info("userEventTriggered 触发,断开连接");
        NettyWebSocketPool.CHANNELS.remove(USER_LIST.get());
        stringRedisTemplate.delete(CommonConstants.WEBSOCKET_CHANNEL_ID_PREFIX+USER_LIST.get());
        USER_LIST.remove();
    }
}

创建websocket输入输出处理器
UserWebsocketInHandler.java

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * UserWebsocketInHandler
 * 入站处理器:获取请求数据,完成业务处理,推送消息给浏览器
 * @author senfel
 * @version 1.0
 * @date 2024/5/22 11:10
 */
@Slf4j
public class UserWebsocketInHandler extends SimpleChannelInboundHandler<String> {

    private StringRedisTemplate stringRedisTemplate;

    public UserWebsocketInHandler(StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        log.error(Thread.currentThread().getName() + "|" + msg);
        String pattern ="\\{.*\\}|\\[.*\\]";
        Pattern r= Pattern.compile(pattern);
        Matcher m =r.matcher(msg);
        if(m.matches()){
            stringRedisTemplate.convertAndSend("nettyWebsocketMsgPush",msg);
        }else {
            ctx.writeAndFlush(new TextWebSocketFrame(msg));
        }
    }
}
import io.netty.channel.*;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

/**
 * UserWebsocketOutHandler
 * 出站处理器:判断数据是否需要进行封装
 * @author senfel
 * @version 1.0
 * @date 2024/5/22 11:10
 */

public class UserWebsocketOutHandler extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if(msg instanceof String) {
            ctx.write(new TextWebSocketFrame((String) msg), promise);
        } else {
            super.write(ctx, msg, promise);
        }
    }
}

创建netty服务端
NettyWebsocketServer.java

import com.example.ccedemo.im.NettyServer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * NettyWebsocketServer
 * @author senfel
 * @version 1.0
 * @date 2024/5/22 11:03
 */
@Slf4j
public class NettyWebsocketServer implements Runnable {

    private StringRedisTemplate stringRedisTemplate;

    /**
     * 服务端IP地址
     */
    private String ip;
    /**
     * 服务端端口号
     */
    private int port;

    public NettyWebsocketServer(String ip, int port, StringRedisTemplate stringRedisTemplate) {
        this.ip = ip;
        this.port = port;
        this.stringRedisTemplate = stringRedisTemplate;
    }

    @Override
    public void run() {
        // 指定boss线程数:主要负责接收连接请求,一般设置为1就可以
        final EventLoopGroup boss = new NioEventLoopGroup(1, new ThreadFactory() {
            private AtomicInteger index = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NioBoss_%d", this.index.incrementAndGet()));
            }
        });

        // 指定worker线程数:主要负责处理连接就绪的连接,一般设置为CPU的核心数
        final int totalThread = Runtime.getRuntime().availableProcessors();
        final EventLoopGroup worker = new NioEventLoopGroup(totalThread, new ThreadFactory() {
            private AtomicInteger index = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NioSelector_%d_%d", totalThread, this.index.incrementAndGet()));
            }
        });

        // 指定任务处理线程数:主要负责读取数据和处理响应,一般该值设置的比较大,与业务相对应
        final int jobThreads = 1024;
        final EventLoopGroup job = new DefaultEventLoopGroup(jobThreads, new ThreadFactory() {
            private AtomicInteger index = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NioJob_%d_%d", jobThreads, this.index.incrementAndGet()));
            }
        });

        // 日志处理handler:类定义上面有Sharable表示线程安全,可以将对象定义在外面使用
        final LoggingHandler LOGGING_HANDLER = new LoggingHandler();

        // 指定服务端bootstrap
        ServerBootstrap server = new ServerBootstrap();
        server.group(boss, worker)
                // 指定通道类型
                .channel(NioServerSocketChannel.class)
                // 指定全连接队列大小:windows下默认是200,linux/mac下默认是128
                .option(ChannelOption.SO_BACKLOG, 2048)
                // 维持链接的活跃,清除死链接
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                // 关闭延迟发送
                .childOption(ChannelOption.TCP_NODELAY, true)
                // 添加handler处理链
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel channel) throws Exception {
                        ChannelPipeline pipeline = channel.pipeline();

                        // 日志处理
                        pipeline.addLast(LOGGING_HANDLER);
                        // 心跳检测:读超时时间、写超时时间、全部超时时间(单位是秒,0表示不处理)
                        pipeline.addLast(new IdleStateHandler(60, 60, 60, TimeUnit.SECONDS));
                        // 处理http请求的编解码器
                        pipeline.addLast(job, "httpServerCodec", new HttpServerCodec());
                        pipeline.addLast(job, "chunkedWriteHandler", new ChunkedWriteHandler());
                        pipeline.addLast(job, "httpObjectAggregator", new HttpObjectAggregator(65536));
                        // 处理websocket的编解码器
                        pipeline.addLast(job, "websocketHandler", new WebsocketServerHandler(stringRedisTemplate));
                        // 自定义处理器
                        pipeline.addLast(job, "userOutHandler", new UserWebsocketOutHandler());
                        pipeline.addLast(job, "userInHandler", new UserWebsocketInHandler(stringRedisTemplate));
                    }
                });
        try {
            // 服务端绑定对外服务地址
            ChannelFuture future = server.bind(ip, port).sync();
            log.info(NettyServer.class + " 启动正在监听: " + future.channel().localAddress());
            // 等待服务关闭,关闭后释放相关资源
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
            job.shutdownGracefully();
        }
    }
}

redis发布订阅

redis配置消息监听
RedisListenerConfig.java

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;

/**
 * RedisListenerConfig
 * @author senfel
 * @version 1.0
 * @date 2024/5/24 16:26
 */
@Configuration
public class RedisListenerConfig {

     @Bean
     RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
         container.setConnectionFactory(connectionFactory);
        container.addMessageListener(new LiveRedisKeysExpireListener(), new PatternTopic("nettyWebsocketMsgPush"));
        return container;
    }
}

redis监听消息处理
LiveRedisKeysExpireListener.java

import com.alibaba.fastjson.JSONObject;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import java.util.Objects;

/**
 * LiveRedisKeysExpireListener
 * @author senfel
 * @version 1.0
 * @date 2024/5/24 16:25
 */
public class LiveRedisKeysExpireListener implements MessageListener {

    @Override
    public void onMessage(Message msg, byte[] bytes) {
        System.out.println("监听到需要进行负载转发的消息:" + msg.toString());
        com.example.ccedemo.nettydemo.Message message = JSONObject.parseObject(msg.toString(), com.example.ccedemo.nettydemo.Message.class);
        Channel channel = NettyWebSocketPool.CHANNELS.get("userId:" + message.getToUserId());
        if(Objects.nonNull(channel)){
            channel.writeAndFlush(new TextWebSocketFrame(msg.toString()));
        }
    }
}

实战测试

浏览器分别打开两个无痕页面
http://127.0.0.1:9999/websocket/page
模拟对话两个页面的用户互斥
在这里插入图片描述

分别发送消息实现对聊
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1703042.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于生命周期评价法的农田温室气体排放估算;农田CH4和N2O排放模拟;农田碳库模型和土壤呼吸等

目录 专题一 温室气体排放模拟研究 专题二 农田CH4和N2O排放模拟 专题三 农田碳库模型和土壤呼吸 专题四 基于生命周期评价法的农田温室气体排放估算 专题五-六 基于过程模型的温室气体排放模拟 专题七 案例模拟与疑难解答 更多应用 农业是甲烷&#xff08;CH4&#xff…

List基本使用(C++)

目录 1.list的介绍 2.list的使用 list的构造 list的size() 和 max_size() list遍历操作 list元素修改操作 assign()函数 push_front(),push_back 头插&#xff0c;尾插 pop_front() pop_back 头删尾删 insert()函数 swap()函数 resize()函数 clear()函数 list类数…

yolov10 瑞芯微 rknn 部署 C++代码

yolov10 目标检测rknn的C部署来了。 特别说明&#xff1a;如有侵权告知删除&#xff0c;谢谢。 直接上代码和模型&#xff0c;欢迎参考交流 【完整代码和模型】 1、rknn模型准备 pytorch转onnx&#xff0c;onnx再转rknn模型这一步就不再赘述&#xff0c;请参考上一篇 【yolov1…

TPM之VMK密封

本篇文章主要介绍基于TPM的Bitlocker全盘加密时,VMK密钥的密封(Seal)流程,至于TPM、Bitlocker、密钥保护器、VMK密钥等这些东西是什么,这里不做解释,需要自己脑补一下(╮(╯▽╰)╭)。 首先看看一张结构图(来自网络),了解一下TPM加密的基本框架与流程: 同样,基于…

关于读书,你可能没想到的陷阱、问题和思考

最近经常看到有人问&#xff1a;AI 已经这么发达了&#xff0c;如果以后的 AI 会更智能&#xff0c;那我们还有必要读书吗&#xff1f; 我认为&#xff1a;还是十分有必要的。 为什么呢&#xff1f;因为读书其实不仅仅是为了获取知识&#xff0c;它更重要的一个作用&#xff0c…

数据结构-堆排序问题

需要在数组里面进行排序&#xff0c;我们可以采取堆排序对其解决问题 版本1&#xff1a; 创建一个数组等大的堆&#xff0c;把数组里面的数值输入到堆里面进行堆排序&#xff0c;但是这样的弊端就是&#xff0c;不是顺序排序 版本2&#xff1a; 每次我们取堆顶然后打印&#xf…

3440亿!国家大基金三期正式落地,关注半导体与算力芯片!

重磅消息来了&#xff01; 5月24日&#xff0c;注册规模3,440亿元人民币的“国家集成电路产业投资基金三期股份有限公司”正式成立&#xff0c;这也意味着&#xff0c;传闻已久的**“国家大基金三期”正式落地&#xff01;** 企查查股东信息显示&#xff0c;该公司由财政部、国…

移动云:开发者手中的未来钥匙

《移动云&#xff1a;开发者手中的未来钥匙》 引言一、无缝集成&#xff0c;加速开发进程二、数据智能&#xff0c;洞悉用户心声三、安全合规&#xff0c;护航创新之旅四、成本优化&#xff0c;助力轻装前行总结 引言 在科技日新月异的今天&#xff0c;移动云已成为推动行业变革…

文件夹类型异常成文件:原因解析与恢复策略

在数字时代&#xff0c;数据的安全与完整性对于个人和企业都至关重要。然而&#xff0c;有时我们可能会遇到一种令人困惑的情况&#xff1a;原本应该是文件夹的图标&#xff0c;却突然变成了文件的图标&#xff0c;这就是所谓的“文件夹类型成文件”问题。本文将深入探讨这一现…

(原创)从右到左排列RecycleView的数据

问题的提出 当我们写一个Recycleview时&#xff0c;默认的效果大概是这样的&#xff1a; 当然&#xff0c;我们也可以用表格布局管理器GridLayoutManager做成这样&#xff1a; 可以看到&#xff0c;默认的绘制方向是&#xff1a; 从左到右&#xff0c;从上到下 那么问题来了…

香橙派 AIpro综合体验及AI样例运行

香橙派 AIpro综合体验及AI样例运行 环境&#xff1a; 香橙派版本&#xff1a; AIpro(8TOPSINT8) OS : Ubuntu 22.04.3 LTS(GNU/Linux 5.10.0 aarch64) (2024-03-18) 远程服务端1&#xff1a;OpenSSH 8.9p1 远程服务端2&#xff1a;TightVNC Server 1.3.10 远程客户端&#xf…

使用numpy手写一个神经网络

本文主要包含以下内容&#xff1a; 推导神经网络的误差反向传播过程使用numpy编写简单的神经网络&#xff0c;并使用iris数据集和california_housing数据集分别进行分类和回归任务&#xff0c;最终将训练过程可视化。 1. BP算法的推导过程 1.1 导入 前向传播和反向传播的总体…

基于EBAZ4205矿板的图像处理:10gamma变换

基于EBAZ4205矿板的图像处理&#xff1a;10gamma变换 项目全部文件 会上传项目全部文件&#xff0c;如果没传&#xff0c;可以私信催我一下&#xff0c;最近太忙了 先看效果 我的项目中的gamma的变换系数为2.2&#xff0c;是会让图像整体变暗的&#xff0c;看右图说明我的ga…

哪款洗地机好用?洗地机十大排行榜

在智能家电飞速发展的今天&#xff0c;洗地机因其吸拖洗一体化的技术优势&#xff0c;成为越来越多家庭的清洁利器。它不仅能快速清理各种地面污渍&#xff0c;还能轻松处理干湿垃圾&#xff0c;大大提升了日常清洁的效率。可是面对市场上琳琅满目的洗地机品牌和型号&#xff0…

数据持久化第六课-ASP.NET运行机制

数据持久化第六课-ASP.NET运行机制 一.预习笔记 1.动态网页的工作机制通常分为以下几个阶段&#xff1a; 1&#xff09;使用动态Web开发技术编写Web应用程序&#xff0c;并部署到Web服务器。 2&#xff09;客户端通过在浏览器中输入地址&#xff0c;请求动态页面。 3&#…

Swift 初学者交心:在 Array 和 Set 之间我们该如何抉择?

概述 初学 Swift 且头发茂密的小码农们在日常开发中必定会在数组&#xff08;Array&#xff09;和集合&#xff08;Set&#xff09;两种类型之间的选择中“摇摆不定”&#xff0c;这也是人之常情。 Array 和 Set 在某些方面“亲如兄弟”&#xff0c;但实际上它们之间却有着“云…

关于DDos防御...别在听别人瞎扯了.....

前言 无意间刷文章的时候看到一篇文章&#xff0c;写的是遇到ddos&#xff0c;怎么用iptables封IP....... 然后我就百度搜了一下&#xff0c;好多都是这么说的&#xff0c;但是我发现&#xff0c;大多数人只要遭受过长期Ddos的&#xff0c;就不会再信网上的文章 文笔不太好&…

【Qt】深入探索Qt事件处理:从基础到高级自定义:QEvent

文章目录 前言&#xff1a;1. 事件的介绍2. 事件的处理2.1. 示例1&#xff1a; 重写鼠标进入和鼠标离开事件2.2. 示例2&#xff1a;当鼠标点击时&#xff0c;获取对应的坐标值&#xff1b;2.3. 鼠标释放事件2.4. 鼠标双击事件2.5. 鼠标移动事件2.6. 鼠标滚轮的滚动事件 3. 按键…

后端经典三层架构

大家好&#xff0c;这里是教授.F 引入&#xff1a; MVC 全称∶ Model 模型、View 视图、 Controller 控制器。MVC 最早出现在 JavaEE 三层中的 Web 层&#xff0c;它可以有效的指导WEB 层的代码如何有效分离&#xff0c;单独工作。 View 视图∶只负责数据和界面的显示&#…

Python I/O操作笔记

打开文件&#xff1a; 使用 open() 函数&#xff0c;其中文件路径可以是相对路径或绝对路径。 模式除了常见的 r&#xff08;只读&#xff09;、w&#xff08;写入&#xff0c;会覆盖原有内容&#xff09;、a&#xff08;追加&#xff09;外&#xff0c;还有一些其他组合模式&…