Java I/O(五)NIO应用之Netty

news2025/1/13 3:36:26

Netty

目录

  • Netty
    • 1 Netty概览
    • 2 Netty核心组件
      • 2.1 Bootstrap和ServerBootStrap(启动引导类)
      • 2.2 Channel(网络操作抽象类)
      • 2.3 EventLoop(事件循环)
      • 2.4 EventLoopGroup(事件循环组)
      • 2.7 ChannelHandler(通道处理器)
      • 2.5 ChannelHandlerContext(通道处理器上下文)
      • 2.6 ChannelPipeline(通道流水线)
      • 2.7 Bytebuf(字节缓冲区)
      • 2.8 编码器、解码器
      • 2.9 ChannelFuture(操作执行结果)
    • 3 实例
      • 3.1 客户端-服务器,简单的字符串通信
      • 3.2 用Netty实现Servlet服务器
      • 3.3 用Netty实现RPC(远程过程调用)
      • 3.4 用Netty实现Websock在线聊天
      • 3.5 用Netty实现更好的在线聊天

1 Netty概览

Netty是一个客户端/服务器框架,它在Java的NIO网络组件上提供了一个简化的封装层。这使得它成为创建低级别非阻塞网络应用程序的一个不错的候选。
让我们看看Netty框架的主要亮点:
易用性:Netty比普通Java NIO更易于使用,并且有一套涵盖大多数用例的广泛示例
最小依赖性:我们将在一分钟内看到,您只需一个依赖性即可获得整个框架
性能:Netty比核心Java API具有更好的吞吐量和更低的延迟。由于其内部资源池,它还具有可扩展性。
安全性:完全支持SSL/TLS和StartTLS。

2 Netty核心组件

2.1 Bootstrap和ServerBootStrap(启动引导类)

Bootstrap 是客户端的启动引导类/辅助类,不管程序使用哪种协议,无论是创建一个客户端还是服务器都需要使用“引导”。
Bootstrap 通常使用 connet() 方法连接到远程的主机和端口,作为一个 Netty TCP 协议通信中的客户端。另外,Bootstrap 也可以通过 bind() 方法绑定本地的一个端口,作为 UDP 协议通信中的一端。具体使用方法如下:

public final class NettyClient {

    static final String HOST = "127.0.0.1";
    static final int PORT = 8007;
 
 
    public static void main(String[] args) throws Exception { 
     
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group) // Set EventLoopGroup to handle all eventsf for client.
                    .channel(NioSocketChannel.class)// Use NIO to accept new connections.
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();

                            p.addLast(new StringDecoder());
                            p.addLast(new StringEncoder());
 
                            // This is our custom client handler which will have logic for chat.
                            p.addLast(new  ClientHandler());
 
                        }
                    });
 
            // Start the client.
            ChannelFuture f = b.connect(HOST, PORT).sync();
 
             
                String input = "Frank";
                Channel channel = f.sync().channel();
                channel.writeAndFlush(input);
                channel.flush();
             
            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down the event loop to terminate all threads.
            group.shutdownGracefully();
        }
    }
}

ServerBootstrap 服务端的启动引导类/辅助类,通常使用 bind() 方法绑定本地的端口上,然后等待客户端的连接。
具体使用方法如下:

public final class NettyServer {
    // Port where chat server will listen for connections.
    static final int PORT = 8007;

    public static void main(String[] args) throws Exception {

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup) // Set boss & worker groups
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();

                            p.addLast(new StringDecoder());
                            p.addLast(new StringEncoder());

                            p.addLast(new ServerHandler());
                        }
                    });

            // Start the server.
            ChannelFuture f = b.bind(PORT).sync();
            System.out.println("Netty Server started.");

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

Bootstrap 只需要配置一个线程组 EventLoopGroup , 而 ServerBootstrap 需要配置两个线程组— EventLoopGroup ,一个用于接收连接,一个用于具体的 IO 处理。

2.2 Channel(网络操作抽象类)

可以简单的理解为客户端连接。
在我们使用某种语言,如 c/c++,java,go 等,进行网络编程的时候,我们通常会使用到 Socket, Socket 是对底层操作系统网络 IO 操作(如 read,write,bind,connect等)的封装, 因此我们必须去学习 Socket 才能完成网络编程,而 Socket 的操作其实是比较复杂的,想要使用好它有一定难度, 所以 Netty 提供了Channel(注意是 io.netty.Channel,而非 Java NIO 的 Channel),更加方便我们处理 IO 事件。
Channel 接口是 Netty 对网络操作抽象类。通过 Channel 我们可以进行 I/O 操作。Channel 为用户提供:

  • 当前网络连接的通道的状态(例如是否打开?是否已连接?)
  • 网络连接的配置参数 (例如接收缓冲区大小)
  • 提供异步的网络 I/O 操作 (如建立连接,读写,绑定端口),异步调用意味着任何 I/O调用都将立即返回,并且不保证在调用结束时所请求的 I/O 操作已完成。调用后立即返回一个 ChannelFuture 实例,通过注册监听器到ChannelFuture 上,可以在 I/O操作成功、失败或取消时回调通知调用方。
  • 支持关联 I/O 操作与对应的处理程序

一旦客户端成功连接服务端,就会新建一个 Channel 同该用户端进行绑定。示例代码如下:

// 通过 Bootstrap 的 connect 方法连接到服务端
public Channel doConnect(InetSocketAddress inetSocketAddress) {
    CompletableFuture<Channel> completableFuture = new CompletableFuture<>();
    bootstrap.connect(inetSocketAddress).addListener((ChannelFutureListener) future -> {
        if (future.isSuccess()) {
            completableFuture.complete(future.channel());
        } else {
            throw new IllegalStateException();
        }
    });
    return completableFuture.get();
}

比较常用的Channel接口实现类是 :

  • NioServerSocketChannel(服务端)
  • NioSocketChannel(客户端)

这两个 Channel 可以和 BIO 编程模型中的 ServerSocket以及Socket两个概念对应上。

2.3 EventLoop(事件循环)

事件循环可以理解为事件线程,专用于处理客户端的连接、读写事件。
Channel 为 Netty 网络操作(读写等操作)抽象类,EventLoop 负责处理注册到其上的Channel 的 I/O 操作,两者配合进行 I/O 操作。
并且,EventLoop 处理的 I/O 事件都将在它专有的 Thread 上被处理,即 Thread 和 EventLoop 属于 1 : 1 的关系,从而保证线程安全。

2.4 EventLoopGroup(事件循环组)

EventLoopGroup 包含多个 EventLoop(每一个 EventLoop 通常内部包含一个线程),它管理着所有的 EventLoop 的生命周期。
本质上是一个线程池,主要负责接收I/O请求,并分配线程执行处理请求。
ServerBootStrap可以设置两个EventLoopGroup,一个负责拿到客户端连接,一个负责每一个连接的实际的I/O。对应于多路复用的selector和IO处理线程。

2.7 ChannelHandler(通道处理器)

我们知道 Netty 是一个款基于事件驱动的网络框架,当特定事件触发时,我们能够按照自定义的逻辑去处理数据。 ChannelHandler 则正是用于处理入站(接收)和出站(发送)数据钩子,它可以处理几乎所有类型的动作,所以 ChannelHandler 会是 我们开发者更为关注的一个接口。

通俗来说,ChannelHandler 是消息的具体处理器,主要负责处理客户端/服务端接收和发送的数据。

ChannelHandler 主要分为处理入站数据的 ChannelInboundHandler 和出站数据的 ChannelOutboundHandler 接口。

Netty 以适配器的形式提供了大量默认的 ChannelHandler 实现,主要目的是为了简化程序开发的过程,我们只需要 重写我们关注的事件和方法就可以了。 通常我们会以继承的方式使用以下适配器和抽象:

  • ChannelHandlerAdapter
  • ChannelInboundHandlerAdapter
  • ChannelDuplexHandler
  • ChannelOutboundHandlerAdapter

2.5 ChannelHandlerContext(通道处理器上下文)

用来保存ChannelHandler的上下文信息。
首先ChannelHandlerContext是一个AttributeMap,可以用来存储多个数据。
然后ChannelHandlerContext继承了ChannelInboundInvoker和ChannelOutboundInvoker,可以触发inbound和outbound的一些方法。
除了继承来的一些方法之外,ChannelHandlerContext还可以作为channel,handler和pipline的沟通桥梁,因为可以从ChannelHandlerContext中获取到对应的channel,handler和pipline:

Channel channel();
ChannelHandler handler();
ChannelPipeline pipeline();

还要注意的是ChannelHandlerContext还返回一个EventExecutor,用来执行特定的任务:

2.6 ChannelPipeline(通道流水线)

当 Channel 被创建时,它会被自动地分配到它专属的 ChannelPipeline。 一个 Channel 包含一个 ChannelPipeline。 ChannelPipeline 为 ChannelHandler 的链,一个 pipeline 上可以有多个 ChannelHandler。

我们可以在 ChannelPipeline 上通过 addLast() 方法添加一个或者多个ChannelHandler (一个数据或者事件可能会被多个 Handler 处理) 。当一个 ChannelHandler 处理完之后就将数据交给下一个 ChannelHandler :

	b.group(eventLoopGroup)
    .handler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) {
            ch.pipeline().addLast(new NettyKryoDecoder(kryoSerializer, RpcResponse.class));
            ch.pipeline().addLast(new NettyKryoEncoder(kryoSerializer, RpcRequest.class));
            ch.pipeline().addLast(new KryoClientHandler());
        }
    });

入站事件和出站事件的流向
从服务端角度来看,如果一个事件的运动方向是从客户端到服务端,那么这个事件是入站的,如果事件运动的方向 是从服务端到客户端,那么这个事件是出站的。

