使用WebSocket实现log日志流的实时展示-从轮询到通知

news2024/9/24 3:20:57

场景介绍

最近开发一个系统,其中一个模块需要展示实时的执行过程,过程日志可能比较多。以前的方案都是前端定时轮询,比如每秒查一次后端接口,将拉取回来的日志重新展示。轮询方案简单容易实现,但是比较消耗资源,后端没有数据的时候,会造成大量的无用轮询。所以这次我们采用长连接的方案,优化这块的逻辑,提升用户体验。
在这里插入图片描述

WebSocket介绍

参考:https://liaoxuefeng.com/books/java/spring/web/websocket/

WebSocket 是一种基于 HTTP 的长连接技术。传统的 HTTP 协议采用请求-响应模型,浏览器不发送请求时,服务器无法主动推送数据给浏览器。因此,当需要定期或不定期向浏览器推送数据(例如股票行情或在线聊天)时,传统的 HTTP 协议只能通过浏览器的 JavaScript 定时轮询来实现。这种方法效率低下,且实时性不高。

由于 HTTP 本身基于 TCP 连接,WebSocket 在 HTTP 协议的基础上进行了简单的升级。建立 TCP 连接后,浏览器在发送请求时附带以下头部信息:

GET /chat HTTP/1.1
Host: www.example.com
Upgrade: websocket
Connection: Upgrade

这表示客户端希望升级为长连接的 WebSocket。服务器返回升级成功的响应:

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade

收到成功响应后,WebSocket 握手即告完成。这意味着代表 WebSocket 的 TCP 连接将不会被服务器关闭,而是保持开放状态,服务器和浏览器可以随时互相推送消息。这些消息既可以是文本,也可以是二进制数据。通常,大多数应用程序会发送基于 JSON 的文本消息。

现代浏览器均已支持 WebSocket 协议,服务器端则需要底层框架的支持。Java 的 Servlet 规范从 3.1 开始支持 WebSocket,因此,必须选择支持 Servlet 3.1 或更高版本的容器,才能使用 WebSocket。最新版本的 Tomcat、Jetty 等开源服务器均已支持 WebSocket。

在这里插入图片描述

实践演示

Java后端

我们以实际代码来演示如何在Springboot项目中实现对Websocket的支持。

step1: 添加websocket依赖
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
step2: 增加配置

这个配置的主要作用是自动启动使用了注解==@ServerEndpoint==的类

@Configuration
@EnableWebSocket
public class WebSocketConfiguration {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}
step3: 创建一个ws endpoint
@ServerEndpoint(value = ChaosConst.CHAOS_WS_API + "/execute/log/{bizType}/{bizId}")
@Component
@Slf4j
public class LogWsEndpoint implements Consumer<ChaosLogEvent> {
    // 对话的标识
    private String bizKey;
    // 存储每个会话
    private static final ConcurrentHashMap<String, List<LogWsEndpoint>> endpointMap = new ConcurrentHashMap<>();
    // 将会话放入到线程池中,异步将数据返回给前端
    private static ThreadPoolExecutor wsThreadPoolExecutor;
    // 核心逻辑处理器
    private ChaosLogEventHandler handler;


	// 业务写和读log
    private static ChaosLogger chaosLogger;

    @Autowired
    @Qualifier("wsThreadPoolExecutor")
    public void setWsThreadPoolExecutor(ThreadPoolExecutor wsThreadPoolExecutor) {
        if (null != wsThreadPoolExecutor) {
            LogWsEndpoint.wsThreadPoolExecutor = wsThreadPoolExecutor;
        }
    }

    @Autowired
    public void setChaosLogger(ChaosLogger chaosLogger) {
        if (null != chaosLogger) {
            LogWsEndpoint.chaosLogger = chaosLogger;
        }
    }

    @OnOpen
    public void onOpen(Session session, @PathParam("bizType") String bizType, @PathParam("bizId") String bizId) {
        this.bizKey = String.format("%s-%s", bizType, bizId);
        log.info("[ws-chaos-log]连接建立中 ==> bizKey : {}", bizKey);
        this.handler = new ChaosLogEventHandler(chaosLogger, session);
        wsThreadPoolExecutor.submit(() -> flushMessage(bizType, bizId, true));

        endpointMap.compute(bizKey, (key, value) -> {
            List<LogWsEndpoint> ends = null == value ? new ArrayList<>() : value;
            ends.add(this);
            return ends;
        });
        log.info("[ws-chaos-log]连接建立成功: sessionId:{}, bizKey : {}",session.getId(), bizKey);
    }


