spring boot 实现直播聊天室(二)
技术方案:
- spring boot
- netty
- rabbitmq
目录结构
引入依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.96.Final</version>
</dependency>
SimpleNettyWebsocketServer
netty server 启动类
@Slf4j
public class SimpleNettyWebsocketServer {
private SimpleWsHandler simpleWsHandler;
public SimpleNettyWebsocketServer(SimpleWsHandler simpleWsHandler) {
this.simpleWsHandler = simpleWsHandler;
}
public void start(int port) throws InterruptedException {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup work = new NioEventLoopGroup(2);
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, work).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//HTTP协议编解码器,用于处理HTTP请求和响应的编码和解码。其主要作用是将HTTP请求和响应消息转换为Netty的ByteBuf对象,并将其传递到下一个处理器进行处理。
pipeline.addLast(new HttpServerCodec());
//用于HTTP服务端,将来自客户端的HTTP请求和响应消息聚合成一个完整的消息,以便后续的处理。
pipeline.addLast(new HttpObjectAggregator(65535));
pipeline.addLast(new IdleStateHandler(30,0,0));
//处理请求参数
pipeline.addLast(new SimpleWsHttpHandler());
pipeline.addLast(new WebSocketServerProtocolHandler("/n/ws"));
pipeline.addLast(simpleWsHandler);
}
});
Channel channel = bootstrap.bind(port).sync().channel();
log.info("server start at port: {}", port);
channel.closeFuture().sync();
} finally {
boss.shutdownGracefully();
work.shutdownGracefully();
}
}
}
NettyUtil: 工具类
public class NettyUtil {
public static AttributeKey<String> G_U = AttributeKey.valueOf("GU");
/**
* 设置上下文参数
* @param channel
* @param attributeKey
* @param data
* @param <T>
*/
public static <T> void setAttr(Channel channel, AttributeKey<T> attributeKey, T data) {
Attribute<T> attr = channel.attr(attributeKey);
if (attr != null) {
attr.set(data);
}
}
/**
* 获取上下文参数
* @param channel
* @param attributeKey
* @return
* @param <T>
*/
public static <T> T getAttr(Channel channel, AttributeKey<T> attributeKey) {
return channel.attr(attributeKey).get();
}
/**
* 根据 渠道获取 session
* @param channel
* @return
*/
public static NettySimpleSession getSession(Channel channel) {
String attr = channel.attr(G_U).get();
if (StrUtil.isNotBlank(attr)){
String[] split = attr.split(",");
String groupId = split[0];
String username = split[1];
return new NettySimpleSession(channel.id().toString(),groupId,username,channel);
}
return null;
}
}
处理handler
SimpleWsHttpHandler
处理 websocket 协议升级时地址请求参数 ws://127.0.0.1:8881/n/ws?groupId=1&username=tom
, 解析groupId 和 username ,并设置这个属性到上下文
/**
* @Date: 2023/12/13 9:53
* 提取参数
*/
@Slf4j
public class SimpleWsHttpHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest request){
//ws://localhost:8080/n/ws?groupId=xx&username=tom
String decode = URLDecoder.decode(request.uri(), StandardCharsets.UTF_8);
log.info("raw request url: {}", decode);
Map<String, String> queryMap = getParams(decode);
String groupId = MapUtil.getStr(queryMap, "groupId", null);
String username = MapUtil.getStr(queryMap, "username", null);
if (StrUtil.isNotBlank(groupId) && StrUtil.isNotBlank(username)) {
NettyUtil.setAttr(ctx.channel(), NettyUtil.G_U, groupId.concat(",").concat(username));
}
//去掉参数 ===> ws://localhost:8080/n/ws
request.setUri(request.uri().substring(0,request.uri().indexOf("?")));
ctx.pipeline().remove(this);
ctx.fireChannelRead(request);
}else{
ctx.fireChannelRead(msg);
}
}
/**
* 解析 queryString
* @param uri
* @return
*/
public static Map<String, String> getParams(String uri) {
Map<String, String> params = new HashMap<>(10);
int idx = uri.indexOf("?");
if (idx != -1) {
String[] paramsArr = uri.substring(idx + 1).split("&");
for (String param : paramsArr) {
idx = param.indexOf("=");
params.put(param.substring(0, idx), param.substring(idx + 1));
}
}
return params;
}
}
SimpleWsHandler
处理消息
@Slf4j
@ChannelHandler.Sharable
public class SimpleWsHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Autowired
private PushService pushService;
/**
* 在新的 Channel 被添加到 ChannelPipeline 中时被调用。这通常发生在连接建立时,即 Channel 已经被成功绑定并注册到 EventLoop 中。
* 在连接建立时被调用一次
*
* @param ctx
* @throws Exception
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
NettySimpleSession session = NettyUtil.getSession(ctx.channel());
if (session == null) {
log.info("handlerAdded channel id: {}", ctx.channel().id());
} else {
log.info("handlerAdded channel group-username: {}-{}", session.group(), session.identity());
}
}
/**
* 连接断开时,Netty 会自动触发 channelInactive 事件,并将该事件交给事件处理器进行处理
* 在 channelInactive 事件的处理过程中,会调用 handlerRemoved 方法
*
* @param ctx
* @throws Exception
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
NettySimpleSession session = NettyUtil.getSession(ctx.channel());
if (session!=null){
log.info("handlerRemoved channel group-username: {}-{}", session.group(), session.identity());
}
offline(ctx.channel());
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
//todo msg 可以是json字符串,这里仅仅只是纯文本
NettySimpleSession session = NettyUtil.getSession(ctx.channel());
if (session!=null){
MessageDto messageDto = new MessageDto();
messageDto.setSessionId(session.getId());
messageDto.setGroup(session.group());
messageDto.setFromUser(session.identity());
messageDto.setContent(msg.text());
pushService.pushGroupMessage(messageDto);
}else {
log.info("channelRead0 session is null channel id: {}-{}", ctx.channel().id(),msg.text());
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.info("SimpleWsHandler 客户端异常断开 {}", cause.getMessage());
//todo offline
offline(ctx.channel());
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent idleStateEvent) {
if (idleStateEvent.state().equals(IdleStateEvent.READER_IDLE_STATE_EVENT)) {
log.info("SimpleWsIdleHandler channelIdle 5 秒未收到客户端消息,强制关闭: {}", ctx.channel().id());
//todo offline
offline(ctx.channel());
}
} else if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
String attr = NettyUtil.getAttr(ctx.channel(), NettyUtil.G_U);
if (StrUtil.isBlank(attr)) {
ctx.writeAndFlush("参数异常");
offline(ctx.channel());
} else {
//todo 可以做用户认证等等
//记录用户登陆session
NettySimpleSession session = NettyUtil.getSession(ctx.channel());
Assert.notNull(session, "session 不能为空");
SessionRegistry.getInstance().addSession(session);
}
}
super.userEventTriggered(ctx,evt);
}
/**
* 用户下线,处理失效 session
* @param channel
*/
public void offline(Channel channel){
NettySimpleSession session = NettyUtil.getSession(channel);
if (session!=null){
SessionRegistry.getInstance().removeSession(session);
}
channel.close();
}
}
PushService
推送服务抽取
public interface PushService {
/**
* 组推送
* @param messageDto
*/
void pushGroupMessage(MessageDto messageDto);
}
@Service
public class PushServiceImpl implements PushService {
@Autowired
private MessageClient messagingClient;
@Override
public void pushGroupMessage(MessageDto messageDto) {
messagingClient.sendMessage(messageDto);
}
}
NettySimpleSession
netty session 封装
public class NettySimpleSession extends AbstractWsSession {
private Channel channel;
public NettySimpleSession(String id, String group, String identity, Channel channel) {
super(id, group, identity);
this.channel = channel;
}
@Override
public void sendTextMessage(MessageDto messageDto) {
String content = messageDto.getFromUser() + " say: " + messageDto.getContent();
// 不能直接 write content, channel.writeAndFlush(content);
// 要封装成 websocketFrame,不然不能编解码!!!
channel.writeAndFlush(new TextWebSocketFrame(content));
}
}
启动类
@Slf4j
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean
public SimpleWsHandler simpleWsHandler(){
return new SimpleWsHandler();
}
@PostConstruct
public void init() {
new Thread(() -> {
log.info(">>>>>>>> start netty ws server....");
try {
new SimpleNettyWebsocketServer(simpleWsHandler()).start(8881);
} catch (InterruptedException e) {
log.info(">>>>>>>> SimpleNettyWebsocketServer start error", e);
}
}).start();
}
}
其他代码参考 spring boot 实现直播聊天室
测试
websocket 地址 ws://127.0.0.1:8881/n/ws?groupId=1&username=tom
good luck!