上图是 Netty 事件入站和出站的大致流向,入站和出站的 ChannelHandler 可以被安装到一个ChannelPipeline中, 如果一个消息或其他的入站事件被[读取],那么它会从ChannelPipeline的头部开始流动,并传递给第一个ChannelInboundHandler ,这个ChannelHandler的行为取决于它的具体功能,不一定会修改消息。 在经历过第一个ChannelInboundHandler之后, 消息会被传递给这条ChannelHandler链的下一个ChannelHandler,最终消息会到达ChannelPipeline尾端,消息的读取也就结束了。

数据的出站 (发送) 流程与入站是相似的,在出站过程中,消息从ChannelOutboundHandler链的尾端开始流动, 直到到达它的头部为止,在这之后,消息会到达网络传输层进行后续传输。

鉴于入站操作和出站操作是不同的,可能有同学会疑惑:为什么入站 ChannelHandler和出站 ChannelHandler的数据 不会窜流呢(为什么 入站 的数据不会到出站 ChannelHandler 链中)?

因为Netty可以区分ChannelInboundHandler和 ChannelOutboundHandler的实现,并确保数据只在两个相同类型的ChannelHandler直接传递,即数据要么在 ChannelInboundHandler链之间流动,要么在ChannelOutboundHandler链之间流动。

当ChannelHandler被添加到ChannelPipeline中后,它会被分配一个ChannelHandlerContext, 它代表了ChannelHandler和ChannelPipeline之间的绑定。 ChannelPipeline 通过 ChannelHandlerContext来间接管理 ChannelHandler 。

2.7 Bytebuf(字节缓冲区)

网络通信最终都是通过字节流进行传输的。 Netty 使用自建的 buffer API,而不是使用 NIO 的 ByteBuffer 来存储连续的字节序列。与 ByteBuffer 相比这种方式拥有明显的优势。Netty 使用新的 buffer 类型 ByteBuf,被设计为一个可从底层解决 ByteBuffer 问题,并可满足日常网络应用开发需要的缓冲类型。这些很酷的特性包括:

  • 如果需要,允许使用自定义的缓冲类型。
  • 复合缓冲类型中内置的透明的零拷贝实现。
  • 开箱即用的动态缓冲类型,具有像 StringBuffer 一样的动态缓冲能力。
  • 不再需要调用的 flip() 方法。
  • 正常情况下具有比 ByteBuffer 更快的响应速度。

2.8 编码器、解码器

当我们通过 Netty 发送(出站)或接收(入站)一个消息时,就会发生一次数据的转换,因为数据在网络中总是通过字节传输的, 所以当数据入站时,Netty 会解码数据,即把数据从字节转为为另一种格式 (通常是一个 Java 对象), 当数据出站时,Netty 会编码数据,即把数据从它当前格式转为为字节。

Netty 为编码器和解码器提供了不同类型的抽象,这些编码器和解码器其实都是ChannelHandler的实现, 它们的名称通常是 ByteToMessageDecoder 和 MessageToByteEncoder。

对于入站数据来说,解码其实是解码器通过重写 ChannelHanler 的read事件(channelRead),然后调用它们自己的 decode方法完成的。 对于出站数据来说,编码则是编码器通过重写ChannelHanler的write事件,然后调用它们自己的 encode方法完成的。

2.9 ChannelFuture(操作执行结果)

public interface ChannelFuture extends Future<Void> {
    Channel channel();

    ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> var1);
     ......

    ChannelFuture sync() throws InterruptedException;
}

Netty 是非阻塞的,因此,我们不能立刻得到操作是否执行成功,但是,你可以通过 ChannelFuture 接口的 addListener() 方法注册一个监听 ChannelFutureListener,当操作执行成功或者失败时,监听就会自动触发返回结果。

ChannelFuture f = b.connect(host, port).addListener(future -> {
  if (future.isSuccess()) {
    System.out.println("连接成功!");
  } else {
    System.err.println("连接失败!");
  }
}).sync();

并且,你还可以通过ChannelFuture 的 channel() 方法获取连接相关联的Channel 。

Channel channel = f.channel();

另外,我们还可以通过 ChannelFuture 接口的 sync()方法让异步的操作编程同步的。

// bind()是异步的,但是,你可以通过 `sync()`方法将其变为同步。
ChannelFuture f = b.bind(port).sync();

3 实例

3.1 客户端-服务器,简单的字符串通信

这个例子实现了简单的通信,客户端发送字符串到服务端,服务端收到后返回一个字符串。
服务端:

package com.qupeng.java.demo.netty4.official.demo;

import java.util.ArrayList;
import java.util.List;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public final class NettyServer {
    // Port where chat server will listen for connections.
    static final int PORT = 8007;

    public static void main(String[] args) throws Exception {

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup) // Set boss & worker groups
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();

                            p.addLast(new StringDecoder());
                            p.addLast(new StringEncoder());

                            p.addLast(new ServerHandler());
                        }
                    });

            // Start the server.
            ChannelFuture f = b.bind(PORT).sync();
            System.out.println("Netty Server started.");

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

@Sharable
class ServerHandler extends SimpleChannelInboundHandler<String> {

    static final List<Channel> channels = new ArrayList<Channel>();

    @Override
    public void channelActive(final ChannelHandlerContext ctx) {
        System.out.println("Client joined - " + ctx);
        channels.add(ctx.channel());
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println("Message received: " + msg);
        for (Channel c : channels) {
            c.writeAndFlush("Hello " + msg + '\n');
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        System.out.println("Closing connection for client - " + ctx);
        ctx.close();
    }
}

客户端:

package com.qupeng.java.demo.netty4.official.demo;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;


 
public final class NettyClient {

    static final String HOST = "127.0.0.1";
    static final int PORT = 8007;
 
 
    public static void main(String[] args) throws Exception { 
     
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group) // Set EventLoopGroup to handle all eventsf for client.
                    .channel(NioSocketChannel.class)// Use NIO to accept new connections.
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();

                            p.addLast(new StringDecoder());
                            p.addLast(new StringEncoder());
 
                            // This is our custom client handler which will have logic for chat.
                            p.addLast(new  ClientHandler());
 
                        }
                    });
 
            // Start the client.
            ChannelFuture f = b.connect(HOST, PORT).sync();
 
             
                String input = "Frank";
                Channel channel = f.sync().channel();
                channel.writeAndFlush(input);
                channel.flush();
             
            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down the event loop to terminate all threads.
            group.shutdownGracefully();
        }
    }
}
  class  ClientHandler extends SimpleChannelInboundHandler<String> {  
 
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
            System.out.println("Message from Server: " + msg);
     
        }
     
    }

参考链接: http://www.mastertheboss.com/jboss-frameworks/netty/jboss-netty-tutorial

3.2 用Netty实现Servlet服务器

这个例子实现了一个简单的Tomcat服务器,支持了一个/hello-world的servlet mappin.
服务端:

package com.qupeng.java.demo.netty4.servlet;

import com.qupeng.java.demo.netty4.servlet.handler.TomcatHandler;
import com.qupeng.java.demo.netty4.servlet.service.MyServlet;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import org.dom4j.Document;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;

import java.io.FileInputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;


//Netty就是一个同时支持多协议的网络通信框架
public class MyTomcatServer {
	//打开Tomcat源码,全局搜索ServerSocket

	private int port = 8080;

	private Map<String, MyServlet> servletMapping = new HashMap<>();

	private Properties webxml = new Properties();

	private TomcatHandler tomcatHandler;

	private void init(){

		try{
			String WEB_INF = this.getClass().getResource("/").getPath();
			FileInputStream fis = new FileInputStream(WEB_INF + "web.xml");
			SAXReader saxReader = new SAXReader();
			Document document = saxReader.read(fis);
			Element root = document.getRootElement();

			List<Element> servlets = root.elements("servlet");
			Map<String, String> servletClassMap = servlets.stream().collect(Collectors.toMap(element -> element.element("servlet-name").getText(), element -> element.element("servlet-class").getText()));

			List<Element> servletMappings = root.elements("servlet-mapping");
			servletMapping = servletMappings.stream().collect(Collectors.toMap(element -> element.element("url-pattern").getText(), element -> {
				String classPath = servletClassMap.get(element.element("servlet-name").getText());
				MyServlet obj = null;
				try {
					obj = (MyServlet)Class.forName(classPath).newInstance();
				} catch (InstantiationException e) {
					e.printStackTrace();
				} catch (IllegalAccessException e) {
					e.printStackTrace();
				} catch (ClassNotFoundException e) {
					e.printStackTrace();
				}
				return obj;
			}));
			tomcatHandler = new TomcatHandler(servletMapping);
		}catch(Exception e){
			e.printStackTrace();
		}
	}

	public void start(){

		init();
		EventLoopGroup bossGroup = new NioEventLoopGroup();
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			ServerBootstrap server = new ServerBootstrap();

			server.group(bossGroup, workerGroup)
					// 主线程处理类,看到这样的写法,底层就是用反射
					.channel(NioServerSocketChannel.class)
					// 子线程处理类 , Handler
					.childHandler(new ChannelInitializer<SocketChannel>() {
						// 客户端初始化处理
						protected void initChannel(SocketChannel client) throws Exception {
							// 无锁化串行编程
							//Netty对HTTP协议的封装,顺序有要求
							// HttpResponseEncoder 编码器
							// 责任链模式,双向链表Inbound OutBound
							client.pipeline().addLast(new HttpResponseEncoder());
							// HttpRequestDecoder 解码器
							client.pipeline().addLast(new HttpRequestDecoder());
							// 业务逻辑处理
							client.pipeline().addLast(new TomcatHandler(servletMapping));
						}
					})
					// 针对主线程的配置 分配线程最大数量 128
					.option(ChannelOption.SO_BACKLOG, 128)
					// 针对子线程的配置 保持长连接
					.childOption(ChannelOption.SO_KEEPALIVE, true);

			//3、启动服务器
			ChannelFuture f = server.bind(port).sync();
			System.out.println("My Tomcat 已启动,监听的端口是:" + port);
			f.channel().closeFuture().sync();
		}catch (Exception e){
			e.printStackTrace();
		}finally {
			// 关闭线程池
			bossGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}
	}

	public static void main(String[] args) {
		new MyTomcatServer().start();
	}

}
package com.qupeng.java.demo.netty4.servlet.handler;