    public void flushMessage(String bizType, String bizId, boolean force) {
        this.handler.flushMessage(bizType, bizId, force);
    }


    @OnClose
    public void onClose() {
        log.info("websocket log server close");
        if (StringUtils.isBlank(bizKey)) {
            return;
        }

        endpointMap.compute(bizKey, (key, endpoints) -> {
            if (null != endpoints) {
                endpoints.remove(this);
            }
            return endpoints;
        });
        log.info("[ws-chaos-log]连接关闭成功,关闭该连接信息:sessionId : {}, bizKey : {}。", handler.getSession().getId(), bizKey);
    }

    @OnMessage
    public void onMessage(String message, Session session) throws IOException {
        log.info("[ws-chaos-log]服务端收到客户端消息 ==> sessionId : {}, bizKey : {}, message : {}", handler.getSession().getId(), bizKey, message);
    }

    @OnError
    public void onError(Session session, Throwable error) {
        log.error("[ws-chaos-log]WebSocket发生错误,sessionId : {}, bizKey : {}", handler.getSession().getId(), bizKey);
    }

    @Override
    public void accept(ChaosLogEvent chaosLogEvent) {
        String contextId = String.format("%s-%s", chaosLogEvent.getBizType(), chaosLogEvent.getBizId());
        log.info("accept chaosLogEvent : {}", JSON.toJSONString(chaosLogEvent));
        List<LogWsEndpoint> logWsEndpoints = endpointMap.get(contextId);
        if (CollectionUtils.isEmpty(logWsEndpoints)) {
            return;
        }

        logWsEndpoints.forEach(endpoint -> wsThreadPoolExecutor.submit(() -> endpoint.flushMessage(chaosLogEvent.getBizType(), chaosLogEvent.getBizId(), true)));
    }
}

==注意:上面有个accept()==方法,这个方法后面也会讲到,主要就是用于触发已经建立连接Websocket发送消息。

核心逻辑实现, 这里读取的日志文件是存储在百度云的ois,ois读取逻辑忽略。

@Slf4j
public class ChaosLogEventHandler {
    private static final long READ_LOG_MOST_LEN = 1024 * 1024 * 5L; // 5M
    private final ChaosLogger chaosLogger;
    @Getter
    private final Session session;
    private final AtomicLong offset = new AtomicLong(-1L);
    private final AtomicBoolean hasTruncated = new AtomicBoolean(false);
    private final AtomicLong waitEventCnt = new AtomicLong(0L);
    private final Lock lock = new ReentrantLock();


    public ChaosLogEventHandler(ChaosLogger chaosLogger, Session session) {
        this.chaosLogger = chaosLogger;
        this.session = session;
    }

