说明
前面消息互发以及广播都是单机就可以完成测试, 但实际场景中客户端的连接数量很大, 那就需要有一定数量的服务端去支撑, 所以准备虚拟机测试。
1. 虚拟机准备
1.1 准备1个1核1G的虚拟机(160), 配置java环境, 安装redis和minio
1.2 准备6个1核1G的空虚拟机(161到166), 只需要java环境即可
2. 服务端改造
2.1 修改 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.hahashou.netty</groupId>
<artifactId>server</artifactId>
<version>1.0-SNAPSHOT</version>
<name>server</name>
<description>Netty Server Project For Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.100.Final</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.58</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-crypto</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
2.2 修改 application.yml (每个服务端的id是不一样的)
server:
port: 32000
spring:
redis:
host: 192.168.109.160
port: 6379
password: root
logging:
level:
com.hahashou.netty: info
netty:
server:
# 唯一标识(与hosts文件里对应)
id : netty-server-1
# 客户端需要连接的端口
port: 35000
2.3 config包下增加 NettyStatic类
package com.hahashou.netty.server.config;
import io.netty.channel.Channel;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @description: 静态常量
* @author: 哼唧兽
* @date: 9999/9/21
**/
public class NettyStatic {
/** key: 用户code; value: channelId */
public static Map<String, String> USER_CHANNEL = new ConcurrentHashMap<>(32);
/** key: channelId; value: Channel */
public static Map<String, Channel> CHANNEL = new ConcurrentHashMap<>(32);
public static Map<String, NettyClientHandler> NETTY_CLIENT_HANDLER = new ConcurrentHashMap<>(32);
public static Map<NettyClientHandler, NettyClient> NETTY_CLIENT = new ConcurrentHashMap<>(32);
}
2.4 config包下增加 RedisConfig类
package com.hahashou.netty.server.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
/**
* @description: Redis配置
* @author: 哼唧兽
* @date: 9999/9/21
**/
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
// 使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值
redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
// 使用StringRedisSerializer来序列化和反序列化redis的key
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
// 开启事务:redisTemplate.setEnableTransactionSupport(true); 我觉得一般用不到(该操作是为了执行一组命令而设置的)
redisTemplate.setConnectionFactory(redisConnectionFactory);
return redisTemplate;
}
@Bean
public ValueOperations<String, Object> redisOperation(RedisTemplate<String, Object> redisTemplate) {
return redisTemplate.opsForValue();
}
public static String NETTY_SERVER_LOCK = "NETTY_SERVER_LOCK";
public static String NETTY_SERVER_LIST = "NETTY_SERVER_LIST";
public static String OFFLINE_MESSAGE = "OFFLINE_MESSAGE_";
}
2.5 修改 EventLoopGroupConfig类
package com.hahashou.netty.server.config;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.RejectedExecutionHandlers;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @description: Netty线程组
* @author: 哼唧兽
* @date: 9999/9/21
**/
@Configuration
public class EventLoopGroupConfig {
private int bossNum = 1;
private int workerNum = 4;
private int businessNum = 1;
private int maxPending = 100000;
/** ------------------------------ 服务端 ------------------------------ */
@Bean("bossGroup")
public NioEventLoopGroup bossGroup() {
return new NioEventLoopGroup(bossNum);
}
@Bean("workerGroup")
public NioEventLoopGroup workerGroup() {
return new NioEventLoopGroup(workerNum);
}
@Bean("businessGroup")
public EventExecutorGroup businessGroup() {
return new DefaultEventExecutorGroup(businessNum, new BusinessThreadFactory(),
maxPending, RejectedExecutionHandlers.reject());
}
/** ------------------------------ 客户端 ------------------------------ */
@Bean("clientWorkerGroup")
public NioEventLoopGroup clientWorkerGroup() {
return new NioEventLoopGroup(workerNum);
}
@Bean("clientBusinessGroup")
public EventExecutorGroup clientBusinessGroup() {
return new DefaultEventExecutorGroup(businessNum, new BusinessThreadFactory(), maxPending, RejectedExecutionHandlers.reject());
}
static class BusinessThreadFactory implements ThreadFactory {
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
BusinessThreadFactory() {
SecurityManager securityManager = System.getSecurityManager();
group = (securityManager != null) ? securityManager.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = "netty-server-";
}
@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(group, runnable, namePrefix + threadNumber.getAndIncrement(), 0);
if (thread.isDaemon()) {
thread.setDaemon(false);
}
if (thread.getPriority() != Thread.NORM_PRIORITY) {
thread.setPriority(Thread.NORM_PRIORITY);
}
return thread;
}
}
}
2.6 config包下增加 SpringBean类
package com.hahashou.netty.server.config;
import io.netty.util.HashedWheelTimer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
import org.springframework.security.crypto.password.PasswordEncoder;
/**
* @description: Spring Bean管理
* @author: 哼唧兽
* @date: 9999/9/21
**/
@Configuration
public class SpringBean {
@Bean
public PasswordEncoder passwordEncoder() {
return new BCryptPasswordEncoder();
}
/**
* 最多能new64个, private static final int INSTANCE_COUNT_LIMIT = 64;
* @return
*/
@Bean
public HashedWheelTimer hashedWheelTimer() {
// 默认tick间隔100毫秒, 轮子大小为512
return new HashedWheelTimer();
}
}
2.7 server包下增加 ApplicationInitial类
package com.hahashou.netty.server;
import com.hahashou.netty.server.config.NettyServer;
import io.netty.util.HashedWheelTimer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
/**
* @description: 应用初始化
* @author: 哼唧兽
* @date: 9999/9/21
**/
@Component
@Slf4j
public class ApplicationInitial implements ApplicationRunner {
@Resource
private HashedWheelTimer hashedWheelTimer;
@Resource
private NettyServer nettyServer;
@Override
public void run(ApplicationArguments args) {
hashedWheelTimer.newTimeout(nettyServer, 1 , TimeUnit.SECONDS);
}
}
2.8 修改 Message类
package com.hahashou.netty.server.config;
import com.alibaba.fastjson.JSON;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
import lombok.Data;
import lombok.Getter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
/**
* @description:
* @author: 哼唧兽
* @date: 9999/9/21
**/
@Data
public class Message {
/** 广播秘钥 */
private String secretKey;
/** 发送者用户code */
private String userCode;
/** 中转的服务端Id */
private String serverId;
/** 接收者用户code */
private String friendUserCode;
/** 连接时专用 */
private String channelId;
/** 消息类型 */
private Integer type;
public enum TypeEnum {
TEXT(0, "文字", "", new ArrayList<>()),
IMAGE(1, "图片", "image", Arrays.asList("bmp", "gif", "jpeg", "jpg", "png")),
VOICE(2, "语音", "voice", Arrays.asList("mp3", "amr", "flac", "wma", "aac")),
VIDEO(3, "视频", "video", Arrays.asList("mp4", "avi", "rmvb", "flv", "3gp", "ts", "mkv")),
;
@Getter
private Integer key;
@Getter
private String describe;
@Getter
private String bucketName;
@Getter
private List<String> formatList;
TypeEnum(int key, String describe, String bucketName, List<String> formatList) {
this.key = key;
this.describe = describe;
this.bucketName = bucketName;
this.formatList = formatList;
}
public static TypeEnum select(String format) {
TypeEnum result = null;
for (TypeEnum typeEnum : TypeEnum.values()) {
if (typeEnum.getFormatList().contains(format)) {
result = typeEnum;
break;
}
}
return result;
}
}
/** 文字或文件的全路径名称 */
private String text;
public static ByteBuf transfer(Message message) {
return Unpooled.copiedBuffer(JSON.toJSONString(message), CharsetUtil.UTF_8);
}
/**
* 生成指定长度的随机字符串
* @param length
* @return
*/
public static String randomString (int length) {
if (length > 64) {
length = 64;
}
List<String> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
list.add(i + "");
}
for (char i = 'A'; i <= 'Z'; i++) {
list.add(String.valueOf(i));
}
for (char i = 'a'; i <= 'z'; i++) {
list.add(String.valueOf(i));
}
list.add("α");
list.add("ω");
Collections.shuffle(list);
String string = list.toString();
return string.replace("[", "")
.replace("]", "")
.replace(", ", "")
.substring(0, length);
}
}
2.9 config包下增加 NettyClientHandler类
package com.hahashou.netty.server.config;
import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
/**
* @description:
* @author: 哼唧兽
* @date: 9999/9/21
**/
@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Getter
@Setter
private String userCode;
@Getter
@Setter
private String hostName;
@Getter
@Setter
private int port;
@Resource
private ValueOperations<String, Object> redisOperation;
@Override
public void channelActive(ChannelHandlerContext ctx) {
log.info("{}, 作为客户端, 与其他服务端连接", LocalDateTime.now());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
NettyStatic.CHANNEL.remove(ctx.channel().id().asLongText());
NettyClientHandler nettyClientHandler = NettyStatic.NETTY_CLIENT_HANDLER.remove(hostName + "@" + port);
NettyClient nettyClient = NettyStatic.NETTY_CLIENT.remove(nettyClientHandler);
nettyClient = null;
nettyClientHandler = null;
System.gc();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg != null) {
Message message = JSON.parseObject(msg.toString(), Message.class);
String channelId = message.getChannelId(),
text = message.getText();
if (StringUtils.hasText(channelId)) {
Channel channel = ctx.channel();
message.setUserCode(userCode);
NettyStatic.USER_CHANNEL.put(hostName, channelId);
NettyStatic.CHANNEL.put(channelId, channel);
channel.writeAndFlush(Message.transfer(message));
} else if (StringUtils.hasText(text)) {
String friendUserCode = message.getFriendUserCode();
if (StringUtils.hasText(message.getServerId())) {
String queryChannelId = NettyStatic.USER_CHANNEL.get(friendUserCode);
if (StringUtils.hasText(queryChannelId)) {
Channel channel = NettyStatic.CHANNEL.get(queryChannelId);
if (channel == null) {
offlineMessage(friendUserCode, message);
return;
}
// 此时, 已不需要serverId
message.setServerId(null);
channel.writeAndFlush(Message.transfer(message));
} else {
offlineMessage(friendUserCode, message);
}
}
}
}
}
/**
* 离线消息存储Redis
* @param friendUserCode
* @param message
*/
public void offlineMessage(String friendUserCode, Message message) {
List<Message> messageList = new ArrayList<>();
Object offlineMessage = redisOperation.get(RedisConfig.OFFLINE_MESSAGE + friendUserCode);
if (offlineMessage != null) {
messageList = JSON.parseArray(offlineMessage.toString(), Message.class);
}
messageList.add(message);
redisOperation.set(RedisConfig.OFFLINE_MESSAGE + friendUserCode, JSON.toJSONString(messageList));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
}
}
2.10 config包下增加 NettyClient类
package com.hahashou.netty.server.config;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.EventExecutorGroup;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.PreDestroy;
import java.net.*;
import java.nio.charset.Charset;
/**
* @description: Netty-客户端TCP服务
* @author: 哼唧兽
* @date: 9999/9/21
**/
@Slf4j
public class NettyClient {
@Getter
@Setter
private NioEventLoopGroup clientWorkerGroup;
@Getter
@Setter
private EventExecutorGroup clientBusinessGroup;
public void createClient(NettyClientHandler nettyClientHandler) {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(clientWorkerGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));
pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));
pipeline.addLast(clientBusinessGroup, nettyClientHandler);
}});
try {
InetAddress inetAddress = InetAddress.getByName(nettyClientHandler.getHostName());
SocketAddress socketAddress = new InetSocketAddress(inetAddress, nettyClientHandler.getPort());
bootstrap.connect(socketAddress).sync().channel();
} catch (UnknownHostException exception) {
log.error("请检查hosts文件是否配置正确 : {}", exception.getMessage());
} catch (InterruptedException exception) {
log.error("客户端中断异常 : {}", exception.getMessage());
}
}
@PreDestroy
public void destroy() {
clientWorkerGroup.shutdownGracefully().syncUninterruptibly();
log.info("客户端关闭成功");
}
}
2.11 修改 NettyServer类
package com.hahashou.netty.server.config;
import com.alibaba.fastjson.JSON;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.EventExecutorGroup;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.nio.charset.Charset;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* @description: Netty-服务端TCP服务
* @author: 哼唧兽
* @date: 9999/9/21
**/
@Component
@Slf4j
public class NettyServer implements TimerTask {
@Value("${netty.server.id}")
private String serverId;
@Value("${netty.server.port}")
private int port;
@Resource
private NioEventLoopGroup bossGroup;
@Resource
private NioEventLoopGroup workerGroup;
@Resource
private EventExecutorGroup businessGroup;
@Resource
private NettyServerHandler nettyServerHandler;
@Resource
private NioEventLoopGroup clientWorkerGroup;
@Resource
private EventExecutorGroup clientBusinessGroup;
@Resource
private RedisTemplate<String, Object> redisTemplate;
@Resource
private ValueOperations<String, Object> redisOperation;
@Resource
private HashedWheelTimer hashedWheelTimer;
@Override
public void run(Timeout timeout) {
Object nettyServerLock = redisOperation.get(RedisConfig.NETTY_SERVER_LOCK);
if (nettyServerLock != null) {
hashedWheelTimer.newTimeout(this, 10, TimeUnit.SECONDS);
return;
}
try {
redisOperation.set(RedisConfig.NETTY_SERVER_LOCK, true);
//String hostAddress = InetAddress.getLocalHost().getHostAddress();
ServerBootstrap serverBootstrap = new ServerBootstrap();
ChannelFuture channelFuture = serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));
pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));
pipeline.addLast(businessGroup, nettyServerHandler);
}
})
// 服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
// 此处有个大坑, 详见文章脱坑指南
.bind(port)
.sync();
if (channelFuture.isSuccess()) {
log.info("{} 启动成功", serverId);
redisTemplate.delete(RedisConfig.NETTY_SERVER_LOCK);
}
thisNodeHandle(port);
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException exception) {
log.error("{} 启动失败: {}", serverId, exception.getMessage());
} finally {
redisTemplate.delete(RedisConfig.NETTY_SERVER_LOCK);
}
}
private void thisNodeHandle(int port) {
Set<String> nodeList = new HashSet<>();
Object nettyServerList = redisOperation.get(RedisConfig.NETTY_SERVER_LIST);
if (nettyServerList != null) {
nodeList = new HashSet<>(JSON.parseArray(nettyServerList.toString(), String.class));
for (String hostAndPort : nodeList) {
String[] split = hostAndPort.split("@");
String connectHost = split[0];
int connectPort = Integer.parseInt(split[1]);
NettyClient nettyClient = new NettyClient();
nettyClient.setClientWorkerGroup(clientWorkerGroup);
nettyClient.setClientBusinessGroup(clientBusinessGroup);
NettyClientHandler nettyClientHandler = new NettyClientHandler();
nettyClientHandler.setUserCode(serverId);
nettyClientHandler.setHostName(connectHost);
nettyClientHandler.setPort(connectPort);
nettyClient.createClient(nettyClientHandler);
NettyStatic.NETTY_CLIENT_HANDLER.put(connectHost + "@" + connectPort, nettyClientHandler);
NettyStatic.NETTY_CLIENT.put(nettyClientHandler, nettyClient);
}
}
nodeList.add(serverId + "@" + port);
redisOperation.set(RedisConfig.NETTY_SERVER_LIST, JSON.toJSONString(nodeList));
}
public void stop() {
bossGroup.shutdownGracefully().syncUninterruptibly();
workerGroup.shutdownGracefully().syncUninterruptibly();
log.info("TCP服务关闭成功");
Object nettyServerList = redisOperation.get(RedisConfig.NETTY_SERVER_LIST);
List<String> hostList = JSON.parseArray(nettyServerList.toString(), String.class);
hostList.remove(serverId + "@" + port);
if (CollectionUtils.isEmpty(hostList)) {
redisTemplate.delete(RedisConfig.NETTY_SERVER_LIST);
} else {
redisOperation.set(RedisConfig.NETTY_SERVER_LIST, JSON.toJSONString(hostList));
}
}
@PreDestroy
public void destroy() {
stop();
}
}
2.12 修改 NettyServerHandler类
package com.hahashou.netty.server.config;
import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* @description:
* @author: 哼唧兽
* @date: 9999/9/21
**/
@Component
@ChannelHandler.Sharable
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Value("${netty.server.id}")
private String serverId;
public static String SERVER_PREFIX = "netty-server-";
@Resource
private RedisTemplate<String, Object> redisTemplate;
@Resource
private ValueOperations<String, Object> redisOperation;
@Override
public void channelActive(ChannelHandlerContext ctx) {
Channel channel = ctx.channel();
String channelId = channel.id().asLongText();
log.info("有客户端连接, channelId : {}", channelId);
NettyStatic.CHANNEL.put(channelId, channel);
Message message = new Message();
message.setChannelId(channelId);
channel.writeAndFlush(Message.transfer(message));
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
String channelId = ctx.channel().id().asLongText();
log.info("有客户端断开连接, channelId : {}", channelId);
NettyStatic.CHANNEL.remove(channelId);
for (Map.Entry<String, String> entry : NettyStatic.USER_CHANNEL.entrySet()) {
if (entry.getValue().equals(channelId)) {
redisTemplate.delete(entry.getKey());
break;
}
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg != null) {
Message message = JSON.parseObject(msg.toString(), Message.class);
String userCode = message.getUserCode(),
channelId = message.getChannelId(),
friendUserCode = message.getFriendUserCode();
if (StringUtils.hasText(userCode) && StringUtils.hasText(channelId)) {
connect(userCode, channelId);
} else if (StringUtils.hasText(message.getText())) {
Object code = redisOperation.get(friendUserCode);
if (code != null) {
String queryServerId = code.toString();
message.setServerId(serverId.equals(queryServerId) ? null : queryServerId);
if (StringUtils.hasText(friendUserCode)) {
sendOtherClient(message);
} else {
sendAdmin(ctx.channel(), message);
}
} else {
offlineMessage(friendUserCode, message);
}
}
}
}
/**
* 建立连接
* @param userCode
* @param channelId
*/
private void connect(String userCode, String channelId) {
log.info("{} 连接", userCode);
NettyStatic.USER_CHANNEL.put(userCode, channelId);
if (!userCode.startsWith(SERVER_PREFIX)) {
redisOperation.set(userCode, serverId);
}
}
/**
* 发送给其他客户端
* @param message
*/
private void sendOtherClient(Message message) {
String friendUserCode = message.getFriendUserCode(),
serverId = message.getServerId();
String queryChannelId;
if (StringUtils.hasText(serverId)) {
log.info("向" + serverId + " 进行转发");
queryChannelId = NettyStatic.USER_CHANNEL.get(serverId);
} else {
queryChannelId = NettyStatic.USER_CHANNEL.get(friendUserCode);
}
if (StringUtils.hasText(queryChannelId)) {
Channel channel = NettyStatic.CHANNEL.get(queryChannelId);
if (channel == null) {
offlineMessage(friendUserCode, message);
return;
}
channel.writeAndFlush(Message.transfer(message));
} else {
offlineMessage(friendUserCode, message);
}
}
/**
* 离线消息存储Redis
* @param friendUserCode
* @param message
*/
public void offlineMessage(String friendUserCode, Message message) {
// 1条message在redis中大概是100B, 1万条算1M, redis.conf的maxmemory设置的是256M
List<Message> messageList = new ArrayList<>();
Object offlineMessage = redisOperation.get(RedisConfig.OFFLINE_MESSAGE + friendUserCode);
if (offlineMessage != null) {
messageList = JSON.parseArray(offlineMessage.toString(), Message.class);
}
messageList.add(message);
redisOperation.set(RedisConfig.OFFLINE_MESSAGE + friendUserCode, JSON.toJSONString(messageList));
}
/**
* 发送给服务端
* @param channel
* @param message
*/
private void sendAdmin(Channel channel, Message message) {
message.setUserCode("ADMIN");
message.setText(LocalDateTime.now().toString());
channel.writeAndFlush(Message.transfer(message));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.info("有客户端发生异常, channelId : {}", ctx.channel().id().asLongText());
}
}
2.13 新建service包, 并新增 ServerService接口
package com.hahashou.netty.server.service;
import com.hahashou.netty.server.config.Message;
/**
* @description:
* @author: 哼唧兽
* @date: 9999/9/21
**/
public interface ServerService {
/**
* 发送消息
* @param dto
*/
void send(Message dto);
/**
* 停止服务(为后续断线重连做准备)
*/
void stop();
}
2.14 service包下新建impl包, 并新增 ServerServiceImpl类
package com.hahashou.netty.server.service.impl;
import com.alibaba.fastjson.JSON;
import com.hahashou.netty.server.config.*;
import com.hahashou.netty.server.service.ServerService;
import io.netty.channel.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
import org.springframework.security.crypto.password.PasswordEncoder;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* @description:
* @author: 哼唧兽
* @date: 9999/9/21
**/
@Service
@Slf4j
public class ServerServiceImpl implements ServerService {
@Value("${netty.server.id}")
private String serverId;
@Resource
private PasswordEncoder passwordEncoder;
@Resource
private ValueOperations<String, Object> redisOperation;
@Resource
private NettyServer nettyServer;
@Override
public void send(Message dto) {
String friendUserCode = dto.getFriendUserCode();
if (StringUtils.hasText(friendUserCode)) {
Object code = redisOperation.get(friendUserCode);
if (code != null) {
String queryServerId = code.toString();
dto.setServerId(serverId.equals(queryServerId) ? null : queryServerId);
if (StringUtils.hasText(friendUserCode)) {
sendOtherClient(dto);
}
} else {
offlineMessage(friendUserCode, dto);
}
} else {
// 全体广播, 需要校验秘钥(inputSecretKey应该是一个动态值, 通过手机+验证码每次广播时获取, 自行实现)
String inputSecretKey = dto.getSecretKey();
// encodedPassword生成见main方法
String encodedPassword = "$2a$10$J/UEqtme/w2D0TWB4gJKFeSsyc3s8pepr6ahzOsORkC9zpaLSvZbG";
if (StringUtils.hasText(inputSecretKey) && passwordEncoder.matches(inputSecretKey, encodedPassword)) {
dto.setSecretKey(null);
for (Map.Entry<String, String> entry : NettyStatic.USER_CHANNEL.entrySet()) {
String key = entry.getKey();
if (key.startsWith(NettyServerHandler.SERVER_PREFIX)) {
// 这里可以用http调用其他服务端, 自行补充(信息redis都有)
continue;
}
// 只处理连接本端的客户端
String value = entry.getValue();
Channel channel = NettyStatic.CHANNEL.get(value);
if (channel == null) {
offlineMessage(friendUserCode, dto);
return;
}
channel.writeAndFlush(Message.transfer(dto));
}
}
}
}
public static void main(String[] args) {
String text = "uTωAoJIGBcy7piYCFgQntVvEh8RH6WMU";
PasswordEncoder passwordEncoder = new BCryptPasswordEncoder();
String encode = passwordEncoder.encode(text);
log.info(encode);
if (passwordEncoder.matches(text, encode)) {
log.info("秘钥正确");
}
}
/**
* 发送给其他客户端
* @param message
*/
private void sendOtherClient(Message message) {
String friendUserCode = message.getFriendUserCode(),
serverId = message.getServerId();
String queryChannelId;
if (StringUtils.hasText(serverId)) {
log.info("向" + serverId + " 进行转发");
queryChannelId = NettyStatic.USER_CHANNEL.get(serverId);
} else {
queryChannelId = NettyStatic.USER_CHANNEL.get(friendUserCode);
}
if (StringUtils.hasText(queryChannelId)) {
Channel channel = NettyStatic.CHANNEL.get(queryChannelId);
if (channel == null) {
offlineMessage(friendUserCode, message);
return;
}
channel.writeAndFlush(Message.transfer(message));
} else {
offlineMessage(friendUserCode, message);
}
}
/**
* 离线消息存储Redis
* @param friendUserCode
* @param message
*/
public void offlineMessage(String friendUserCode, Message message) {
List<Message> messageList = new ArrayList<>();
Object offlineMessage = redisOperation.get(RedisConfig.OFFLINE_MESSAGE + friendUserCode);
if (offlineMessage != null) {
messageList = JSON.parseArray(offlineMessage.toString(), Message.class);
}
messageList.add(message);
redisOperation.set(RedisConfig.OFFLINE_MESSAGE + friendUserCode, JSON.toJSONString(messageList));
}
@Override
public void stop() {
nettyServer.stop();
}
}
2.15 修改 ServerController类
package com.hahashou.netty.server.controller;
import com.hahashou.netty.server.config.Message;
import com.hahashou.netty.server.service.ServerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
/**
* @description:
* @author: 哼唧兽
* @date: 9999/9/21
**/
@RestController
@RequestMapping("/server")
@Slf4j
public class ServerController {
@Resource
private ServerService serverService;
/**
* 秘钥记录: uTωAoJIGBcy7piYCFgQntVvEh8RH6WMU
* @param dto
* @return
*/
@PostMapping("/send")
public String send(@RequestBody Message dto) {
serverService.send(dto);
return "success";
}
@GetMapping("/stop")
public String stop() {
serverService.stop();
return "stop netty success";
}
}
3. 脱坑指南, 针对 NettyServer类
工具
yum -y install net-tools
netstat -tunlp
防火墙打开时, 当使用 bind(String inetHost, int inetPort) 方法时, 因为inetHost是127.0.0.1, 所以只有本机可以访问35000, 要想让其他机器可以连接到, 需使用 bind(int inetPort) 方法, 下图是前后两次端口占用情况
结论
当使用bind(String inetHost, int inetPort)方法时, 无论防火墙关闭以及启动, 虚拟机均有问题; 但当机器有公网IP, 且防火墙关闭或端口开放时, 通过DNS解析映射是没有问题的, 建议还是用bind(int inetPort)方法
4. 服务端准备
4.1 打包3个服务端的jar包, id分别为netty-server-1、netty-server-2、netty-server-3, 分别放在161到163上
4.2 161、162、163端口开放
firewall-cmd --zone=public --add-port=35000/tcp --permanent
firewall-cmd --zone=public --add-port=32000/tcp --permanent
firewall-cmd --reload
4.3 161、162、163修改hosts
vi /etc/hosts
追加内容
192.168.109.161 netty-server-1
192.168.109.162 netty-server-2
192.168.109.163 netty-server-3
4.4 依次启动161、162、163
java -Dfile.encoding=UTF-8 -jar server-1.0-SNAPSHOT.jar
161
162
163
redis中记录的服务列表
5. 客户端改造
5.1 修改 application.yml
server:
port: 32001
logging:
level:
com.hahashou.netty: info
spring:
servlet:
multipart:
max-file-size: 128MB
max-request-size: 256MB
userCode: Aa
host: 192.168.109.161
minio:
endpoint: http://192.168.109.160:9000
accessKey: root
secretKey: root123456
5.2 修改 NettyClient类
package com.hahashou.netty.client.config;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.EventExecutorGroup;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.nio.charset.Charset;
/**
* @description: Netty-TCP服务
* @author: 哼唧兽
* @date: 9999/9/21
**/
@Component
@Slf4j
public class NettyClient implements ApplicationListener<ApplicationStartedEvent> {
@Value("${host}")
private String host;
public static int PORT = 35000;
@Resource
private NioEventLoopGroup workerGroup;
@Resource
private EventExecutorGroup businessGroup;
@Resource
private NettyClientHandler nettyClientHandler;
public static Channel CHANNEL;
@SneakyThrows
@Override
public void onApplicationEvent(ApplicationStartedEvent event) {
createClient(workerGroup, businessGroup, nettyClientHandler, host, PORT);
}
public void createClient(NioEventLoopGroup workerGroup, EventExecutorGroup businessGroup,
NettyClientHandler nettyClientHandler, String host, int port) {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));
pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));
pipeline.addLast(businessGroup, nettyClientHandler);
}});
try {
CHANNEL = bootstrap.connect(host, port).sync().channel();
} catch (InterruptedException exception) {
log.error("客户端中断异常 : {}", exception.getMessage());
}
}
@PreDestroy
public void destroy() {
workerGroup.shutdownGracefully().syncUninterruptibly();
log.info("客户端关闭成功");
}
}
6. 客户端准备
6.1 准备6个jar包, 修改application.yml, 并根据下述规则放到对应机器上
Aa放在163上, Bb放在164上, Cc放在165上, Dd放在166上, Ee放在161上, Ff放在162上
userCode: Aa
host: 192.168.109.161
userCode: Bb
host: 192.168.109.161
userCode: Cc
host: 192.168.109.162
userCode: Dd
host: 192.168.109.162
userCode: Ee
host: 192.168.109.163
userCode: Ff
host: 192.168.109.163
6.2 161到166端口开放
firewall-cmd --zone=public --add-port=32001/tcp --permanent
firewall-cmd --reload
6.3 启动所有客户端
7. 测试
7.1 两个客户端连同一服务端, 不会出现转发
Aa向Bb发送消息, 且Bb收到后回复Aa
7.2 两个客户端连不同服务端
Aa向Cc发送消息(通过服务端1转发到服务端2), 且Cc收到后回复Aa(通过服务端2转发到服务端1)
Aa向Ee发送消息, 且Ee收到后回复Aa
7.3 广播