第 3 篇 : Netty离线消息处理(可跳过)

news2024/11/24 18:17:06

说明

仅是个人的不成熟想法, 未深入研究验证

1. 修改 NettyServerHandler类

package com.hahashou.netty.server.config;

import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.HashedWheelTimer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

/**
 * @description:
 * @author: 哼唧兽
 * @date: 9999/9/21
 **/
@Component
@ChannelHandler.Sharable
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    /** key: 用户code; value: channelId */
    public static Map<String, String> USER_CHANNEL = new ConcurrentHashMap<>(32);

    /** key: channelId; value: Channel */
    public static Map<String, Channel> CHANNEL = new ConcurrentHashMap<>(32);

    /** 最好是单独写个单例(注意: 最多只能new 64个此类对象) */
    public static HashedWheelTimer TIMER;

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        Channel channel = ctx.channel();
        String channelId = channel.id().asLongText();
        log.info("有客户端连接, channelId : {}", channelId);
        CHANNEL.put(channelId, channel);
        Message message = new Message();
        message.setChannelId(channelId);
        channel.writeAndFlush(Message.transfer(message));
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        log.info("有客户端断开连接, channelId : {}", ctx.channel().id().asLongText());
        CHANNEL.remove(ctx.channel().id().asLongText());
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        if (msg != null) {
            Message message = JSON.parseObject(msg.toString(), Message.class);
            String userCode = message.getUserCode(),
                    channelId = message.getChannelId();
            if (StringUtils.hasText(userCode) && StringUtils.hasText(channelId)) {
                connect(userCode, channelId);
            } else if (StringUtils.hasText(message.getText())) {
                if (StringUtils.hasText(message.getFriendUserCode())) {
                    sendOtherClient(message);
                } else {
                    sendAdmin(ctx.channel(), message);
                }
            }
        }
    }

    /**
     * 建立连接
     * @param userCode
     * @param channelId
     */
    private void connect(String userCode, String channelId) {
        log.info("客户端 {} 连接", userCode);
        USER_CHANNEL.put(userCode, channelId);
        if (TIMER == null) {
            // 默认的时间轮是100毫秒的tick间隔, 0.1秒的误差
            TIMER = new HashedWheelTimer();
        }
        TIMER.newTimeout(new OfflineMessage(userCode), 1, TimeUnit.SECONDS);
    }

    /**
     * 发送给其他客户端
     * @param message
     */
    private void sendOtherClient(Message message) {
        String friendUserCode = message.getFriendUserCode();
        String queryChannelId = USER_CHANNEL.get(friendUserCode);
        if (StringUtils.hasText(queryChannelId)) {
            Channel channel = CHANNEL.get(queryChannelId);
            if (channel == null) {
                offlineMessage(friendUserCode, message);
                return;
            }
            channel.writeAndFlush(Message.transfer(message));
        } else {
            offlineMessage(friendUserCode, message);
        }
    }

    /**
     * 离线消息存储
     * @param friendUserCode
     * @param message
     */
    public void offlineMessage(String friendUserCode, Message message) {
        List<Message> messageList = OfflineMessage.USER_MESSAGE.get(friendUserCode);
        if (CollectionUtils.isEmpty(messageList)) {
            messageList = new ArrayList<>();
        }
        messageList.add(message);
        OfflineMessage.USER_MESSAGE.put(friendUserCode, messageList);
    }

    /**
     * 发送给服务端
     * @param channel
     * @param message
     */
    private void sendAdmin(Channel channel, Message message) {
        message.setUserCode("ADMIN");
        message.setText(LocalDateTime.now().toString());
        channel.writeAndFlush(Message.transfer(message));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.info("有客户端发生异常, channelId : {}", ctx.channel().id().asLongText());
    }
}

2. config包下增加 OfflineMessage类

package com.hahashou.netty.server.config;

import io.netty.channel.Channel;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @description: 离线消息
 * @author: 哼唧兽
 * @date: 9999/9/21
 **/
@Slf4j
public class OfflineMessage implements TimerTask {

    public static Map<String, List<Message>> USER_MESSAGE = new ConcurrentHashMap<>(32);

    private String userCode;

    public OfflineMessage(String userCode) {
        this.userCode = userCode;
    }

    @Override
    public void run(Timeout timeout) {
        List<Message> messageList = USER_MESSAGE.get(userCode);
        if (CollectionUtils.isEmpty(messageList)) {
            return;
        }
        log.info("向 {} 推送离线消息", userCode);
        Channel channel = NettyServerHandler.CHANNEL.get(NettyServerHandler.USER_CHANNEL.get(userCode));
        for (Message offlineMessage : messageList) {
            channel.writeAndFlush(Message.transfer(offlineMessage));
        }
    }
}