import com.qupeng.java.demo.netty4.servlet.service.MyServlet;
import com.qupeng.java.demo.netty4.servlet.http.HttpServletRequest;
import com.qupeng.java.demo.netty4.servlet.http.HttpServletResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.HttpRequest;

import java.util.Map;

public class TomcatHandler extends ChannelInboundHandlerAdapter {

	private Map<String, MyServlet> servletMapping;

	public TomcatHandler(Map<String, MyServlet> servletMapping) {
		this.servletMapping = servletMapping;
	}

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		if (msg instanceof HttpRequest){
			HttpRequest req = (HttpRequest) msg;
			String uri = req.uri();

			HttpServletRequest servletRequest = new HttpServletRequest(req);
			HttpServletResponse servletResponse = new HttpServletResponse(ctx);
			if(servletMapping.containsKey(uri)){
				servletMapping.get(uri).service(servletRequest, servletResponse);
			}else{
				servletResponse.setStatus(404);
				servletResponse.setContentType("text/html");
				servletResponse.getPrintWriter().write("404 - Not Found");
			}

		}
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

	}
}

Servlet API:

package com.qupeng.java.demo.netty4.servlet.service;

import com.qupeng.java.demo.netty4.servlet.http.HttpServletRequest;
import com.qupeng.java.demo.netty4.servlet.http.HttpServletResponse;

public abstract class MyServlet {
	
	public void service(HttpServletRequest request, HttpServletResponse response) throws Exception{
		if("GET".equalsIgnoreCase(request.getMethod())){
			doGet(request, response);
		}else{
			doPost(request, response);
		}
	}
	
	public abstract void doGet(HttpServletRequest request, HttpServletResponse response) throws Exception;
	
	public abstract void doPost(HttpServletRequest request, HttpServletResponse response) throws Exception;
	
}
package com.qupeng.java.demo.netty4.servlet.service;

import com.qupeng.java.demo.netty4.servlet.http.HttpServletRequest;
import com.qupeng.java.demo.netty4.servlet.http.HttpServletResponse;

public class HelloWorldServlet extends MyServlet {
    @Override
    public void doGet(HttpServletRequest request, HttpServletResponse response) throws Exception {
        this.doPost(request, response);
    }

    @Override
    public void doPost(HttpServletRequest request, HttpServletResponse response) throws Exception {
        response.getPrintWriter().write("Hello world!");
    }
}
package com.qupeng.java.demo.netty4.servlet.http;

import io.netty.handler.codec.http.HttpRequest;

public class HttpServletRequest {
    private HttpRequest req;

    public HttpServletRequest(HttpRequest req) {
        this.req = req;
    }

    public String getUrl() {
        return req.uri();
    }

    public String getMethod() {
        return req.method().name();
    }
}
package com.qupeng.java.demo.netty4.servlet.http;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;

public class HttpServletResponse {
    private PrintWriter printWriter;

    private FullHttpResponse httpResponse;

    public HttpServletResponse(ChannelHandlerContext ctx) {
        this.printWriter = new PrintWriter(ctx, this);
        httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
    }

    public PrintWriter getPrintWriter() {
        return printWriter;
    }

    public void setStatus(int sc) {
        httpResponse.setStatus(HttpResponseStatus.valueOf(sc));
    }

    public void setHeader(String name,String value) {
        httpResponse.headers().set(name, value);
    }

    public void setContentType(String type) {
        httpResponse.headers().set("Content-Type", type);
    }

    public FullHttpResponse getHttpResponse() {
        return httpResponse;
    }
}
package com.qupeng.java.demo.netty4.servlet.http;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;

public class PrintWriter {

    private ChannelHandlerContext ctx;

    private HttpServletResponse servletResponse;

    public PrintWriter(ChannelHandlerContext ctx, HttpServletResponse servletResponse) {
        this.ctx = ctx;
        this.servletResponse = servletResponse;
    }

    public void write(String str) { ;
        servletResponse.setContentType("text/html");
        ctx.write(servletResponse.getHttpResponse().replace(Unpooled.wrappedBuffer(str.getBytes())));
        ctx.flush();
        ctx.close();
    }
}

web.xml文件:

<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee
                             http://xmlns.jcp.org/xml/ns/javaee/web-app_4_0.xsd"
         version="4.0"
         metadata-complete="true">
    <servlet>
        <servlet-name>hello-world</servlet-name>
        <servlet-class>com.qupeng.java.demo.netty4.servlet.service.HelloWorldServlet</servlet-class>
    </servlet>

    <servlet-mapping>
        <servlet-name>hello-world</servlet-name>
        <url-pattern>/hello-world</url-pattern>
    </servlet-mapping>
</web-app>

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.qupeng.java.demos</groupId>
    <artifactId>netty4</artifactId>
    <version>1.0-SNAPSHOT</version>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.3</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.52.Final</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.10</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>

        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
        </dependency>

        <dependency>
            <groupId>org.dom4j</groupId>
            <artifactId>dom4j</artifactId>
            <version>2.0.0</version>
        </dependency>
    </dependencies>
</project>

3.3 用Netty实现RPC(远程过程调用)

这个例子来自《Netty4核心原理与手写RCP框架实战》一书,实现了客户端远程调用服务端的数值计算功能。
服务端:

package com.qupeng.java.demo.netty4.rpc.registry;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

public class RpcRegistry {  
    private int port;  
    public RpcRegistry(int port){  
        this.port = port;  
    }  
    public void start(){  
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
          
        try {  
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
            		.channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
  
                        @Override  
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            //自定义协议解码器
                            /** 入参有5个,分别解释如下
                             maxFrameLength:框架的最大长度。如果帧的长度大于此值,则将抛出TooLongFrameException。
                             lengthFieldOffset:长度字段的偏移量:即对应的长度字段在整个消息数据中得位置
                             lengthFieldLength:长度字段的长度。如:长度字段是int型表示,那么这个值就是4(long型就是8)
                             lengthAdjustment:要添加到长度字段值的补偿值
                             initialBytesToStrip:从解码帧中去除的第一个字节数
                             */
                            pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                            //自定义协议编码器
                            pipeline.addLast(new LengthFieldPrepender(4));
                            //对象参数类型编码器
                            pipeline.addLast("encoder",new ObjectEncoder());
                            //对象参数类型解码器
                            pipeline.addLast("decoder",new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
                            pipeline.addLast(new RegistryHandler());
                        }  
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
            ChannelFuture future = b.bind(port).sync();
            System.out.println("GP RPC Registry start listen at " + port );
            future.channel().closeFuture().sync();    
        } catch (Exception e) {  
             bossGroup.shutdownGracefully();    
             workerGroup.shutdownGracefully();  
        }  
    }
    
    
    public static void main(String[] args) throws Exception {    
        new RpcRegistry(8080).start();    
    }    
}  
package com.qupeng.java.demo.netty4.rpc.registry;

import com.qupeng.java.demo.netty4.rpc.protocol.InvokerProtocol;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.io.File;
import java.lang.reflect.Method;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

public class RegistryHandler  extends ChannelInboundHandlerAdapter {

	//用保存所有可用的服务
    public static ConcurrentHashMap<String, Object> registryMap = new ConcurrentHashMap<String,Object>();

    //保存所有相关的服务类
    private List<String> classNames = new ArrayList<String>();
    
    public RegistryHandler(){
    	//完成递归扫描
    	scannerClass("com.qupeng.java.demo.netty4.rpc.provider");
    	doRegister();
    }
    
    
    @Override    
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    	Object result = new Object();
        InvokerProtocol request = (InvokerProtocol)msg;

