spring boot 实现直播聊天室(二)

news2025/1/12 20:53:23

spring boot 实现直播聊天室(二)

技术方案:

  • spring boot
  • netty
  • rabbitmq

目录结构

在这里插入图片描述

引入依赖

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.96.Final</version>
</dependency>

SimpleNettyWebsocketServer

netty server 启动类

@Slf4j
public class SimpleNettyWebsocketServer {

    private SimpleWsHandler simpleWsHandler;

    public SimpleNettyWebsocketServer(SimpleWsHandler simpleWsHandler) {
        this.simpleWsHandler = simpleWsHandler;
    }

    public void start(int port) throws InterruptedException {
        NioEventLoopGroup boss = new NioEventLoopGroup(1);
        NioEventLoopGroup work = new NioEventLoopGroup(2);
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(boss, work).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    //HTTP协议编解码器,用于处理HTTP请求和响应的编码和解码。其主要作用是将HTTP请求和响应消息转换为Netty的ByteBuf对象,并将其传递到下一个处理器进行处理。
                    pipeline.addLast(new HttpServerCodec());
                    //用于HTTP服务端,将来自客户端的HTTP请求和响应消息聚合成一个完整的消息,以便后续的处理。
                    pipeline.addLast(new HttpObjectAggregator(65535));
                    pipeline.addLast(new IdleStateHandler(30,0,0));
                    //处理请求参数
                    pipeline.addLast(new SimpleWsHttpHandler());
                    pipeline.addLast(new WebSocketServerProtocolHandler("/n/ws"));
                    pipeline.addLast(simpleWsHandler);

                }
            });
            Channel channel = bootstrap.bind(port).sync().channel();
            log.info("server start at port: {}", port);
            channel.closeFuture().sync();
        } finally {
            boss.shutdownGracefully();
            work.shutdownGracefully();
        }
    }
}

NettyUtil: 工具类

public class NettyUtil {

    public static AttributeKey<String> G_U = AttributeKey.valueOf("GU");

    /**
     * 设置上下文参数
     * @param channel
     * @param attributeKey
     * @param data
     * @param <T>
     */
    public static <T> void setAttr(Channel channel, AttributeKey<T> attributeKey, T data) {
        Attribute<T> attr = channel.attr(attributeKey);
        if (attr != null) {
            attr.set(data);
        }
    }


    /**
     * 获取上下文参数 
     * @param channel
     * @param attributeKey
     * @return
     * @param <T>
     */
    public static <T> T getAttr(Channel channel, AttributeKey<T> attributeKey) {
        return channel.attr(attributeKey).get();
    }


    /**
     * 根据 渠道获取 session
     * @param channel
     * @return
     */
    public static NettySimpleSession getSession(Channel channel) {
        String attr = channel.attr(G_U).get();
        if (StrUtil.isNotBlank(attr)){
            String[] split = attr.split(",");
            String groupId = split[0];
            String username = split[1];
            return new NettySimpleSession(channel.id().toString(),groupId,username,channel);
        }
        return null;
    }
}

处理handler

SimpleWsHttpHandler

处理 websocket 协议升级时地址请求参数 ws://127.0.0.1:8881/n/ws?groupId=1&username=tom, 解析groupId 和 username ,并设置这个属性到上下文

/**
 * @Date: 2023/12/13 9:53
 * 提取参数
 */
@Slf4j
public class SimpleWsHttpHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof  FullHttpRequest request){
            //ws://localhost:8080/n/ws?groupId=xx&username=tom
            String decode = URLDecoder.decode(request.uri(), StandardCharsets.UTF_8);
            log.info("raw request url: {}", decode);
            Map<String, String> queryMap = getParams(decode);
            String groupId = MapUtil.getStr(queryMap, "groupId", null);
            String username = MapUtil.getStr(queryMap, "username", null);
            if (StrUtil.isNotBlank(groupId) && StrUtil.isNotBlank(username)) {
                NettyUtil.setAttr(ctx.channel(), NettyUtil.G_U, groupId.concat(",").concat(username));
            }
            //去掉参数 ===>  ws://localhost:8080/n/ws
            request.setUri(request.uri().substring(0,request.uri().indexOf("?")));
            ctx.pipeline().remove(this);
            ctx.fireChannelRead(request);
        }else{
            ctx.fireChannelRead(msg);
        }
    }

    /**
     * 解析 queryString
     * @param uri
     * @return
     */
    public static Map<String, String> getParams(String uri) {
        Map<String, String> params = new HashMap<>(10);
        int idx = uri.indexOf("?");
        if (idx != -1) {
            String[] paramsArr = uri.substring(idx + 1).split("&");
            for (String param : paramsArr) {
                idx = param.indexOf("=");
                params.put(param.substring(0, idx), param.substring(idx + 1));
            }
        }

        return params;
    }
}
SimpleWsHandler