    public void flushMessage(String bizType, String bizId, boolean force) {
        String bizKey = bizType + "-" + bizId;
        if (!lock.tryLock()) {
            waitEventCnt.incrementAndGet();
            log.info("[WS]获取锁失败,直接返回 ==> sessionId : {}, bizKey:{}", session.getId(), bizKey);
            return;
        }
        try {
            if (!force && waitEventCnt.getAndSet(0L) < 1) {
                log.info("[ws-chaos-log]没有待处理事件,直接返回 ==> sessionId : {}, bizKey:{}", session.getId(), bizKey);
                // 没有待处理的事件
                return;
            }
            log.info("[ws-chaos-log]向客户端刷新数据 ==> sessionId : {}, bizKey : {}, offset : {}", session.getId(), bizKey, offset.get());
            if (offset.get() < 0) {
                long contentLength = chaosLogger.getLogContentLength(bizType, bizId);
                log.info("[ws-chaos-log]contentLength = {} for bizLogKey {}", contentLength, bizKey);
                if (contentLength == 0) {
                    return;
                }
                if (contentLength > READ_LOG_MOST_LEN) {
                    offset.set(contentLength - READ_LOG_MOST_LEN);
                    hasTruncated.set(true);
                    log.info("[ws-chaos-log]文件过大,截取最后10M ==> sessionId : {}, bizKey : {} contentLength={} offset={}", session.getId(), bizKey, contentLength, offset.get());
                } else {
                    offset.set(0L);
                }
            } else if (!force) {
                long contentLength = chaosLogger.getLogContentLength(bizType, bizId);
                if (contentLength <= offset.get()) {
                    log.info("[ws-chaos-log]文件长度小于offset,无需刷新 ==> sessionId : {}, bizKey : {} contentLength={} offset={}", session.getId(), bizKey, contentLength, offset.get());
                    return;
                }
            }

            // 读取日志内容
            BosObject bosObject = chaosLogger.readLogObject(bizType, bizId, offset.get(), Long.MAX_VALUE);
            try (BufferedReader reader = new BufferedReader(new InputStreamReader(bosObject.getObjectContent()))) {
                String line = null;
                while (null != (line = reader.readLine())) {
                    if (hasTruncated.get()) {
                        hasTruncated.set(false);
                        log.info("[ws-chaos-log]hasTruncated changed to false");
                    } else {
                        log.info("[ws-chaos-log]send ws msg:{}", line);
                        try {
                            session.getBasicRemote().sendText(line + "\n");
                        } catch (IllegalStateException e) {
                            log.info("[ws-chaos-log]发送消息过程中连接状态异常,跳过", e);
                        }
                    }
                    // +1是因为每一行结尾会有一个回车
                    offset.addAndGet(line.getBytes(StandardCharsets.UTF_8).length + 1);
                }
            } catch (IOException e) {
                log.error("", e);
            }
        } catch (NotFoundException e) {
            log.info("[ws-chaos-log]未找到数据,无需向客户端同步,bizKey:{}", bizKey, e);
        } catch (RuntimeException e) {
            log.error("", e);
        } finally {
            lock.unlock();
        }
        log.info("[ws-chaos-log]向客户端刷新数据,完成 ==> sessionId : {}, bizKey : {}", session.getId(), bizKey);
        // 在处理过程中,可能又有新的事件,所以再次尝试刷新数据
        flushMessage(bizType, bizKey, false);
    }
}
stept5: 广播事件,全局监听

前后端建立连接的时候,绑定了后端一台机器,但是后台一般都是多台服务器,如果事件传递到其他服务器,那么已经建立的连接如何监听到并返回内呢?

这里使用了rocketmq的机制,每台机器都会监听到事件的变化,从而触发当前机器将变更内容发回到前端。

@Component
@RocketMQMessageListener(topic = "EXECUTE_FLOW_LOG", selectorExpression = "log", consumerGroup = "flow-log", messageModel = MessageModel.BROADCASTING)
@Slf4j
public class ChaosLogEventConsumer implements RocketMQListener<String> {
    @Autowired(required = false)
    private List<Consumer<ChaosLogEvent>> chaosLogEventConsumers = Collections.emptyList();

    @Override
    public void onMessage(String message) {
        log.info("[MQ]receive ChaosLogEvent message:{}", message);
        ChaosLogEvent event = JsonUtils.fromJson(message, ChaosLogEvent.class);
        for (Consumer<ChaosLogEvent> consumer : chaosLogEventConsumers) {
            try {
                consumer.accept(event);
            } catch (RuntimeException e) {
                log.error("[MQ] failed consume ChaosLogEvent message,consumer:" + consumer.getClass(), e);
            }
        }
    }
}

前端代码实现

以react为例,仅供参考:

