文章目录
- 前言
- 技术积累
- 什么是netty
- netty如何实现IM
- 如何实现IM集群
- 实战演示
- 基础配置
- netty搭建IM集群
- redis发布订阅
- 实战测试
前言
在前面的博文中我们分享了原生websoket集群搭建,也用redis 发布订阅实现了集群消息正常有序分发。但是有不少同学希望风向一期netty实现websoket,可以实现对聊方案。其实前面分享的websocket集群解决方案完全可以实现im对聊的,只是性能处理上没有nettty那么好。今天我们就分享一起使用高性能NIO框架netty实现IM集群对聊方案,各位看官敬请鉴赏。
技术积累
什么是netty
Netty是由JBOSS提供的一个java开源框架,现为 Github上的独立项目。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
netty的核心是支持零拷贝的bytebuf缓冲对象、通用通信api和可扩展的事件模型;它支持多种传输服务并且支持HTTP、Protobuf、二进制、文本、WebSocket 等一系列常见协议,也支持自定义协议。
netty的模型是基于reactor多线程模型,其中mainReactor用于接收客户端请求并转发给subReactor。SubReactor负责通道的读写请求,非 IO 请求(具体逻辑处理)的任务则会直接写入队列,等待 worker threads 进行处理。
netty如何实现IM
netty支持websocket通讯协议,那么我们就可以用它来实现websoket,实现后端服务主动向前端推送消息的功能。
比如AB用户分别注册到websoket后台,A用户向B用户发送消息,后端接收到A用户消息后判断消息接收者是B用户,然后后端逻辑直接调用B用户websoket连接进行推送即可。
如何实现IM集群
由于websocket 长连接是注册到后端服务本地缓存的,而且这个信道回话是不能被其他中间件缓存的,你们我们就只能在缓存目标用户的服务上拿到会话进行推送消息。
之前博文中也讲到过可以使用消息广播的形式找到目标回话服务,比如Redis的发布订阅、其他Mq等等。当A用户注册到C后端服务上,B服务注册到D后端服务上,这个时候如果A向B发送消息,则需要在C后端服务上增肌广播逻辑,让其他服务感知并监听到消息体,其他服务D收到消息后会验证是否这个用户会话缓存在本地,如果存在则向B前端对送消息,不再则不予处理。这样,就完成了IM集群回话的整个链路流程。
实战演示
本次实战我们简单使用netty实现IM集群通讯即可,如果需要用于生成环境需要增加一些防卫式程序设计,比如Redis发布监听冗余检验后错误处理等等。
基础配置
maven依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.12.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<java.version>8</java.version>
<spring-cloud.version>Hoxton.SR12</spring-cloud.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<!--netty-->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.6.Final</version>
</dependency>
<!-- 整合thymeleaf前端页面 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
application配置文件
server:
port: 9999
spring:
profiles:
active: dev
mvc:
pathmatch:
# Springfox使用的路径匹配是基于AntPathMatcher的,而Spring Boot 2.6.X使用的是PathPatternMatcher
matching-strategy: ant_path_matcher
redis:
host: 127.0.0.1
port: 6379
thymeleaf:
mode: HTML
encoding: UTF-8
content-type: text/html
cache: false
prefix: classpath:/templates/
websocket演示html
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>websocket通讯</title>
</head>
<script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.js"></script>
<script>
var socket;
function openSocket() {
if (typeof (WebSocket) == "undefined") {
console.log("您的浏览器不支持WebSocket");
} else {
console.log("您的浏览器支持WebSocket");
//实现化WebSocket对象,指定要连接的服务器地址与端口 建立连接
//等同于socket = new WebSocket("ws://localhost:8888/xxxx/im/25");
//var socketUrl="${request.contextPath}/im/"+$("#userId").val();
//var socketUrl = "ws://192.168.112.10:7777/ws/" + $("#userId").val();
//var socketUrl = "wss://192.168.112.10/ws/"+ $("#userId").val();
//var socketUrl = "wss://192.168.112.10:8899"
var socketUrl = "ws://127.0.0.1:88?userId="+$("#userId").val();
socketUrl = socketUrl.replace("https", "ws").replace("http", "ws");
console.log(socketUrl);
if (socket != null) {
socket.close();
socket = null;
}
socket = new WebSocket(socketUrl);
//打开事件
socket.onopen = function () {
console.log("websocket已打开");
//socket.send("这是来自客户端的消息" + location.href + new Date());
};
//获得消息事件
socket.onmessage = function (msg) {
console.log("接收消息为:"+msg.data);
};
//关闭事件
socket.onclose = function () {
console.log("websocket已关闭");
};
//发生了错误事件
socket.onerror = function () {
console.log("websocket发生了错误");
}
}
}
function sendMessage() {
if (typeof (WebSocket) == "undefined") {
console.log("您的浏览器不支持WebSocket");
} else {
console.log("您的浏览器支持WebSocket");
console.log('发送消息为:{"fromUserId":"' + $("#userId").val() + '","toUserId":"' + $("#toUserId").val() + '","contentText":"' + $("#contentText").val() + '"}');
socket.send('{"fromUserId":"' + $("#userId").val() + '","toUserId":"' + $("#toUserId").val() + '","contentText":"' + $("#contentText").val() + '"}');
}
}
</script>
<body>
<p>【userId】:<div><input id="userId" name="userId" type="text" value="10"></div>
<p>【toUserId】:<div><input id="toUserId" name="toUserId" type="text" value="20"></div>
<p>【toUserId】:<div><input id="contentText" name="contentText" type="text" value="hello websocket"></div>
<p>【操作】:<div><button onclick="openSocket()">开启socket</button></div>
<p>【操作】:<div><button onclick="sendMessage()">发送消息</button></div>
</body>
</html>
netty搭建IM集群
创建消息体
Message.java
import lombok.Data;
/**
* Message
* @author senfel
* @version 1.0
* @date 2024/5/17 14:39
*/
@Data
public class Message {
/**
* 消息编码
*/
private String code;
/**
* 来自(保证唯一)
*/
private String fromUserId;
/**
* 去自(保证唯一)
*/
private String toUserId;
/**
* 内容
*/
private String contentText;
}
创建netty websocket连接池
NettyWebSocketPool.java
import io.netty.channel.Channel;
import java.util.concurrent.ConcurrentHashMap;
/**
* NettyWebSocketPool
* @author senfel
* @version 1.0
* @date 2024/5/23 10:19
*/
public class NettyWebSocketPool {
/**
* 通道连接池
*/
public static final ConcurrentHashMap<String, Channel> CHANNELS = new ConcurrentHashMap<>();
}
创建websocket处理器
WebsocketServerHandler.java
import com.example.ccedemo.im.CommonConstants;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
/**
* WebsocketServerHandler
* @author senfel
* @version 1.0
* @date 2024/5/22 10:57
*/
@Slf4j
public class WebsocketServerHandler extends SimpleChannelInboundHandler<Object> {
private final static ThreadLocal<String> USER_LIST = new ThreadLocal<>();
private WebSocketServerHandshaker handshaker;
private StringRedisTemplate stringRedisTemplate;
public WebsocketServerHandler() {
}
public WebsocketServerHandler(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
InetSocketAddress reAddr = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = reAddr.getAddress().getHostAddress();
String clientPort = String.valueOf(reAddr.getPort());
log.debug("有新的客户端接入:{}:{}", clientIp, clientPort);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest) {
handleHttpRequest(ctx, (FullHttpRequest) msg);
} else if (msg instanceof WebSocketFrame) {
handleWebSocketFrame(ctx, (WebSocketFrame) msg);
}
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame msg) {
if (msg instanceof CloseWebSocketFrame) {
disconnectCurrentUser();
handshaker.close(ctx.channel(), (CloseWebSocketFrame) msg.retain());
return;
}
if (msg instanceof PingWebSocketFrame) {
log.info("websocket ping message");
ctx.channel().write(new PingWebSocketFrame(msg.content().retain()));
} else if (msg instanceof TextWebSocketFrame) {
// websocket消息解压成字符串让下一个handler处理
String text = ((TextWebSocketFrame) msg).text();
log.info("请求数据|{}", text);
// 如果不调用这个方法后面的handler就获取不到数据
ctx.fireChannelRead(text);
} else {
log.error("不支持的消息格式");
throw new UnsupportedOperationException("不支持的消息格式");
}
}
private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
if (!req.decoderResult().isSuccess()
|| (!"websocket".equalsIgnoreCase(req.headers().get(HttpHeaderNames.UPGRADE)))) {
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
return;
}
WebSocketServerHandshakerFactory wsShakerFactory = new WebSocketServerHandshakerFactory(
"ws://" + req.headers().get(HttpHeaderNames.HOST), null, false);
handshaker = wsShakerFactory.newHandshaker(req);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
String uri = req.uri();
Map<String, String> paramMap = null;
//如果url包含参数,需要处理
if (uri.contains(CommonConstants.QUESTION)) {
paramMap = getUrlParams(uri);
String newUri = uri.substring(0, uri.indexOf(CommonConstants.QUESTION));
req.setUri(newUri);
}
//缓存当前连接
assert paramMap != null;
String channelId = "userId:"+paramMap.get("userId");
log.info("缓存用户通道信息:{}",ctx.channel().localAddress());
log.info("缓存用户通道信息:{}",ctx.channel().remoteAddress());
NettyWebSocketPool.CHANNELS.put(channelId, ctx.channel());
USER_LIST.set(channelId);
//写入在线用户
stringRedisTemplate.opsForValue().set(CommonConstants.WEBSOCKET_CHANNEL_ID_PREFIX+channelId, channelId);
//建立websocket连接握手
handshaker.handshake(ctx.channel(), req);
}
}
private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest msg, DefaultFullHttpResponse response) {
if (response.status().code() != HttpResponseStatus.OK.code()) {
ByteBuf buf = Unpooled.copiedBuffer(response.status().toString(), CharsetUtil.UTF_8);
response.content().writeBytes(buf);
buf.release();
}
ChannelFuture cf = ctx.channel().writeAndFlush(response);
if (!HttpUtil.isKeepAlive(msg) || response.status().code() != HttpResponseStatus.OK.code()) {
cf.addListener(ChannelFutureListener.CLOSE);
}
}
/**
* url 参数切割
* @param url
* @return
*/
private Map<String, String> getUrlParams(String url) {
Map<String, String> map = new HashMap<>(4);
url = url.replace("?", ";");
if (!url.contains(";")) {
return map;
}
if (url.split(";").length > 0) {
String[] arr = url.split(";")[1].split("&");
for (String s : arr) {
String[] data = s.split("=");
if (data.length > 1) {
map.put(data[0], data[1]);
}
}
return map;
} else {
return map;
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("消息处理异常:{}", USER_LIST.get(), cause);
disconnectCurrentUser();
ctx.close();
}
/**
* 状态触发 检测是否处于空闲状态 间隔时间 60s
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {
log.info("握手完成,连接地址为:{}", ctx.channel().remoteAddress());
} else if (evt instanceof IdleStateEvent) {
if (!StringUtils.isEmpty(USER_LIST.get())) {
//断开连接
disconnectCurrentUser();
ctx.disconnect();
}
} else {
super.userEventTriggered(ctx, evt);
}
}
/**
* disconnectCurrentUser
* @author senfel
* @date 2024/5/23 15:13
* @return void
*/
private void disconnectCurrentUser() {
log.info("谁断开了连接:{}",USER_LIST.get());
log.info("userEventTriggered 触发,断开连接");
NettyWebSocketPool.CHANNELS.remove(USER_LIST.get());
stringRedisTemplate.delete(CommonConstants.WEBSOCKET_CHANNEL_ID_PREFIX+USER_LIST.get());
USER_LIST.remove();
}
}
创建websocket输入输出处理器
UserWebsocketInHandler.java
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* UserWebsocketInHandler
* 入站处理器:获取请求数据,完成业务处理,推送消息给浏览器
* @author senfel
* @version 1.0
* @date 2024/5/22 11:10
*/
@Slf4j
public class UserWebsocketInHandler extends SimpleChannelInboundHandler<String> {
private StringRedisTemplate stringRedisTemplate;
public UserWebsocketInHandler(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
log.error(Thread.currentThread().getName() + "|" + msg);
String pattern ="\\{.*\\}|\\[.*\\]";
Pattern r= Pattern.compile(pattern);
Matcher m =r.matcher(msg);
if(m.matches()){
stringRedisTemplate.convertAndSend("nettyWebsocketMsgPush",msg);
}else {
ctx.writeAndFlush(new TextWebSocketFrame(msg));
}
}
}
import io.netty.channel.*;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
/**
* UserWebsocketOutHandler
* 出站处理器:判断数据是否需要进行封装
* @author senfel
* @version 1.0
* @date 2024/5/22 11:10
*/
public class UserWebsocketOutHandler extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if(msg instanceof String) {
ctx.write(new TextWebSocketFrame((String) msg), promise);
} else {
super.write(ctx, msg, promise);
}
}
}
创建netty服务端
NettyWebsocketServer.java
import com.example.ccedemo.im.NettyServer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* NettyWebsocketServer
* @author senfel
* @version 1.0
* @date 2024/5/22 11:03
*/
@Slf4j
public class NettyWebsocketServer implements Runnable {
private StringRedisTemplate stringRedisTemplate;
/**
* 服务端IP地址
*/
private String ip;
/**
* 服务端端口号
*/
private int port;
public NettyWebsocketServer(String ip, int port, StringRedisTemplate stringRedisTemplate) {
this.ip = ip;
this.port = port;
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public void run() {
// 指定boss线程数:主要负责接收连接请求,一般设置为1就可以
final EventLoopGroup boss = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger index = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NioBoss_%d", this.index.incrementAndGet()));
}
});
// 指定worker线程数:主要负责处理连接就绪的连接,一般设置为CPU的核心数
final int totalThread = Runtime.getRuntime().availableProcessors();
final EventLoopGroup worker = new NioEventLoopGroup(totalThread, new ThreadFactory() {
private AtomicInteger index = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NioSelector_%d_%d", totalThread, this.index.incrementAndGet()));
}
});
// 指定任务处理线程数:主要负责读取数据和处理响应,一般该值设置的比较大,与业务相对应
final int jobThreads = 1024;
final EventLoopGroup job = new DefaultEventLoopGroup(jobThreads, new ThreadFactory() {
private AtomicInteger index = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NioJob_%d_%d", jobThreads, this.index.incrementAndGet()));
}
});
// 日志处理handler:类定义上面有Sharable表示线程安全,可以将对象定义在外面使用
final LoggingHandler LOGGING_HANDLER = new LoggingHandler();
// 指定服务端bootstrap
ServerBootstrap server = new ServerBootstrap();
server.group(boss, worker)
// 指定通道类型
.channel(NioServerSocketChannel.class)
// 指定全连接队列大小:windows下默认是200,linux/mac下默认是128
.option(ChannelOption.SO_BACKLOG, 2048)
// 维持链接的活跃,清除死链接
.childOption(ChannelOption.SO_KEEPALIVE, true)
// 关闭延迟发送
.childOption(ChannelOption.TCP_NODELAY, true)
// 添加handler处理链
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
// 日志处理
pipeline.addLast(LOGGING_HANDLER);
// 心跳检测:读超时时间、写超时时间、全部超时时间(单位是秒,0表示不处理)
pipeline.addLast(new IdleStateHandler(60, 60, 60, TimeUnit.SECONDS));
// 处理http请求的编解码器
pipeline.addLast(job, "httpServerCodec", new HttpServerCodec());
pipeline.addLast(job, "chunkedWriteHandler", new ChunkedWriteHandler());
pipeline.addLast(job, "httpObjectAggregator", new HttpObjectAggregator(65536));
// 处理websocket的编解码器
pipeline.addLast(job, "websocketHandler", new WebsocketServerHandler(stringRedisTemplate));
// 自定义处理器
pipeline.addLast(job, "userOutHandler", new UserWebsocketOutHandler());
pipeline.addLast(job, "userInHandler", new UserWebsocketInHandler(stringRedisTemplate));
}
});
try {
// 服务端绑定对外服务地址
ChannelFuture future = server.bind(ip, port).sync();
log.info(NettyServer.class + " 启动正在监听: " + future.channel().localAddress());
// 等待服务关闭,关闭后释放相关资源
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
job.shutdownGracefully();
}
}
}
redis发布订阅
redis配置消息监听
RedisListenerConfig.java
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
/**
* RedisListenerConfig
* @author senfel
* @version 1.0
* @date 2024/5/24 16:26
*/
@Configuration
public class RedisListenerConfig {
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(new LiveRedisKeysExpireListener(), new PatternTopic("nettyWebsocketMsgPush"));
return container;
}
}
redis监听消息处理
LiveRedisKeysExpireListener.java
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import java.util.Objects;
/**
* LiveRedisKeysExpireListener
* @author senfel
* @version 1.0
* @date 2024/5/24 16:25
*/
public class LiveRedisKeysExpireListener implements MessageListener {
@Override
public void onMessage(Message msg, byte[] bytes) {
System.out.println("监听到需要进行负载转发的消息:" + msg.toString());
com.example.ccedemo.nettydemo.Message message = JSONObject.parseObject(msg.toString(), com.example.ccedemo.nettydemo.Message.class);
Channel channel = NettyWebSocketPool.CHANNELS.get("userId:" + message.getToUserId());
if(Objects.nonNull(channel)){
channel.writeAndFlush(new TextWebSocketFrame(msg.toString()));
}
}
}
实战测试
浏览器分别打开两个无痕页面
http://127.0.0.1:9999/websocket/page
模拟对话两个页面的用户互斥
分别发送消息实现对聊