        //当客户端建立连接时,需要从自定义协议中获取信息,拿到具体的服务和实参
		//使用反射调用
        if(registryMap.containsKey(request.getClassName())){ 
        	Object clazz = registryMap.get(request.getClassName());
        	Method method = clazz.getClass().getMethod(request.getMethodName(), request.getParames());    
        	result = method.invoke(clazz, request.getValues());   
        }
        ctx.write(result);  
        ctx.flush();    
        ctx.close();  
    }
    
    @Override    
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
         cause.printStackTrace();    
         ctx.close();    
    }
    

    /*
     * 递归扫描
     */
	private void scannerClass(String packageName){
		URL url = this.getClass().getClassLoader().getResource(packageName.replaceAll("\\.", "/"));
		File dir = new File(url.getFile());
		for (File file : dir.listFiles()) {
			//如果是一个文件夹,继续递归
			if(file.isDirectory()){
				scannerClass(packageName + "." + file.getName());
			}else{
				classNames.add(packageName + "." + file.getName().replace(".class", "").trim());
			}
		}
	}

	/**
	 * 完成注册
	 */
	private void doRegister(){
		if(classNames.size() == 0){ return; }
		for (String className : classNames) {
			try {
				Class<?> clazz = Class.forName(className);
				Class<?> i = clazz.getInterfaces()[0];
				registryMap.put(i.getName(), clazz.newInstance());
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}
  
}
package com.qupeng.java.demo.netty4.rpc.protocol;

import lombok.Data;

import java.io.Serializable;

/**
 * 自定义传输协议
 */
@Data
public class InvokerProtocol implements Serializable {

    private String className;//类名
    private String methodName;//函数名称 
    private Class<?>[] parames;//形参列表
    private Object[] values;//实参列表

}

远程服务接口定义:

package com.qupeng.java.demo.netty4.rpc.api;

public interface IRpcHelloService {
    String hello(String name);  
}  
package com.qupeng.java.demo.netty4.rpc.api;

public interface IRpcService {

	/** 加 */
	public int add(int a, int b);

	/** 减 */
	public int sub(int a, int b);

	/** 乘 */
	public int mult(int a, int b);

	/** 除 */
	public int div(int a, int b);

}
package com.qupeng.java.demo.netty4.rpc.provider;

import com.qupeng.java.demo.netty4.rpc.api.IRpcHelloService;

public class RpcHelloServiceImpl implements IRpcHelloService {

    public String hello(String name) {  
        return "Hello " + name + "!";  
    }  
  
}  
package com.qupeng.java.demo.netty4.rpc.provider;

import com.qupeng.java.demo.netty4.rpc.api.IRpcService;

public class RpcServiceImpl implements IRpcService {

	public int add(int a, int b) {
		return a + b;
	}

	public int sub(int a, int b) {
		return a - b;
	}

	public int mult(int a, int b) {
		return a * b;
	}

	public int div(int a, int b) {
		return a / b;
	}

}

客户端实现:

package com.qupeng.java.demo.netty4.rpc.consumer.proxy;

import com.qupeng.java.demo.netty4.rpc.protocol.InvokerProtocol;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

public class RpcProxy {  
	
	public static <T> T create(Class<?> clazz){
        //clazz传进来本身就是interface
        MethodProxy proxy = new MethodProxy(clazz);
        Class<?> [] interfaces = clazz.isInterface() ?
                                new Class[]{clazz} :
                                clazz.getInterfaces();
        T result = (T) Proxy.newProxyInstance(clazz.getClassLoader(),interfaces,proxy);
        return result;
    }

	private static class MethodProxy implements InvocationHandler {
		private Class<?> clazz;
		public MethodProxy(Class<?> clazz){
			this.clazz = clazz;
		}

		public Object invoke(Object proxy, Method method, Object[] args)  throws Throwable {
			//如果传进来是一个已实现的具体类(本次演示略过此逻辑)
			if (Object.class.equals(method.getDeclaringClass())) {
				try {
					return method.invoke(this, args);
				} catch (Throwable t) {
					t.printStackTrace();
				}
				//如果传进来的是一个接口(核心)
			} else {
				return rpcInvoke(proxy,method, args);
			}
			return null;
		}


		/**
		 * 实现接口的核心方法
		 * @param method
		 * @param args
		 * @return
		 */
		public Object rpcInvoke(Object proxy,Method method,Object[] args){

			//传输协议封装
			InvokerProtocol msg = new InvokerProtocol();
			msg.setClassName(this.clazz.getName());
			msg.setMethodName(method.getName());
			msg.setValues(args);
			msg.setParames(method.getParameterTypes());

			final RpcProxyHandler consumerHandler = new RpcProxyHandler();
			EventLoopGroup group = new NioEventLoopGroup();
			try {
				Bootstrap b = new Bootstrap();
				b.group(group)
						.channel(NioSocketChannel.class)
						.option(ChannelOption.TCP_NODELAY, true)
						.handler(new ChannelInitializer<SocketChannel>() {
							@Override
							public void initChannel(SocketChannel ch) throws Exception {
								ChannelPipeline pipeline = ch.pipeline();
								//自定义协议解码器
								/** 入参有5个,分别解释如下
								 maxFrameLength:框架的最大长度。如果帧的长度大于此值,则将抛出TooLongFrameException。
								 lengthFieldOffset:长度字段的偏移量:即对应的长度字段在整个消息数据中得位置
								 lengthFieldLength:长度字段的长度:如:长度字段是int型表示,那么这个值就是4(long型就是8)
								 lengthAdjustment:要添加到长度字段值的补偿值
								 initialBytesToStrip:从解码帧中去除的第一个字节数
								 */
								pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
								//自定义协议编码器
								pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
								//对象参数类型编码器
								pipeline.addLast("encoder", new ObjectEncoder());
								//对象参数类型解码器
								pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
								pipeline.addLast("handler",consumerHandler);
							}
						});

				ChannelFuture future = b.connect("localhost", 8080).sync();
				future.channel().writeAndFlush(msg).sync();
				future.channel().closeFuture().sync();
			} catch(Exception e){
				e.printStackTrace();
			}finally {
				group.shutdownGracefully();
			}
			return consumerHandler.getResponse();
		}

	}
}
package com.qupeng.java.demo.netty4.rpc.consumer.proxy;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class
RpcProxyHandler extends ChannelInboundHandlerAdapter {
	  
    private Object response;    
      
    public Object getResponse() {    
	    return response;    
	}    
  
    @Override    
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        response = msg;
    }    
        
    @Override    
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("client exception is general");    
    }    
} 
package com.qupeng.java.demo.netty4.rpc.consumer;


import com.qupeng.java.demo.netty4.rpc.api.IRpcHelloService;
import com.qupeng.java.demo.netty4.rpc.api.IRpcService;
import com.qupeng.java.demo.netty4.rpc.consumer.proxy.RpcProxy;

public class RpcConsumer {
	
    public static void main(String [] args){  
        IRpcHelloService rpcHello = RpcProxy.create(IRpcHelloService.class);
        
        System.out.println(rpcHello.hello("world"));

        IRpcService service = RpcProxy.create(IRpcService.class);
        
        System.out.println("8 + 2 = " + service.add(8, 2));
        System.out.println("8 - 2 = " + service.sub(8, 2));
        System.out.println("8 * 2 = " + service.mult(8, 2));
        System.out.println("8 / 2 = " + service.div(8, 2));
    }
    
}

3.4 用Netty实现Websock在线聊天

这个例子实现了一个简单的在线聊天,浏览器与服务端建立websocket链接,可以发送文本消息到服务端,服务端推送消息到所有的客户端。
在这里插入图片描述
页面:

<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <title>Netty WebSocket Chat Room</title>
    <style type="text/css">
        .border{
            border: 1px solid #73AD21;
        }
        .container{
            height: 700px;
            display: flex;
            flex-direction: column;
            align-items:center;
        }
        .banner{
            background:#73AD21;
            height: 28px;
            width: 402px;
            font-size:12px;
        }
        .online-msg{
            background:#EFEFF4;
            font-size:12px;
            color:#666;
            height: 300px;
            width: 380px;
		    padding: 10px;
        }
        .tool-box{
             background-color:#e7eff8;
             height:35px;
             width: 402px;
        }
        .input-text{
            font-size:12px;
            color:#666;
            height: 150px;
            width: 380px;
    		padding: 10px;
        }
        .input-action{
            display: flex;
            justify-content:flex-end;
        }
    </style>
</head>
<br>
<body>
<br>
<script type="text/javascript">
    var socket;
    if (!window.WebSocket) {
        window.WebSocket = window.MozWebSocket;
    }
    if (window.WebSocket) {
        socket = new WebSocket("ws://localhost:8080/websocket");
        socket.onmessage = function (event) {
            var ta = document.getElementById('online-msg');
            ta.append(document.createElement('br'));
            ta.append(event.data);
        };
        socket.onopen = function (event) {
            var ta = document.getElementById('online-msg');
            ta.append("打开WebSocket服务正常,浏览器支持WebSocket!");
        };
        socket.onclose = function (event) {
            var ta = document.getElementById('responseText');
            ta.append("WebSocket 关闭!");
        };
    }
    else {
        alert("抱歉,您的浏览器不支持WebSocket协议!");
    }

    function send() {
        if (!window.WebSocket) {
            return;
        }
        if (socket.readyState == WebSocket.OPEN) {
            var ta = document.getElementById('send-message');
            socket.send(ta.innerText);
            ta.innerText = '';
        }
        else {
            alert("WebSocket连接没有建立成功!");
        }
    }
</script>
<div class="container">
    <div class="banner">
        <div style="line-height: 28px;color:#fff;">
            <span style="text-align:left;margin-left:10px;">Netty Websocket聊天室</span>
        </div>
    </div>
    <div class="online-msg border" id="online-msg">
    </div>
    <form onsubmit="return false;">
        <div class="tool-box">
        </div>
        <div class="input-text border" contenteditable="true" id="send-message">
        </div>
        <div class="input-action">
            <input class="button" type="button" id="mjr_send" onclick="send()" value="发送"/>
        </div>
    </form>
</div>
</body>
</html>

服务端:

package com.qupeng.java.demo.netty4.websocket.simple.chat;

import com.qupeng.java.demo.netty4.websocket.simple.chat.handler.HttpServerHandler;
import com.qupeng.java.demo.netty4.websocket.simple.chat.handler.WebSocketServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class SimpleWebSocketChatServer {
    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args.length > 0) {
            try {
                port = Integer.parseInt(args[0]);
            } catch (NumberFormatException e) {
                e.printStackTrace();
            }
        }
        new SimpleWebSocketChatServer().run(port);
    }

    public void run(int port) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch)
                                throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast("http-codec", new HttpServerCodec());
                            pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
                            pipeline.addLast("http-chunked", new ChunkedWriteHandler());
                            pipeline.addLast("http-server-handler", new HttpServerHandler());
                            pipeline.addLast("websocket-server-handler", new WebSocketServerHandler());
                        }
                    });

            Channel ch = b.bind(port).sync().channel();
            System.out.println("Web socket server started at port " + port
                    + '.');
            System.out.println("Open your browser and navigate to http://localhost:"+ port + '/');
            ch.closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
package com.qupeng.java.demo.netty4.websocket.simple.chat.handler;