处理消息

@Slf4j
@ChannelHandler.Sharable
public class SimpleWsHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    @Autowired
    private PushService pushService;

    /**
     * 在新的 Channel 被添加到 ChannelPipeline 中时被调用。这通常发生在连接建立时,即 Channel 已经被成功绑定并注册到 EventLoop 中。
     * 在连接建立时被调用一次
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        NettySimpleSession session = NettyUtil.getSession(ctx.channel());
        if (session == null) {
            log.info("handlerAdded channel id: {}", ctx.channel().id());
        } else {
            log.info("handlerAdded channel group-username: {}-{}", session.group(), session.identity());
        }
    }

    /**
     * 连接断开时,Netty 会自动触发 channelInactive 事件,并将该事件交给事件处理器进行处理
     * 在 channelInactive 事件的处理过程中,会调用 handlerRemoved 方法
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        NettySimpleSession session = NettyUtil.getSession(ctx.channel());
        if (session!=null){
            log.info("handlerRemoved channel group-username: {}-{}", session.group(), session.identity());
        }
        offline(ctx.channel());
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        //todo msg 可以是json字符串,这里仅仅只是纯文本
        NettySimpleSession session = NettyUtil.getSession(ctx.channel());
        if (session!=null){
            MessageDto messageDto = new MessageDto();
            messageDto.setSessionId(session.getId());
            messageDto.setGroup(session.group());
            messageDto.setFromUser(session.identity());
            messageDto.setContent(msg.text());
            pushService.pushGroupMessage(messageDto);
        }else {
            log.info("channelRead0 session is null channel id: {}-{}", ctx.channel().id(),msg.text());
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.info("SimpleWsHandler 客户端异常断开 {}", cause.getMessage());
        //todo offline
        offline(ctx.channel());
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent idleStateEvent) {
            if (idleStateEvent.state().equals(IdleStateEvent.READER_IDLE_STATE_EVENT)) {
                log.info("SimpleWsIdleHandler channelIdle 5 秒未收到客户端消息,强制关闭: {}", ctx.channel().id());
                //todo offline
                offline(ctx.channel());
            }
        } else if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
            String attr = NettyUtil.getAttr(ctx.channel(), NettyUtil.G_U);
            if (StrUtil.isBlank(attr)) {
                ctx.writeAndFlush("参数异常");
                offline(ctx.channel());
            } else {
                //todo 可以做用户认证等等
                //记录用户登陆session
                NettySimpleSession session = NettyUtil.getSession(ctx.channel());
                Assert.notNull(session, "session 不能为空");
                SessionRegistry.getInstance().addSession(session);
            }
        }
        super.userEventTriggered(ctx,evt);
    }

    /**
     * 用户下线,处理失效 session
     * @param channel
     */
    public void offline(Channel channel){
        NettySimpleSession session = NettyUtil.getSession(channel);
        if (session!=null){
            SessionRegistry.getInstance().removeSession(session);
        }
        channel.close();
    }

}

PushService

推送服务抽取

public interface PushService {

    /**
     * 组推送
     * @param messageDto
     */
    void pushGroupMessage(MessageDto messageDto);

}

@Service
public class PushServiceImpl implements PushService {

    @Autowired
    private MessageClient messagingClient;

    @Override
    public void pushGroupMessage(MessageDto messageDto) {
        messagingClient.sendMessage(messageDto);
    }
}

NettySimpleSession

netty session 封装

public class NettySimpleSession extends AbstractWsSession {

    private Channel channel;

    public NettySimpleSession(String id, String group, String identity, Channel channel) {
        super(id, group, identity);
        this.channel = channel;
    }

    @Override
    public void sendTextMessage(MessageDto messageDto) {
        String content = messageDto.getFromUser() + " say: " + messageDto.getContent();
        // 不能直接 write content, channel.writeAndFlush(content);
        // 要封装成 websocketFrame,不然不能编解码!!!
        channel.writeAndFlush(new TextWebSocketFrame(content));
    }
}

