1. 实现:在netty客户端实现netty客户端注册功能,netty客户端需要发送注册消息到netty服务端。
2. 在父工程创建Message类,定义消息格式和消息类型
定义消息类型:
package message;
public enum MessageType {
RegisterRequest,
RegisterResponse,
QueryRegisterRequest,
QueryRegisterResponse,
Request,
Response
}
定义Message
package message;
import com.alibaba.fastjson.JSONObject;
import lombok.Data;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
@Data
public class Message extends JSONObject {
private String uuid;
private MessageType messageType;
private String from; //消息来源
private String to;
private JSONObject data; //消息实体
private JSONObject message = new JSONObject();
public Message(MessageType messageType, String from, String to, JSONObject data) {
this.uuid = UUID.randomUUID().toString();
this.messageType = messageType == null ? MessageType.Request : messageType;
this.from = from;
this.to = to;
this.data = data;
message.put("uuid", this.uuid);
message.put("messageType", messageType);
message.put("from", from);
message.put("to", to);
message.put("data", data);
}
//序列化
public byte[] toByte() {
System.out.println("message:" + this.message.toJSONString());
return this.message.toJSONString().getBytes(StandardCharsets.UTF_8);
}
//反序列化
public static Message toMessage(String msg) {
return (Message) JSONObject.parseObject(msg);
}
public String toString() {
return this.message.toJSONString();
}
}
3. 服务端实现注册中心
创建Register类,管理注册的客户端
package server.register;
import io.netty.channel.Channel;
import java.util.HashMap;
public class Register {
private static HashMap<String, Channel> registerTable = new HashMap<>();
private Register() {
}
public static void addServer(String name, Channel channel) {
registerTable.put(name, channel);
System.out.println(name + "注册成功!");
}
public static void removeServer(String name, Channel channel) {
registerTable.remove(name);
}
public static Channel getServer(String name) {
return registerTable.get(name);
}
public static HashMap<String, Channel> getAllServer() {
return registerTable;
}
}
修改NettyServiceHandler,对服务器接收的消息根据类型处理,如果是注册消息,则去注册,请求消息则发送到相应的客户端。
package server.netty;
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import server.register.Register;
import java.util.HashMap;
public class NettyServiceHandler extends SimpleChannelInboundHandler<String> {
private static final ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 获取到当前与服务器连接成功的channel
Channel channel = ctx.channel();
group.add(channel);
System.out.println(channel.remoteAddress() + " 上线," + "在线数量:" + group.size());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// 获取到当前要断开连接的Channel
Channel channel = ctx.channel();
System.out.println(channel.remoteAddress() + "下线," + "在线数量:" + group.size());
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
Channel channel = ctx.channel();
System.out.println("netty客户端" + channel.remoteAddress() + "发送过来的消息:" + msg);
JSONObject message = JSONObject.parseObject(msg);
// group.forEach(ch -> { // JDK8 提供的lambda表达式
// if (ch != channel) {
// ch.writeAndFlush(msg + "\r\n");
// }
// });
HandlerMessage(message, channel);
}
public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable) throws Exception {
throwable.printStackTrace();
channelHandlerContext.close();
}
public void HandlerMessage(JSONObject message, Channel channel) {
String messageType = message.getString("messageType");
switch (messageType) {
case "RegisterRequest":
message.put("channel", channel);
register(message);
break;
case "QueryRegisterRequest":
break;
default:
request(message);
}
}
public void register(JSONObject message) {
Register.addServer(message.getString("from"), (Channel) message.get("channel"));
}
public void request(JSONObject message) {
Channel channel = Register.getServer(message.getString("to"));
channel.writeAndFlush(message.toJSONString() + "\r\n");
}
public HashMap<String, Channel> queryRegister() {
return Register.getAllServer();
}
}
4. 客户端自动注册
客户端启动时把自己的信息发送到服务端
package client.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import message.Message;
import message.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
@Slf4j
public class NettyClient {
public final static Logger logger = LoggerFactory.getLogger(NettyClient.class);
@Resource
private SocketInitializer socketInitializer;
@Getter
private Bootstrap bootstrap;
@Getter
private Channel channel;
/**
* netty服务监听端口
*/
@Value("${netty.port:6666}")
private int port;
@Value("${netty.host:127.0.0.1}")
private String host;
@Value("${netty.name}")
private String name;
/**
* 启动netty
*/
public void start() {
this.init();
this.channel = this.bootstrap.connect(host, port).channel();
logger.info("Netty connect on port: {}, the host {}, the channel {}", this.port, this.host, this.channel);
//自动注册
register();
}
/**
* 初始化netty配置
*/
private void init() {
EventLoopGroup group = new NioEventLoopGroup();
this.bootstrap = new Bootstrap();
//设置线程组
bootstrap.group(group)
.channel(NioSocketChannel.class) //设置客户端的通道实现类型
.handler(this.socketInitializer);
}
//netty发送消息
public void sendMessage(String message) {
this.channel.writeAndFlush(message + "\r\n");
}
//netty发送消息
public void sendMessage(Message message) {
this.channel.writeAndFlush(message.toString() + "\r\n");
}
public void register() {
Message message = new Message(MessageType.RegisterRequest, name, null, null);
sendMessage(message);
logger.info("register");
}
}
5. 客户端RestController层接收消息
RESTFUL风格,从url解析目标客户端的名称
package client.control;
import client.netty.NettyClient;
import com.alibaba.fastjson.JSONObject;
import message.Message;
import message.MessageType;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
@RestController
@RequestMapping("/client1")
public class NettyControl {
@Value("${netty.name}")
private String from;
@Resource
private NettyClient nettyClient;
@PostMapping("/{to}/msg")
public String sendMsg(@PathVariable("to") String to, @RequestBody JSONObject data) {
Message message = new Message(MessageType.Request, from, to, data);
this.nettyClient.sendMessage(message);
return message.toString();
}
}
6. postman测试
启动服务和两个客户端,客户端已经注册成功。
client1发送到client2
client2发送到client1
服务端收到的消息