import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import lombok.extern.slf4j.Slf4j;

import java.io.File;
import java.io.RandomAccessFile;
import java.net.URL;


@Slf4j
public class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {

    private final String webroot = "webroot";
    
    //获取class路径
    private URL baseURL = HttpServerHandler.class.getResource("");

    private WebSocketServerHandshaker handshaker;

    @Override
    public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
        if (handleWebsocketConnectionRequest(ctx, request)) {
            return;
        }

        String uri = request.uri();

        RandomAccessFile file = null;
        try {
            String page = uri.equals("/") ? "simple-chat.html" : uri;
            file = new RandomAccessFile(getResource(page), "r");
        } catch (Exception e) {
            ctx.fireChannelRead(request.retain());
            return;
        }

        HttpResponse response = new DefaultHttpResponse(request.protocolVersion(), HttpResponseStatus.OK);
        String contextType = "text/html;";
        response.headers().set(HttpHeaderNames.CONTENT_TYPE, contextType + "charset=utf-8;");

        boolean keepAlive = HttpUtil.isKeepAlive(request);

        if (keepAlive) {
            response.headers().set(HttpHeaderNames.CONTENT_LENGTH, file.length());
            response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
        }
        ctx.write(response);

        ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length()));

        ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
        if (!keepAlive) {
            future.addListener(ChannelFutureListener.CLOSE);
        }

        file.close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        Channel client = ctx.channel();
        log.info("Client:" + client.remoteAddress() + "异常");
        // 当出现异常就关闭连接
        cause.printStackTrace();
        ctx.close();
    }

    private boolean handleWebsocketConnectionRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception {

        // 如果HTTP解码失败,返回HHTP异常
        if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {
            return false;
        }

        // 构造握手响应返回,本机测试
        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
                "ws://localhost:8080/websocket", null, false);
        handshaker = wsFactory.newHandshaker(req);
        if (handshaker == null) {
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
        } else {
            handshaker.handshake(ctx.channel(), req);
        }

        return true;
    }

    private File getResource(String fileName) throws Exception {
        String basePath = baseURL.toURI().toString();
        int start = basePath.indexOf("classes/");
        basePath = (basePath.substring(0, start) + "/" + "classes/").replaceAll("/+", "/");

        String path = basePath + webroot + "/" + fileName;
        log.info("BaseURL:" + basePath);
        path = !path.contains("file:") ? path : path.substring(5);
        path = path.replaceAll("//", "/");
        return new File(path);
    }
}
package com.qupeng.java.demo.netty4.websocket.simple.chat.handler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.ChannelGroupException;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class WebSocketServerHandler extends SimpleChannelInboundHandler<WebSocketFrame> {

    private static ChannelGroup onlineUsers = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    private WebSocketServerHandshaker handshaker;

    @Override
    public void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) throws Exception {
        handleWebSocketFrame(ctx, msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    private void handleWebSocketFrame(ChannelHandlerContext ctx,
                                      WebSocketFrame frame) {

        // 判断是否是关闭链路的指令
        if (frame instanceof CloseWebSocketFrame) {
            handshaker.close(ctx.channel(),
                    (CloseWebSocketFrame) frame.retain());
            return;
        }
        // 判断是否是Ping消息
        if (frame instanceof PingWebSocketFrame) {
            ctx.channel().write(
                    new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        // 本例程仅支持文本消息,不支持二进制消息
        if (!(frame instanceof TextWebSocketFrame)) {
            throw new UnsupportedOperationException(String.format(
                    "%s frame types not supported", frame.getClass().getName()));
        }

        onlineUsers.add(ctx.channel());

        try {
            String request = ((TextWebSocketFrame) frame).text();
            onlineUsers.writeAndFlush(new TextWebSocketFrame(new java.util.Date().toString() + " " + request));
        } catch (ChannelGroupException ex) {
            log.error("推送组播消息失败。", ex);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.qupeng.java.demos</groupId>
    <artifactId>netty4</artifactId>
    <version>1.0-SNAPSHOT</version>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.3</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.52.Final</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.10</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>

        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
        </dependency>
    </dependencies>

</project>

3.5 用Netty实现更好的在线聊天

这个例子改编自《Netty4核心原理与手写RCP框架实战》一书,实现了一个完整的在线聊天功能,除了浏览器聊天,还支持在控制台推送系统消息给所有浏览器啊,我在原有基础上添加了退出聊天室的功能。
在这里插入图片描述

服务端:

package com.qupeng.java.demo.netty4.websocket.chat.server;

import com.qupeng.java.demo.netty4.websocket.chat.protocol.IMDecoder;
import com.qupeng.java.demo.netty4.websocket.chat.protocol.IMEncoder;
import com.qupeng.java.demo.netty4.websocket.chat.server.handler.HttpServerHandler;
import com.qupeng.java.demo.netty4.websocket.chat.server.handler.TerminalServerHandler;
import com.qupeng.java.demo.netty4.websocket.chat.server.handler.WebSocketServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class WebSocketChatServer {
    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args.length > 0) {
            try {
                port = Integer.parseInt(args[0]);
            } catch (NumberFormatException e) {
                e.printStackTrace();
            }
        }
        new WebSocketChatServer().run(port);
    }

    public void run(int port) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch)
                                throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            /** 解析自定义协议 */
                            pipeline.addLast(new IMDecoder());  //Inbound
                            pipeline.addLast(new IMEncoder());  //Outbound
                            pipeline.addLast(new TerminalServerHandler());  //Inbound
                            pipeline.addLast("http-codec", new HttpServerCodec());
                            pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
                            pipeline.addLast("http-chunked", new ChunkedWriteHandler());
                            pipeline.addLast("http-server-handler", new HttpServerHandler());
                            pipeline.addLast(new WebSocketServerProtocolHandler("/im"));
                            pipeline.addLast("websocket-server-handler", new WebSocketServerHandler());
                        }
                    });

            Channel ch = b.bind(port).sync().channel();
            System.out.println("Web socket server started at port " + port
                    + '.');
            System.out.println("Open your browser and navigate to http://localhost:"+ port + '/');
            ch.closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
package com.qupeng.java.demo.netty4.websocket.chat.server.handler;

import com.qupeng.java.demo.netty4.websocket.chat.protocol.IMMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class TerminalServerHandler extends SimpleChannelInboundHandler<IMMessage> {

	private MsgProcessor processor = new MsgProcessor();
	
	@Override
	protected void channelRead0(ChannelHandlerContext ctx, IMMessage msg) throws Exception {
	    processor.sendMsg(ctx.channel(), msg);
    }

    /**
     * 异常处理
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.info("Socket Client: 与客户端断开连接:" + cause.getMessage());
        cause.printStackTrace();
        ctx.close();
    }

}
package com.qupeng.java.demo.netty4.websocket.chat.server.handler;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import lombok.extern.slf4j.Slf4j;

import java.io.File;
import java.io.RandomAccessFile;
import java.net.URL;
import java.util.HashMap;
import java.util.List;
import java.util.Map;


@Slf4j
public class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {

    private final String webroot = "webroot";
    
    //获取class路径
    private URL baseURL = HttpServerHandler.class.getResource("");

    private MsgProcessor processor = new MsgProcessor();

    @Override
    public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
        String uri = request.uri();
        if (uri.startsWith("/logout")) {
            String nickName = getGetParamsFromChannel(request).get("nickname").toString();
            processor.logout(ctx.channel(), nickName);
            String data = "logout";
            ByteBuf buf = Unpooled.wrappedBuffer(data.getBytes());
            FullHttpResponse response = new DefaultFullHttpResponse(request.protocolVersion(), HttpResponseStatus.OK, buf);
            if(buf != null){
                response.headers().set("Content-Type","text/plain;charset=UTF-8");
                response.headers().set("Content-Length",response.content().readableBytes());
            }
            ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
            return;
        }

        RandomAccessFile file = null;
        try {
            String page = uri.equals("/") ? "chat.html" : uri;
            file = new RandomAccessFile(getResource(page), "r");
        } catch (Exception e) {
            ctx.fireChannelRead(request.retain());
            return;
        }

        HttpResponse response = new DefaultHttpResponse(request.protocolVersion(), HttpResponseStatus.OK);
        String contextType = "text/html;";
        if (uri.endsWith(".css")) {
            contextType = "text/css;";
        } else if (uri.endsWith(".js")) {
            contextType = "text/javascript;";
        } else if (uri.toLowerCase().matches(".*\\.(jpg|png|gif)$")) {
            String ext = uri.substring(uri.lastIndexOf("."));
            contextType = "image/" + ext;
        }
        response.headers().set(HttpHeaderNames.CONTENT_TYPE, contextType + "charset=utf-8;");

        boolean keepAlive = HttpUtil.isKeepAlive(request);

        if (keepAlive) {
            response.headers().set(HttpHeaderNames.CONTENT_LENGTH, file.length());
            response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
        }
        ctx.write(response);

        ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length()));

        ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
        if (!keepAlive) {
            future.addListener(ChannelFutureListener.CLOSE);
        }

        file.close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        Channel client = ctx.channel();
        log.info("Client:" + client.remoteAddress() + "异常");
        // 当出现异常就关闭连接
        cause.printStackTrace();
        ctx.close();
    }

    private File getResource(String fileName) throws Exception {
        String basePath = baseURL.toURI().toString();
        int start = basePath.indexOf("classes/");
        basePath = (basePath.substring(0, start) + "/" + "classes/").replaceAll("/+", "/");

        String path = basePath + webroot + "/" + fileName;
        log.info("BaseURL:" + basePath);
        path = !path.contains("file:") ? path : path.substring(5);
        path = path.replaceAll("//", "/");
        return new File(path);
    }

    private Map<String, Object> getGetParamsFromChannel(FullHttpRequest fullHttpRequest) {
        Map<String, Object> params = new HashMap<>();
        if(fullHttpRequest.method() == HttpMethod.GET){
            QueryStringDecoder decoder = new QueryStringDecoder(fullHttpRequest.uri());
            Map<String, List<String>> paramList = decoder.parameters();

            for(Map.Entry<String, List<String>> entry : paramList.entrySet()){
                params.put(entry.getKey(),entry.getValue().get(0));
            }
            return params;
        }
        return params;
    }

}
package com.qupeng.java.demo.netty4.websocket.chat.server.handler;

import lombok.extern.slf4j.Slf4j;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

@Slf4j
public class WebSocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    private MsgProcessor processor = new MsgProcessor();

    @Override
    protected void channelRead0(ChannelHandlerContext ctx,TextWebSocketFrame msg) throws Exception {
        processor.sendMsg(ctx.channel(), msg.text());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        Channel client = ctx.channel();
        String addr = processor.getAddress(client);
        log.info("WebSocket Client:" + addr + "异常");
        // 当出现异常就关闭连接
        cause.printStackTrace();
        ctx.close();
    }

}
package com.qupeng.java.demo.netty4.websocket.chat.server.handler;

import com.alibaba.fastjson.JSONObject;
import com.qupeng.java.demo.netty4.websocket.chat.protocol.IMDecoder;
import com.qupeng.java.demo.netty4.websocket.chat.protocol.IMEncoder;
import com.qupeng.java.demo.netty4.websocket.chat.protocol.IMMessage;
import com.qupeng.java.demo.netty4.websocket.chat.protocol.IMP;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.GlobalEventExecutor;

/**
 * 主要用于自定义协议内容的逻辑处理
 *
 */
public class MsgProcessor {

	//记录在线用户
	private static ChannelGroup onlineUsers = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
	
	//定义一些扩展属性
	public static final AttributeKey<String> NICK_NAME = AttributeKey.valueOf("nickName");
	public static final AttributeKey<String> IP_ADDR = AttributeKey.valueOf("ipAddr");
	public static final AttributeKey<JSONObject> ATTRS = AttributeKey.valueOf("attrs");
	public static final AttributeKey<String> FROM = AttributeKey.valueOf("from");
	
	//自定义解码器
	private IMDecoder decoder = new IMDecoder();
	//自定义编码器
	private IMEncoder encoder = new IMEncoder();
	
	/**
	 * 获取用户昵称
	 * @param client
	 * @return
	 */
	public String getNickName(Channel client){
		return client.attr(NICK_NAME).get();
	}
	/**
	 * 获取用户远程IP地址
	 * @param client
	 * @return
	 */
	public String getAddress(Channel client){
		return client.remoteAddress().toString().replaceFirst("/","");
	}
	
	/**
	 * 获取扩展属性
	 * @param client
	 * @return
	 */
	public JSONObject getAttrs(Channel client){
		try{
			return client.attr(ATTRS).get();
		}catch(Exception e){
			return null;
		}
	}
	
	/**
	 * 获取扩展属性
	 * @param client
	 * @return
	 */
	private void setAttrs(Channel client, String key, Object value){
		try{
			JSONObject json = client.attr(ATTRS).get();
			json.put(key, value);
			client.attr(ATTRS).set(json);
		}catch(Exception e){
			JSONObject json = new JSONObject();
			json.put(key, value);
			client.attr(ATTRS).set(json);
		}
	}
	
	/**
	 * 登出通知
	 * @param client
	 */
	public void logout(Channel client, String nickname){
		//如果nickName为null,没有遵从聊天协议的连接,表示未非法登录
//		if(getNickName(client) == null){ return; }
		for (Channel channel : onlineUsers) {
			IMMessage request = new IMMessage(IMP.SYSTEM.getName(), sysTime(), onlineUsers.size(), nickname + "离开");
			String content = encoder.encode(request);
			channel.writeAndFlush(new TextWebSocketFrame(content));
		}
		onlineUsers.remove(client);
	}
	
	/**
	 * 发送消息
	 * @param client
	 * @param msg
	 */
	public void sendMsg(Channel client, IMMessage msg){

		sendMsg(client,encoder.encode(msg));
	}
	
	/**
	 * 发送消息
	 * @param client
	 * @param msg
	 */
	public void sendMsg(Channel client, String msg){

		IMMessage request = decoder.decode(msg);
		if(null == request){ return; }
		
		String addr = getAddress(client);
		
		if(request.getCmd().equals(IMP.LOGIN.getName())){
			client.attr(NICK_NAME).getAndSet(request.getSender());
			client.attr(IP_ADDR).getAndSet(addr);
			client.attr(FROM).getAndSet(request.getTerminal());
//			System.out.println(client.attr(FROM).get());
			onlineUsers.add(client);
			
			for (Channel channel : onlineUsers) {
				boolean isself = (channel == client);
				if(!isself){
					request = new IMMessage(IMP.SYSTEM.getName(), sysTime(), onlineUsers.size(), getNickName(client) + "加入");
				}else{
					request = new IMMessage(IMP.SYSTEM.getName(), sysTime(), onlineUsers.size(), "已与服务器建立连接!");
				}

				if("Console".equals(channel.attr(FROM).get())){
					channel.writeAndFlush(request);
					continue;
				}
				String content = encoder.encode(request);
				channel.writeAndFlush(new TextWebSocketFrame(content));
			}
		}else if(request.getCmd().equals(IMP.CHAT.getName())){
			for (Channel channel : onlineUsers) {
				boolean isself = (channel == client);
				if (isself) {
					request.setSender("you");
				}else{
					request.setSender(getNickName(client));
				}
				request.setTime(sysTime());

				if("Console".equals(channel.attr(FROM).get()) & !isself){
					channel.writeAndFlush(request);
					continue;
				}
				String content = encoder.encode(request);
				channel.writeAndFlush(new TextWebSocketFrame(content));
			}
		}else if(request.getCmd().equals(IMP.FLOWER.getName())){
			JSONObject attrs = getAttrs(client);
			long currTime = sysTime();
			if(null != attrs){
				long lastTime = attrs.getLongValue("lastFlowerTime");
				//60秒之内不允许重复刷鲜花
				int secends = 10;
				long sub = currTime - lastTime;
				if(sub < 1000 * secends){
					request.setSender("you");
					request.setCmd(IMP.SYSTEM.getName());
					request.setContent("您送鲜花太频繁," + (secends - Math.round(sub / 1000)) + "秒后再试");

					String content = encoder.encode(request);
					client.writeAndFlush(new TextWebSocketFrame(content));
					return;
				}
			}
			
			//正常送花
			for (Channel channel : onlineUsers) {
				if (channel == client) {
					request.setSender("you");
					request.setContent("你给大家送了一波鲜花雨");
					setAttrs(client, "lastFlowerTime", currTime);
				}else{
					request.setSender(getNickName(client));
					request.setContent(getNickName(client) + "送来一波鲜花雨");
				}
				request.setTime(sysTime());
				
				String content = encoder.encode(request);
				channel.writeAndFlush(new TextWebSocketFrame(content));
			}
		}
	}
	
	/**
	 * 获取系统时间
	 * @return
	 */
	private Long sysTime(){
		return System.currentTimeMillis();
	}
	
}

控制台客户端:

package com.qupeng.java.demo.netty4.websocket.chat.client;

import com.qupeng.java.demo.netty4.websocket.chat.client.handler.ChatClientHandler;
import com.qupeng.java.demo.netty4.websocket.chat.protocol.IMDecoder;
import com.qupeng.java.demo.netty4.websocket.chat.protocol.IMEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.io.IOException;

/**
 * 客户端
 * @author Tom
 *
 */
public class ChatClient  {
    
    private ChatClientHandler clientHandler;
    private String host;
    private int port;
    
    public ChatClient(String nickName){
        this.clientHandler = new ChatClientHandler(nickName);
    }
    
    public void connect(String host,int port){
    		this.host = host;
    		this.port = port;

        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new IMDecoder());
                    ch.pipeline().addLast(new IMEncoder());
                    ch.pipeline().addLast(clientHandler);
                }
            });
            ChannelFuture f = b.connect(this.host, this.port).sync();
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
    
    
    public static void main(String[] args) throws IOException{
		new ChatClient("Cover").connect("127.0.0.1",8080);
    	
    	String url = "http://localhost:8080/images/a.png";
    	System.out.println(url.toLowerCase().matches(".*\\.(gif|png|jpg)$"));
		
    }
    
}
package com.qupeng.java.demo.netty4.websocket.chat.client.handler;

