创建一个Maven项目添加下面依赖
<dependencies>
<!-- 日志依赖 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.32</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.78</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.84.Final</version>
</dependency>
</dependencies>
编码解码器
package com.example.nettydemo.coder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import java.nio.charset.StandardCharsets;
public class NettyEncoder extends MessageToByteEncoder<String> {
public NettyEncoder() {
}
@Override
protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
byte[] byteMsg = msg.getBytes(StandardCharsets.UTF_8);
int msgLength = byteMsg.length;
ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(4 + byteMsg.length);
buf.writeInt(msgLength);
buf.writeBytes(byteMsg, 0, msgLength);
out.writeBytes(buf);
buf.release();
}
}
package com.example.nettydemo.coder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
import java.util.List;
@Slf4j
public class NettyDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int beginReader = in.readerIndex();
int dataLength = in.readInt();
if (in.readableBytes() < dataLength) {
in.readerIndex(beginReader);
} else {
byte[] data = new byte[dataLength];
in.readBytes(data);
String str = new String(data, 0, dataLength, StandardCharsets.UTF_8);
out.add(str);
}
}
}
服务端
package com.example.nettydemo.server;
import com.example.nettydemo.coder.NettyDecoder;
import com.example.nettydemo.coder.NettyEncoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.IOException;
import java.util.Map;
@Slf4j
public class TcpServer {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private ServerBootstrap server;
private ChannelFuture channelFuture;
private Integer port;
public TcpServer(Integer port) {
this.port = port;
// nio连接处理池
this.bossGroup = new NioEventLoopGroup();
// 处理事件池
this.workerGroup = new NioEventLoopGroup();
server = new ServerBootstrap();
server.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 自定义处理类
ch.pipeline().addLast(new NettyDecoder());
ch.pipeline().addLast(new NettyEncoder());
ch.pipeline().addLast(new TcpServerHandler());
}
});
server.option(ChannelOption.SO_BACKLOG, 128);
server.childOption(ChannelOption.SO_KEEPALIVE, true);
}
public synchronized void startListen() {
try {
// 绑定到指定端口
channelFuture = server.bind(port).sync();
log.info("netty服务器在[{}]端口启动监听", port);
} catch (Exception e) {
log.error("netty服务器在[{}]端口启动监听失败", port);
e.printStackTrace();
}
}
public void sendMessageToClient(String clientIp, Object msg) {
Map<String, Channel> channelMap = TcpServerHandler.channelSkipMap.get(port);
Channel channel = channelMap.get(clientIp);
String sendStr;
try {
sendStr = OBJECT_MAPPER.writeValueAsString(msg);
} catch (JsonGenerationException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
}
try {
log.info("向客户端 {} 发送消息内容:{}", clientIp, sendStr);
channel.writeAndFlush(sendStr);
} catch (Exception var4) {
log.error("向客户端 {} 发送消息失败,消息内容:{}", clientIp, sendStr);
throw new RuntimeException(var4);
}
}
public void pushMessageToClients(Object msg) {
Map<String, Channel> channelMap = TcpServerHandler.channelSkipMap.get(port);
if (channelMap != null && !channelMap.isEmpty()) {
channelMap.forEach((k, v) -> sendMessageToClient(k, msg));
}
}
}
package com.example.nettydemo.server;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
@Slf4j
public class TcpServerHandler extends SimpleChannelInboundHandler<String> {
/**
* 用跳表存储连接channel
*/
public static Map<Integer, Map<String, Channel>> channelSkipMap = new ConcurrentSkipListMap<>();
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("应用程序的监听通道异常!");
cause.printStackTrace();
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
// 获取每个用户端连接的ip
InetSocketAddress ipSocket = (InetSocketAddress) channel.remoteAddress();
String clientIp = ipSocket.getAddress().getHostAddress();
InetSocketAddress localSocket = (InetSocketAddress) channel.localAddress();
// 本地端口做键
int localPort = localSocket.getPort();
Map<String, Channel> channelMap = channelSkipMap.get(localPort);
if (channelMap == null || channelMap.isEmpty()) {
channelMap = new HashMap<>(4);
}
channelMap.put(clientIp, channel);
channelSkipMap.put(localPort, channelMap);
log.info("应用程序添加监听通道,与客户端:{} 建立连接成功!", clientIp);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// 获取每个用户端连接的ip
Channel channel = ctx.channel();
InetSocketAddress localSocket = (InetSocketAddress) channel.localAddress();
int localPort = localSocket.getPort();
InetSocketAddress ipSocket = (InetSocketAddress) channel.remoteAddress();
String clientIp = ipSocket.getAddress().getHostAddress();
Map<String, Channel> channelMap = channelSkipMap.get(localPort);
channelMap.remove(clientIp);
log.info("应用程序移除监听通道,与客户端:{} 断开连接!", clientIp);
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
Channel channel = channelHandlerContext.channel();
// 获取每个用户端连接的ip
InetSocketAddress ipSocket = (InetSocketAddress) channel.remoteAddress();
log.info("接收到客户端: {} 应用数据:{}", ipSocket, msg);
}
}
package com.example.nettydemo.server;
public class ServerTest {
public static void main(String[] args) {
TcpServer tcpServer = new TcpServer(40001);
tcpServer.startListen();
while (true) {
try {
// 每5秒向客户端发送一次 "test-朱上林123"
Thread.sleep(5000);
tcpServer.pushMessageToClients("test-朱上林123");
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
客户端
package com.example.nettydemo.client;
import com.example.nettydemo.coder.NettyDecoder;
import com.example.nettydemo.coder.NettyEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.IOException;
@Slf4j
public class TcpClient {
private EventLoopGroup group;
private ChannelFuture channelFuture;
private final String ip;
private final Integer port;
private final ObjectMapper objectMapper = new ObjectMapper();
public TcpClient(String ip, Integer port) {
this.ip = ip;
this.port = port;
}
/**
* 建立连接
*
*/
public synchronized void connectServer() {
log.info("开始建立连接,ip:{}, port:{}", ip, port);
// 生命nio连接池
this.group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
// 配置解码器以及消息处理类
b.group(this.group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new NettyEncoder());
pipeline.addLast(new NettyDecoder());
pipeline.addLast(new TcpClientHandler());
}
});
// 开始连接
this.channelFuture = b.connect(ip, port).sync();
} catch (Exception var4) {
log.error("连接建立失败,ip:{}, port:{}", ip, port);
this.group.shutdownGracefully();
var4.printStackTrace();
}
}
/**
* 关闭连接
*/
public void close() {
this.group.shutdownGracefully();
}
/**
* 发送消息
*
* @param msg
*/
public synchronized void sendCommonMsg(Object msg) {
String sendStr;
if (!getConnectStatus()) {
connectServer();
}
try {
sendStr = objectMapper.writeValueAsString(msg);
} catch (JsonMappingException e) {
throw new RuntimeException(e);
} catch (JsonGenerationException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
}
try {
log.info("发送消息内容:{}", sendStr);
this.channelFuture.channel().writeAndFlush(sendStr);
} catch (Exception var4) {
log.error("发送消息失败,消息内容:{}", sendStr);
throw new RuntimeException(var4);
}
}
/**
* 获取当前连接状态
*/
public Boolean getConnectStatus() {
return group != null && !group.isShutdown() && !group.isShuttingDown();
}
}
package com.example.nettydemo.client;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TcpClientHandler extends SimpleChannelInboundHandler<String> {
/**
* 读取事件
*
* @param channelHandlerContext
* @param msg
*/
@Override
public void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) {
log.info("服务返回消息 :{}", msg);
}
/**
* 发生异常
*
* @param channelHandlerContext
* @param throwable
*/
@Override
public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable) {
log.error("通信发生异常:" + throwable.getMessage());
channelHandlerContext.close();
}
}
package com.example.nettydemo.client;
public class TcpClientTest {
public static void main(String[] args) {
TcpClient tcpClient = new TcpClient("127.0.0.1", 40001);
// 客户端连接到服务器后,向服务器发送一条消息:
tcpClient.connectServer();
tcpClient.sendCommonMsg("我是Client,刚刚是我连接到你的!");
}
}
启动服务端和客户端实现通信
下课!