文章目录
- 十一、打造QQ在线状态功能之为你的应用增添色彩
- 1、在线状态设计
- 2、Netty网关用户状态变更通知、登录ack
- 3、逻辑层处理用户上线下线
- 4、在线状态订阅—临时订阅
- 5、实现手动设置客户端状态接口
- 6、推拉结合实现在线状态更新
- 十二、IM扩展—能做的事情还有很多
- 1、如何让陌生人只能发送几条消息
- 2、如何实现消息的撤回
- 3、如何设计亿级聊天记录存储方案
项目源代码
目录 |
---|
IM即时通讯系统[SpringBoot+Netty]——梳理(一) |
IM即时通讯系统[SpringBoot+Netty]——梳理(二) |
IM即时通讯系统[SpringBoot+Netty]——梳理(三) |
IM即时通讯系统[SpringBoot+Netty]——梳理(四) |
十一、打造QQ在线状态功能之为你的应用增添色彩
这个状态是指用户的状态,最先接触的也是qq,还有一种状态叫服务端状态,实现一套在线机制对服务端性能消耗是极大的,手机端在线离线的频率是很高的,微信切进来看几眼算上线,退出去又算离线,假设有100个好友一个操作就会裂变成100个操作,设计这个在线功能也还是从业务需求的角度来设计的
1、在线状态设计
需求一:需要实时的更新好友的状态,有一个标识可以辨别在线和离线,在线和离线可以实时得到感知,手动修改忙碌啥的状态可以实时通知到好友
需求二:打开群组等,可以获取到这一批人的在线状态,在线的会有一个在线的标识,和好友一样可以实时感知到用户下线了,可以实时的将在线修改为离线
改进
- 改进一:状态变更只推送给在线的用户
- 改进二:使用按需拉取、临时订阅的方式
2、Netty网关用户状态变更通知、登录ack
在tcp层通知逻辑层某个用户上线了
3、逻辑层处理用户上线下线
首先在user里面搞一个mq接收类去接收状态变更消息
然后当处理到了用户状态变更的命令就处理
@Override
public void processUserOnlineStatusNotify(UserStatusChangeNotifyContent content) {
// 1、获取到该用户的所有 session,并将其设置到pack中
List<UserSession> userSessions
= userSessionUtils.getUserSession(content.getAppId(), content.getUserId());
UserStatusChangeNotifyPack userStatusChangeNotifyPack = new UserStatusChangeNotifyPack();
BeanUtils.copyProperties(content, userStatusChangeNotifyPack);
userStatusChangeNotifyPack.setClient(userSessions);
// TODO 发送给自己的同步端
syncSender(userStatusChangeNotifyPack, content.getUserId(),
UserEventCommand.USER_ONLINE_STATUS_CHANGE_NOTIFY_SYNC, content);
// TODO 同步给好友和订阅了自己的人
dispatcher(userStatusChangeNotifyPack, content.getUserId(), UserEventCommand.USER_ONLINE_STATUS_CHANGE_NOTIFY,
content.getAppId());
}
// 同步自己端
private void syncSender(Object pack, String userId, Command command, ClientInfo clientInfo){
messageProducer.sendToUserExceptClient(userId, command,
pack, clientInfo);
}
// 同步对方端口
private void dispatcher(Object pack, String userId, Command command, Integer appId){
// TODO 获取指定用户的所有好友id
List<String> allFriendId = imFriendShipService.getAllFriendId(userId, appId);
for (String fid : allFriendId) {
messageProducer.sendToUser(fid, command,
pack, appId);
}
// TODO 发送给临时订阅的人
String key = appId+ ":" + Constants.RedisConstants.subscribe + ":" + userId;
// 取出key中的所有key
Set<Object> keys = stringRedisTemplate.opsForHash().keys(key);
// 遍历
for (Object k : keys) {
String filed = (String)k;
// 取出其中的过期时间
Long expired = Long.valueOf((String) Objects.requireNonNull(stringRedisTemplate.opsForHash().get(key, filed)));
// 如果没有过期,就要给他发送
if(expired > 0 && expired > System.currentTimeMillis()){
messageProducer.sendToUser(filed, UserEventCommand.USER_ONLINE_STATUS_CHANGE_NOTIFY,
pack, appId);
}else{
stringRedisTemplate.opsForHash().delete(key, filed);
}
}
}
要通知自己其他端登录,还要通知好友和订阅了改用户的用户们
4、在线状态订阅—临时订阅
// 订阅用户状态
@Override
public void subscribeUserOnlineStatus(SubscribeUserOnlineStatusReq req) {
Long subExpireTime = 0L;
if(req != null && req.getSubTime() > 0){
subExpireTime = System.currentTimeMillis() + req.getSubTime();
}
// 使用redis的hash结构存储订阅用户的状态
for (String subUserId : req.getSubUserId()) {
String key = req.getAppId() + ":" + Constants.RedisConstants.subscribe + ":" + subUserId;
stringRedisTemplate.opsForHash().put(key, req.getOperater(), subExpireTime.toString());
}
}
这其中涉及到了一个用户被多个用户订阅,所以要选择一种合适的redis的数据结构去存储,也就是使用hash不错
逻辑层处理用户上线下线的时候,同步给临时订阅的人消息的时候,也是通过获取这个redis中的数据去做的分发处理
5、实现手动设置客户端状态接口
// 设置客户端状态
@Override
public void setUserCustomerStatus(SetUserCustomerStatusReq req) {
// 包
UserCustomStatusChangeNotifyPack userCustomStatusChangeNotifyPack = new UserCustomStatusChangeNotifyPack();
userCustomStatusChangeNotifyPack.setCustomStatus(req.getCustomStatus());
userCustomStatusChangeNotifyPack.setCustomText(req.getCustomText());
userCustomStatusChangeNotifyPack.setUserId(req.getUserId());
// 将状态存储到redis中
String key = req.getAppId() + ":" + Constants.RedisConstants.userCustomerStatus + ":" + req.getUserId();
stringRedisTemplate.opsForValue().set(key, JSONObject.toJSONString(userCustomStatusChangeNotifyPack));
syncSender(userCustomStatusChangeNotifyPack, req.getUserId()
, UserEventCommand.USER_ONLINE_STATUS__SET_CHANGE_NOTIFY_SYNC,
new ClientInfo(req.getAppId(), req.getClientType(), req.getImei()));
dispatcher(userCustomStatusChangeNotifyPack, req.getUserId()
, UserEventCommand.USER_ONLINE_STATUS__SET_CHANGE_NOTIFY, req.getAppId());
}
6、推拉结合实现在线状态更新
// 拉取指定用户的状态
@Override
public Map<String, UserOnlineStatusResp> queryUserOnlineStatus(PullUserOnlineStatusReq req) {
return getUserOnlineStatus(req.getUserList(), req.getAppId());
}
// 拉取所有用户的状态
@Override
public Map<String, UserOnlineStatusResp> queryFriendOnlineStatus(PullFriendOnlineStatusReq req) {
List<String> allFriendId = imFriendShipService.getAllFriendId(req.getOperater(), req.getAppId());
return getUserOnlineStatus(allFriendId, req.getAppId());
}
// 拉取用户在线状态
private Map<String, UserOnlineStatusResp> getUserOnlineStatus(List<String> userId,Integer appId){
// 返回类
Map<String, UserOnlineStatusResp> res = new HashMap<>(userId.size());
for (String uid : userId) {
UserOnlineStatusResp resp = new UserOnlineStatusResp();
// 拉取服务端的状态
List<UserSession> userSession = userSessionUtils.getUserSession(appId, uid);
resp.setSession(userSession);
// 拉取客户端的状态
String key = appId + ":" + Constants.RedisConstants.userCustomerStatus + ":" + uid;
String s = stringRedisTemplate.opsForValue().get(key);
if(StringUtils.isNotBlank(s)){
JSONObject parse = (JSONObject) JSON.parse(s);
resp.setCustomText(parse.getString("customText"));
resp.setCustomStatus(parse.getInteger("customStatus"));
}
res.put(uid, resp);
}
return res;
}
拉取用户的状态信息的各种接口
十二、IM扩展—能做的事情还有很多
1、如何让陌生人只能发送几条消息
有一些软件是没有称为好友的话,只能发送3条呀几条的消息消息,这个功能要怎么实现呢
这里采用的方案是用回调,在回调前和回调后做一些逻辑上的判断,来决定下面的操作是否执行
2、如何实现消息的撤回
只能由im系统撤回,是一个command指令
撤回的本质就是将要撤回的那条消息变成,谁谁谁撤回了消息,将原消息变更称为一条新的消息,并且插入一条新的消息
逻辑:
- 修改历史消息的状态
- 修改离线消息的状态
- ack给发送方
- 发送给同步端
- 分发给消息的接收方
// 撤回消息
public void recallMessage(RecallMessageContent messageContent) {
// 如果消息发送超过一定的时间就不可以撤回了
Long messageTime = messageContent.getMessageTime();
Long now = System.currentTimeMillis();
RecallMessageNotifyPack pack = new RecallMessageNotifyPack();
BeanUtils.copyProperties(messageContent, pack);
if(120000L < now - messageTime){
recallAck(pack, ResponseVO.errorResponse(MessageErrorCode.MESSAGE_RECALL_TIME_OUT), messageContent);
return;
}
LambdaQueryWrapper<ImMessageBodyEntity> lqw = new LambdaQueryWrapper<>();
lqw.eq(ImMessageBodyEntity::getAppId, messageContent.getAppId());
lqw.eq(ImMessageBodyEntity::getMessageKey, messageContent.getMessageKey());
ImMessageBodyEntity body = imMessageBodyMapper.selectOne(lqw);
// 如果查不到该消息的话
if(body == null){
// TODO ack失败 不存在的消息体不能撤回
recallAck(pack, ResponseVO.errorResponse(MessageErrorCode.MESSAGEBODY_IS_NOT_EXIST), messageContent);
return;
}
// 如果该消息已经被撤回
if(body.getDelFlag() == DelFlagEnum.DELETE.getCode()){
recallAck(pack, ResponseVO.errorResponse(MessageErrorCode.MESSAGE_IS_RECALLED), messageContent);
return;
}
// 经过上面的判断,这时候的该信息就是没有撤回且正常的消息,下面就该进行修改历史信息
body.setDelFlag(DelFlagEnum.DELETE.getCode());
imMessageBodyMapper.update(body, lqw);
// 如果撤回的消息的单聊的话
if(messageContent.getConversationType() == ConversationTypeEnum.P2P.getCode()){
// fromId的队列
String fromKey = messageContent.getAppId() + ":"
+ Constants.RedisConstants.OfflineMessage + ":" + messageContent.getFromId();
// toId的队列
String toKey = messageContent.getAppId() + ":"
+ Constants.RedisConstants.OfflineMessage + ":" + messageContent.getToId();
// 构建离线消息体
OfflineMessageContent offlineMessageContent = new OfflineMessageContent();
BeanUtils.copyProperties(messageContent, offlineMessageContent);
offlineMessageContent.setDelFlag(DelFlagEnum.DELETE.getCode());
offlineMessageContent.setMessageKey(messageContent.getMessageKey());
offlineMessageContent.setConversationType(ConversationTypeEnum.P2P.getCode());
offlineMessageContent.setConversationId(conversationService.conversationConversationId(
offlineMessageContent.getConversationType(), messageContent.getFromId(), messageContent.getToId()));
offlineMessageContent.setMessageBody(body.getMessageBody());
long seq = redisSeq.doGetSeq(messageContent.getAppId()
+ ":" + Constants.SeqConstants.Message
+ ":" + ConversationIdGenerate.generateP2PId(messageContent.getFromId(), messageContent.getToId()));
offlineMessageContent.setMessageSequence(seq);
long messageKey = SnowflakeIdWorker.nextId();
redisTemplate.opsForZSet().add(fromKey, JSONObject.toJSONString(offlineMessageContent), messageKey);
redisTemplate.opsForZSet().add(toKey, JSONObject.toJSONString(offlineMessageContent), messageKey);
// ack
recallAck(pack, ResponseVO.successResponse(), messageContent);
// 分发给同步端
messageProducer.sendToUserExceptClient(messageContent.getFromId(), MessageCommand.MSG_RECALL_NOTIFY,
pack, messageContent);
// 分发给对方
messageProducer.sendToUser(messageContent.getToId(), MessageCommand.MSG_RECALL_NOTIFY,
pack, messageContent);
}else{
List<String> groupMemberId
= imGroupMemberService.getGroupMemberId(messageContent.getToId(), messageContent.getAppId());
long seq = redisSeq.doGetSeq(messageContent.getAppId() + ":" + Constants.SeqConstants.Message
+ ":" + ConversationIdGenerate.generateP2PId(messageContent.getFromId(), messageContent.getToId()));
// ack
recallAck(pack, ResponseVO.successResponse(), messageContent);
// 发送给同步端
messageProducer.sendToUserExceptClient(messageContent.getFromId(), MessageCommand.MSG_RECALL_NOTIFY,
pack, messageContent);
// 同步给对方端
for (String memberId : groupMemberId) {
String toKey = messageContent.getAppId() + ":" + Constants.RedisConstants.OfflineMessage + ":"
+ memberId;
OfflineMessageContent offlineMessageContent = new OfflineMessageContent();
offlineMessageContent.setDelFlag(DelFlagEnum.DELETE.getCode());
BeanUtils.copyProperties(messageContent, offlineMessageContent);
offlineMessageContent.setConversationType(ConversationTypeEnum.GROUP.getCode());
offlineMessageContent.setConversationId(conversationService.conversationConversationId(
offlineMessageContent.getConversationType(), messageContent.getFromId(), messageContent.getToId()
));
offlineMessageContent.setMessageBody(body.getMessageBody());
offlineMessageContent.setMessageSequence(seq);
redisTemplate.opsForZSet().add(toKey, JSONObject.toJSONString(offlineMessageContent), seq);
groupMessageProducer.producer(messageContent.getFromId(), MessageCommand.MSG_RECALL_NOTIFY
,pack, messageContent);
}
}
}
3、如何设计亿级聊天记录存储方案
比较主流的qq、微信对于聊天记录的存储都是有期限的,也就是说我们查询的聊天记录,怎么存储的多怎么取的快
对owner_id加上索引,这个查询就变成了一次索引查询一次,主键查询,这样就满足了查的快了
但是如果只有一个应用的话没问题,但是如果有多个应用接入的情况下,全部的消息存储到这张表中的话,就有点难了
分库
按照appId进行分库,这样的只适用于你已知多少多少应用接入的情况下,才能准确的进行分库,而且也不能部署太多的库,每一个应用都要有一个库的话,太繁琐了
数据迁移
我们可以限定一个期限,超过的期限的记录就不允许用户去查了,但是后台可以查询,这些数据就被迁移到其他的地方,以供后台查询,而且后台慢的查也可以,原来的数据表中的数据就都没了,就变成一个新的表了,然后再放进去数据,查到的也就是最新的了
分表
比较适合已知有多少个应用接入,基于owener_id做分表
基于时间戳做分表,就不用考虑多少个应用接入,但是缺点就是一张表要固定存储一端时间的数据,有的时间段消息多,有的时间段消息少,可能不太好
public class MessageKeyGenerate {
//标识从2020.1.1开始
private static final long T202001010000 = 1577808000000L;
// private Lock lock = new ReentrantLock();
AtomicReference<Thread> owner = new AtomicReference<>();
private static volatile int rotateId = 0;
private static int rotateIdWidth = 15;
private static int rotateIdMask = 32767;
private static volatile long timeId = 0;
private int nodeId = 0;
private static int nodeIdWidth = 6;
private static int nodeIdMask = 63;
public void setNodeId(int nodeId) {
this.nodeId = nodeId;
}
public synchronized long generateId() throws Exception {
// lock.lock();
this.lock();
rotateId = rotateId + 1;
long id = System.currentTimeMillis() - T202001010000;
//不同毫秒数生成的id要重置timeId和自选次数
if (id > timeId) {
timeId = id;
rotateId = 1;
} else if (id == timeId) {
//表示是同一毫秒的请求
if (rotateId == rotateIdMask) {
//一毫秒只能发送32768到这里表示当前毫秒数已经超过了
while (id <= timeId) {
//重新给id赋值
id = System.currentTimeMillis() - T202001010000;
}
this.unLock();
return generateId();
}
}
id <<= nodeIdWidth;
id += (nodeId & nodeIdMask);
id <<= rotateIdWidth;
id += rotateId;
// lock.unlock();
this.unLock();
return id;
}
public static int getSharding(long mid) {
Calendar calendar = Calendar.getInstance();
mid >>= nodeIdWidth;
mid >>= rotateIdWidth;
calendar.setTime(new Date(T202001010000 + mid));
int month = calendar.get(Calendar.MONTH);
int year = calendar.get(Calendar.YEAR);
year %= 3;
return (year * 12 + month);
}
public static long getMsgIdFromTimestamp(long timestamp) {
long id = timestamp - T202001010000;
id <<= rotateIdWidth;
id <<= nodeIdWidth;
return id;
}
public void lock() {
Thread cur = Thread.currentThread();
while (!owner.compareAndSet(null, cur)){
}
}
public void unLock() {
Thread cur = Thread.currentThread();
owner.compareAndSet(cur, null);
}
public static void main(String[] args) throws Exception {
MessageKeyGenerate messageKeyGenerate = new MessageKeyGenerate();
for (int i = 0; i < 10; i++) {
long l = messageKeyGenerate.generateId();
System.out.println(l);
}
//im_message_history_12
//10000 10001
//0 1
long msgIdFromTimestamp = getMsgIdFromTimestamp(1734529845000L);
int sharding = getSharding(msgIdFromTimestamp);
System.out.println(sharding);
}
}
这个是优化后的基于时间戳生成message_id的策略
最后这点东西听不太懂了,就这样吧!