import com.qupeng.java.demo.netty4.websocket.chat.protocol.IMMessage;
import com.qupeng.java.demo.netty4.websocket.chat.protocol.IMP;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.Scanner;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * 聊天客户端逻辑实现
 * @author Tom
 *
 */
@Slf4j
public class ChatClientHandler extends SimpleChannelInboundHandler<IMMessage> {

	private ChannelHandlerContext ctx;
	private String nickName;
	public ChatClientHandler(String nickName){
		this.nickName = nickName;
	}
	
	/**启动客户端控制台*/
    private void session() throws IOException {
    		new Thread(){
    			public void run(){
					System.out.println(nickName + ",你好,请在控制台输入对话内容");
    				IMMessage message = null;
    		        Scanner scanner = new Scanner(System.in);
    		        do{
    			        	if(scanner.hasNext()){
    			        		String input = scanner.nextLine();
    			        		if("exit".equals(input)){
    			        			message = new IMMessage(IMP.LOGOUT.getName(),"Console",System.currentTimeMillis(),nickName);
    			        		}else{
    			        			message = new IMMessage(IMP.CHAT.getName(),System.currentTimeMillis(),nickName,input);
    			        		}
    			        	}
    		        }
    		        while (sendMsg(message));
    		        scanner.close();
    			}
    		}.start();
    }
	
