直播弹幕系统(三)- 直播在线人数统计

news2025/1/21 21:52:46

直播弹幕系统(三)- 直播在线人数统计

  • 前言
  • 一. 在线人数统计功能实现
    • 1.1 Redis整合
    • 1.2 在线人数更新
    • 1.3 演示

前言

上一篇文章整合RabbitMQ进行消息广播和异步处理 写完了消息的广播、削峰、异步处理业务逻辑等操作。完成了实时共享功能。

不过写到后面发现在线人数统计这块的功能还没实现,因此在这篇补上。

备注:

  1. 目前的WebSocket写法实现只是一种方案,显然并不是最优解,也可能有更好的写法。
  2. 后期准备尝试用Netty来平移替换。

一. 在线人数统计功能实现

基本思路:

  1. 既然我们使用一种分布式架构,并且使用本地缓存去存储WebSocket的链接信息。
  2. 那么对于分布式下的数值统计,最好的就是使用第三方库或者中间件去存储数据。
  3. 这样分布式下的每个服务都可以通过第三方库去进行数据交互和共享
  4. 那么很简单。建立WebSocket的时候,同一个直播间下的在线人数+1。断开的时候则-1。
  5. 对于Redis存储的数据结构,可以使用Hash。1000个直播间可以共用一个hashKey,但是每个键值对却又不同。和1000个String类型的数据存储相比,内存上要节省的多。Redis - String内存开销问题以及基本/扩展数据类型的使用

1.1 Redis整合

1.首先毫无疑问我们需要添加一个pom依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>2.6.7</version>
</dependency>

