直播弹幕系统(四)- 发送弹幕校验登录整合JWT
- 前言
- 一. 整合JWT
- 1.1 改造Socket服务
- 1.2 测试
前言
上一篇文章 直播弹幕系统(三)- 直播在线人数统计 主要讲了利用Redis
对一个直播间的在线用户做出统计。那么这篇文章,就要对发送弹幕的用户进行一个校验了。毕竟一个直播间,游客是可以正常访问的。但是发送弹幕的话,肯定就是需要登录了。这次使用JWT
来完成校验。
JWT
可以先看看这篇文章Java - JWT的简单介绍和使用
一. 整合JWT
1.添加以下pom
依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
<version>2.3.2.RELEASE</version>
</dependency>
<dependency>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt</artifactId>
<version>0.9.1</version>
</dependency>
<dependency>
<groupId>com.auth0</groupId>
<artifactId>java-jwt</artifactId>
<version>3.19.1</version>
</dependency>
2.添加配置类AppConfig
:
import kz.util.SpringBeanUtil;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class AppConfig {
@Value("${user.secretKey}")
public String secretKey;
public static String GetSecret() {
AppConfig configBean = SpringBeanUtil.getBean(AppConfig.class);
String secretKey = configBean.getSecretKey();
return secretKey == null ? StringUtils.EMPTY : secretKey;
}
public String getSecretKey() {
return secretKey;
}
public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}
}
3.添加JwtUtil
:
import com.auth0.jwt.JWT;
import com.auth0.jwt.JWTVerifier;
import com.auth0.jwt.algorithms.Algorithm;
import io.jsonwebtoken.JwtBuilder;
import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.SignatureAlgorithm;
import kz.config.AppConfig;
import org.apache.commons.lang3.StringUtils;
import org.apache.tomcat.util.codec.binary.Base64;
import java.util.Calendar;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* @author Zong0915
* @date 2022/11/11 下午7:16
*/
public class JwtUtil {
private static final SignatureAlgorithm ALGORITHM = SignatureAlgorithm.HS256;
// 生成Jwt Token
public static String buildToken(Long userId) {
HashMap<String, Object> chaim = new HashMap<String, Object>();
chaim.put("userId", userId);
return JwtUtil.buildToken(userId, chaim);
}
// 生成Jwt Token
public static String buildToken(Long userId, Map<String, Object> chaim) {
Calendar expires = Calendar.getInstance();
JwtBuilder jwtBuilder = Jwts
.builder()
.setClaims(chaim)
// JWT唯一标识
.setId(UUID.randomUUID().toString())
// 签发时间
.setIssuedAt(expires.getTime())
// Subject主体,存我们的userId
.setSubject(userId + "")
// 签名算法和对应的秘钥
.signWith(ALGORITHM, "sdss@!##rerwe");
// 设置过期时间
expires.add(Calendar.HOUR, 24 * 30);
jwtBuilder.setExpiration(expires.getTime());
// 生成Token
return jwtBuilder.compact();
}
// 根据Token 拿到我们的userId
public static Long getUerIdFromClaim(String token) {
if (StringUtils.isBlank(token)) {
return null;
}
Object o = Jwts
.parser()
.setSigningKey(AppConfig.GetSecret())
.parseClaimsJws(token)
.getBody()
.get("userId");
if (o != null) {
return Long.parseLong(o + "");
}
return null;
}
public static boolean isVerify(String jwtToken) {
// 这里一定要经过Base64解码
try {
Algorithm algorithm = Algorithm.HMAC256(Base64.decodeBase64(AppConfig.GetSecret()));
JWTVerifier verifier = JWT.require(algorithm).build();
verifier.verify(jwtToken); // 校验不通过会抛出异常
} catch (Exception e) {
return false;
}
return true;
}
}
1.1 改造Socket服务
第一步,我们先添加JWT
相关的秘钥。在boostrap.yml
文件中添加以下内容:(配合AppConfig
)
user:
secretKey: sdss@!##rerwe
我们以userId
为10010为例:http://localhost:4396/zong/?userId=10010&roomId=1
。写个UT
,创建一条Token
:
修改BulletScreenServer
类:主要修改如下:
- 添加一个
token
属性。 - 监听的
WebSocket
地址变更:/websocket/live/{roomId}/{token}
- 发送弹幕的时候,对
Token
进行合法校验。
import kz.cache.SocketCache;
import kz.common.SocketConstants;
import kz.entity.OriginMessage;
import kz.producer.OriginMessageSender;
import kz.util.JwtUtil;
import kz.util.RedisUtil;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author Zong0915
* @date 2022/12/9 下午3:45
*/
@Component
@ServerEndpoint("/websocket/live/{roomId}/{token}")
@Slf4j
@Getter
public class BulletScreenServer {
/**
* 多例模式下的赋值方式
*/
private static OriginMessageSender originMessageSender;
/**
* 多例模式下的赋值方式
*/
@Autowired
private void setOriginMessageSender(OriginMessageSender originMessageSender) {
BulletScreenServer.originMessageSender = originMessageSender;
}
private static final AtomicLong count = new AtomicLong(0);
private Session session;
private String sessionId;
private String token;
private String roomId;
private String userId;
/**
* 打开连接
*
* @param session
* @OnOpen 连接成功后会自动调用该方法
*/
@OnOpen
public void openConnection(Session session, @PathParam("roomId") String roomId, @PathParam("token") String token) {
// 如果是游客观看视频,虽然有弹幕,但是没有用户信息,所以需要用try
count.incrementAndGet();
log.info("*************WebSocket连接次数: {} *************", count.longValue());
this.token = token;
this.roomId = roomId;
// 保存session相关信息到本地
this.sessionId = session.getId();
this.session = session;
RedisUtil.increment(SocketConstants.LIVE_COUNT_HASH_KEY, roomId);
SocketCache.put(sessionId, this);
// 初始化操作,通知客户端已经建立链接成功,并且初始化直播间在线人数个数
originMessageSender.send(buildMessage("", 1));
}
/**
* 客户端刷新页面,或者关闭页面,服务端断开连接等等操作,都需要关闭连接
*/
@OnClose
public void closeConnection() {
SocketCache.remove(sessionId);
RedisUtil.decrement(SocketConstants.LIVE_COUNT_HASH_KEY, roomId);
// 客户端退出,通知其他页面,在线人数-1
originMessageSender.send(buildMessage("", 1));
}
/**
* 客户端发送消息给服务端
*
* @param message
*/
@OnMessage
public void onMessage(String message) {
if (StringUtils.isBlank(message)) {
return;
}
// 校验当前Token,不合法或者失效,重新登录
if (!JwtUtil.isVerify(token)) {
// 定点通知客户端
sendMessageSingle("TOKEN_ERROR", 4);
return;
}
Long userIdFromToken = JwtUtil.getUerIdFromClaim(token);
if (userIdFromToken == null || userIdFromToken == 0L) {
// 定点通知客户端,发送弹幕需要进行登录
sendMessageSingle("NO_LOGIN", 3);
return;
}
this.userId = String.valueOf(userIdFromToken);
// 将消息丢给MQ,业务上的处理什么也不管,交给弹幕业务来处理,并且达到削峰的目的
originMessageSender.send(buildMessage(message, 2));
}
private void sendMessageSingle(String message, Integer operateType) {
OriginMessage buildMessage = buildMessage(message, operateType);
try {
this.session.getBasicRemote().sendText(JSONObject.toJSONString(buildMessage));
} catch (IOException e) {
e.printStackTrace();
}
}
private OriginMessage buildMessage(String message, Integer operateType) {
OriginMessage originMessage = new OriginMessage();
originMessage.setMessage(message);
originMessage.setRoomId(roomId);
originMessage.setSessionId(sessionId);
originMessage.setUserId(userId);
originMessage.setOperateType(operateType);
originMessage.setCount(RedisUtil.get(SocketConstants.LIVE_COUNT_HASH_KEY, roomId));
return originMessage;
}
}
前端代码做出修改index.tsx
:
import React, { useEffect, useState } from 'react';
import { Button, Row, Col, Input, Modal } from 'antd';
import { getValueByParam } from '../utils/pageHelper';
const { warning } = Modal;
// 生成的JWT的Token,我这里是写死的
const token = 'eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMDAxMCIsImV4cCI6MTY3Mzc3NTYyMSwidXNlcklkIjoxMDAxMCwiaWF0IjoxNjcxMTgzNjIxLCJqdGkiOiJmOTY2OTlhYy00ODRhLTRiNDUtYjBlMi04YjVkY2U4ZjVmZGYifQ.Ie0obuzOksJL0a63bpoDfvILFrujnj-kAL0z_3tdwMI';
const errorToken = '123';
// userId为0的,当没登录
const emptyToken = 'eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIwIiwiZXhwIjoxNjczNzc3MzMwLCJ1c2VySWQiOjAsImlhdCI6MTY3MTE4NTMzMCwianRpIjoiYjZkOWUyYjUtZDVkYy00MjVhLWE3NjMtOTNjNzFhNDQ2YWIwIn0.pw1hIt-bK-1ctjIGBrINuAIzADYLy1058qMHv3nxr4w';
const ws = new WebSocket(`ws://localhost:80/websocket/live/${getValueByParam('roomId')}/${token}`);
const UserPage = () => {
const [ message, setMessage ] = useState<string>('');
const [ bulletList, setBulletList ] = useState<any>([]);
const [ onlineCount, setOnlineCount ] = useState<number>(0);
useEffect(() => {
ws.onopen = () => {
ws.onmessage = (msg: any) => {
const entity: any = JSON.parse(msg.data);
if (entity?.operateType === 2) {
const arr :any = [ `用户[${entity.userId}]: ${entity.message}` ];
setBulletList((pre: any[]) => [].concat(...pre, ...arr));
} else if (entity?.operateType === 3) {
warning({ content: '请先登录在发送弹幕!' });
} else if (entity?.operateType === 4) {
warning({ content: '登录已失效,请重新登录!' });
}
setOnlineCount(entity?.count ?? 0);
};
};
ws.onclose = () => {
console.log('断开连接');
};
}, []);
const sendMsg = () => {
ws?.send(message);
};
return <>
<Row style={{ width: 2000, marginTop: 200 }}>
<Col offset={6}>
<Input onChange={event => setMessage(event.target.value)} />
</Col>
<Col>
<Button
onClick={sendMsg}
type='primary'
>发送弹幕</Button>
</Col>
<Col style={{ marginLeft: 100 }}>
{'在线人数: ' + onlineCount}
</Col>
<Col style={{ marginLeft: 10 }}>
<div style={{ border: '1px solid', width: 500, height: 500 }}>
{bulletList.map((item: string, index: number) => {
return <Row key={index}>
{item}
</Row>;
})}
</div>
</Col>
</Row>
</>;
};
export default UserPage;
1.2 测试
生成Token
的UT
代码:
public class Test {
public static void main(String[] args) {
String token = JwtUtil.buildToken(0L);
// String token = JwtUtil.buildToken(10010L);
System.out.println(token);
}
}
使用正常的Token
:token
(都写在代码里面了,读者需要生成一下然后替换)
使用错误Token
(比如过期):errorToken
(都写在代码里面了,读者需要生成一下然后替换)
使用未登录的Token
:emptyToken
(都写在代码里面了,读者需要生成一下然后替换)
其实,如果没登录的话,或者Token
不合法,可以给他一个统一的状态给前端。都当做Token
不合法即可。也可以对于游客这种身份,给个默认的userId
,例如0。或者是随便传一个错误的Token
。
而使用JWT
这种技术,一旦实现登录了,Token
就可以丢到浏览器里面了,那么用户在打开这个直播间的那一刻就已经知晓当前用户是否登录了。后端只需要拿到这个Token
进行校验即可。
但是这种写法其实还是有一些问题:
- 我们是通过
URL
上传Token
来完成参数传递的。而非请求头。原生的WebSocket
对这方面的支持比较差。 - 如果可以在拦截器层面直接完成用户校验和消息拦截,无需走入
@OnMessage
修饰的函数中那就是最好的。但很可惜,一般拦截器只针对于HTTP
请求,对WebSocket
则不支持。
因此后续还是学习下Netty
这方面的框架,探究Netty
对于WebSocket
写法的替代性。