    /**
     * tcp链路建立成功后调用
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    		this.ctx = ctx;
        IMMessage message = new IMMessage(IMP.LOGIN.getName(),"Console",System.currentTimeMillis(),this.nickName);
        sendMsg(message);
        log.info("成功连接服务器,已执行登录动作");
        session();
    }
    /**
     * 发送消息
     * @param msg
     * @return
     * @throws IOException 
     */
    private boolean sendMsg(IMMessage msg){
        ctx.channel().writeAndFlush(msg);
		System.out.println("继续输入开始对话...");
        return msg.getCmd().equals(IMP.LOGOUT) ? false : true;
    }
    /**
     * 收到消息后调用
     * @throws IOException 
     */
    @Override
    public void channelRead0(ChannelHandlerContext ctx, IMMessage msg) throws IOException {
    	IMMessage m = (IMMessage)msg;
		System.out.println((null == m.getSender() ? "" : (m.getSender() + ":")) + removeHtmlTag(m.getContent()));
    }


	public static String removeHtmlTag(String htmlStr){
		String regEx_script="<script[^>]*?>[\\s\\S]*?<\\/script>"; //定义script的正则表达式
		String regEx_style="<style[^>]*?>[\\s\\S]*?<\\/style>"; //定义style的正则表达式
		String regEx_html="<[^>]+>"; //定义HTML标签的正则表达式

		Pattern p_script=Pattern.compile(regEx_script,Pattern.CASE_INSENSITIVE);
		Matcher m_script=p_script.matcher(htmlStr);
		htmlStr=m_script.replaceAll(""); //过滤script标签

		Pattern p_style=Pattern.compile(regEx_style,Pattern.CASE_INSENSITIVE);
		Matcher m_style=p_style.matcher(htmlStr);
		htmlStr=m_style.replaceAll(""); //过滤style标签

		Pattern p_html=Pattern.compile(regEx_html,Pattern.CASE_INSENSITIVE);
		Matcher m_html=p_html.matcher(htmlStr);
		htmlStr=m_html.replaceAll(""); //过滤html标签

		return htmlStr.trim(); //返回文本字符串
	}

    /**
     * 发生异常时调用
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    	log.info("与服务器断开连接:"+cause.getMessage());
        ctx.close();
    }
}
package com.qupeng.java.demo.netty4.websocket.chat.protocol;


/**
 * 自定义IM协议,Instant Messaging Protocol即时通信协议
 *
 */
public enum IMP {
	/** 系统消息 */
	SYSTEM("SYSTEM"),
	/** 登录指令 */
	LOGIN("LOGIN"),
	/** 登出指令 */
	LOGOUT("LOGOUT"),
	/** 聊天消息 */
	CHAT("CHAT"),
	/** 送鲜花 */
	FLOWER("FLOWER");

	private String name;
	
	public static boolean isIMP(String content){
		return content.matches("^\\[(SYSTEM|LOGIN|LOGOUT|CHAT)\\]");
	}
	
	IMP(String name){
		this.name = name;
	}
	
	public String getName(){
		return this.name;
	}
	
	public String toString(){
		return this.name;
	}
	
}
package com.qupeng.java.demo.netty4.websocket.chat.protocol;

import lombok.Data;
import org.msgpack.annotation.Message;


/**
 * 自定义消息实体类
 *
 */
@Message
@Data
public class IMMessage{
	
	private String addr;		//IP地址及端口
	private String cmd;		//命令类型[LOGIN]或者[SYSTEM]或者[LOGOUT]
	private long time;		//命令发送时间
	private int online;		//当前在线人数
	private String sender;  //发送人
	private String receiver;	//接收人
	private String content;		//消息内容
	private String terminal; 	//终端
	
	public IMMessage(){}
	
	public IMMessage(String cmd,long time,int online,String content){
		this.cmd = cmd;
		this.time = time;
		this.online = online;
		this.content = content;
		this.terminal = terminal;
	}

	public IMMessage(String cmd,String terminal,long time,String sender){
		this.cmd = cmd;
		this.time = time;
		this.sender = sender;
		this.terminal = terminal;
	}


	public IMMessage(String cmd,long time,String sender,String content){
		this.cmd = cmd;
		this.time = time;
		this.sender = sender;
		this.content = content;
		this.terminal = terminal;
	}

	@Override
	public String toString() {
		return "IMMessage{" +
				"addr='" + addr + '\'' +
				", cmd='" + cmd + '\'' +
				", time=" + time +
				", online=" + online +
				", sender='" + sender + '\'' +
				", receiver='" + receiver + '\'' +
				", content='" + content + '\'' +
				'}';
	}
}
package com.qupeng.java.demo.netty4.websocket.chat.protocol;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.msgpack.MessagePack;
import org.msgpack.MessageTypeException;

import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * 自定义IM协议的编码器
 */
public class IMDecoder extends ByteToMessageDecoder {

	//解析IM写一下请求内容的正则
	private Pattern pattern = Pattern.compile("^\\[(.*)\\](\\s\\-\\s(.*))?");
	
	@Override
	protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
		try{
			//先获取可读字节数
	        final int length = in.readableBytes();
	        final byte[] array = new byte[length];
	        String content = new String(array,in.readerIndex(),length);
	        
	        //空消息不解析
	        if(!(null == content || "".equals(content.trim()))){
	        	if(!IMP.isIMP(content)){
	        		ctx.channel().pipeline().remove(this);
	        		return;
	        	}
	        }
	        
	        in.getBytes(in.readerIndex(), array, 0, length);
	        out.add(new MessagePack().read(array,IMMessage.class));
	        in.clear();
		}catch(MessageTypeException e){
			ctx.channel().pipeline().remove(this);
		}
	}
	
	/**
	 * 字符串解析成自定义即时通信协议
	 * @param msg
	 * @return
	 */
	public IMMessage decode(String msg){
		if(null == msg || "".equals(msg.trim())){ return null; }
		try{
			Matcher m = pattern.matcher(msg);
			String header = "";
			String content = "";
			if(m.matches()){
				header = m.group(1);
				content = m.group(3);
			}
			
			String [] heards = header.split("\\]\\[");
			long time = 0;
			try{ time = Long.parseLong(heards[1]); } catch(Exception e){}
			String nickName = heards[2];
			//昵称最多十个字
			nickName = nickName.length() < 10 ? nickName : nickName.substring(0, 9);
			
			if(msg.startsWith("[" + IMP.LOGIN.getName() + "]")){
				return new IMMessage(heards[0],heards[3],time,nickName);
			}else if(msg.startsWith("[" + IMP.CHAT.getName() + "]")){
				return new IMMessage(heards[0],time,nickName,content);
			}else if(msg.startsWith("[" + IMP.FLOWER.getName() + "]")){
				return new IMMessage(heards[0],heards[3],time,nickName);
			}else{
				return null;
			}
		}catch(Exception e){
			e.printStackTrace();
			return null;
		}
	}
}
package com.qupeng.java.demo.netty4.websocket.chat.protocol;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.msgpack.MessagePack;

/**
 * 自定义IM协议的编码器
 */
public class IMEncoder extends MessageToByteEncoder<IMMessage> {

	@Override
	protected void encode(ChannelHandlerContext ctx, IMMessage msg, ByteBuf out)
			throws Exception {
		out.writeBytes(new MessagePack().write(msg));
	}
	
	public String encode(IMMessage msg){
		if(null == msg){ return ""; }
		String prex = "[" + msg.getCmd() + "]" + "[" + msg.getTime() + "]";
		if(IMP.LOGIN.getName().equals(msg.getCmd()) ||
			IMP.FLOWER.getName().equals(msg.getCmd())){
			prex += ("[" + msg.getSender() + "][" + msg.getTerminal() + "]");
		}else if(IMP.CHAT.getName().equals(msg.getCmd())){
			prex += ("[" + msg.getSender() + "]");
		}else if(IMP.SYSTEM.getName().equals(msg.getCmd())){
			prex += ("[" + msg.getOnline() + "]");
		}
		if(!(null == msg.getContent() || "".equals(msg.getContent()))){
			prex += (" - " + msg.getContent());
		}
		return prex;
	}

}

页面:

<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8">
    <meta content="width=device-width, initial-scale=1.0, maximum-scale=1.0, minimum-scale=1.0, user-scalable=0" name="viewport">
    <title>在线聊天室</title>
    <link rel="stylesheet" type="text/css" href="css/style.css" />
    <script type="text/javascript" src="/js/lib/jquery.min.js"></script>
	<script type="text/javascript" src="/js/lib/jquery.snowfall.js"></script>
	<script type="text/javascript" src="/js/chat.util.js"></script>
</head>