3. 启动服务端以及客户端A, 发送几条离线消息

A客户端发送离线消息
之后, 启动客户端B接收离线消息。启动/停止了4次, 得到如下4个结果
1
结果1
2
结果2
3
结果3
4
结果4
可以看出, 得到的离线消息并不可靠, 虽然有2次结果一致。而且在这之前, 有一次启动时, 根本就是1条消息都没有, 我都一度怀疑我写的有问题

4. 个人猜想

因为是异步的, netty发送消息时, 轮询策略应该有个时间轮管理着, 且时间轮是有tick间隔的。java中for循环的执行效率大概是10个循环耗时1毫秒, 0.001秒, 如果在for循环中增加线程sleep, 或许就都能执行到, 所以我在OfflineMessage类中for循环中增加50毫秒的slepp, 5次测试结果一致, 后将50改成10, 5次测试结果一致。虽然测试没有问题, 但毕竟测试量太少, 且我觉得离线消息应该是能通过接口一次性就获取到, 所以这种通过netty获取离线消息的方式, 我不赞同

for (Message offlineMessage : messageList) {
    // 异常向上抛出或捕获
    Thread.sleep(50);
    channel.writeAndFlush(Message.transfer(offlineMessage));
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1626276.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Vue3中使用无缝滚动插件vue3-seamless-scroll

官网&#xff1a;https://www.npmjs.com/package/vue-seamless-scroll 1、实现效果文字描述&#xff1a; 表格中的列数据进行横向无缝滚动&#xff0c;某一列进行筛选的时候&#xff0c;重新请求后端的数据&#xff0c;进行刷新 2、安装&#xff1a;npm i vue3-seamless-scrol…

从Kafka的可靠性设计体验软件设计之美

目录 1. Kafka可靠性概述 2. 副本剖析 2.1 什么是副本 2.2 副本失效场景 2.3 数据丢失场景 2.4 解决数据丢失方案 3. 日志同步机制 4. 可靠性分析 1. Kafka可靠性概述 Kafka 中采用了多副本的机制&#xff0c;这是大多数分布式系统中惯用的手法&#xff0c;以此来实现水平扩…

异步日志方案spdlog

异步日志方案spdlog spdlog 是一款高效的 C 日志库&#xff0c;它以其极高的性能和零成本的抽象而著称。spdlog 支持异步和同步日志记录&#xff0c;提供多种日志级别&#xff0c;并允许用户将日志输出到控制台、文件或自定义的接收器。 多线程使用和同步、异步日志没有关系是…

10.接口自动化测试学习-Pytest框架(2)

1.mark标签 如果在每一个模块&#xff0c;每一个类&#xff0c;每一个方法和用例之前都加上mark标签&#xff0c;那么在pytest运行时就可以只运行带有该mark标签的模块、类、接口。 这样可以方便我们执行自动化时&#xff0c;自主选择执行全部用例、某个模块用例、某个流程用…

indexDB 大图缓存

背景 最近在项目中遇到了一个问题&#xff1a;由于大屏背景图加载速度过慢&#xff0c;导致页面黑屏时间过长&#xff0c;影响了用户的体验。从下图可以看出加载耗时将近一分钟 IndexDB 主要的想法就是利用indexDB去做缓存&#xff0c;优化加载速度&#xff1b;在这之前&am…

自动驾驶传感器篇: GNSSIMU组合导航

自动驾驶传感器篇&#xff1a; GNSS&IMU组合导航 1.GNSS1.1 GNSS 系统概述1.2 GNSS系统基本组成1. 空间部分&#xff08;Space Segment&#xff09;&#xff1a;2. 地面控制部分&#xff08;Ground Control Segment&#xff09;&#xff1a;3. 用户设备部分&#xff08;Use…

python爬虫-----深入了解 requests 库下篇(第二十六天)

&#x1f388;&#x1f388;作者主页&#xff1a; 喔的嘛呀&#x1f388;&#x1f388; &#x1f388;&#x1f388;所属专栏&#xff1a;python爬虫学习&#x1f388;&#x1f388; ✨✨谢谢大家捧场&#xff0c;祝屏幕前的小伙伴们每天都有好运相伴左右&#xff0c;一定要天天…

电影交流|基于SprinBoot+vue的电影交流平台小程序系统(源码+数据库+文档)

电影交流平台目录 目录 基于SprinBootvue的电影交流平台小程序系统 一、前言 二、系统设计 三、系统功能设计 1用户信息管理 2 电影信息管理 3公告信息管理 4论坛信息管理 四、数据库设计 五、核心代码 六、论文参考 七、最新计算机毕设选题推荐 八、源码获取…

Oracle delete删除数据是否为逻辑删除、新插入数据占用的数据块位置实验

假设一&#xff1a;数据库delete删除为直接删除 假设二&#xff1a;数据库delete删除为逻辑删除&#xff0c;在数据块标记出来&#xff0c;但是实际并没有删除。 方式一&#xff1a;通过dump数据块的方式来实现 我们先用小数据量&#xff0c;通过dump数据块的方式来实现 -- 数…

Activity界面什么都不显示怎么解决

如图&#xff0c;有可能是你重写错了方法&#xff0c;两个onCreate方法非常像&#xff0c;参数不同&#xff0c;正确方法如下&#xff1a;

003基于SSM的学生选课系统(学生信息管理系统)ssm+mysql

003基于SSM的学生选课系统/学生信息管理系统 开发环境&#xff1a; Eclipse/MyEclipse、Tomcat8、Jdk1.8 数据库&#xff1a; MySQL 前端&#xff1a;JavaScript、jQuery、bootstrap4、particles.js 后端&#xff1a;maven、SpringMVC、MyBatis、ajax、mysql读写分离、mybat…

共享购:融合社交分享与消费返利的创新电商模式

共享购电商模式是一种独特的商业模式&#xff0c;巧妙地将社交分享与消费返利结合&#xff0c;让消费者在购物的同时&#xff0c;也能通过平台资产奖励实现价值的双重增长。该平台资产体系主要由共享值和共享积分两大要素构成&#xff0c;共同构建了一个充满活力的电商生态系统…

C++中布隆过滤器

&#x1f436;博主主页&#xff1a;ᰔᩚ. 一怀明月ꦿ ❤️‍&#x1f525;专栏系列&#xff1a;线性代数&#xff0c;C初学者入门训练&#xff0c;题解C&#xff0c;C的使用文章&#xff0c;「初学」C&#xff0c;linux &#x1f525;座右铭&#xff1a;“不要等到什么都没有了…

【网络原理】TCP协议的连接管理机制(三次握手和四次挥手)

系列文章目录 【网络通信基础】网络中的常见基本概念 【网络编程】网络编程中的基本概念及Java实现UDP、TCP客户端服务器程序&#xff08;万字博文&#xff09; 【网络原理】UDP协议的报文结构 及 校验和字段的错误检测机制&#xff08;CRC算法、MD5算法&#xff09; 【网络…

Linux的FTP服务

目录 1.什么是FTP服务&#xff1f; 2.FTP的工作原理和流程 1 主动模式 2 被动模式 3.搭建和配置FTP服务 1 下载服务包、备份配置文件 2 修改配置文件​编辑 3 匿名访问测试 4 设置黑白命令 1.什么是FTP服务&#xff1f; FTP&#xff08;file Transfer Protocol&#…

Baidu comate智能编程助手评测

Baidu comate智能编程助手评测 作者&#xff1a;知孤云出岫 目录 一&#xff0e; 关于comate产品 二&#xff0e; 关于comate产品体验 三&#xff0e; 关于实际案例. 四&#xff0e; 关于baidu comate编程助手的实测体验感悟 五&#xff0e; …

(三)登录和注册(handle_auto.go)

登录和注册(handle_auto.go) 文章目录 登录和注册(handle_auto.go)一、所需要的结构体信息二、注册三、登录四、退出 一、所需要的结构体信息 type UserAuth struct{}type LoginReq struct {Username string json:"username" binding:"required"Password …

面试经典150题——求根节点到叶节点数字之和

​ 1. 题目描述 2. 题目分析与解析 2.1 思路一——DFS 理解问题&#xff1a; 首先要理解题目的要求&#xff0c;即对于给定的二叉树&#xff0c;我们需要找出从根节点到所有叶子节点的所有路径&#xff0c;然后将每一条路径上的数字组成一个整数&#xff0c;最后求出这些整数…

【静态分析】静态分析笔记08 - 指针分析 - 上下文敏感

参考&#xff1a; 【课程笔记】南大软件分析课程8——指针分析-上下文敏感&#xff08;课时11/12&#xff09; - 简书 ------------------------------------------------------------------------------------------------------------- 1. 上下文不敏感的问题 说明&#…

最详细步骤解决:Apps targeting Android12 and higher are required to specify...

问题原因&#xff1a; 当targetSdkVersion>31时&#xff0c;需要在AndroidManifest.xml中配置android:exported的值&#xff0c;该值为boolean类型。 android:exported解释&#xff1a; activity 是否可由其他应用的组件启动&#xff1a; 如果设为 "true"&#…