Netty
目录
- Netty
- 1 Netty概览
- 2 Netty核心组件
- 2.1 Bootstrap和ServerBootStrap(启动引导类)
- 2.2 Channel(网络操作抽象类)
- 2.3 EventLoop(事件循环)
- 2.4 EventLoopGroup(事件循环组)
- 2.7 ChannelHandler(通道处理器)
- 2.5 ChannelHandlerContext(通道处理器上下文)
- 2.6 ChannelPipeline(通道流水线)
- 2.7 Bytebuf(字节缓冲区)
- 2.8 编码器、解码器
- 2.9 ChannelFuture(操作执行结果)
- 3 实例
- 3.1 客户端-服务器,简单的字符串通信
- 3.2 用Netty实现Servlet服务器
- 3.3 用Netty实现RPC(远程过程调用)
- 3.4 用Netty实现Websock在线聊天
- 3.5 用Netty实现更好的在线聊天
1 Netty概览
Netty是一个客户端/服务器框架,它在Java的NIO网络组件上提供了一个简化的封装层。这使得它成为创建低级别非阻塞网络应用程序的一个不错的候选。
让我们看看Netty框架的主要亮点:
易用性:Netty比普通Java NIO更易于使用,并且有一套涵盖大多数用例的广泛示例
最小依赖性:我们将在一分钟内看到,您只需一个依赖性即可获得整个框架
性能:Netty比核心Java API具有更好的吞吐量和更低的延迟。由于其内部资源池,它还具有可扩展性。
安全性:完全支持SSL/TLS和StartTLS。
2 Netty核心组件
2.1 Bootstrap和ServerBootStrap(启动引导类)
Bootstrap 是客户端的启动引导类/辅助类,不管程序使用哪种协议,无论是创建一个客户端还是服务器都需要使用“引导”。
Bootstrap 通常使用 connet() 方法连接到远程的主机和端口,作为一个 Netty TCP 协议通信中的客户端。另外,Bootstrap 也可以通过 bind() 方法绑定本地的一个端口,作为 UDP 协议通信中的一端。具体使用方法如下:
public final class NettyClient {
static final String HOST = "127.0.0.1";
static final int PORT = 8007;
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group) // Set EventLoopGroup to handle all eventsf for client.
.channel(NioSocketChannel.class)// Use NIO to accept new connections.
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new StringDecoder());
p.addLast(new StringEncoder());
// This is our custom client handler which will have logic for chat.
p.addLast(new ClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(HOST, PORT).sync();
String input = "Frank";
Channel channel = f.sync().channel();
channel.writeAndFlush(input);
channel.flush();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}
}
}
ServerBootstrap 服务端的启动引导类/辅助类,通常使用 bind() 方法绑定本地的端口上,然后等待客户端的连接。
具体使用方法如下:
public final class NettyServer {
// Port where chat server will listen for connections.
static final int PORT = 8007;
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup) // Set boss & worker groups
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new StringDecoder());
p.addLast(new StringEncoder());
p.addLast(new ServerHandler());
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
System.out.println("Netty Server started.");
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
Bootstrap 只需要配置一个线程组 EventLoopGroup , 而 ServerBootstrap 需要配置两个线程组— EventLoopGroup ,一个用于接收连接,一个用于具体的 IO 处理。
2.2 Channel(网络操作抽象类)
可以简单的理解为客户端连接。
在我们使用某种语言,如 c/c++,java,go 等,进行网络编程的时候,我们通常会使用到 Socket, Socket 是对底层操作系统网络 IO 操作(如 read,write,bind,connect等)的封装, 因此我们必须去学习 Socket 才能完成网络编程,而 Socket 的操作其实是比较复杂的,想要使用好它有一定难度, 所以 Netty 提供了Channel(注意是 io.netty.Channel,而非 Java NIO 的 Channel),更加方便我们处理 IO 事件。
Channel 接口是 Netty 对网络操作抽象类。通过 Channel 我们可以进行 I/O 操作。Channel 为用户提供:
- 当前网络连接的通道的状态(例如是否打开?是否已连接?)
- 网络连接的配置参数 (例如接收缓冲区大小)
- 提供异步的网络 I/O 操作 (如建立连接,读写,绑定端口),异步调用意味着任何 I/O调用都将立即返回,并且不保证在调用结束时所请求的 I/O 操作已完成。调用后立即返回一个 ChannelFuture 实例,通过注册监听器到ChannelFuture 上,可以在 I/O操作成功、失败或取消时回调通知调用方。
- 支持关联 I/O 操作与对应的处理程序
一旦客户端成功连接服务端,就会新建一个 Channel 同该用户端进行绑定。示例代码如下:
// 通过 Bootstrap 的 connect 方法连接到服务端
public Channel doConnect(InetSocketAddress inetSocketAddress) {
CompletableFuture<Channel> completableFuture = new CompletableFuture<>();
bootstrap.connect(inetSocketAddress).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
completableFuture.complete(future.channel());
} else {
throw new IllegalStateException();
}
});
return completableFuture.get();
}
比较常用的Channel接口实现类是 :
- NioServerSocketChannel(服务端)
- NioSocketChannel(客户端)
这两个 Channel 可以和 BIO 编程模型中的 ServerSocket以及Socket两个概念对应上。
2.3 EventLoop(事件循环)
事件循环可以理解为事件线程,专用于处理客户端的连接、读写事件。
Channel 为 Netty 网络操作(读写等操作)抽象类,EventLoop 负责处理注册到其上的Channel 的 I/O 操作,两者配合进行 I/O 操作。
并且,EventLoop 处理的 I/O 事件都将在它专有的 Thread 上被处理,即 Thread 和 EventLoop 属于 1 : 1 的关系,从而保证线程安全。
2.4 EventLoopGroup(事件循环组)
EventLoopGroup 包含多个 EventLoop(每一个 EventLoop 通常内部包含一个线程),它管理着所有的 EventLoop 的生命周期。
本质上是一个线程池,主要负责接收I/O请求,并分配线程执行处理请求。
ServerBootStrap可以设置两个EventLoopGroup,一个负责拿到客户端连接,一个负责每一个连接的实际的I/O。对应于多路复用的selector和IO处理线程。
2.7 ChannelHandler(通道处理器)
我们知道 Netty 是一个款基于事件驱动的网络框架,当特定事件触发时,我们能够按照自定义的逻辑去处理数据。 ChannelHandler 则正是用于处理入站(接收)和出站(发送)数据钩子,它可以处理几乎所有类型的动作,所以 ChannelHandler 会是 我们开发者更为关注的一个接口。
通俗来说,ChannelHandler 是消息的具体处理器,主要负责处理客户端/服务端接收和发送的数据。
ChannelHandler 主要分为处理入站数据的 ChannelInboundHandler 和出站数据的 ChannelOutboundHandler 接口。
Netty 以适配器的形式提供了大量默认的 ChannelHandler 实现,主要目的是为了简化程序开发的过程,我们只需要 重写我们关注的事件和方法就可以了。 通常我们会以继承的方式使用以下适配器和抽象:
- ChannelHandlerAdapter
- ChannelInboundHandlerAdapter
- ChannelDuplexHandler
- ChannelOutboundHandlerAdapter
2.5 ChannelHandlerContext(通道处理器上下文)
用来保存ChannelHandler的上下文信息。
首先ChannelHandlerContext是一个AttributeMap,可以用来存储多个数据。
然后ChannelHandlerContext继承了ChannelInboundInvoker和ChannelOutboundInvoker,可以触发inbound和outbound的一些方法。
除了继承来的一些方法之外,ChannelHandlerContext还可以作为channel,handler和pipline的沟通桥梁,因为可以从ChannelHandlerContext中获取到对应的channel,handler和pipline:
Channel channel();
ChannelHandler handler();
ChannelPipeline pipeline();
还要注意的是ChannelHandlerContext还返回一个EventExecutor,用来执行特定的任务:
2.6 ChannelPipeline(通道流水线)
当 Channel 被创建时,它会被自动地分配到它专属的 ChannelPipeline。 一个 Channel 包含一个 ChannelPipeline。 ChannelPipeline 为 ChannelHandler 的链,一个 pipeline 上可以有多个 ChannelHandler。
我们可以在 ChannelPipeline 上通过 addLast() 方法添加一个或者多个ChannelHandler (一个数据或者事件可能会被多个 Handler 处理) 。当一个 ChannelHandler 处理完之后就将数据交给下一个 ChannelHandler :
b.group(eventLoopGroup)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new NettyKryoDecoder(kryoSerializer, RpcResponse.class));
ch.pipeline().addLast(new NettyKryoEncoder(kryoSerializer, RpcRequest.class));
ch.pipeline().addLast(new KryoClientHandler());
}
});
入站事件和出站事件的流向
从服务端角度来看,如果一个事件的运动方向是从客户端到服务端,那么这个事件是入站的,如果事件运动的方向 是从服务端到客户端,那么这个事件是出站的。
上图是 Netty 事件入站和出站的大致流向,入站和出站的 ChannelHandler 可以被安装到一个ChannelPipeline中, 如果一个消息或其他的入站事件被[读取],那么它会从ChannelPipeline的头部开始流动,并传递给第一个ChannelInboundHandler ,这个ChannelHandler的行为取决于它的具体功能,不一定会修改消息。 在经历过第一个ChannelInboundHandler之后, 消息会被传递给这条ChannelHandler链的下一个ChannelHandler,最终消息会到达ChannelPipeline尾端,消息的读取也就结束了。
数据的出站 (发送) 流程与入站是相似的,在出站过程中,消息从ChannelOutboundHandler链的尾端开始流动, 直到到达它的头部为止,在这之后,消息会到达网络传输层进行后续传输。
鉴于入站操作和出站操作是不同的,可能有同学会疑惑:为什么入站 ChannelHandler和出站 ChannelHandler的数据 不会窜流呢(为什么 入站 的数据不会到出站 ChannelHandler 链中)?
因为Netty可以区分ChannelInboundHandler和 ChannelOutboundHandler的实现,并确保数据只在两个相同类型的ChannelHandler直接传递,即数据要么在 ChannelInboundHandler链之间流动,要么在ChannelOutboundHandler链之间流动。
当ChannelHandler被添加到ChannelPipeline中后,它会被分配一个ChannelHandlerContext, 它代表了ChannelHandler和ChannelPipeline之间的绑定。 ChannelPipeline 通过 ChannelHandlerContext来间接管理 ChannelHandler 。
2.7 Bytebuf(字节缓冲区)
网络通信最终都是通过字节流进行传输的。 Netty 使用自建的 buffer API,而不是使用 NIO 的 ByteBuffer 来存储连续的字节序列。与 ByteBuffer 相比这种方式拥有明显的优势。Netty 使用新的 buffer 类型 ByteBuf,被设计为一个可从底层解决 ByteBuffer 问题,并可满足日常网络应用开发需要的缓冲类型。这些很酷的特性包括:
- 如果需要,允许使用自定义的缓冲类型。
- 复合缓冲类型中内置的透明的零拷贝实现。
- 开箱即用的动态缓冲类型,具有像 StringBuffer 一样的动态缓冲能力。
- 不再需要调用的 flip() 方法。
- 正常情况下具有比 ByteBuffer 更快的响应速度。
2.8 编码器、解码器
当我们通过 Netty 发送(出站)或接收(入站)一个消息时,就会发生一次数据的转换,因为数据在网络中总是通过字节传输的, 所以当数据入站时,Netty 会解码数据,即把数据从字节转为为另一种格式 (通常是一个 Java 对象), 当数据出站时,Netty 会编码数据,即把数据从它当前格式转为为字节。
Netty 为编码器和解码器提供了不同类型的抽象,这些编码器和解码器其实都是ChannelHandler的实现, 它们的名称通常是 ByteToMessageDecoder 和 MessageToByteEncoder。
对于入站数据来说,解码其实是解码器通过重写 ChannelHanler 的read事件(channelRead),然后调用它们自己的 decode方法完成的。 对于出站数据来说,编码则是编码器通过重写ChannelHanler的write事件,然后调用它们自己的 encode方法完成的。
2.9 ChannelFuture(操作执行结果)
public interface ChannelFuture extends Future<Void> {
Channel channel();
ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> var1);
......
ChannelFuture sync() throws InterruptedException;
}
Netty 是非阻塞的,因此,我们不能立刻得到操作是否执行成功,但是,你可以通过 ChannelFuture 接口的 addListener() 方法注册一个监听 ChannelFutureListener,当操作执行成功或者失败时,监听就会自动触发返回结果。
ChannelFuture f = b.connect(host, port).addListener(future -> {
if (future.isSuccess()) {
System.out.println("连接成功!");
} else {
System.err.println("连接失败!");
}
}).sync();
并且,你还可以通过ChannelFuture 的 channel() 方法获取连接相关联的Channel 。
Channel channel = f.channel();
另外,我们还可以通过 ChannelFuture 接口的 sync()方法让异步的操作编程同步的。
// bind()是异步的,但是,你可以通过 `sync()`方法将其变为同步。
ChannelFuture f = b.bind(port).sync();
3 实例
3.1 客户端-服务器,简单的字符串通信
这个例子实现了简单的通信,客户端发送字符串到服务端,服务端收到后返回一个字符串。
服务端:
package com.qupeng.java.demo.netty4.official.demo;
import java.util.ArrayList;
import java.util.List;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public final class NettyServer {
// Port where chat server will listen for connections.
static final int PORT = 8007;
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup) // Set boss & worker groups
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new StringDecoder());
p.addLast(new StringEncoder());
p.addLast(new ServerHandler());
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
System.out.println("Netty Server started.");
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
@Sharable
class ServerHandler extends SimpleChannelInboundHandler<String> {
static final List<Channel> channels = new ArrayList<Channel>();
@Override
public void channelActive(final ChannelHandlerContext ctx) {
System.out.println("Client joined - " + ctx);
channels.add(ctx.channel());
}
@Override
public void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("Message received: " + msg);
for (Channel c : channels) {
c.writeAndFlush("Hello " + msg + '\n');
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
System.out.println("Closing connection for client - " + ctx);
ctx.close();
}
}
客户端:
package com.qupeng.java.demo.netty4.official.demo;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public final class NettyClient {
static final String HOST = "127.0.0.1";
static final int PORT = 8007;
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group) // Set EventLoopGroup to handle all eventsf for client.
.channel(NioSocketChannel.class)// Use NIO to accept new connections.
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new StringDecoder());
p.addLast(new StringEncoder());
// This is our custom client handler which will have logic for chat.
p.addLast(new ClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(HOST, PORT).sync();
String input = "Frank";
Channel channel = f.sync().channel();
channel.writeAndFlush(input);
channel.flush();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}
}
}
class ClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("Message from Server: " + msg);
}
}
参考链接: http://www.mastertheboss.com/jboss-frameworks/netty/jboss-netty-tutorial
3.2 用Netty实现Servlet服务器
这个例子实现了一个简单的Tomcat服务器,支持了一个/hello-world的servlet mappin.
服务端:
package com.qupeng.java.demo.netty4.servlet;
import com.qupeng.java.demo.netty4.servlet.handler.TomcatHandler;
import com.qupeng.java.demo.netty4.servlet.service.MyServlet;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import org.dom4j.Document;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;
import java.io.FileInputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
//Netty就是一个同时支持多协议的网络通信框架
public class MyTomcatServer {
//打开Tomcat源码,全局搜索ServerSocket
private int port = 8080;
private Map<String, MyServlet> servletMapping = new HashMap<>();
private Properties webxml = new Properties();
private TomcatHandler tomcatHandler;
private void init(){
try{
String WEB_INF = this.getClass().getResource("/").getPath();
FileInputStream fis = new FileInputStream(WEB_INF + "web.xml");
SAXReader saxReader = new SAXReader();
Document document = saxReader.read(fis);
Element root = document.getRootElement();
List<Element> servlets = root.elements("servlet");
Map<String, String> servletClassMap = servlets.stream().collect(Collectors.toMap(element -> element.element("servlet-name").getText(), element -> element.element("servlet-class").getText()));
List<Element> servletMappings = root.elements("servlet-mapping");
servletMapping = servletMappings.stream().collect(Collectors.toMap(element -> element.element("url-pattern").getText(), element -> {
String classPath = servletClassMap.get(element.element("servlet-name").getText());
MyServlet obj = null;
try {
obj = (MyServlet)Class.forName(classPath).newInstance();
} catch (InstantiationException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
return obj;
}));
tomcatHandler = new TomcatHandler(servletMapping);
}catch(Exception e){
e.printStackTrace();
}
}
public void start(){
init();
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap server = new ServerBootstrap();
server.group(bossGroup, workerGroup)
// 主线程处理类,看到这样的写法,底层就是用反射
.channel(NioServerSocketChannel.class)
// 子线程处理类 , Handler
.childHandler(new ChannelInitializer<SocketChannel>() {
// 客户端初始化处理
protected void initChannel(SocketChannel client) throws Exception {
// 无锁化串行编程
//Netty对HTTP协议的封装,顺序有要求
// HttpResponseEncoder 编码器
// 责任链模式,双向链表Inbound OutBound
client.pipeline().addLast(new HttpResponseEncoder());
// HttpRequestDecoder 解码器
client.pipeline().addLast(new HttpRequestDecoder());
// 业务逻辑处理
client.pipeline().addLast(new TomcatHandler(servletMapping));
}
})
// 针对主线程的配置 分配线程最大数量 128
.option(ChannelOption.SO_BACKLOG, 128)
// 针对子线程的配置 保持长连接
.childOption(ChannelOption.SO_KEEPALIVE, true);
//3、启动服务器
ChannelFuture f = server.bind(port).sync();
System.out.println("My Tomcat 已启动,监听的端口是:" + port);
f.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
}finally {
// 关闭线程池
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
new MyTomcatServer().start();
}
}
package com.qupeng.java.demo.netty4.servlet.handler;
import com.qupeng.java.demo.netty4.servlet.service.MyServlet;
import com.qupeng.java.demo.netty4.servlet.http.HttpServletRequest;
import com.qupeng.java.demo.netty4.servlet.http.HttpServletResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.HttpRequest;
import java.util.Map;
public class TomcatHandler extends ChannelInboundHandlerAdapter {
private Map<String, MyServlet> servletMapping;
public TomcatHandler(Map<String, MyServlet> servletMapping) {
this.servletMapping = servletMapping;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpRequest){
HttpRequest req = (HttpRequest) msg;
String uri = req.uri();
HttpServletRequest servletRequest = new HttpServletRequest(req);
HttpServletResponse servletResponse = new HttpServletResponse(ctx);
if(servletMapping.containsKey(uri)){
servletMapping.get(uri).service(servletRequest, servletResponse);
}else{
servletResponse.setStatus(404);
servletResponse.setContentType("text/html");
servletResponse.getPrintWriter().write("404 - Not Found");
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
}
}
Servlet API:
package com.qupeng.java.demo.netty4.servlet.service;
import com.qupeng.java.demo.netty4.servlet.http.HttpServletRequest;
import com.qupeng.java.demo.netty4.servlet.http.HttpServletResponse;
public abstract class MyServlet {
public void service(HttpServletRequest request, HttpServletResponse response) throws Exception{
if("GET".equalsIgnoreCase(request.getMethod())){
doGet(request, response);
}else{
doPost(request, response);
}
}
public abstract void doGet(HttpServletRequest request, HttpServletResponse response) throws Exception;
public abstract void doPost(HttpServletRequest request, HttpServletResponse response) throws Exception;
}
package com.qupeng.java.demo.netty4.servlet.service;
import com.qupeng.java.demo.netty4.servlet.http.HttpServletRequest;
import com.qupeng.java.demo.netty4.servlet.http.HttpServletResponse;
public class HelloWorldServlet extends MyServlet {
@Override
public void doGet(HttpServletRequest request, HttpServletResponse response) throws Exception {
this.doPost(request, response);
}
@Override
public void doPost(HttpServletRequest request, HttpServletResponse response) throws Exception {
response.getPrintWriter().write("Hello world!");
}
}
package com.qupeng.java.demo.netty4.servlet.http;
import io.netty.handler.codec.http.HttpRequest;
public class HttpServletRequest {
private HttpRequest req;
public HttpServletRequest(HttpRequest req) {
this.req = req;
}
public String getUrl() {
return req.uri();
}
public String getMethod() {
return req.method().name();
}
}
package com.qupeng.java.demo.netty4.servlet.http;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
public class HttpServletResponse {
private PrintWriter printWriter;
private FullHttpResponse httpResponse;
public HttpServletResponse(ChannelHandlerContext ctx) {
this.printWriter = new PrintWriter(ctx, this);
httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
}
public PrintWriter getPrintWriter() {
return printWriter;
}
public void setStatus(int sc) {
httpResponse.setStatus(HttpResponseStatus.valueOf(sc));
}
public void setHeader(String name,String value) {
httpResponse.headers().set(name, value);
}
public void setContentType(String type) {
httpResponse.headers().set("Content-Type", type);
}
public FullHttpResponse getHttpResponse() {
return httpResponse;
}
}
package com.qupeng.java.demo.netty4.servlet.http;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
public class PrintWriter {
private ChannelHandlerContext ctx;
private HttpServletResponse servletResponse;
public PrintWriter(ChannelHandlerContext ctx, HttpServletResponse servletResponse) {
this.ctx = ctx;
this.servletResponse = servletResponse;
}
public void write(String str) { ;
servletResponse.setContentType("text/html");
ctx.write(servletResponse.getHttpResponse().replace(Unpooled.wrappedBuffer(str.getBytes())));
ctx.flush();
ctx.close();
}
}
web.xml文件:
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee
http://xmlns.jcp.org/xml/ns/javaee/web-app_4_0.xsd"
version="4.0"
metadata-complete="true">
<servlet>
<servlet-name>hello-world</servlet-name>
<servlet-class>com.qupeng.java.demo.netty4.servlet.service.HelloWorldServlet</servlet-class>
</servlet>
<servlet-mapping>
<servlet-name>hello-world</servlet-name>
<url-pattern>/hello-world</url-pattern>
</servlet-mapping>
</web-app>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.qupeng.java.demos</groupId>
<artifactId>netty4</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.52.Final</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.10</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>org.dom4j</groupId>
<artifactId>dom4j</artifactId>
<version>2.0.0</version>
</dependency>
</dependencies>
</project>
3.3 用Netty实现RPC(远程过程调用)
这个例子来自《Netty4核心原理与手写RCP框架实战》一书,实现了客户端远程调用服务端的数值计算功能。
服务端:
package com.qupeng.java.demo.netty4.rpc.registry;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
public class RpcRegistry {
private int port;
public RpcRegistry(int port){
this.port = port;
}
public void start(){
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//自定义协议解码器
/** 入参有5个,分别解释如下
maxFrameLength:框架的最大长度。如果帧的长度大于此值,则将抛出TooLongFrameException。
lengthFieldOffset:长度字段的偏移量:即对应的长度字段在整个消息数据中得位置
lengthFieldLength:长度字段的长度。如:长度字段是int型表示,那么这个值就是4(long型就是8)
lengthAdjustment:要添加到长度字段值的补偿值
initialBytesToStrip:从解码帧中去除的第一个字节数
*/
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
//自定义协议编码器
pipeline.addLast(new LengthFieldPrepender(4));
//对象参数类型编码器
pipeline.addLast("encoder",new ObjectEncoder());
//对象参数类型解码器
pipeline.addLast("decoder",new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
pipeline.addLast(new RegistryHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture future = b.bind(port).sync();
System.out.println("GP RPC Registry start listen at " + port );
future.channel().closeFuture().sync();
} catch (Exception e) {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new RpcRegistry(8080).start();
}
}
package com.qupeng.java.demo.netty4.rpc.registry;
import com.qupeng.java.demo.netty4.rpc.protocol.InvokerProtocol;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.io.File;
import java.lang.reflect.Method;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
public class RegistryHandler extends ChannelInboundHandlerAdapter {
//用保存所有可用的服务
public static ConcurrentHashMap<String, Object> registryMap = new ConcurrentHashMap<String,Object>();
//保存所有相关的服务类
private List<String> classNames = new ArrayList<String>();
public RegistryHandler(){
//完成递归扫描
scannerClass("com.qupeng.java.demo.netty4.rpc.provider");
doRegister();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Object result = new Object();
InvokerProtocol request = (InvokerProtocol)msg;
//当客户端建立连接时,需要从自定义协议中获取信息,拿到具体的服务和实参
//使用反射调用
if(registryMap.containsKey(request.getClassName())){
Object clazz = registryMap.get(request.getClassName());
Method method = clazz.getClass().getMethod(request.getMethodName(), request.getParames());
result = method.invoke(clazz, request.getValues());
}
ctx.write(result);
ctx.flush();
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
/*
* 递归扫描
*/
private void scannerClass(String packageName){
URL url = this.getClass().getClassLoader().getResource(packageName.replaceAll("\\.", "/"));
File dir = new File(url.getFile());
for (File file : dir.listFiles()) {
//如果是一个文件夹,继续递归
if(file.isDirectory()){
scannerClass(packageName + "." + file.getName());
}else{
classNames.add(packageName + "." + file.getName().replace(".class", "").trim());
}
}
}
/**
* 完成注册
*/
private void doRegister(){
if(classNames.size() == 0){ return; }
for (String className : classNames) {
try {
Class<?> clazz = Class.forName(className);
Class<?> i = clazz.getInterfaces()[0];
registryMap.put(i.getName(), clazz.newInstance());
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
package com.qupeng.java.demo.netty4.rpc.protocol;
import lombok.Data;
import java.io.Serializable;
/**
* 自定义传输协议
*/
@Data
public class InvokerProtocol implements Serializable {
private String className;//类名
private String methodName;//函数名称
private Class<?>[] parames;//形参列表
private Object[] values;//实参列表
}
远程服务接口定义:
package com.qupeng.java.demo.netty4.rpc.api;
public interface IRpcHelloService {
String hello(String name);
}
package com.qupeng.java.demo.netty4.rpc.api;
public interface IRpcService {
/** 加 */
public int add(int a, int b);
/** 减 */
public int sub(int a, int b);
/** 乘 */
public int mult(int a, int b);
/** 除 */
public int div(int a, int b);
}
package com.qupeng.java.demo.netty4.rpc.provider;
import com.qupeng.java.demo.netty4.rpc.api.IRpcHelloService;
public class RpcHelloServiceImpl implements IRpcHelloService {
public String hello(String name) {
return "Hello " + name + "!";
}
}
package com.qupeng.java.demo.netty4.rpc.provider;
import com.qupeng.java.demo.netty4.rpc.api.IRpcService;
public class RpcServiceImpl implements IRpcService {
public int add(int a, int b) {
return a + b;
}
public int sub(int a, int b) {
return a - b;
}
public int mult(int a, int b) {
return a * b;
}
public int div(int a, int b) {
return a / b;
}
}
客户端实现:
package com.qupeng.java.demo.netty4.rpc.consumer.proxy;
import com.qupeng.java.demo.netty4.rpc.protocol.InvokerProtocol;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
public class RpcProxy {
public static <T> T create(Class<?> clazz){
//clazz传进来本身就是interface
MethodProxy proxy = new MethodProxy(clazz);
Class<?> [] interfaces = clazz.isInterface() ?
new Class[]{clazz} :
clazz.getInterfaces();
T result = (T) Proxy.newProxyInstance(clazz.getClassLoader(),interfaces,proxy);
return result;
}
private static class MethodProxy implements InvocationHandler {
private Class<?> clazz;
public MethodProxy(Class<?> clazz){
this.clazz = clazz;
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//如果传进来是一个已实现的具体类(本次演示略过此逻辑)
if (Object.class.equals(method.getDeclaringClass())) {
try {
return method.invoke(this, args);
} catch (Throwable t) {
t.printStackTrace();
}
//如果传进来的是一个接口(核心)
} else {
return rpcInvoke(proxy,method, args);
}
return null;
}
/**
* 实现接口的核心方法
* @param method
* @param args
* @return
*/
public Object rpcInvoke(Object proxy,Method method,Object[] args){
//传输协议封装
InvokerProtocol msg = new InvokerProtocol();
msg.setClassName(this.clazz.getName());
msg.setMethodName(method.getName());
msg.setValues(args);
msg.setParames(method.getParameterTypes());
final RpcProxyHandler consumerHandler = new RpcProxyHandler();
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//自定义协议解码器
/** 入参有5个,分别解释如下
maxFrameLength:框架的最大长度。如果帧的长度大于此值,则将抛出TooLongFrameException。
lengthFieldOffset:长度字段的偏移量:即对应的长度字段在整个消息数据中得位置
lengthFieldLength:长度字段的长度:如:长度字段是int型表示,那么这个值就是4(long型就是8)
lengthAdjustment:要添加到长度字段值的补偿值
initialBytesToStrip:从解码帧中去除的第一个字节数
*/
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
//自定义协议编码器
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
//对象参数类型编码器
pipeline.addLast("encoder", new ObjectEncoder());
//对象参数类型解码器
pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
pipeline.addLast("handler",consumerHandler);
}
});
ChannelFuture future = b.connect("localhost", 8080).sync();
future.channel().writeAndFlush(msg).sync();
future.channel().closeFuture().sync();
} catch(Exception e){
e.printStackTrace();
}finally {
group.shutdownGracefully();
}
return consumerHandler.getResponse();
}
}
}
package com.qupeng.java.demo.netty4.rpc.consumer.proxy;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class
RpcProxyHandler extends ChannelInboundHandlerAdapter {
private Object response;
public Object getResponse() {
return response;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
response = msg;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("client exception is general");
}
}
package com.qupeng.java.demo.netty4.rpc.consumer;
import com.qupeng.java.demo.netty4.rpc.api.IRpcHelloService;
import com.qupeng.java.demo.netty4.rpc.api.IRpcService;
import com.qupeng.java.demo.netty4.rpc.consumer.proxy.RpcProxy;
public class RpcConsumer {
public static void main(String [] args){
IRpcHelloService rpcHello = RpcProxy.create(IRpcHelloService.class);
System.out.println(rpcHello.hello("world"));
IRpcService service = RpcProxy.create(IRpcService.class);
System.out.println("8 + 2 = " + service.add(8, 2));
System.out.println("8 - 2 = " + service.sub(8, 2));
System.out.println("8 * 2 = " + service.mult(8, 2));
System.out.println("8 / 2 = " + service.div(8, 2));
}
}
3.4 用Netty实现Websock在线聊天
这个例子实现了一个简单的在线聊天,浏览器与服务端建立websocket链接,可以发送文本消息到服务端,服务端推送消息到所有的客户端。
页面:
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Netty WebSocket Chat Room</title>
<style type="text/css">
.border{
border: 1px solid #73AD21;
}
.container{
height: 700px;
display: flex;
flex-direction: column;
align-items:center;
}
.banner{
background:#73AD21;
height: 28px;
width: 402px;
font-size:12px;
}
.online-msg{
background:#EFEFF4;
font-size:12px;
color:#666;
height: 300px;
width: 380px;
padding: 10px;
}
.tool-box{
background-color:#e7eff8;
height:35px;
width: 402px;
}
.input-text{
font-size:12px;
color:#666;
height: 150px;
width: 380px;
padding: 10px;
}
.input-action{
display: flex;
justify-content:flex-end;
}
</style>
</head>
<br>
<body>
<br>
<script type="text/javascript">
var socket;
if (!window.WebSocket) {
window.WebSocket = window.MozWebSocket;
}
if (window.WebSocket) {
socket = new WebSocket("ws://localhost:8080/websocket");
socket.onmessage = function (event) {
var ta = document.getElementById('online-msg');
ta.append(document.createElement('br'));
ta.append(event.data);
};
socket.onopen = function (event) {
var ta = document.getElementById('online-msg');
ta.append("打开WebSocket服务正常,浏览器支持WebSocket!");
};
socket.onclose = function (event) {
var ta = document.getElementById('responseText');
ta.append("WebSocket 关闭!");
};
}
else {
alert("抱歉,您的浏览器不支持WebSocket协议!");
}
function send() {
if (!window.WebSocket) {
return;
}
if (socket.readyState == WebSocket.OPEN) {
var ta = document.getElementById('send-message');
socket.send(ta.innerText);
ta.innerText = '';
}
else {
alert("WebSocket连接没有建立成功!");
}
}
</script>
<div class="container">
<div class="banner">
<div style="line-height: 28px;color:#fff;">
<span style="text-align:left;margin-left:10px;">Netty Websocket聊天室</span>
</div>
</div>
<div class="online-msg border" id="online-msg">
</div>
<form onsubmit="return false;">
<div class="tool-box">
</div>
<div class="input-text border" contenteditable="true" id="send-message">
</div>
<div class="input-action">
<input class="button" type="button" id="mjr_send" onclick="send()" value="发送"/>
</div>
</form>
</div>
</body>
</html>
服务端:
package com.qupeng.java.demo.netty4.websocket.simple.chat;
import com.qupeng.java.demo.netty4.websocket.simple.chat.handler.HttpServerHandler;
import com.qupeng.java.demo.netty4.websocket.simple.chat.handler.WebSocketServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class SimpleWebSocketChatServer {
public static void main(String[] args) throws Exception {
int port = 8080;
if (args.length > 0) {
try {
port = Integer.parseInt(args[0]);
} catch (NumberFormatException e) {
e.printStackTrace();
}
}
new SimpleWebSocketChatServer().run(port);
}
public void run(int port) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch)
throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("http-codec", new HttpServerCodec());
pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
pipeline.addLast("http-chunked", new ChunkedWriteHandler());
pipeline.addLast("http-server-handler", new HttpServerHandler());
pipeline.addLast("websocket-server-handler", new WebSocketServerHandler());
}
});
Channel ch = b.bind(port).sync().channel();
System.out.println("Web socket server started at port " + port
+ '.');
System.out.println("Open your browser and navigate to http://localhost:"+ port + '/');
ch.closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
package com.qupeng.java.demo.netty4.websocket.simple.chat.handler;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import lombok.extern.slf4j.Slf4j;
import java.io.File;
import java.io.RandomAccessFile;
import java.net.URL;
@Slf4j
public class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private final String webroot = "webroot";
//获取class路径
private URL baseURL = HttpServerHandler.class.getResource("");
private WebSocketServerHandshaker handshaker;
@Override
public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
if (handleWebsocketConnectionRequest(ctx, request)) {
return;
}
String uri = request.uri();
RandomAccessFile file = null;
try {
String page = uri.equals("/") ? "simple-chat.html" : uri;
file = new RandomAccessFile(getResource(page), "r");
} catch (Exception e) {
ctx.fireChannelRead(request.retain());
return;
}
HttpResponse response = new DefaultHttpResponse(request.protocolVersion(), HttpResponseStatus.OK);
String contextType = "text/html;";
response.headers().set(HttpHeaderNames.CONTENT_TYPE, contextType + "charset=utf-8;");
boolean keepAlive = HttpUtil.isKeepAlive(request);
if (keepAlive) {
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, file.length());
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
ctx.write(response);
ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length()));
ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
if (!keepAlive) {
future.addListener(ChannelFutureListener.CLOSE);
}
file.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Channel client = ctx.channel();
log.info("Client:" + client.remoteAddress() + "异常");
// 当出现异常就关闭连接
cause.printStackTrace();
ctx.close();
}
private boolean handleWebsocketConnectionRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception {
// 如果HTTP解码失败,返回HHTP异常
if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {
return false;
}
// 构造握手响应返回,本机测试
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
"ws://localhost:8080/websocket", null, false);
handshaker = wsFactory.newHandshaker(req);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
handshaker.handshake(ctx.channel(), req);
}
return true;
}
private File getResource(String fileName) throws Exception {
String basePath = baseURL.toURI().toString();
int start = basePath.indexOf("classes/");
basePath = (basePath.substring(0, start) + "/" + "classes/").replaceAll("/+", "/");
String path = basePath + webroot + "/" + fileName;
log.info("BaseURL:" + basePath);
path = !path.contains("file:") ? path : path.substring(5);
path = path.replaceAll("//", "/");
return new File(path);
}
}
package com.qupeng.java.demo.netty4.websocket.simple.chat.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.ChannelGroupException;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class WebSocketServerHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
private static ChannelGroup onlineUsers = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private WebSocketServerHandshaker handshaker;
@Override
public void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) throws Exception {
handleWebSocketFrame(ctx, msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
private void handleWebSocketFrame(ChannelHandlerContext ctx,
WebSocketFrame frame) {
// 判断是否是关闭链路的指令
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(),
(CloseWebSocketFrame) frame.retain());
return;
}
// 判断是否是Ping消息
if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(
new PongWebSocketFrame(frame.content().retain()));
return;
}
// 本例程仅支持文本消息,不支持二进制消息
if (!(frame instanceof TextWebSocketFrame)) {
throw new UnsupportedOperationException(String.format(
"%s frame types not supported", frame.getClass().getName()));
}
onlineUsers.add(ctx.channel());
try {
String request = ((TextWebSocketFrame) frame).text();
onlineUsers.writeAndFlush(new TextWebSocketFrame(new java.util.Date().toString() + " " + request));
} catch (ChannelGroupException ex) {
log.error("推送组播消息失败。", ex);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.qupeng.java.demos</groupId>
<artifactId>netty4</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.52.Final</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.10</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
</dependencies>
</project>
3.5 用Netty实现更好的在线聊天
这个例子改编自《Netty4核心原理与手写RCP框架实战》一书,实现了一个完整的在线聊天功能,除了浏览器聊天,还支持在控制台推送系统消息给所有浏览器啊,我在原有基础上添加了退出聊天室的功能。
服务端:
package com.qupeng.java.demo.netty4.websocket.chat.server;
import com.qupeng.java.demo.netty4.websocket.chat.protocol.IMDecoder;
import com.qupeng.java.demo.netty4.websocket.chat.protocol.IMEncoder;
import com.qupeng.java.demo.netty4.websocket.chat.server.handler.HttpServerHandler;
import com.qupeng.java.demo.netty4.websocket.chat.server.handler.TerminalServerHandler;
import com.qupeng.java.demo.netty4.websocket.chat.server.handler.WebSocketServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class WebSocketChatServer {
public static void main(String[] args) throws Exception {
int port = 8080;
if (args.length > 0) {
try {
port = Integer.parseInt(args[0]);
} catch (NumberFormatException e) {
e.printStackTrace();
}
}
new WebSocketChatServer().run(port);
}
public void run(int port) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch)
throws Exception {
ChannelPipeline pipeline = ch.pipeline();
/** 解析自定义协议 */
pipeline.addLast(new IMDecoder()); //Inbound
pipeline.addLast(new IMEncoder()); //Outbound
pipeline.addLast(new TerminalServerHandler()); //Inbound
pipeline.addLast("http-codec", new HttpServerCodec());
pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
pipeline.addLast("http-chunked", new ChunkedWriteHandler());
pipeline.addLast("http-server-handler", new HttpServerHandler());
pipeline.addLast(new WebSocketServerProtocolHandler("/im"));
pipeline.addLast("websocket-server-handler", new WebSocketServerHandler());
}
});
Channel ch = b.bind(port).sync().channel();
System.out.println("Web socket server started at port " + port
+ '.');
System.out.println("Open your browser and navigate to http://localhost:"+ port + '/');
ch.closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
package com.qupeng.java.demo.netty4.websocket.chat.server.handler;
import com.qupeng.java.demo.netty4.websocket.chat.protocol.IMMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TerminalServerHandler extends SimpleChannelInboundHandler<IMMessage> {
private MsgProcessor processor = new MsgProcessor();
@Override
protected void channelRead0(ChannelHandlerContext ctx, IMMessage msg) throws Exception {
processor.sendMsg(ctx.channel(), msg);
}
/**
* 异常处理
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.info("Socket Client: 与客户端断开连接:" + cause.getMessage());
cause.printStackTrace();
ctx.close();
}
}
package com.qupeng.java.demo.netty4.websocket.chat.server.handler;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import lombok.extern.slf4j.Slf4j;
import java.io.File;
import java.io.RandomAccessFile;
import java.net.URL;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j
public class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private final String webroot = "webroot";
//获取class路径
private URL baseURL = HttpServerHandler.class.getResource("");
private MsgProcessor processor = new MsgProcessor();
@Override
public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
String uri = request.uri();
if (uri.startsWith("/logout")) {
String nickName = getGetParamsFromChannel(request).get("nickname").toString();
processor.logout(ctx.channel(), nickName);
String data = "logout";
ByteBuf buf = Unpooled.wrappedBuffer(data.getBytes());
FullHttpResponse response = new DefaultFullHttpResponse(request.protocolVersion(), HttpResponseStatus.OK, buf);
if(buf != null){
response.headers().set("Content-Type","text/plain;charset=UTF-8");
response.headers().set("Content-Length",response.content().readableBytes());
}
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
return;
}
RandomAccessFile file = null;
try {
String page = uri.equals("/") ? "chat.html" : uri;
file = new RandomAccessFile(getResource(page), "r");
} catch (Exception e) {
ctx.fireChannelRead(request.retain());
return;
}
HttpResponse response = new DefaultHttpResponse(request.protocolVersion(), HttpResponseStatus.OK);
String contextType = "text/html;";
if (uri.endsWith(".css")) {
contextType = "text/css;";
} else if (uri.endsWith(".js")) {
contextType = "text/javascript;";
} else if (uri.toLowerCase().matches(".*\\.(jpg|png|gif)$")) {
String ext = uri.substring(uri.lastIndexOf("."));
contextType = "image/" + ext;
}
response.headers().set(HttpHeaderNames.CONTENT_TYPE, contextType + "charset=utf-8;");
boolean keepAlive = HttpUtil.isKeepAlive(request);
if (keepAlive) {
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, file.length());
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
ctx.write(response);
ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length()));
ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
if (!keepAlive) {
future.addListener(ChannelFutureListener.CLOSE);
}
file.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Channel client = ctx.channel();
log.info("Client:" + client.remoteAddress() + "异常");
// 当出现异常就关闭连接
cause.printStackTrace();
ctx.close();
}
private File getResource(String fileName) throws Exception {
String basePath = baseURL.toURI().toString();
int start = basePath.indexOf("classes/");
basePath = (basePath.substring(0, start) + "/" + "classes/").replaceAll("/+", "/");
String path = basePath + webroot + "/" + fileName;
log.info("BaseURL:" + basePath);
path = !path.contains("file:") ? path : path.substring(5);
path = path.replaceAll("//", "/");
return new File(path);
}
private Map<String, Object> getGetParamsFromChannel(FullHttpRequest fullHttpRequest) {
Map<String, Object> params = new HashMap<>();
if(fullHttpRequest.method() == HttpMethod.GET){
QueryStringDecoder decoder = new QueryStringDecoder(fullHttpRequest.uri());
Map<String, List<String>> paramList = decoder.parameters();
for(Map.Entry<String, List<String>> entry : paramList.entrySet()){
params.put(entry.getKey(),entry.getValue().get(0));
}
return params;
}
return params;
}
}
package com.qupeng.java.demo.netty4.websocket.chat.server.handler;
import lombok.extern.slf4j.Slf4j;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
@Slf4j
public class WebSocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private MsgProcessor processor = new MsgProcessor();
@Override
protected void channelRead0(ChannelHandlerContext ctx,TextWebSocketFrame msg) throws Exception {
processor.sendMsg(ctx.channel(), msg.text());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
Channel client = ctx.channel();
String addr = processor.getAddress(client);
log.info("WebSocket Client:" + addr + "异常");
// 当出现异常就关闭连接
cause.printStackTrace();
ctx.close();
}
}
package com.qupeng.java.demo.netty4.websocket.chat.server.handler;
import com.alibaba.fastjson.JSONObject;
import com.qupeng.java.demo.netty4.websocket.chat.protocol.IMDecoder;
import com.qupeng.java.demo.netty4.websocket.chat.protocol.IMEncoder;
import com.qupeng.java.demo.netty4.websocket.chat.protocol.IMMessage;
import com.qupeng.java.demo.netty4.websocket.chat.protocol.IMP;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.GlobalEventExecutor;
/**
* 主要用于自定义协议内容的逻辑处理
*
*/
public class MsgProcessor {
//记录在线用户
private static ChannelGroup onlineUsers = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
//定义一些扩展属性
public static final AttributeKey<String> NICK_NAME = AttributeKey.valueOf("nickName");
public static final AttributeKey<String> IP_ADDR = AttributeKey.valueOf("ipAddr");
public static final AttributeKey<JSONObject> ATTRS = AttributeKey.valueOf("attrs");
public static final AttributeKey<String> FROM = AttributeKey.valueOf("from");
//自定义解码器
private IMDecoder decoder = new IMDecoder();
//自定义编码器
private IMEncoder encoder = new IMEncoder();
/**
* 获取用户昵称
* @param client
* @return
*/
public String getNickName(Channel client){
return client.attr(NICK_NAME).get();
}
/**
* 获取用户远程IP地址
* @param client
* @return
*/
public String getAddress(Channel client){
return client.remoteAddress().toString().replaceFirst("/","");
}
/**
* 获取扩展属性
* @param client
* @return
*/
public JSONObject getAttrs(Channel client){
try{
return client.attr(ATTRS).get();
}catch(Exception e){
return null;
}
}
/**
* 获取扩展属性
* @param client
* @return
*/
private void setAttrs(Channel client, String key, Object value){
try{
JSONObject json = client.attr(ATTRS).get();
json.put(key, value);
client.attr(ATTRS).set(json);
}catch(Exception e){
JSONObject json = new JSONObject();
json.put(key, value);
client.attr(ATTRS).set(json);
}
}
/**
* 登出通知
* @param client
*/
public void logout(Channel client, String nickname){
//如果nickName为null,没有遵从聊天协议的连接,表示未非法登录
// if(getNickName(client) == null){ return; }
for (Channel channel : onlineUsers) {
IMMessage request = new IMMessage(IMP.SYSTEM.getName(), sysTime(), onlineUsers.size(), nickname + "离开");
String content = encoder.encode(request);
channel.writeAndFlush(new TextWebSocketFrame(content));
}
onlineUsers.remove(client);
}
/**
* 发送消息
* @param client
* @param msg
*/
public void sendMsg(Channel client, IMMessage msg){
sendMsg(client,encoder.encode(msg));
}
/**
* 发送消息
* @param client
* @param msg
*/
public void sendMsg(Channel client, String msg){
IMMessage request = decoder.decode(msg);
if(null == request){ return; }
String addr = getAddress(client);
if(request.getCmd().equals(IMP.LOGIN.getName())){
client.attr(NICK_NAME).getAndSet(request.getSender());
client.attr(IP_ADDR).getAndSet(addr);
client.attr(FROM).getAndSet(request.getTerminal());
// System.out.println(client.attr(FROM).get());
onlineUsers.add(client);
for (Channel channel : onlineUsers) {
boolean isself = (channel == client);
if(!isself){
request = new IMMessage(IMP.SYSTEM.getName(), sysTime(), onlineUsers.size(), getNickName(client) + "加入");
}else{
request = new IMMessage(IMP.SYSTEM.getName(), sysTime(), onlineUsers.size(), "已与服务器建立连接!");
}
if("Console".equals(channel.attr(FROM).get())){
channel.writeAndFlush(request);
continue;
}
String content = encoder.encode(request);
channel.writeAndFlush(new TextWebSocketFrame(content));
}
}else if(request.getCmd().equals(IMP.CHAT.getName())){
for (Channel channel : onlineUsers) {
boolean isself = (channel == client);
if (isself) {
request.setSender("you");
}else{
request.setSender(getNickName(client));
}
request.setTime(sysTime());
if("Console".equals(channel.attr(FROM).get()) & !isself){
channel.writeAndFlush(request);
continue;
}
String content = encoder.encode(request);
channel.writeAndFlush(new TextWebSocketFrame(content));
}
}else if(request.getCmd().equals(IMP.FLOWER.getName())){
JSONObject attrs = getAttrs(client);
long currTime = sysTime();
if(null != attrs){
long lastTime = attrs.getLongValue("lastFlowerTime");
//60秒之内不允许重复刷鲜花
int secends = 10;
long sub = currTime - lastTime;
if(sub < 1000 * secends){
request.setSender("you");
request.setCmd(IMP.SYSTEM.getName());
request.setContent("您送鲜花太频繁," + (secends - Math.round(sub / 1000)) + "秒后再试");
String content = encoder.encode(request);
client.writeAndFlush(new TextWebSocketFrame(content));
return;
}
}
//正常送花
for (Channel channel : onlineUsers) {
if (channel == client) {
request.setSender("you");
request.setContent("你给大家送了一波鲜花雨");
setAttrs(client, "lastFlowerTime", currTime);
}else{
request.setSender(getNickName(client));
request.setContent(getNickName(client) + "送来一波鲜花雨");
}
request.setTime(sysTime());
String content = encoder.encode(request);
channel.writeAndFlush(new TextWebSocketFrame(content));
}
}
}
/**
* 获取系统时间
* @return
*/
private Long sysTime(){
return System.currentTimeMillis();
}
}
控制台客户端:
package com.qupeng.java.demo.netty4.websocket.chat.client;
import com.qupeng.java.demo.netty4.websocket.chat.client.handler.ChatClientHandler;
import com.qupeng.java.demo.netty4.websocket.chat.protocol.IMDecoder;
import com.qupeng.java.demo.netty4.websocket.chat.protocol.IMEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.IOException;
/**
* 客户端
* @author Tom
*
*/
public class ChatClient {
private ChatClientHandler clientHandler;
private String host;
private int port;
public ChatClient(String nickName){
this.clientHandler = new ChatClientHandler(nickName);
}
public void connect(String host,int port){
this.host = host;
this.port = port;
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new IMDecoder());
ch.pipeline().addLast(new IMEncoder());
ch.pipeline().addLast(clientHandler);
}
});
ChannelFuture f = b.connect(this.host, this.port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws IOException{
new ChatClient("Cover").connect("127.0.0.1",8080);
String url = "http://localhost:8080/images/a.png";
System.out.println(url.toLowerCase().matches(".*\\.(gif|png|jpg)$"));
}
}
package com.qupeng.java.demo.netty4.websocket.chat.client.handler;
import com.qupeng.java.demo.netty4.websocket.chat.protocol.IMMessage;
import com.qupeng.java.demo.netty4.websocket.chat.protocol.IMP;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.Scanner;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* 聊天客户端逻辑实现
* @author Tom
*
*/
@Slf4j
public class ChatClientHandler extends SimpleChannelInboundHandler<IMMessage> {
private ChannelHandlerContext ctx;
private String nickName;
public ChatClientHandler(String nickName){
this.nickName = nickName;
}
/**启动客户端控制台*/
private void session() throws IOException {
new Thread(){
public void run(){
System.out.println(nickName + ",你好,请在控制台输入对话内容");
IMMessage message = null;
Scanner scanner = new Scanner(System.in);
do{
if(scanner.hasNext()){
String input = scanner.nextLine();
if("exit".equals(input)){
message = new IMMessage(IMP.LOGOUT.getName(),"Console",System.currentTimeMillis(),nickName);
}else{
message = new IMMessage(IMP.CHAT.getName(),System.currentTimeMillis(),nickName,input);
}
}
}
while (sendMsg(message));
scanner.close();
}
}.start();
}
/**
* tcp链路建立成功后调用
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
IMMessage message = new IMMessage(IMP.LOGIN.getName(),"Console",System.currentTimeMillis(),this.nickName);
sendMsg(message);
log.info("成功连接服务器,已执行登录动作");
session();
}
/**
* 发送消息
* @param msg
* @return
* @throws IOException
*/
private boolean sendMsg(IMMessage msg){
ctx.channel().writeAndFlush(msg);
System.out.println("继续输入开始对话...");
return msg.getCmd().equals(IMP.LOGOUT) ? false : true;
}
/**
* 收到消息后调用
* @throws IOException
*/
@Override
public void channelRead0(ChannelHandlerContext ctx, IMMessage msg) throws IOException {
IMMessage m = (IMMessage)msg;
System.out.println((null == m.getSender() ? "" : (m.getSender() + ":")) + removeHtmlTag(m.getContent()));
}
public static String removeHtmlTag(String htmlStr){
String regEx_script="<script[^>]*?>[\\s\\S]*?<\\/script>"; //定义script的正则表达式
String regEx_style="<style[^>]*?>[\\s\\S]*?<\\/style>"; //定义style的正则表达式
String regEx_html="<[^>]+>"; //定义HTML标签的正则表达式
Pattern p_script=Pattern.compile(regEx_script,Pattern.CASE_INSENSITIVE);
Matcher m_script=p_script.matcher(htmlStr);
htmlStr=m_script.replaceAll(""); //过滤script标签
Pattern p_style=Pattern.compile(regEx_style,Pattern.CASE_INSENSITIVE);
Matcher m_style=p_style.matcher(htmlStr);
htmlStr=m_style.replaceAll(""); //过滤style标签
Pattern p_html=Pattern.compile(regEx_html,Pattern.CASE_INSENSITIVE);
Matcher m_html=p_html.matcher(htmlStr);
htmlStr=m_html.replaceAll(""); //过滤html标签
return htmlStr.trim(); //返回文本字符串
}
/**
* 发生异常时调用
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.info("与服务器断开连接:"+cause.getMessage());
ctx.close();
}
}
package com.qupeng.java.demo.netty4.websocket.chat.protocol;
/**
* 自定义IM协议,Instant Messaging Protocol即时通信协议
*
*/
public enum IMP {
/** 系统消息 */
SYSTEM("SYSTEM"),
/** 登录指令 */
LOGIN("LOGIN"),
/** 登出指令 */
LOGOUT("LOGOUT"),
/** 聊天消息 */
CHAT("CHAT"),
/** 送鲜花 */
FLOWER("FLOWER");
private String name;
public static boolean isIMP(String content){
return content.matches("^\\[(SYSTEM|LOGIN|LOGOUT|CHAT)\\]");
}
IMP(String name){
this.name = name;
}
public String getName(){
return this.name;
}
public String toString(){
return this.name;
}
}
package com.qupeng.java.demo.netty4.websocket.chat.protocol;
import lombok.Data;
import org.msgpack.annotation.Message;
/**
* 自定义消息实体类
*
*/
@Message
@Data
public class IMMessage{
private String addr; //IP地址及端口
private String cmd; //命令类型[LOGIN]或者[SYSTEM]或者[LOGOUT]
private long time; //命令发送时间
private int online; //当前在线人数
private String sender; //发送人
private String receiver; //接收人
private String content; //消息内容
private String terminal; //终端
public IMMessage(){}
public IMMessage(String cmd,long time,int online,String content){
this.cmd = cmd;
this.time = time;
this.online = online;
this.content = content;
this.terminal = terminal;
}
public IMMessage(String cmd,String terminal,long time,String sender){
this.cmd = cmd;
this.time = time;
this.sender = sender;
this.terminal = terminal;
}
public IMMessage(String cmd,long time,String sender,String content){
this.cmd = cmd;
this.time = time;
this.sender = sender;
this.content = content;
this.terminal = terminal;
}
@Override
public String toString() {
return "IMMessage{" +
"addr='" + addr + '\'' +
", cmd='" + cmd + '\'' +
", time=" + time +
", online=" + online +
", sender='" + sender + '\'' +
", receiver='" + receiver + '\'' +
", content='" + content + '\'' +
'}';
}
}
package com.qupeng.java.demo.netty4.websocket.chat.protocol;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.msgpack.MessagePack;
import org.msgpack.MessageTypeException;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* 自定义IM协议的编码器
*/
public class IMDecoder extends ByteToMessageDecoder {
//解析IM写一下请求内容的正则
private Pattern pattern = Pattern.compile("^\\[(.*)\\](\\s\\-\\s(.*))?");
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
try{
//先获取可读字节数
final int length = in.readableBytes();
final byte[] array = new byte[length];
String content = new String(array,in.readerIndex(),length);
//空消息不解析
if(!(null == content || "".equals(content.trim()))){
if(!IMP.isIMP(content)){
ctx.channel().pipeline().remove(this);
return;
}
}
in.getBytes(in.readerIndex(), array, 0, length);
out.add(new MessagePack().read(array,IMMessage.class));
in.clear();
}catch(MessageTypeException e){
ctx.channel().pipeline().remove(this);
}
}
/**
* 字符串解析成自定义即时通信协议
* @param msg
* @return
*/
public IMMessage decode(String msg){
if(null == msg || "".equals(msg.trim())){ return null; }
try{
Matcher m = pattern.matcher(msg);
String header = "";
String content = "";
if(m.matches()){
header = m.group(1);
content = m.group(3);
}
String [] heards = header.split("\\]\\[");
long time = 0;
try{ time = Long.parseLong(heards[1]); } catch(Exception e){}
String nickName = heards[2];
//昵称最多十个字
nickName = nickName.length() < 10 ? nickName : nickName.substring(0, 9);
if(msg.startsWith("[" + IMP.LOGIN.getName() + "]")){
return new IMMessage(heards[0],heards[3],time,nickName);
}else if(msg.startsWith("[" + IMP.CHAT.getName() + "]")){
return new IMMessage(heards[0],time,nickName,content);
}else if(msg.startsWith("[" + IMP.FLOWER.getName() + "]")){
return new IMMessage(heards[0],heards[3],time,nickName);
}else{
return null;
}
}catch(Exception e){
e.printStackTrace();
return null;
}
}
}
package com.qupeng.java.demo.netty4.websocket.chat.protocol;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.msgpack.MessagePack;
/**
* 自定义IM协议的编码器
*/
public class IMEncoder extends MessageToByteEncoder<IMMessage> {
@Override
protected void encode(ChannelHandlerContext ctx, IMMessage msg, ByteBuf out)
throws Exception {
out.writeBytes(new MessagePack().write(msg));
}
public String encode(IMMessage msg){
if(null == msg){ return ""; }
String prex = "[" + msg.getCmd() + "]" + "[" + msg.getTime() + "]";
if(IMP.LOGIN.getName().equals(msg.getCmd()) ||
IMP.FLOWER.getName().equals(msg.getCmd())){
prex += ("[" + msg.getSender() + "][" + msg.getTerminal() + "]");
}else if(IMP.CHAT.getName().equals(msg.getCmd())){
prex += ("[" + msg.getSender() + "]");
}else if(IMP.SYSTEM.getName().equals(msg.getCmd())){
prex += ("[" + msg.getOnline() + "]");
}
if(!(null == msg.getContent() || "".equals(msg.getContent()))){
prex += (" - " + msg.getContent());
}
return prex;
}
}
页面:
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta content="width=device-width, initial-scale=1.0, maximum-scale=1.0, minimum-scale=1.0, user-scalable=0" name="viewport">
<title>在线聊天室</title>
<link rel="stylesheet" type="text/css" href="css/style.css" />
<script type="text/javascript" src="/js/lib/jquery.min.js"></script>
<script type="text/javascript" src="/js/lib/jquery.snowfall.js"></script>
<script type="text/javascript" src="/js/chat.util.js"></script>
</head>
<body>
<div id="loginbox">
<div style="width:300px;margin:200px auto;">
欢迎进入WebSocket聊天室
<br/>
<br/>
<input type="text" style="width:180px;" placeholder="进入前,请先输入昵称" id="nickname" name="nickname" />
<input type="button" style="width:50px;" value="进入" onclick="CHAT.login();" />
<div id="error-msg" style="color:red;"></div>
</div>
</div>
<div id="chatbox" style="display: none;">
<div style="background:#3d3d3d;height: 28px; width: 100%;font-size:12px;position: fixed;top: 0px;z-index: 999;">
<div style="line-height: 28px;color:#fff;">
<span style="text-align:left;margin-left:10px;">Netty Websocket聊天室</span>
<span style="float:right; margin-right:10px;">
<span>当前在线<span id="onlinecount">0</span>人</span> |
<span id="shownikcname">匿名</span> |
<a href="javascript:;" onclick="CHAT.logout()" style="color:#fff;">退出</a>
</span>
</div>
</div>
<div id="doc">
<div id="chat">
<div id="message" class="message">
<div id="onlinemsg" style="background:#EFEFF4; font-size:12px; margin-top:40px; margin-left:10px; color:#666;">
</div>
</div>
<form onsubmit="return false;">
<div class="tool-box">
<div class="face-box" id="face-box"></div>
<span class="face" onclick="CHAT.openFace()" title="选择表情"></span>
<!--
<span class="img" id="tool-img-btn" title="发送图片"></span>
<span class="file" id="tool-file-btn" title="上传文件"></span>
-->
<span class="flower" onclick="CHAT.sendFlower()" title="送鲜花"></span>
</div>
<div class="input-box">
<div class="input" contenteditable="true" id="send-message"></div>
<div class="action">
<input class="button" type="button" id="mjr_send" onclick="CHAT.sendText()" value="发送"/>
</div>
</div>
</form>
</div>
</div>
</div>
</body>
</html>
pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.qupeng.java.demos</groupId>
<artifactId>netty4</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.52.Final</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.10</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.4</version>
</dependency>
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
<version>0.6.12</version>
</dependency>
<dependency>
<groupId>org.dom4j</groupId>
<artifactId>dom4j</artifactId>
<version>2.0.0</version>
</dependency>
</dependencies>
</project>