说明
仅是个人的不成熟想法, 未深入研究验证
1. 修改 NettyServerHandler类
package com.hahashou.netty.server.config;
import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.HashedWheelTimer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
* @description:
* @author: 哼唧兽
* @date: 9999/9/21
**/
@Component
@ChannelHandler.Sharable
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/** key: 用户code; value: channelId */
public static Map<String, String> USER_CHANNEL = new ConcurrentHashMap<>(32);
/** key: channelId; value: Channel */
public static Map<String, Channel> CHANNEL = new ConcurrentHashMap<>(32);
/** 最好是单独写个单例(注意: 最多只能new 64个此类对象) */
public static HashedWheelTimer TIMER;
@Override
public void channelActive(ChannelHandlerContext ctx) {
Channel channel = ctx.channel();
String channelId = channel.id().asLongText();
log.info("有客户端连接, channelId : {}", channelId);
CHANNEL.put(channelId, channel);
Message message = new Message();
message.setChannelId(channelId);
channel.writeAndFlush(Message.transfer(message));
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
log.info("有客户端断开连接, channelId : {}", ctx.channel().id().asLongText());
CHANNEL.remove(ctx.channel().id().asLongText());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg != null) {
Message message = JSON.parseObject(msg.toString(), Message.class);
String userCode = message.getUserCode(),
channelId = message.getChannelId();
if (StringUtils.hasText(userCode) && StringUtils.hasText(channelId)) {
connect(userCode, channelId);
} else if (StringUtils.hasText(message.getText())) {
if (StringUtils.hasText(message.getFriendUserCode())) {
sendOtherClient(message);
} else {
sendAdmin(ctx.channel(), message);
}
}
}
}
/**
* 建立连接
* @param userCode
* @param channelId
*/
private void connect(String userCode, String channelId) {
log.info("客户端 {} 连接", userCode);
USER_CHANNEL.put(userCode, channelId);
if (TIMER == null) {
// 默认的时间轮是100毫秒的tick间隔, 0.1秒的误差
TIMER = new HashedWheelTimer();
}
TIMER.newTimeout(new OfflineMessage(userCode), 1, TimeUnit.SECONDS);
}
/**
* 发送给其他客户端
* @param message
*/
private void sendOtherClient(Message message) {
String friendUserCode = message.getFriendUserCode();
String queryChannelId = USER_CHANNEL.get(friendUserCode);
if (StringUtils.hasText(queryChannelId)) {
Channel channel = CHANNEL.get(queryChannelId);
if (channel == null) {
offlineMessage(friendUserCode, message);
return;
}
channel.writeAndFlush(Message.transfer(message));
} else {
offlineMessage(friendUserCode, message);
}
}
/**
* 离线消息存储
* @param friendUserCode
* @param message
*/
public void offlineMessage(String friendUserCode, Message message) {
List<Message> messageList = OfflineMessage.USER_MESSAGE.get(friendUserCode);
if (CollectionUtils.isEmpty(messageList)) {
messageList = new ArrayList<>();
}
messageList.add(message);
OfflineMessage.USER_MESSAGE.put(friendUserCode, messageList);
}
/**
* 发送给服务端
* @param channel
* @param message
*/
private void sendAdmin(Channel channel, Message message) {
message.setUserCode("ADMIN");
message.setText(LocalDateTime.now().toString());
channel.writeAndFlush(Message.transfer(message));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.info("有客户端发生异常, channelId : {}", ctx.channel().id().asLongText());
}
}
2. config包下增加 OfflineMessage类
package com.hahashou.netty.server.config;
import io.netty.channel.Channel;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @description: 离线消息
* @author: 哼唧兽
* @date: 9999/9/21
**/
@Slf4j
public class OfflineMessage implements TimerTask {
public static Map<String, List<Message>> USER_MESSAGE = new ConcurrentHashMap<>(32);
private String userCode;
public OfflineMessage(String userCode) {
this.userCode = userCode;
}
@Override
public void run(Timeout timeout) {
List<Message> messageList = USER_MESSAGE.get(userCode);
if (CollectionUtils.isEmpty(messageList)) {
return;
}
log.info("向 {} 推送离线消息", userCode);
Channel channel = NettyServerHandler.CHANNEL.get(NettyServerHandler.USER_CHANNEL.get(userCode));
for (Message offlineMessage : messageList) {
channel.writeAndFlush(Message.transfer(offlineMessage));
}
}
}
3. 启动服务端以及客户端A, 发送几条离线消息
之后, 启动客户端B接收离线消息。启动/停止了4次, 得到如下4个结果
1
2
3
4
可以看出, 得到的离线消息并不可靠, 虽然有2次结果一致。而且在这之前, 有一次启动时, 根本就是1条消息都没有, 我都一度怀疑我写的有问题
4. 个人猜想
因为是异步的, netty发送消息时, 轮询策略应该有个时间轮管理着, 且时间轮是有tick间隔的。java中for循环的执行效率大概是10个循环耗时1毫秒, 0.001秒, 如果在for循环中增加线程sleep, 或许就都能执行到, 所以我在OfflineMessage类中for循环中增加50毫秒的slepp, 5次测试结果一致, 后将50改成10, 5次测试结果一致。虽然测试没有问题, 但毕竟测试量太少, 且我觉得离线消息应该是能通过接口一次性就获取到, 所以这种通过netty获取离线消息的方式, 我不赞同
for (Message offlineMessage : messageList) {
// 异常向上抛出或捕获
Thread.sleep(50);
channel.writeAndFlush(Message.transfer(offlineMessage));
}