目录
- 基于Netty实现的简单聊天服务组件
- 效果展示
- 技术选型:
- 功能分析
- 聊天服务基础设施配置(基于Netty)
- 定义组件基础的配置(`ChatProperties`)
- 定义聊天服务类(`ChatServer`)
- 定义聊天服务配置初始化类(`ChatServerInitializer`)
- 用户上线、下线处理
- 客户端绑定服务处理类(`ClientInboundHandler`)
- 用户消息发送、接收处理
- 定义一个文本消息处理器(`TextWebSocketFrameHandler`)
- 用户登录凭证校验
- 定义一个凭证处理器接口(`AuthorizationProcessor`)
- 定义 `ChatAutoConfiguration` 自动化配置类
- 定义 `ChatServerApplication` 服务启动类
- 参考资料
基于Netty实现的简单聊天服务组件
本文摘自Quan后台管理服务框架中的
quan-chat
工具,该工具仅实现了非常简单服务模型。后期本人会视情况扩展更多复杂的业务场景。
如果本文对您解决问题有帮助,欢迎到Gitee或Github点个star 🤝
quan-chat 是一个基于 Netty 实现的服务端即时消息通讯组件,组件本身不具备业务处理能力,主要的作用是提供服务端消息中转; 通过实现组件中的接口可以完成与项目相关的业务功能, 例如:点对点消息收发、权限校验、聊天记录保存等。
web展示层ui基于layim。layim展示的功能较为丰富。为演示服务组件,仅实现点对点聊天功能。其它功能视情况扩展。
本组件仅用于学习交流使用,本文应用到的 layim 来自互联网,如果您想将 layim 框架用于其它用途,必须取得原作者授权: layui ,否则产生的一切法律责任与本作者无关。
效果展示
技术选型:
spring-boot-2.7.16
netty-4.1.97
layim-3.9.8
功能分析
- 聊天服务基础设施配置(基于Netty)
- 用户上线、下线处理
- 用户消息发送、接收处理
- 用户登录凭证校验
完整的组件代码开源地址:https://gitee.com/quan100/quan/tree/main/quan-tools/quan-chat
下面仅展示部分代码
聊天服务基础设施配置(基于Netty)
Netty 是一个异步事件驱动的网络应用框架,用于快速开发可维护的高性能服务器和客户端。
定义组件基础的配置(ChatProperties
)
ChatProperties 主要用于定义组件内部使用到的配置参数。
package cn.javaquan.tools.chat.autoconfigure;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.util.Assert;
/**
* Configuration properties for im support.
*
* @author javaquan
* @since 1.0.0
*/
@ConfigurationProperties(prefix = "quan.im")
public class ChatProperties {
/**
* 默认数据包最大长度
* 64kb
*/
private final static int MAX_FRAME_SIZE = 65536;
/**
* 默认的消息体最大长度
* 64kb
*/
private final static int MAX_CONTENT_LENGTH = 65536;
/**
* 空闲检查时间,单位:秒
*/
private final static long READER_IDLE_TIME = 600L;
/**
* 开启IM服务的端口
*/
private Integer port;
/**
* SSL配置
*/
private Ssl ssl;
/**
* websocket 路径
*/
private String websocketPath;
/**
* 数据包最大长度
* 单位:字节
*/
private Integer maxFrameSize;
/**
* 消息体最大长度
* 单位:字节
*/
private Integer maxContentLength;
/**
* 允许连接空闲的最大时间
* <p>
* 当空闲超过最大时间后,强制下线
*/
private Long readerIdleTime;
public Integer getPort() {
return port;
}
public void setPort(Integer port) {
this.port = port;
}
public int determineDefaultPort() {
Assert.notNull(this.port, "[Assertion failed chat server port] - this numeric argument must have value; it must not be null");
return this.port;
}
public Ssl getSsl() {
return ssl;
}
public void setSsl(Ssl ssl) {
this.ssl = ssl;
}
public String getWebsocketPath() {
return websocketPath;
}
public void setWebsocketPath(String websocketPath) {
this.websocketPath = websocketPath;
}
public String determineDefaultWebsocketPath() {
Assert.hasText(this.websocketPath, "[Assertion failed chat server websocketPath] - it must not be null or empty");
return this.websocketPath;
}
public Integer getMaxFrameSize() {
return maxFrameSize;
}
public void setMaxFrameSize(Integer maxFrameSize) {
this.maxFrameSize = maxFrameSize;
}
public Integer determineDefaultMaxFrameSize() {
if (null == maxFrameSize) {
this.setMaxFrameSize(MAX_FRAME_SIZE);
}
return this.maxFrameSize;
}
public Integer getMaxContentLength() {
return maxContentLength;
}
public void setMaxContentLength(Integer maxContentLength) {
this.maxContentLength = maxContentLength;
}
public Integer determineDefaultMaxContentLength() {
if (null == maxContentLength) {
this.setMaxContentLength(MAX_CONTENT_LENGTH);
}
return this.maxContentLength;
}
public Long getReaderIdleTime() {
return readerIdleTime;
}
public void setReaderIdleTime(Long readerIdleTime) {
this.readerIdleTime = readerIdleTime;
}
public Long determineDefaultReaderIdleTime() {
if (null == readerIdleTime) {
this.setReaderIdleTime(READER_IDLE_TIME);
}
return this.readerIdleTime;
}
/**
* ssl properties.
*/
public static class Ssl {
private boolean enabled = false;
private String protocol = "TLS";
/**
* an X.509 certificate chain file in PEM format
*/
private String keyCertChainFilePath;
/**
* a PKCS#8 private key file in PEM format
*/
private String keyFilePath;
public boolean isEnabled() {
return enabled;
}
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
public String getProtocol() {
return protocol;
}
public void setProtocol(String protocol) {
this.protocol = protocol;
}
public String getKeyCertChainFilePath() {
return keyCertChainFilePath;
}
public void setKeyCertChainFilePath(String keyCertChainFilePath) {
this.keyCertChainFilePath = keyCertChainFilePath;
}
public String determineDefaultKeyCertChainFilePath() {
Assert.hasText(this.keyCertChainFilePath, "[Assertion failed chat server keyCertChainFilePath] - it must not be null or empty");
return this.keyCertChainFilePath;
}
public String getKeyFilePath() {
return keyFilePath;
}
public void setKeyFilePath(String keyFilePath) {
this.keyFilePath = keyFilePath;
}
public String determineDefaultKeyFilePath() {
Assert.hasText(this.keyFilePath, "[Assertion failed chat server keyFilePath] - it must not be null or empty");
return this.keyFilePath;
}
}
public void afterPropertiesSet() {
determineDefaultPort();
determineDefaultWebsocketPath();
determineDefaultMaxFrameSize();
determineDefaultMaxContentLength();
determineDefaultReaderIdleTime();
}
}
yml 配置示例:
quan:
im:
port: 10000 # 配置chat服务端口
websocket-path: /chat # 配置chat服务websocket访问的uri
reader-idle-time: 1800 #允许连接空闲的时间,单位:秒。超时后强制下线
定义聊天服务类(ChatServer
)
用于实现客户端与服务器建立连接,状态维护
package cn.javaquan.tools.chat.server;
import cn.javaquan.tools.chat.autoconfigure.ChatProperties;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.ImmediateEventExecutor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.util.Assert;
import java.net.InetSocketAddress;
/**
* 默认的聊天服务
*
* @author javaquan
* @since 1.0.0
*/
public class ChatServer {
private static final Log logger = LogFactory.getLog(ChatServer.class);
private final ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
private final EventLoopGroup group = new NioEventLoopGroup();
private Channel channel;
public ChannelFuture start(InetSocketAddress address, ChatProperties properties) {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(group)
.channel(NioServerSocketChannel.class)
.childHandler(createInitializer(channelGroup, properties));
ChannelFuture future = bootstrap.bind(address);
future.syncUninterruptibly();
channel = future.channel();
return future;
}
protected ChannelInitializer<Channel> createInitializer(ChannelGroup group, ChatProperties properties) {
return new ChatServerInitializer(group, properties);
}
public void destroy() {
if (channel != null) {
channel.close();
}
channelGroup.close();
group.shutdownGracefully();
}
public void start(ChatProperties properties) {
ChannelFuture future = this.start(new InetSocketAddress(properties.getPort()), properties);
addShutdownHook(this);
future.addListener((listener) -> {
Assert.isTrue(listener.isSuccess(), logMessageFormat(properties.getPort(), "error"));
logger.info(logMessageFormat(properties.getPort(), "success"));
});
}
/**
* Registers a new virtual-machine shutdown hook.
*
* @param chatServer
*/
private void addShutdownHook(ChatServer chatServer) {
Runtime.getRuntime().addShutdownHook(new Thread(chatServer::destroy));
}
private String logMessageFormat(Integer port, String state) {
return String.format("%s started %s on port(s): %s", this.getClass().getSimpleName(), state, port);
}
}
定义聊天服务配置初始化类(ChatServerInitializer
)
主要用于初始化聊天服务应用到的处理器。
package cn.javaquan.tools.chat.server;
import cn.javaquan.tools.chat.autoconfigure.ChatProperties;
import cn.javaquan.tools.chat.context.ClientInboundHandler;
import cn.javaquan.tools.chat.context.TextWebSocketFrameHandler;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
/**
* 初始化服务配置
*
* @author javaquan
*/
public class ChatServerInitializer extends ChannelInitializer<Channel> {
private final ChannelGroup group;
private final ChatProperties properties;
public ChatServerInitializer(ChannelGroup group, ChatProperties properties) {
this.group = group;
this.properties = properties;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(properties.getMaxContentLength()));
pipeline.addLast(new IdleStateHandler(properties.getReaderIdleTime(), 0, 0, TimeUnit.SECONDS));
pipeline.addLast(new ClientInboundHandler(group, properties.getWebsocketPath()));
pipeline.addLast(new TextWebSocketFrameHandler());
pipeline.addLast(new WebSocketServerProtocolHandler(properties.getWebsocketPath(), null, true, properties.getMaxFrameSize()));
}
}
用户上线、下线处理
客户端绑定服务处理类(ClientInboundHandler
)
主要用于处理用户上线、下线状态处理。
package cn.javaquan.tools.chat.context;
import cn.javaquan.tools.chat.core.ChannelPool;
import cn.javaquan.tools.chat.core.support.AuthorizationProcessor;
import cn.javaquan.tools.chat.util.SpringUtils;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
/**
* 客户端用户状态处理
*
* @author javaquan
*/
@Sharable
public class ClientInboundHandler extends ChannelInboundHandlerAdapter {
private static final Log logger = LogFactory.getLog(ClientInboundHandler.class);
private final ChannelGroup group;
private final String websocketPath;
public ClientInboundHandler(ChannelGroup group, String websocketPath) {
this.group = group;
this.websocketPath = websocketPath;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest) {
FullHttpRequest request = (FullHttpRequest) msg;
String uri = request.uri();
Map<String, String> queryParams = paramsParser(uri);
online(ctx.channel(), queryParams);
request.setUri(websocketPath);
}
super.channelRead(ctx, msg);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
logger.info(String.format("用户[%s]闲置时间超过最大值,将关闭连接!", ChannelPool.getSessionState(ctx.channel())));
ctx.channel().close();
}
} else {
super.userEventTriggered(ctx, evt);
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
group.add(ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
group.remove(channel);
offline(channel);
}
/**
* 异常时调用
*
* @param ctx
* @param cause
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
logger.error("服务器错误", cause);
offline(ctx.channel());
// 发生异常之后关闭连接(关闭channel)
ctx.channel().close();
}
/**
* url参数解析
*
* @param uriParams
* @return
* @throws URISyntaxException
*/
private Map<String, String> paramsParser(String uriParams) throws URISyntaxException {
URI uri = new URI(uriParams);
Map<String, String> paramsMap = new HashMap<>();
String queryParam = uri.getQuery();
String[] queryParams = queryParam.split("&");
for (String param : queryParams) {
String[] urlParam = param.split("=");
paramsMap.put(urlParam[0], urlParam[1]);
}
return paramsMap;
}
/**
* 用户上线
*
* @param channel
* @param urlParams url参数
*/
private void online(Channel channel, Map<String, String> urlParams) {
String userId = urlParams.get("userId");
String authorization = urlParams.get("authorization");
AuthorizationProcessor authorizationProcessor = SpringUtils.getBean(AuthorizationProcessor.class);
if (!authorizationProcessor.checkAuth(authorization)) {
channel.close();
logger.info(String.format("用户[%s]凭证校验失败,连接被服务器拒绝", userId));
return;
}
logger.info(String.format("用户[%s]上线", userId));
channel.attr(ChannelPool.SESSION_STATE).set(userId);
ChannelPool.addChannel(userId, channel);
/// TODO 若用户上线,则通知好友已上线。kafka发送上线事件
}
/**
* 用户离线
*
* @param channel
*/
private void offline(Channel channel) {
ChannelPool.removeChannel(channel);
logger.info(String.format("用户[%s]下线", ChannelPool.getSessionState(channel)));
/// TODO 若用户下线,则通知好友已下线。kafka发送下线事件
}
}
用户消息发送、接收处理
定义一个文本消息处理器(TextWebSocketFrameHandler
)
用于将用户发送的文本消息转换为服务端使用的模版消息。
通过模版将消息转发给接收者。
package cn.javaquan.tools.chat.context;
import cn.javaquan.tools.chat.core.MessageHandlerFactory;
import cn.javaquan.tools.chat.core.message.MessageTemplate;
import cn.javaquan.tools.chat.util.JsonUtils;
import cn.javaquan.tools.chat.util.SpringUtils;
import cn.javaquan.tools.chat.core.support.IMessageHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
/**
* 消息处理器
*
* @author javaquan
*/
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
public void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
MessageTemplate messageTemplate = messageConvertor(msg);
messageHandler(ctx, messageTemplate);
}
/**
* 消息处理
* <p>
* 根据消息类型处理消息
* <p>
* 需要自定义实现{@link IMessageHandler}接口。
*
* @param ctx
* @param messageTemplate
*/
private void messageHandler(ChannelHandlerContext ctx, MessageTemplate messageTemplate) {
MessageHandlerFactory messageHandlerFactory = SpringUtils.getBean(MessageHandlerFactory.class);
messageHandlerFactory.getService(messageTemplate.getType()).handler(ctx, messageTemplate);
}
/**
* 将字符串信息转换为模版信息格式
*
* @param msg
* @return
*/
private MessageTemplate messageConvertor(TextWebSocketFrame msg) {
return JsonUtils.parseObject(msg.text(), MessageTemplate.class);
}
}
用户登录凭证校验
定义一个凭证处理器接口(AuthorizationProcessor
)
将处理器定义成接口,主要目的是将组件与业务解耦。
因为不同的业务,实现的权限业务都可能不一样。
只需业务端实现该接口,当权限校验不通过时,组件内部就会拒绝客户端连接。
package cn.javaquan.tools.chat.core.support;
/**
* 授权凭证处理器
*
* @author wangquan
*/
public interface AuthorizationProcessor {
/**
* 检查权限
*
* @param authorization 登录凭证
* @return
*/
boolean checkAuth(String authorization);
}
定义 ChatAutoConfiguration
自动化配置类
ChatAutoConfiguration
是quan-chat
组件中最重要的一项配置,通过该配置来定义组件是否生效。
当引入quan-chat
组件时,不需要对组件进行扫描。服务启动时会自动发现该配置。
通过该配置初始化聊天服务所依赖的相关功能。若未按照配置要求配置属性,quan-chat
组件引入将无效。
package cn.javaquan.tools.chat.autoconfigure;
import cn.javaquan.tools.chat.ChatServerApplication;
import cn.javaquan.tools.chat.core.ChannelPool;
import cn.javaquan.tools.chat.core.support.AbstractAuthorizationCheckProcessor;
import cn.javaquan.tools.chat.core.support.AuthorizationProcessor;
import cn.javaquan.tools.chat.server.ChatServer;
import cn.javaquan.tools.chat.server.SecureChatServer;
import io.netty.channel.Channel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.AnyNestedCondition;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import java.io.File;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* im聊天sdk配置
*
* @author javaquan
* @since 1.0.0
*/
@AutoConfiguration
@EnableConfigurationProperties(ChatProperties.class)
public class ChatAutoConfiguration {
@Import(ChatServerApplication.class)
@Configuration(proxyBeanMethods = false)
@Conditional(ChatCondition.class)
protected static class ChatConfiguration {
@ConditionalOnProperty(prefix = "quan.im.ssl", name = "enabled", havingValue = "false", matchIfMissing = true)
@ConditionalOnMissingBean
@Bean
ChatServer chatServer() {
return new ChatServer();
}
@ConditionalOnMissingBean
@Bean
ChannelPool channelPool() {
Map<String, Channel> channelContainer = new ConcurrentHashMap<>();
return new ChannelPool(channelContainer);
}
@ConditionalOnMissingBean
@Bean
AuthorizationProcessor authorizationProcessor() {
return new AbstractAuthorizationCheckProcessor();
}
}
static class ChatCondition extends AnyNestedCondition {
ChatCondition() {
super(ConfigurationPhase.PARSE_CONFIGURATION);
}
@ConditionalOnProperty(prefix = "quan.im", name = "port")
static class PortProperty {
}
@ConditionalOnProperty(prefix = "quan.im.ssl", name = "enabled", havingValue = "true")
@ConditionalOnMissingBean
@Bean
SslContext sslContext(ChatProperties properties) throws Exception {
ChatProperties.Ssl ssl = properties.getSsl();
File keyCertChainFile = new File(ssl.determineDefaultKeyCertChainFilePath());
File keyFile = new File(ssl.determineDefaultKeyFilePath());
return SslContextBuilder.forServer(keyCertChainFile, keyFile).build();
}
@ConditionalOnProperty(prefix = "quan.im.ssl", name = "enabled", havingValue = "true")
@ConditionalOnMissingBean
@Bean
ChatServer secureChatServer(SslContext context) {
return new SecureChatServer(context);
}
}
}
定义 ChatServerApplication
服务启动类
当引入 quan-chat 组件时,并正确配置
ChatProperties
属性,服务启动时则会自动扫描ChatServerApplication
类,用于启动 聊天服务端。
package cn.javaquan.tools.chat;
import cn.javaquan.tools.chat.autoconfigure.ChatProperties;
import cn.javaquan.tools.chat.server.ChatServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
/**
* chat服务启动
*
* @author javaquan
* @since 1.0.0
*/
public class ChatServerApplication implements ApplicationRunner {
@Autowired
private ChatServer chatServer;
@Autowired
private ChatProperties properties;
@Override
public void run(ApplicationArguments args) throws Exception {
properties.afterPropertiesSet();
chatServer.start(properties);
}
}
参考资料
如果本文对您解决问题有帮助,欢迎到Gitee或Github点个star 🤝
quan-chat 工具文档:https://doc.javaquan.cn/pages/tools/chat/
quan-chat 工具开源地址:https://gitee.com/quan100/quan/tree/main/quan-tools/quan-chat