<body>
	<div id="loginbox">
        <div style="width:300px;margin:200px auto;">
           	 欢迎进入WebSocket聊天室
            <br/>
            <br/>
            <input type="text" style="width:180px;" placeholder="进入前,请先输入昵称" id="nickname" name="nickname" />
            <input type="button" style="width:50px;" value="进入" onclick="CHAT.login();" />
            <div id="error-msg" style="color:red;"></div>
        </div>
    </div>
    <div id="chatbox" style="display: none;">
        <div style="background:#3d3d3d;height: 28px; width: 100%;font-size:12px;position: fixed;top: 0px;z-index: 999;">
            <div style="line-height: 28px;color:#fff;">
                <span style="text-align:left;margin-left:10px;">Netty Websocket聊天室</span>
                <span style="float:right; margin-right:10px;">
                	<span>当前在线<span id="onlinecount">0</span></span> |
	                <span id="shownikcname">匿名</span> |
	                <a href="javascript:;" onclick="CHAT.logout()" style="color:#fff;">退出</a>
                </span>
            </div>
        </div>
        <div id="doc">
            <div id="chat">
                <div id="message" class="message">
                    <div id="onlinemsg" style="background:#EFEFF4; font-size:12px; margin-top:40px; margin-left:10px; color:#666;">
                    </div>
                </div>
                <form onsubmit="return false;">
                	<div class="tool-box">
                		<div class="face-box" id="face-box"></div>
                		<span class="face" onclick="CHAT.openFace()" title="选择表情"></span>
                		<!--
	                		<span class="img" id="tool-img-btn" title="发送图片"></span>
	                		<span class="file" id="tool-file-btn" title="上传文件"></span>
                		 -->
                		<span class="flower" onclick="CHAT.sendFlower()" title="送鲜花"></span>
                	</div>
                	<div class="input-box">
		                <div class="input" contenteditable="true" id="send-message"></div>
		                <div class="action">
		                    <input class="button" type="button" id="mjr_send" onclick="CHAT.sendText()" value="发送"/>
		                </div>
            		</div>
               </form>
           </div>
       </div>
    </div>
</body>

</html>

pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.qupeng.java.demos</groupId>
    <artifactId>netty4</artifactId>
    <version>1.0-SNAPSHOT</version>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.3</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.52.Final</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.10</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>

        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.4</version>
        </dependency>

        <dependency>
            <groupId>org.msgpack</groupId>
            <artifactId>msgpack</artifactId>
            <version>0.6.12</version>
        </dependency>

        <dependency>
            <groupId>org.dom4j</groupId>
            <artifactId>dom4j</artifactId>
            <version>2.0.0</version>
        </dependency>
    </dependencies>
</project>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/117873.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Vue】创建 Vue 实例与对象配置、容器与实例的关系、插值延伸和 Vue 开发工具的初步使用

创建 Vue 实例 引入 Vue 注意在 Head 中 <script type"text/javascript" src"./vue.js"></script>另一个 javascript 中创建 Vue 实例&#xff0c;注意在 Body 尾部 <script type"text/javascript">const x new Vue() <…

12. 目前常用的四种信道复用方式:()、()、()和() ---- 计算机网络

目前常用的四种信道服用方式&#xff1a;&#xff08;频分复用&#xff09;、&#xff08;时分复用&#xff09;、&#xff08;码分复用&#xff09;和&#xff08;波分复用&#xff09; 知识点 复用&#xff08;multiplexing&#xff09;&#xff1a;就是在一个信道上传输多路…

java SE阶段面试题

目录 1、Java 的数据类型有哪些&#xff1f; 2、变量的三要素是什么&#xff1f;变量使用有什么要求&#xff1f; 3、基本数据类型变量和引用数据类型变量有什么区别&#xff1f; 4、Java 的运算符有几种意思&#xff1f; 5、Java 的自增、自减运算符在自增变量前后有什么区…

《计算机网络》——第三章知识点

第三章思维导图 链路层的信道类型 一对一:点对点信道 —对多:广播信道 链路层要解决的问题 封装成帧 透明传输 差错检测密封&#xff0c;透气性差 封装成帧就是在一段数据的前后部分添加首部和尾部&#xff0c;这样就构成了一个帧。接收端在收到物理层上交的比特流后&#xff…

Pandas.to_csv()函数及全部参数使用方法一文详解+实例代码

目录 前言 一、基础语法与功能 二、参数说明和代码演示 1.path_or_buf 选择文件/文件路径写入 2.sep 指定分隔符 3.na_rep 指定缺少数据表示 4.float_format 指定浮点型字符串输出格式 5. columns 指定要写入的列 6.header 是否需要写入列名 7.index 是否写入行名称&am…

【实时数仓】Sugar拉取数据展示、品牌销售排行接口、品类销售占比接口和热门商品SPU排名接口的实现

文章目录一 Sugar拉取数据展示1 内网穿透&#xff08;1&#xff09;作用&#xff08;2&#xff09;工具&#xff08;3&#xff09;本机ip地址&#xff08;4&#xff09;花生壳配置2 配置组件二 品牌销售排行接口1 Sugar配置&#xff08;1&#xff09;图表配置&#xff08;2&…

2022《粤语好声音-乐队风暴》全国总决赛圆满收官!

2022年12月17日&#xff0c;由广东珠江、盛娱星汇海选联合主办的2022《粤语好声音-乐队风暴》全国总决赛在广州增城1978电影小镇正式拉开帷幕。从海选到全国总决赛&#xff0c;2022《粤语好声音-乐队风暴》在21座城市中&#xff0c;通过线上线下双模式开展&#xff0c;历时6个月…

OpManager 虚拟化管理

什么是虚拟化 虚拟化是创建计算资源的虚拟形式&#xff0c;如计算机、服务器或其他硬件组件&#xff0c;或基于软件的资源&#xff08;如操作系统&#xff09;。虚拟化最常见的示例是在操作系统安装期间对硬盘进行分区&#xff0c;其中物理硬盘驱动器被拆分为多个逻辑磁盘以提…

重点 |中级软件设计师易混淆知识点 (1)

本文章总结了软件设计师考试易混淆知识点&#xff01;&#xff01;&#xff01; 帮助大家更好的复习&#xff0c;希望能对大家有所帮助 比较长&#xff0c;放了部分&#xff0c;需要可私信&#xff01;&#xff01; 易混淆点1&#xff1a;原、反、补码的运算 1、原码&#x…

Технокубок 2021 - Финал C. Basic Diplomacy

翻译&#xff1a; Aleksey有&#x1d45b;个朋友。他现在也在度假&#xff0c;所以他有&#x1d45a;天来玩这款新的病毒式合作游戏!但由于它是合作的&#xff0c;阿列克谢将需要一个队友在每个&#x1d45a;天。 在这些日子里&#xff0c;每天都有一些朋友可以玩&#xff0c…

Spring 依赖注入

文章目录流程图依赖注入的方式手动注入自动注入XML的autowire自动注入autowire BY_NAME 与 BY_TYPE(已过时)执行时机&#xff1a;AUTOWIRE_BY_NAMEAUTOWIRE_BY_TYPEAutowired注解的处理(含Value&#xff0c;Inject)AutowiredAnnotationBeanPostProcessorresolveDependencyfindA…

机器学习——线性模型学习

线性回归 主要目标确定 如何确定w和b呢&#xff1f;关键在于如何衡量f(x)与y的差别 此种衡量误差的方法称为均方误差也称为欧式距离 求解w和b使上述方程最小化的过程称为线性回归模型的最小二乘”参数估计“ 多元线性回归 针对多个属性的数据集D&#xff0c;此时试图学得 …

DHCP学习

目录 DHCP基本认识和原理 场景一、同网段DHCP 场景二、不同段DHCP&#xff08;中继DHCP&#xff09; DHCP基本认识和原理 DHCP&#xff08;Dynamic Host Configuration Protocol动态主机协议&#xff09;。 作用&#xff1a;为局域网络中主机动态分发地址&#xff0c;以及…

INMP441麦克风芯片--支持I2S、ESP32

1.简介 INMP441是InvenSense公司推出的一款具有底部端口的高信噪比、低功耗、数字输出的全向MEMS麦克风&#xff0c;信噪比高达61dB&#xff0c;使其成为近场应用的绝佳选择。INMP441的电路结构如图所示&#xff0c;包括MEMS声音传感器、模数转换器&#xff08;ADC&#xff09…

人工智能的2022:技术的价值在于生产力

这里写自定义目录标题欢迎使用Markdown编辑器新的改变功能快捷键合理的创建标题&#xff0c;有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、居右SmartyPants创建一个自定义列表如何创建一个注…

孤独的照片(思维)

Farmer John 最近购入了 NN 头新的奶牛&#xff0c;每头奶牛的品种是更赛牛&#xff08;Guernsey&#xff09;或荷斯坦牛&#xff08;Holstein&#xff09;之一。 奶牛目前排成一排&#xff0c;Farmer John 想要为每个连续不少于三头奶牛的序列拍摄一张照片。 然而&#xff0…

初识Kubernetes:(2)Kubernetes环境搭建

初识Kubernetes&#xff1a;&#xff08;2&#xff09;Kubernetes环境搭建1 环境规划1.1 集群类型1.2 安装方式2 环境搭建2.1 minikube安装2.2 启动集群3 服务部署1 环境规划 1.1 集群类型 Kubernetes集群大致分为两类&#xff1a;一主多从和多主多从。 一主多从&#xff1a…

UE4 shader编程 基础学习笔记 --- 熟悉各个节点

Texture Sample&#xff1a;用所需要的纹理覆盖到Mesh上&#xff0c;展示效果&#xff0c;Mesh上存在漫反射 高光 环境变量的和制造出了左上角的纹理光照效果 该节点只能设置其黑白效果 0为黑&#xff0c;1为白 该节点可以调节R、G两个参数 该节点可以调节RGB三个参数 该节…

C#---第十八课:Debug调试技巧--Debug类、pin 、add watch、拖动断点、修改变量、两个断点的联动

文章目录1. 在output窗口中输出变量的内容----Debug类2. Pin变量 / add watch 实时监控变量的变化(1) pin to sources(2) add watch3. 自由拖动黄色箭头图标4. 直接修改变量的值5. 两个断点的联动&#xff08;当一个断点触发&#xff0c;另一个断点才会触发debug&#xff09;1.…

变革:区块链上的政府和企业应用

发表时间&#xff1a;2022年5月12日 信息来源&#xff1a;coingeek.com 区块链只与数字货币和金融交易挂钩的时代已经过去了。这项技术在过去十年中不断发展&#xff0c;今天&#xff0c;它为游戏、社交媒体应用、医疗保健以及供应链管理等多个领域提供着支持。相比其它领域&am…