2.在上一章节中的bulletcommon包下创建共通函数。第一个,SpringBean的工具类SpringBeanUtil

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class SpringBeanUtil implements ApplicationContextAware {

    private static ApplicationContext applicationContext;


    // 获取applicationContext
    public static ApplicationContext getApplicationContext() {
        return applicationContext;
    }

    // 通过name获取 Bean.
    public static Object getBean(String name) {
        return getApplicationContext().getBean(name);
    }

    // 通过class获取Bean.
    @SuppressWarnings("unchecked")
    public static <T> T getBean(Class<T> clazz) {
        try {
            char[] cs = clazz.getSimpleName().toCharArray();
            cs[0] += 32;// 首字母大写到小写
            return (T) getApplicationContext().getBean(String.valueOf(cs));
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    // 通过name,以及Clazz返回指定的Bean
    public static <T> T getBean(String name, Class<T> clazz) {
        return getApplicationContext().getBean(name, clazz);
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        if (SpringBeanUtil.applicationContext == null) {
            SpringBeanUtil.applicationContext = applicationContext;
        }
        log.info("\r\n----------加载applicationContext成功-----------------");
    }

3.创建一个RedisUtilStringRedisTemplate下有个操作:

  • opsForHash().increment(hashKey,key,value):可以给指定的hashKey下的key加上指定的value数值大小。那正好可以用给直播间在线人数的 ±1
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.springframework.data.redis.core.StringRedisTemplate;

/**
 * @author Zong0915
 * @date 2022/12/15 下午6:52
 */
public class RedisUtil {
    public static StringRedisTemplate getStringRedisTemplate() {
        return SpringBeanUtil.getBean(StringRedisTemplate.class);
    }

    public static void increment(String hashKey, String key) {
        if (StringUtils.isBlank(hashKey) || StringUtils.isBlank(key)) {
            return;
        }
        StringRedisTemplate stringRedisTemplate = RedisUtil.getStringRedisTemplate();
        stringRedisTemplate.opsForHash().increment(hashKey, key, 1);
    }

    public static void decrement(String hashKey, String key) {
        if (StringUtils.isBlank(hashKey) || StringUtils.isBlank(key)) {
            return;
        }
        StringRedisTemplate stringRedisTemplate = RedisUtil.getStringRedisTemplate();
        stringRedisTemplate.opsForHash().increment(hashKey, key, -1);
    }

    public static Long get(String hashKey, String key) {
        if (StringUtils.isBlank(hashKey) || StringUtils.isBlank(key)) {
            return 0L;
        }
        StringRedisTemplate stringRedisTemplate = RedisUtil.getStringRedisTemplate();
        String res = (String) stringRedisTemplate.opsForHash().get(hashKey, key);
        return NumberUtils.toLong(res);
    }
}

4.我们给消息体添加两个参数:countoperateType

@Data
public class OriginMessage {
    private String sessionId;
    private String userId;
    private String roomId;
    private String message;
    /** 直播间人数 */
    private Long count;
    /** 1:链接初始化。2:发送弹幕 */
    private Integer operateType;
}

5.SocketConstants再添加一个常量,用于HashKey

public static final String LIVE_COUNT_HASH_KEY = "LiveCount";

1.2 在线人数更新

在线人数的更新涉及到两个地方:

  • WebSocket建立连接的时候。
  • WebSocket断开连接的时候

修改BulletScreenServer类:重点在于@OnOpen@OnClose修饰的函数。

package kz.service;

import kz.cache.SocketCache;
import kz.common.SocketConstants;
import kz.entity.OriginMessage;
import kz.producer.OriginMessageSender;
import kz.util.RedisUtil;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.concurrent.atomic.AtomicLong;

/**
 * @author Zong0915
 * @date 2022/12/9 下午3:45
 */
@Component
@ServerEndpoint("/websocket/live/{roomId}/{userId}")
@Slf4j
@Getter
public class BulletScreenServer {
    /**
     * 多例模式下的赋值方式
     */
    private static OriginMessageSender originMessageSender;

    /**
     * 多例模式下的赋值方式
     */
    @Autowired
    private void setOriginMessageSender(OriginMessageSender originMessageSender) {
        BulletScreenServer.originMessageSender = originMessageSender;
    }

    private static final AtomicLong count = new AtomicLong(0);

    private Session session;
    private String sessionId;
    private String userId;
    private String roomId;

    /**
     * 打开连接
     *
     * @param session
     * @OnOpen 连接成功后会自动调用该方法
     * @PathParam("token") 获取 @ServerEndpoint("/imserver/{userId}") 后面的参数
     */
    @OnOpen
    public void openConnection(Session session, @PathParam("roomId") String roomId, @PathParam("userId") String userId) {
        // 如果是游客观看视频,虽然有弹幕,但是没有用户信息,所以需要用try
        count.incrementAndGet();
        log.info("*************WebSocket连接次数: {} *************", count.longValue());
        this.userId = userId;
        this.roomId = roomId;
        // 保存session相关信息到本地
        this.sessionId = session.getId();
        this.session = session;
        // 在线人数+1
        RedisUtil.increment(SocketConstants.LIVE_COUNT_HASH_KEY, roomId);
        SocketCache.put(sessionId, this);
        // 通知客户端,此时服务端和客户端之间成功建立连接,并且把在线人数传过去
        originMessageSender.send(buildMessage("", 1));
    }

    /**
     * 客户端刷新页面,或者关闭页面,服务端断开连接等等操作,都需要关闭连接
     */
    @OnClose
    public void closeConnection() {
        SocketCache.remove(sessionId);
        // 链接断开,人数-1,并通知给客户端
        RedisUtil.decrement(SocketConstants.LIVE_COUNT_HASH_KEY, roomId);
        originMessageSender.send(buildMessage("", 1));
    }

    /**
     * 客户端发送消息给服务端
     *
     * @param message
     */
    @OnMessage
    public void onMessage(String message) {
        if (StringUtils.isBlank(message)) {
            return;
        }
        // 将消息丢给MQ,业务上的处理什么也不管,交给弹幕业务来处理,并且达到削峰的目的
        originMessageSender.send(buildMessage(message, 2));
    }

    private OriginMessage buildMessage(String message, Integer operateType) {
        OriginMessage originMessage = new OriginMessage();
        originMessage.setMessage(message);
        originMessage.setRoomId(roomId);
        originMessage.setSessionId(sessionId);
        originMessage.setUserId(userId);
        originMessage.setOperateType(operateType);
        originMessage.setCount(RedisUtil.get(SocketConstants.LIVE_COUNT_HASH_KEY, roomId));
        return originMessage;
    }
}

前端修改:完整的项目框架可以看这篇文章SpringBoot - WebSocket的使用和聊天室练习 。 这里就是修改index.tsx文件(区别不大)

import React, { useEffect, useState } from 'react';
import { Button, Row, Col, Input } from 'antd';
import { getValueByParam } from '../utils/pageHelper';

const ws = new WebSocket(`ws://localhost:80/websocket/live/${getValueByParam('roomId')}/${getValueByParam('userId')}`);

const UserPage = () => {
  const [ message, setMessage ] = useState<string>('');
  const [ bulletList, setBulletList ] = useState<any>([]);
  const [ onlineCount, setOnlineCount ] = useState<number>(0);

  useEffect(() => {
    ws.onopen = () => {
      ws.onmessage = (msg: any) => {
        const entity: any = JSON.parse(msg.data);
        if (entity?.operateType === 2) {
          const arr :any = [ `用户[${entity.userId}]: ${entity.message}` ];
          setBulletList((pre: any[]) => [].concat(...pre, ...arr));
        }
        setOnlineCount(entity?.count ?? 0);
      };
    };
    ws.onclose = () => {
      console.log('断开连接');
    };
  }, []);

  const sendMsg = () => {
    ws?.send(message);
  };

  return <>
    <Row style={{ width: 2000, marginTop: 200 }}>
      <Col offset={6}>
        <Input onChange={event => setMessage(event.target.value)} />
      </Col>
      <Col>
        <Button
          onClick={sendMsg}
          type='primary'
        >发送弹幕</Button>
      </Col>
      <Col style={{ marginLeft: 100 }}>
        {'在线人数: ' + onlineCount}
      </Col>
      <Col style={{ marginLeft: 10 }}>
        <div style={{ border: '1px solid', width: 500, height: 500 }}>
          {bulletList.map((item: string, index: number) => {
            return <Row key={index}>
              {item}
            </Row>;
          })}
        </div>
      </Col>
    </Row>
  </>;
};

export default UserPage;

1.3 演示

动图如下:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/92634.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Netcat介绍及安装使用

目录 介绍 Linux 安装 Windows安装 1.下载安装包 2.解压安装包 3.安装路径加入系统变量 Netcat命令参数 使用Netcat互相通信 1.创建一个服务端 2.创建一个客户端&#xff08;连接服务端&#xff09; 介绍 Netcat 是一款简单的Unix工具&#xff0c;使用UDP和TCP协议。…

七、Docker 安装Tomcat(流程、注意点、实操)

1、从中央仓库搜索tomcat 命令:docker search tomcat 也可以从官网查找,地址:Docker Hub 2、从中央仓库拉取tomcat 命令:docker pull tomcat:8.0 这里我们选择8.0 版本tomcat 3、查看镜像 命令:docker images 4、运行镜像 命令:docker run -d

如何从内存卡恢复丢失的数据?简单内存卡(SD卡)数据恢复方法分享

SD卡&#xff0c;也就是内存卡&#xff0c;在日常使用中有着体积小、存储量大的优点&#xff0c;被我们用来存储一些重要的数据。相机是使用SD卡的场景之一。目前大多数相机都使用SD卡来存储相关数据&#xff0c;这不仅是因为SD容量的优势&#xff0c;而且其运行速度也比较快&a…

苹果手机有什么好玩的app推荐

creativeclock 苹果手机有什么好玩的app推荐&#xff0c;iPhone时钟app推荐下载。 An elegant clock application that contains various creative clock styles and widgets. FlipClock, PolarClock, DigitalClock, RouletteClock, AnalogClock … and so on. view on Appsto…

Matplotlib学习笔记(第二章 2.1.5 图形的绘制过程)

本教程旨在展示使用Matplotlib的单个可视化的开始、中间和结束。 我们将从一些原始数据开始&#xff0c;最后保存一个定制的可视化图形。 在此过程中&#xff0c;我们尝试使用Matplotlib来突出一些整洁的特性和最佳实践。 注意&#xff1a;本教程基于克里斯莫菲特这篇优秀的博…

【图像去噪】鲁棒PCA图像去噪【含Matlab源码 463期】

⛄一、图像去噪及滤波简介 1 图像去噪 1.1 图像噪声定义 噪声是干扰图像视觉效果的重要因素&#xff0c;图像去噪是指减少图像中噪声的过程。噪声分类有三种&#xff1a;加性噪声&#xff0c;乘性噪声和量化噪声。我们用f(x,y&#xff09;表示图像&#xff0c;g(x,y&#xff0…

jmeter性能测试-Arrivals 线程组解释

&#x1f4cc; 博客主页&#xff1a; 程序员二黑 &#x1f4cc; 专注于软件测试领域相关技术实践和思考&#xff0c;持续分享自动化软件测试开发干货知识&#xff01; &#x1f4cc; 公号同名&#xff0c;欢迎加入我的测试交流群&#xff0c;我们一起交流学习&#xff01; 目录…

Go项目目录结构该怎么写?

原文地址&#xff1a;Go项目目录结构该怎么写&#xff1f; Go 目录 /cmd 项目的主干。 每个应用程序的目录名应该与想要的可执行文件的名称相匹配(例如&#xff0c;/cmd/myapp)。 不要在这个目录中放置太多代码。如果认为代码可以导入并在其他项目中使用&#xff0c;那么它…

Python入门自学到精通需要看哪些书籍?

Python语言在近几年可以算得上如日中天&#xff0c;越来越火爆的同时&#xff0c;学习Python的人也越来越多了。对于不同基础的学习者来讲&#xff0c;学习的重点和方式也许会有差别&#xff0c;但是基础语法永远都是重中之重。在牢牢掌握基础知识的前提下&#xff0c;我们才能…

SAP ABAP 利用弹窗(POPUP)实现屏幕(DIALOG)快速开发

SAP ABAP 利用弹窗&#xff08;POPUP&#xff09;实现屏幕&#xff08;DIALOG&#xff09;快速开发 引言&#xff1a; 在 ABAP 开发中经常用到屏幕&#xff08;DIALOG&#xff09;开发&#xff0c;这通常都比较耗时。按复杂度可以分成复杂和简单两类屏幕开发&#xff0c;复杂的…

linux(乌班图)开发环境搭建

乌班图远程连接方法&#xff1a;安装openssh-server 和openssh-clientsudo apt-get -y install openssh-server openssh-client 设置允许root用户进行远程连接 方法一&#xff1a; /etc/ssh/sshd_config里面添加PermitRootLogin yes #重启 service ssh restart方法二&#xff1…

Pandas小白入门(一)---将value_counts的结果转为DataFrame

文章目录代码示例工作原理rename_axisreset_index各函数对于DataFrame下的应用其他应用quantile结果转为DataFrame代码示例 value_counts的结果是一个series&#xff0c;其index为原来列的值&#xff0c;value为值的个数。要将其转为DataFrame需要两个函数rename_axis和reset_…

为什么每个程序员都必须写博客

工作了好几年了&#xff0c;一直没写过技术类的博客&#xff0c;最近才开始尝试写一些技术类的博客。通过写博客的这段时间发现&#xff0c;写博客能够帮助我们快速成长已经提高我们学习的积极性&#xff0c;本文将和大家详细说说程序员写博客的好处。 &#x1f680; 一、加深对…

机器学习必会技能之微积分【一文到底】

机器学习必会技能 —— 微积分【一文到底】 文章目录机器学习必会技能 —— 微积分【一文到底】1 微积分的四类问题2 深入理解导数的本质3 深入理解复合函数求导4 理解多元函数偏导5 梯度究竟是什么&#xff1f;6 真正理解微积分6.1 直观理解6.2 理解微积分基本定理7 非常重要的…

NLP之文本分类项目(基于tensorflow1.14版本)

1.README.md:(查看项目整体结构以及各个部分作用) # Text Classification with RNN使用循环神经网络进行中文文本分类本文是基于TensorFlow在中文数据集上的简化实现&#xff0c;使用了字符级RNN对中文文本进行分类&#xff0c;达到了较好的效果。## 环境- Python 3 - TensorF…

Oracle项目管理之设施与资产管理Facilities and Asset (English)

目录 Maintenance Management Stand Alone or Integrated Facility Condition Assessment Space Management Lease Management Full Lease Expenses and Payment Management Transaction Management Asset Portfolio Management Portfolio Management Capabilities S…

改进二进制粒子群算法在配电网重构中的应用(Matlab实现)【论文复现】

目录 ​ 0 概述 1 配电网重构的目标函数 2 算例 3 matlab代码实现 0 概述 配电系统中存在大量的分段开关和联络开关&#xff0c;配电网重构正是通过调整分段开关和联络升大的组合状态来变换网络结构,用于优化配电网某些指标&#xff0c;使其达到最优状态。正常运行时,则通…

简单理解Vue的data为啥只能是函数

在学习vue的时候vue2只有在组件中严格要求data必须是一个函数&#xff0c;而在普通vue实例中&#xff0c;data可以是一个对象&#xff0c;但是在vue3出现后data必须一个函数&#xff0c;当时看着官方文档说的是好像是对象的引用问题&#xff0c;但是内部原理却不是很了解&#…

[附源码]Node.js计算机毕业设计高校教学过程管理系统Express

项目运行 环境配置&#xff1a; Node.js最新版 Vscode Mysql5.7 HBuilderXNavicat11Vue。 项目技术&#xff1a; Express框架 Node.js Vue 等等组成&#xff0c;B/S模式 Vscode管理前后端分离等等。 环境需要 1.运行环境&#xff1a;最好是Nodejs最新版&#xff0c;我…

异常检测方法总结

在数据挖掘中&#xff0c;异常检测&#xff08;英语&#xff1a;anomaly detection&#xff09;对不匹配预期模式或数据集中其他项目的项目、事件或观测值的识别。 通常异常项目会转变成银行欺诈、结构缺陷、医疗问题、文本错误等类型的问题。异常也被称为离群值、新奇、噪声、…