export const fetchExecuteLogs = (bizType: string, bizId: any, logsRef: any, setLogs: any) => {
  if (!bizType || !bizId) {
    console.log('fetchLogs: logContextToken or node is null')
    return
  }
  setLogs([])
  if (logsRef.current[0]) {
    console.log('close ws')
    logsRef.current[0].close()
  }
  let host = wsHost ? wsHost : window.location.host
  let protocol = window.location.protocol === 'https:' ? 'wss' : 'ws'
  let client = new WebSocket(`${protocol}://${host}/ws/ark/chaos/execute/log/${bizType}/${bizId}`)
  logsRef.current = [client, []]
  // 报错的回调函数
  client.onerror = (e: any) => {
    console.log('Connection Error')
    console.log(e)
  }
  //链接打开的回调函数
  client.onopen = () => {
    console.log('WebSocket Client Connected')
  }
  //链接关闭的回调函数
  client.onclose = () => {
    console.log('echo-protocol Client Closed')
  }
  //收到消息的处理函数
  client.onmessage = (e: any) => {
    if (logsRef.current[0] === client) {
      if (typeof e.data === 'string') {
        let newLogs = [...logsRef.current[1], e.data]
        if (newLogs.length > 250) {
          newLogs = newLogs.slice(200)
        }
        setLogs(newLogs)
        logsRef.current = [client, newLogs]
      }
    } else {
      client.close()
    }
  }
  const sendPing = () => {
    if (logsRef.current[0] === client) {
      const data = { message: 'heartbeat' }
      client.send(JSON.stringify(data))
      setTimeout(sendPing, 10000)
    }
  }
  setTimeout(sendPing, 10000)
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1966466.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ctfhub-SQL注入-1-基础题目详解

1.打开题目在url上判断是数字型注入还是字符型注入 1 //无回显&#xff0c;说明是报错了&#xff0c;‘和后面的‘冲突了 1’ -- //有回显&#xff0c;是因为--把后面的‘注释掉了 2.使用命令判断列数 1order by 1-- //有回显 1order by 2 -- //有回显 1order by …

推荐Nodejs下高效存储树到数据库工具库-FlexTree

官网 | English FlexTree是Nodejs下一个基于左右值算法的树结构库&#xff0c;它提供了一种简单的方式来存储和操作树形结构数据。 FlexTree提供了简单而丰富的API让你可以轻松的操作树&#xff0c;如增删改查、遍历、移动、查询等。 主要特性&#xff1a; 基于左右值算法&a…

AMEYA360:纳芯微高集成单芯片SoC如何高效智能控制车载步进电机?

随着现代汽车电子技术的快速发展&#xff0c;步进电机作为一种精确且可靠的执行元件&#xff0c;在汽车电子系统中的应用日益广泛。为了实现车载步进电机应用的精确控制&#xff0c;纳芯微推出了集成LIN和MOSFET功率级的单芯片车用小电机驱动SoC——NSUC1610&#xff0c;可以帮…

全面掌握VS Code:提升开发效率的终极指南

Visual SCode Visual Studio Code&#xff08;简称VS Code&#xff09;是一款由微软开发的免费、开源且跨平台的代码编辑器。它支持多种编程语言&#xff0c;通过其强大的扩展库&#xff0c;可以满足各种开发需求。本教程将详细介绍如何从安装到高级使用&#xff0c;帮助你充分…

降低Anki对C盘空间占用的四种方法

Anki安装后&#xff0c;笔记中所用到的各种媒体和资源文件默认保存在C盘&#xff0c;例如我的电脑上是保存在“C:\Users\asus\AppData\Roaming\Anki2”&#xff0c;其中asus是我电脑的登录用户名。随着笔记收集越来越多&#xff0c;对C盘的占用也越来越大&#xff0c;因此&…

5问5答!您想了解的数据采集DAQ关键指标都在这里了

1、什么是采样率&#xff0c;它对测量结果有何影响&#xff1f; 采样率是数据采集卡每秒采集数据的次数。采样率对测量结果的准确性有直接影响。如果采样率过低&#xff0c;可能会错过信号的重要部分&#xff0c;导致数据失真。 理论上根据采样定理&#xff0c;采样率应为信…

【C++BFS】802. 找到最终的安全状态

本文涉及知识点 CBFS算法 LeetCode802. 找到最终的安全状态 有一个有 n 个节点的有向图&#xff0c;节点按 0 到 n - 1 编号。图由一个 索引从 0 开始 的 2D 整数数组 graph表示&#xff0c; graph[i]是与节点 i 相邻的节点的整数数组&#xff0c;这意味着从节点 i 到 graph…

【Qwen-Audio部署实战】Qwen-Audio-Chat模型之对话机器人部署测试

系列篇章&#x1f4a5; No.文章1【Qwen部署实战】探索Qwen-7B-Chat&#xff1a;阿里云大型语言模型的对话实践2【Qwen2部署实战】Qwen2初体验&#xff1a;用Transformers打造智能聊天机器人3【Qwen2部署实战】探索Qwen2-7B&#xff1a;通过FastApi框架实现API的部署与调用4【Q…

02.计算器存储器的原理

02.计算器存储器的原理 目录介绍 01.什么是存储器 1.1 了解存储器是什么1.2 存储器类型 02.存储器系统设计 2.1 存储器分层设计2.2 存储器层次结构2.3 高速缓存设计思想2.4 虚拟内存访问内存 03.存储器类型 3.1 按照材质划分3.2 按芯片类型划分3.3 内存 vs CPU3.4 存储器访问…

【Yolov8】实战三:手把手教你使用YOLOv8以及pyqt搭建中医耳穴辅助诊断项目原理及模型部署

摘要 今天&#xff0c;学习RTMPose关键点检测实战。教大家如何安装安装MMDetection和MMPose。 实战项目以三角板关键点检测场景为例&#xff0c;结合OpenMMLab开源目标检测算法库MMDetection、开源关键点检测算法库MMPose、开源模型部署算法库MMDeploy&#xff0c;全面讲解项目…

Spring源码解析(26)之AOP的核心对象创建过程

一、前言 在上一节中我们介绍了在Spring 解析xml配置文件的时候&#xff0c;给我们往容器中生成了很多BeanDefinition&#xff0c;其中最重要的是advice对象&#xff0c;而advice对象最外层是用一个advisor对象包裹起来&#xff0c;而我们的advice对象的创建需要三个参数&#…

|迁移学习| 迁移学习详解及基于pytorch的相关代码实现

&#x1f411; |迁移学习| 迁移学习详解及基于pytorch的相关代码实现 &#x1f411; 文章目录 &#x1f411; |迁移学习| 迁移学习详解及基于pytorch的相关代码实现 &#x1f411;&#x1f411; 前言&#x1f411;&#x1f411; 迁移学习详解&#x1f411;&#x1f411; 迁移学…

第34篇 子程序FINDSUM求和<一>

Q&#xff1a;如何设计汇编语言程序求数组[1:n]的和&#xff1f; A&#xff1a;基本原理&#xff1a;可编写一段实现子程序FINDSUM&#xff0c;子程序中使用一个loop来实现数组的求和运算。子程序FINDSUM的参数N存储在内存中&#xff0c;主程序从该内存中将其读取到一个寄存器…

MES系统如何实现生产任务的自动或辅助调度

MES系统&#xff08;Manufacturing Execution System&#xff0c;制造执行系统&#xff09;通过一系列集成化的功能模块和智能算法&#xff0c;实现生产任务的自动或辅助调度。以下是MES系统实现生产任务自动或辅助调度的具体方式&#xff1a; 1. 生产计划与排程 计划制定&am…

【C++从小白到大牛】类和对象

目录 一、面向过程和面向对象初步认识 二、类的引入 三、类的定义 类的成员函数两种定义方式&#xff1a; 1. 声明和定义全部放在类体中 2. 类声明放在.h文件中&#xff0c;成员函数定义放在.cpp文件中 成员变量命名规则的建议&#xff1a; 四、类的访问限定符 【访问限…

4.2.2、存储管理-段式存储和段页式存储

段式存储 段式存储是指将进程空间分为一个个段,每段也有段号和段内地址,与页式存储不同的是,每段物理大小不同,分段是根据逻辑整体分段的. 地址表示:(段号,段内偏移):其中段内偏移不能超过该段号对应的段长,否则越界错误,而此地址对应的真正内存地址应该是:段号对应的基地址段…

lambdafunctionbind

lambda匿名函数 定义&#xff1a; 捕捉&#xff1a;传值/传引用/mutable 混合捕捉&#xff0c;&#xff1d;表全普通捕捉 即使全部捕捉&#xff0c; 编译器实现时也不一定全部传入&#xff0c; 编译器只会传入要用到的变量 lambda内可使用的变量的范围 lambda内只能用捕捉对…

Linux gcc day 9

cpu是一个只可以执行指令&#xff0c;不是cpu要打印而是我们要打印&#xff0c;然后编译成指令再给cpu&#xff0c;再通过操作系统进行操手 进程状态&#xff1a; 为什么会有这些状态&#xff1f; 进程的多状态&#xff0c;本质都是为了满足未来不同的运行场景 有那些状态&am…

linux系统的检测脚本,用于检查linux的网络配置,包括网络接口状态、IP地址、子网掩码、默认网关、DNS服务器、连通性测试等等

目录 一、要求 二、脚本介绍 1、脚本内容 2、脚本解释 &#xff08;1&#xff09; 检查是否以 root 用户身份运行 &#xff08;2&#xff09;显示脚本标题 &#xff08;3&#xff09;打印主机名 &#xff08;4&#xff09;获取网络接口信息 &#xff08;5&#xff09…

React学习之props(父传子,子传父),Context组件之间的传参。

目录 前言 一、什么时候需要使用props&#xff1f; 二、使用 1.父传子 2.子传父 二、什么时候需要使用Context&#xff1f; 第一步: 第二步使用&#xff1a; 第一种&#xff1a; 第二种&#xff1a; 演示&#xff1a; 总结 前言 React学习笔记记录&#xff0c;pr…