启动类

@Slf4j
@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }


    @Bean
    public SimpleWsHandler simpleWsHandler(){
        return new SimpleWsHandler();
    }

    @PostConstruct
    public void init() {
        new Thread(() -> {
            log.info(">>>>>>>> start netty ws server....");
            try {
                new SimpleNettyWebsocketServer(simpleWsHandler()).start(8881);
            } catch (InterruptedException e) {
                log.info(">>>>>>>> SimpleNettyWebsocketServer start error", e);
            }
        }).start();
    }

}

其他代码参考 spring boot 实现直播聊天室

测试

websocket 地址 ws://127.0.0.1:8881/n/ws?groupId=1&username=tom

在这里插入图片描述

good luck!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1309577.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

位1的个数

题目链接 位1的个数 题目描述 注意点 输入必须是长度为 32 的 二进制串 解答思路 位运算判断每一位是否为1 代码 public class Solution {// you need to treat n as an unsigned valuepublic int hammingWeight(int n) {int res 0;for (int i 0; i < 32; i) {res …

Mac中nvm切换node版本失败

Mac中使用 nvm 管理 node 版本&#xff0c;在使用指令&#xff1a;nvm use XXX 切换版本之后。 关闭终端&#xff0c;再次打开&#xff0c;输入 node -v 还是得到之前的 node 版本。 原因&#xff1a; 在这里这个 default 中有个 node 的版本号&#xff0c;使用 nvm use 时&a…

【玩转TableAgent数据智能分析】会话式数据分析,所需即所得!

目录 1 TableAgent介绍 2 TableAgent五大优点 3 体验TableAgent 3.1 登录TableAgent平台 3.2 会话式数据分析 4 总结 【优化改善】 【对比TableAgent与文心一言- E言易图】 1 TableAgent介绍 TableAgent是一款数据集成和分析平台&#xff0c;它可以帮助用户从多个数据源中…

【wimdows电脑上管理员账户与管理员身份的区别】

管理员账户 在控制面板的用户账户中&#xff0c;点击更改账户类型&#xff0c;可以看到目前的账户是“管理员账户”还是“标准账户”。 管理员身份 在快捷方式上右击&#xff0c;可以看到&#xff0c;可以选择以管理员身份运行该软件。 如何查看某个应用是否以管理员身份…

大数据云计算之OpenStack

大数据云计算之OpenStack 1.什么是OpenStack&#xff0c;其作用是什么&#xff1f;OpenStack主要的组成模块有哪些&#xff1f;各自的主要作用是什么&#xff1f; OpenStack是一个开源的云计算平台&#xff0c;旨在为企业和服务提供商提供私有云和公有云的建设和管理解决方案…

MySQL第三方备份工具Percona XtraBackup

实验环境&#xff1a; CentOS7.9 准备软件&#xff1a;yum -y install https://repo.percona.com/yum/percona-release-latest.noarch.rpm 一、什么是Percona XtraBackup&#xff1a;Percona XtraBackup&#xff08;简称PXB&#xff09;是 Percona 公司开发的一个用于 MySQL …

Optional.ofNullable的使用

Optional.ofNullable的使用 Optional.ofNullable()方法是Java 8中的一个方法&#xff0c;用于创建一个Optional对象&#xff0c;该对象可能包含一个非空值&#xff0c;也可能为空。如果传递给ofNullable()方法的参数为null&#xff0c;则返回一个空的Optional对象&#xff0c;…

现代雷达车载应用——第2章 汽车雷达系统原理 2.3节 信号模型

经典著作&#xff0c;值得一读&#xff0c;英文原版下载链接【免费】ModernRadarforAutomotiveApplications资源-CSDN文库。 2.3 信号模型 雷达的发射机通常发出精心设计和定义明确的信号。然而&#xff0c;接收到的返回信号是多个分量的叠加&#xff0c;包括目标的反射、杂波…

conda环境报错: Solving environment: failed with initial frozen solve.

出现的情况&#xff1a; 解决方法&#xff1a; 参考了许多博客 建议的方法&#xff1a; 创建一个虚拟环境 conda create -n torch_1.3 python3.6 激活虚拟环境 conda activate torch_1.3 conda安装 conda install pytorch1.5.0 如果报错每个包单独安装就可以了&#x…

issue unit

The Issue Unit issue queue用来hold住&#xff0c;已经dispatched&#xff0c;但是还没有执行的uops&#xff1b; 当一条uop的所有的operands已经ready之后&#xff0c;request请求会被拉起来&#xff1b;然后issue select logic将会从request bit 1的slot中&#xff0c;选择…

中通快递查询,中通快递单号查询,并进行多次揽收分析

批量查询中通快递单号的物流信息&#xff0c;并将其中的多次揽收件分析筛选出来。 所需工具&#xff1a; 一个【快递批量查询高手】软件 中通快递单号若干 操作步骤&#xff1a; 步骤1&#xff1a;运行【快递批量查询高手】软件&#xff0c;第一次使用的伙伴记得先注册&…

LeetCode 309买卖股票的最佳时机含冷冻期 714买卖股票的最佳时机含手续费 | 代码随想录25期训练营day51

动态规划算法9 LeetCode 309 买卖股票的最佳时机含冷冻期 2023.12.14 题目链接代码随想录讲解[链接] int maxProfit(vector<int>& prices) {//1确定dp二维数组//dp[i][0]表示遍历到第i天时持有股票的当前收入;dp[i][1]表示遍历到第i天时未持有股票的当前收入//dp…

项目管理:如何把项目管理落实到执行细节

一切皆项目&#xff0c;我们的人生也是一个大的项目&#xff0c;在这个大的项目中&#xff0c;拥有多个小的项目&#xff0c;多个阶段&#xff0c;各种活动&#xff0c;小的项目构建成不同大的项目。 对于个人这个项目&#xff0c;我们可以一步步去完成&#xff0c;从项目管理…

SQL进阶理论篇(二):数据库的设计范式

文章目录 简介数据库的设计范式有哪些数据库中的几种键从1NF到3NF1NF2NF3NFBCNF&#xff08;巴斯范式&#xff09; 反范式设计反范式的适用场景总结参考文献 简介 本小节主要内容&#xff1a; 数据库的设计范式都有哪些数据库的键都有哪些1NF、2NF和3NF都是指什么&#xff1f…

初识大数据应用,一文掌握大数据知识文集(1)

文章目录 &#x1f3c6;初识大数据应用知识&#x1f50e;一、初识大数据应用知识(1)&#x1f341; 01、请用Java实现非递归二分查询&#xff1f;&#x1f341; 02、是客户端还是Namenode决定输入的分片&#xff1f;&#x1f341; 03、mapred.job.tracker命令的作用&#xff1f;…

00TD I 秒变甜美小淑女,冬日氛围感拉满

一衣两穿的的气质款羽绒服 穿着轻松营造氛围感 90白鸭绒填充 厚实蓬松保暖性十足&#xff0c;大大的翻领显得脸小 又增添精致感&#xff0c;腰间系带系起来就像 穿了件可爱精致的小裙子般 解开也是另一种简约气质的感觉 小淑女们衣橱必入款。

水闸水雨情监测设施建设项目

功能设计 在水闸上、下游挡墙外侧各安装1套雷达水位计&#xff0c;水闸屋顶布置个雨量计&#xff0c;水位及雨量监测数据的采集与传输主要是实时的完成水位、雨量数据的采集与处理&#xff0c;并按照设定的工作方式、时间间隔、增量范围将数据上传至扬压力监测站边缘计算终端&…

【数据结构和算法】压缩字符串

其他系列文章导航 Java基础合集数据结构与算法合集 设计模式合集 多线程合集 分布式合集 ES合集 文章目录 其他系列文章导航 文章目录 前言 一、题目描述 二、题解 2.1 方法一&#xff1a;双指针 三、代码 3.1 方法一&#xff1a;双指针 四、复杂度分析 前言 这是力扣…

如何搭建适合自己的数字人源码系统?

随着数字人技术的快速发展&#xff0c;数字人直播软件成为了直播行业的热门话题。数字人源码部署成为了创业者在这个领域看中的新商机&#xff0c;数字人直播软件给市场带来很多机会的同时也给商家带来了更多的收益。但是&#xff0c;选择一个适合自己的数字人直播软件源码并不…

模型数据-HttpServletRequest使用

模型数据-HttpServletRequest使用 三种情况 自己也要添加一些属性到Request域中,可以通过原生Servlet API的request来设置.也可以修改按照默认机制已经放进去 的某些对象.明白默认情况下存放的名字其实就是类名/类型名 首字母小写.(虽然monster100改了,但还是按类名首字母小写…