前言
模仿微信聊天页面,开发一个基于Netty搭建WebSocket通信案例。Netty的应用方面非常广;聊天、MQ、RPC、数据等等,在5G到来的时候更加需要大量数据传输,Netty的应用也会更加广阔。
1:案例使用SpringBoot+Netty+WebSocket搭建功能。
2:使用Netty提供的HttpServerCodec、HttpObjectAggregator、3:ChunkedWriteHandler进行编码解码处理。
通信逻辑尽可能简化到只了解根本,便于后续个人应用及开发的拓展。
public class ClientMsgProtocol {
private int type; //1请求个人信息,2发送聊天信息
private String msgInfo; //消息
public int getType() {
return type;
}
public void setType(int type) {
this.type = type;
}
public String getMsgInfo() {
return msgInfo;
}
public void setMsgInfo(String msgInfo) {
this.msgInfo = msgInfo;
}
}
public class ServerMsgProtocol {
private int type; //链接信息;1自发信息、2群发消息
private String channelId; //通信管道ID,实际使用中会映射成用户名
private String userHeadImg; //用户头像[模拟分配]
private String msgInfo; //通信消息
public int getType() {
return type;
}
public void setType(int type) {
this.type = type;
}
public String getChannelId() {
return channelId;
}
public void setChannelId(String channelId) {
this.channelId = channelId;
}
public String getUserHeadImg() {
return userHeadImg;
}
public void setUserHeadImg(String userHeadImg) {
this.userHeadImg = userHeadImg;
}
public String getMsgInfo() {
return msgInfo;
}
public void setMsgInfo(String msgInfo) {
this.msgInfo = msgInfo;
}
}
public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) {
channel.pipeline().addLast("http-codec", new HttpServerCodec());
channel.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
channel.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
// 在管道中添加我们自己的接收数据实现方法
channel.pipeline().addLast(new MyServerHandler());
}
}
public class MyServerHandler extends ChannelInboundHandlerAdapter {
private Logger logger = LoggerFactory.getLogger(MyServerHandler.class);
private WebSocketServerHandshaker handshaker;
/**
* 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
SocketChannel channel = (SocketChannel) ctx.channel();
logger.info("链接报告开始");
logger.info("链接报告信息:有一客户端链接到本服务端");
logger.info("链接报告IP:{}", channel.localAddress().getHostString());
logger.info("链接报告Port:{}", channel.localAddress().getPort());
logger.info("链接报告完毕");
ChannelHandler.channelGroup.add(ctx.channel());
}
/**
* 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
logger.info("客户端断开链接{}", ctx.channel().localAddress().toString());
ChannelHandler.channelGroup.remove(ctx.channel());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//http
if (msg instanceof FullHttpRequest) {
FullHttpRequest httpRequest = (FullHttpRequest) msg;
if (!httpRequest.decoderResult().isSuccess()) {
DefaultFullHttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST);
// 返回应答给客户端
if (httpResponse.status().code() != 200) {
ByteBuf buf = Unpooled.copiedBuffer(httpResponse.status().toString(), CharsetUtil.UTF_8);
httpResponse.content().writeBytes(buf);
buf.release();
}
// 如果是非Keep-Alive,关闭连接
ChannelFuture f = ctx.channel().writeAndFlush(httpResponse);
if (httpResponse.status().code() != 200) {
f.addListener(ChannelFutureListener.CLOSE);
}
return;
}
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws:/" + ctx.channel() + "/websocket", null, false);
handshaker = wsFactory.newHandshaker(httpRequest);
if (null == handshaker) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
handshaker.handshake(ctx.channel(), httpRequest);
}
return;
}
//ws
if (msg instanceof WebSocketFrame) {
WebSocketFrame webSocketFrame = (WebSocketFrame) msg;
//关闭请求
if (webSocketFrame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) webSocketFrame.retain());
return;
}
//ping请求
if (webSocketFrame instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(webSocketFrame.content().retain()));
return;
}
//只支持文本格式,不支持二进制消息
if (!(webSocketFrame instanceof TextWebSocketFrame)) {
throw new Exception("仅支持文本格式");
}
String request = ((TextWebSocketFrame) webSocketFrame).text();
System.out.println("服务端收到:" + request);
ClientMsgProtocol clientMsgProtocol = JSON.parseObject(request, ClientMsgProtocol.class);
//1请求个人信息
if (1 == clientMsgProtocol.getType()) {
ctx.channel().writeAndFlush(MsgUtil.buildMsgOwner(ctx.channel().id().toString()));
return;
}
//群发消息
if (2 == clientMsgProtocol.getType()) {
TextWebSocketFrame textWebSocketFrame = MsgUtil.buildMsgAll(ctx.channel().id().toString(), clientMsgProtocol.getMsgInfo());
ChannelHandler.channelGroup.writeAndFlush(textWebSocketFrame);
}
}
}
/**
* 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
logger.info("异常信息:\r\n" + cause.getMessage());
}
}
@Component("nettyServer")
public class NettyServer {
private Logger logger = LoggerFactory.getLogger(NettyServer.class);
//配置服务端NIO线程组
private final EventLoopGroup parentGroup = new NioEventLoopGroup(); //NioEventLoopGroup extends MultithreadEventLoopGroup Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
private final EventLoopGroup childGroup = new NioEventLoopGroup();
private Channel channel;
public ChannelFuture bing(InetSocketAddress address) {
ChannelFuture channelFuture = null;
try {
ServerBootstrap b = new ServerBootstrap();
b.group(parentGroup, childGroup)
.channel(NioServerSocketChannel.class) //非阻塞模式
.option(ChannelOption.SO_BACKLOG, 128)
.childHandler(new MyChannelInitializer());
channelFuture = b.bind(address).syncUninterruptibly();
channel = channelFuture.channel();
} catch (Exception e) {
logger.error(e.getMessage());
} finally {
if (null != channelFuture && channelFuture.isSuccess()) {
logger.info("itstack-demo-netty server start done. {关注明哥,获取源码}");
} else {
logger.error("itstack-demo-netty server start error. {关注明哥,获取源码}");
}
}
return channelFuture;
}
public void destroy() {
if (null == channel) return;
channel.close();
parentGroup.shutdownGracefully();
childGroup.shutdownGracefully();
}
public Channel getChannel() {
return channel;
}
}
@SpringBootApplication
@ComponentScan("com.lm.demo.netty")
public class NettyApplication implements CommandLineRunner {
@Value("${netty.host}")
private String host;
@Value("${netty.port}")
private int port;
@Autowired
private NettyServer nettyServer;
public static void main(String[] args) {
SpringApplication.run(NettyApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
InetSocketAddress address = new InetSocketAddress(host, port);
ChannelFuture channelFuture = nettyServer.bing(address);
Runtime.getRuntime().addShutdownHook(new Thread(() -> nettyServer.destroy()));
channelFuture.channel().closeFuture().syncUninterruptibly();
}
}
@Controller
public class NettyController {
@Resource
private NettyServer nettyServer;
@RequestMapping("/index")
public String index(Model model){
model.addAttribute("name", "xiaohao");
return "index";
}
}
打包运行 如下
浏览器访问
http://localhost:8081/index
好了到这里就结束了netty之基于Netty搭建WebSocket,模仿微信聊天页面的学习,大家一定要跟着动手操作起来。需要的源码的 可si我获取;