Netty网络编程实战:基于Netty的Http服务器开发

news2025/1/12 9:59:09

Netty网络编程实战:基于Netty的Http服务器开发

文章目录

  • Netty网络编程实战:基于Netty的Http服务器开发
    • 介绍
    • 功能需求
      • 服务端代码实现
    • 基于Netty的WebSocket开发网页版聊天室
      • WebSocket简介
      • WebSocket和HTTP的区别
      • 基础环境准备
      • 服务端开发
    • Netty中粘包和拆包的解决方案
      • 粘包和拆包简介
      • 粘包和拆包代码演示
        • 粘包
      • 粘包和拆包的解决方法

介绍

Netty的HTTP协议栈无论在性能还是可靠性上,都表现优异,非常适合在非Web容器的场景下应
用,相比于传统的Tomcat、Jetty等Web容器,它更加轻量和小巧,灵活性和定制性也更好。

在这里插入图片描述

功能需求

  1. Netty 服务器在 8080 端口监听
  2. 浏览器发出请求 "http://localhost:8080/ "
  3. 服务器可以回复消息给客户端 "Hello! 我是Netty服务器 " ,并对特定请求资源进行过滤.

服务端代码实现

NettyHttpServer

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * 聊天室服务端
 */
public class NettyHttpServer {
    //端口号
    private int port;

    public NettyHttpServer(int port) {
        this.port = port;
    }

    public void run() throws InterruptedException {
        //1. 创建bossGroup线程组: 处理网络事件--连接事件
        EventLoopGroup bossGroup = null;
        //2. 创建workerGroup线程组: 处理网络事件--读写事件 2*处理器线程数
        EventLoopGroup workerGroup = null;
        try {
            bossGroup = new NioEventLoopGroup(1);
            workerGroup = new NioEventLoopGroup();
            //3. 创建服务端启动助手
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //4. 设置bossGroup线程组和workerGroup线程组
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class) //5. 设置服务端通道实现为NIO
                    .option(ChannelOption.SO_BACKLOG, 128)//6. 参数设置
                    .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)//6. 参数设置
                    .childHandler(new ChannelInitializer<SocketChannel>() { //7. 创建一个通道初始化对象
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //8. 向pipeline中添加自定义业务处理handler
                            //添加编解码器
                            ch.pipeline().addLast(new HttpServerCodec());
                            // 自定义业务处理类
                            ch.pipeline().addLast(new NettyHttpServerHandler());
                        }
                    });
            //9. 启动服务端并绑定端口,同时将异步改为同步
            ChannelFuture future = serverBootstrap.bind(port);
            future.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        System.out.println("端口绑定成功!");
                    } else {
                        System.out.println("端口绑定失败!");
                    }
                }
            });
            System.out.println("http服务端启动成功.");
            //10. 关闭通道(并不是真正意义上关闭,而是监听通道关闭的状态)和关闭连接池
            future.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new NettyHttpServer(8080).run();
    }
}

NettyHttpServerHandle

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;

/**
 * http服务器处理类
 */
public class NettyHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
    /**
     * 读取就绪事件
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
        //1.判断请求是不是HTTP请求
        if (msg instanceof HttpRequest) {
            DefaultHttpRequest request = (DefaultHttpRequest) msg;
            System.out.println("浏览器请求路径:" + request.uri());
            if ("/favicon.ico".equals(request.uri())) {
                System.out.println("图标不响应");
                return;
            }
            //2.给浏览器进行响应
            ByteBuf byteBuf = Unpooled.copiedBuffer("Hello! 我是Netty服务器 ", CharsetUtil.UTF_8);
            DefaultFullHttpResponse response =
                    new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
                            HttpResponseStatus.OK, byteBuf);
            //2.1 设置响应头
            response.headers().set(HttpHeaderNames.CONTENT_TYPE,
                    "text/html;charset=utf-8");
            response.headers().set(HttpHeaderNames.CONTENT_LENGTH,
                    byteBuf.readableBytes());
            ctx.writeAndFlush(response);
        }
    }
}

基于Netty的WebSocket开发网页版聊天室

WebSocket简介

WebSocket是一种在单个TCP连接上进行全双工通信的协议。WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据在WebSocket API中,客户端和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

应用场景十分广泛:

  1. 社交订阅
  2. 协同编辑/编程
  3. 股票基金报价
  4. 体育实况更新
  5. 多媒体聊天
  6. 在线教育

WebSocket和HTTP的区别

http协议是用在应用层的协议,他是基于tcp协议的,http协议建立连接也必须要有三次握手才能发送信息。 http连接分为短连接,长连接,短连接是每次请求都要三次握手才能发送自己的信息。即每一个request对应一个response。长连接是在一定的期限内保持连接。保持TCP连接不断开。客户端与服务器通信,必须要有客户端先发起, 然后服务器返回结果。客户端是主动的,服务器是被动的。 客户端要想实时获取服务端消息就得不断发送长连接到服务端.

WebSocket实现了多路复用,他是全双工通信。在webSocket协议下服务端和客户端可以同时发送信息。 建立了WebSocket连接之后, 服务端可以主动发送信息到客户端,而且信息当中不必在带有head的部分信息了与http的长链接通信来说。这种方式,不仅能降低服务器的压力,而且信息当中也减少了部分多余的信息。

基础环境准备

  1. 在idea中创建Springboot工程
    在这里插入图片描述

  2. 相关依赖
    pom.xml

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!--整合web模块-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--整合模板引擎 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-thymeleaf</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

        <!--引入netty依赖 -->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

  1. 静态资源
    在这里插入图片描述
    如果有想实际使用测试的需要,这些资源都可以私发消息找我获取,包括项目源代码!
  2. yam配置
server:
	port: 8080
resources:
	static-locations:
	- classpath:/static/
spring:
	thymeleaf:
		cache: false
		checktemplatelocation: true
		enabled: true
		encoding: UTF-8
		mode: HTML5
		prefix: classpath:/templates/
		suffix: .html

服务端开发

  1. 添加Netty依赖
<!--引入netty依赖 -->
<dependency>
	<groupId>io.netty</groupId>
	<artifactId>netty-all</artifactId>
</dependency>
  1. Netty相关配置
netty:
	port: 8081
	path: /chat
  1. Netty配置类
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Component
@ConfigurationProperties(prefix = "netty")
@Data
public class NettyConfig {
	private int port;//netty监听的端口
    
	private String path;//websocket访问路径
}
  1. NettyWebSocketServer开发
import com.lagou.config.NettyConfig;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PreDestroy;

/**
 * Netty服务器
 */
@Component
public class NettyWebSocketServer implements Runnable {

    @Autowired
    NettyConfig nettyConfig;

    @Autowired
    WebSocketChannelInit webSocketChannelInit;


    private EventLoopGroup bossGroup = new NioEventLoopGroup(1);

    private EventLoopGroup workerGroup = new NioEventLoopGroup();

    /**
     * 资源关闭--在容器销毁是关闭
     */
    @PreDestroy
    public void close() {
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }

    @Override
    public void run() {
        try {
            //1.创建服务端启动助手
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //2.设置线程组
            serverBootstrap.group(bossGroup, workerGroup);
            //3.设置参数
            serverBootstrap.channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.DEBUG))
                    .childHandler(webSocketChannelInit);
            //4.启动
            ChannelFuture channelFuture = serverBootstrap.bind(nettyConfig.getPort()).sync();
            System.out.println("--Netty服务端启动成功---");
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
  1. 通道初始化对象
import com.lagou.config.NettyConfig;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 通道初始化对象
 */
@Component
public class WebSocketChannelInit extends ChannelInitializer {

    @Autowired
    NettyConfig nettyConfig;

    @Autowired
    WebSocketHandler webSocketHandler;

    @Override
    protected void initChannel(Channel channel) throws Exception {
        ChannelPipeline pipeline = channel.pipeline();
        //对http协议的支持.
        pipeline.addLast(new HttpServerCodec());
        // 对大数据流的支持
        pipeline.addLast(new ChunkedWriteHandler());
        //post请求分三部分. request line / request header / message body
        // HttpObjectAggregator将多个信息转化成单一的request或者response对象
        pipeline.addLast(new HttpObjectAggregator(8000));
        // 将http协议升级为ws协议. websocket的支持
        pipeline.addLast(new WebSocketServerProtocolHandler(nettyConfig.getPath()));
        // 自定义处理handler
        pipeline.addLast(webSocketHandler);

    }
}
  1. 处理对象
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;

/**
 * 自定义处理类
 * TextWebSocketFrame: websocket数据是帧的形式处理
 */
@Component
@ChannelHandler.Sharable //设置通道共享
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    public static List<Channel> channelList = new ArrayList<>();

    /**
     * 通道就绪事件
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        //当有新的客户端连接的时候, 将通道放入集合
        channelList.add(channel);
        System.out.println("有新的连接.");
    }


    /**
     * 通道未就绪--channel下线
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        //当有客户端断开连接的时候,就移除对应的通道
        channelList.remove(channel);
    }

    /**
     * 读就绪事件
     *
     * @param ctx
     * @param textWebSocketFrame
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception {
        String msg = textWebSocketFrame.text();
        System.out.println("msg:" + msg);
        //当前发送消息的通道, 当前发送的客户端连接
        Channel channel = ctx.channel();
        for (Channel channel1 : channelList) {
            //排除自身通道
            if (channel != channel1) {
                channel1.writeAndFlush(new TextWebSocketFrame(msg));
            }
        }
    }


    /**
     * 异常处理事件
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        Channel channel = ctx.channel();
        //移除集合
        channelList.remove(channel);
    }
}
  1. 启动类
import com.lagou.netty.NettyWebSocketServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class NettySpringbootApplication implements CommandLineRunner {

    @Autowired
    NettyWebSocketServer nettyWebSocketServer;

    public static void main(String[] args) {
        SpringApplication.run(NettySpringbootApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        new Thread(nettyWebSocketServer).start();
    }
}
  1. 前端js开发
$(function () {
    //这里需要注意的是,prompt有两个参数,前面是提示的话,后面是当对话框出来后,在对话框里的默认值
    var username = "";
    while (true) {
        //弹出一个输入框,输入一段文字,可以提交
        username = prompt("请输入您的名字", ""); //将输入的内容赋给变量 name ,
        if (username.trim() === "")//如果返回的有内容
        {
            alert("名称不能输入空")
        } else {
            $("#username").text(username);
            break;
        }
    }

    var ws = new WebSocket("ws://localhost:8081/chat");
    ws.onopen = function () {
        console.log("连接成功.")
    }
    ws.onmessage = function (evt) {
        showMessage(evt.data);
    }
    ws.onclose = function (){
        console.log("连接关闭")
    }

    ws.onerror = function (){
        console.log("连接异常")
    }

    function showMessage(message) {
        // 张三:你好
        var str = message.split(":");
        $("#msg_list").append(`<li class="active"}>
                                  <div class="main">
                                    <img class="avatar" width="30" height="30" src="/img/user.png">
                                    <div>
                                        <div class="user_name">${str[0]}</div>
                                        <div class="text">${str[1]}</div>
                                    </div>                       
                                   </div>
                              </li>`);
        // 置底
        setBottom();
    }

    $('#my_test').bind({
        focus: function (event) {
            event.stopPropagation()
            $('#my_test').val('');
            $('.arrow_box').hide()
        },
        keydown: function (event) {
            event.stopPropagation()
            if (event.keyCode === 13) {
                if ($('#my_test').val().trim() === '') {
                    this.blur()
                    $('.arrow_box').show()
                    setTimeout(() => {
                        this.focus()
                    }, 1000)
                } else {
                    $('.arrow_box').hide()
                    //发送消息
                    sendMsg();
                    this.blur()
                    setTimeout(() => {
                        this.focus()
                    })
                }
            }
        }
    });
    $('#send').on('click', function (event) {
        event.stopPropagation()
        if ($('#my_test').val().trim() === '') {
            $('.arrow_box').show()
        } else {
            sendMsg();
        }
    })

    function sendMsg() {
        var message = $("#my_test").val();
        $("#msg_list").append(`<li class="active"}>
                                  <div class="main self">
                                      <div class="text">` + message + `</div>
                                  </div>
                              </li>`);
        $("#my_test").val('');

        //发送消息
        message = username + ":" + message;
        ws.send(message);
        // 置底
        setBottom();
    }

    // 置底
    function setBottom() {
        // 发送消息后滚动到底部
        const container = $('.m-message')
        const scroll = $('#msg_list')
        container.animate({
            scrollTop: scroll[0].scrollHeight - container[0].clientHeight + container.scrollTop() + 100
        });
    }
});

Netty中粘包和拆包的解决方案

粘包和拆包简介

粘包和拆包是TCP网络编程中不可避免的,无论是服务端还是客户端,当我们读取或者发送消息的时候,都需要考虑TCP底层的粘包/拆包机制

TCP是个“流”协议,所谓流,就是没有界限的一串数据。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。

如图所示,假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到的字节数是不确定的,故可能存在以下4种情况。

  1. 服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包;

在这里插入图片描述

  1. 服务端一次接收到了两个数据包,D1和D2粘合在一起,被称为TCP粘包;

在这里插入图片描述

  1. 如果D2的数据包比较大, 服务端分两次读取到了两个数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这被称为TCP拆包

在这里插入图片描述

  1. 如果D1, D2的数据包都很大, 服务端分多次才能将D1和D2包接收完全,期间发生多次拆包

在这里插入图片描述

TCP粘包和拆包产生的原因:

数据从发送方到接收方需要经过操作系统的缓冲区,而造成粘包和拆包的主要原因就在这个缓冲区上。粘包可以理解为缓冲区数据堆积,导致多个请求数据粘在一起,而拆包可以理解为发送的数据大于缓冲区,进行拆分处理。

粘包和拆包代码演示

粘包

客户端

NettyClient

/**
 * 客户端
 */
public class NettyClient {
    public static void main(String[] args) throws InterruptedException {
        //1. 创建线程组
        EventLoopGroup group = new NioEventLoopGroup();
        //2. 创建客户端启动助手
        Bootstrap bootstrap = new Bootstrap();
        //3. 设置线程组
        bootstrap.group(group)
                .channel(NioSocketChannel.class)//4. 设置客户端通道实现为NIO
                .handler(new ChannelInitializer<SocketChannel>() { //5. 创建一个通道初始化对象
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        //添加解码器
                        ch.pipeline().addLast(new LineBasedFrameDecoder(2048));
                        //6. 向pipeline中添加自定义业务处理handler
                        ch.pipeline().addLast(new NettyClientHandler());
                    }
                });
        //7. 启动客户端,等待连接服务端,同时将异步改为同步
        ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9999).sync();
        //8. 关闭通道和关闭连接池
        channelFuture.channel().closeFuture().sync();
        group.shutdownGracefully();
    }
}

NettyClientHandler

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.util.CharsetUtil;

/**
 * 客户端处理类
 */
public class NettyClientHandler implements ChannelInboundHandler {

    /**
     * 通道就绪事件
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for (int i = 0; i < 10; i++) {
            ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀.我是Netty客户端"+i+"$",
                    CharsetUtil.UTF_8));
        }
    }

    /**
     * 通道读就绪事件
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("服务端发送的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
    }


    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {

    }


    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {

    }


    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

    }
}

服务端

NettyServer

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.LineBasedFrameDecoder;

import java.nio.charset.StandardCharsets;

/**
 * Netty服务端
 */
public class NettyServer {
    public static void main(String[] args) throws InterruptedException {
        //1. 创建bossGroup线程组: 处理网络事件--连接事件
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        //2. 创建workerGroup线程组: 处理网络事件--读写事件 2*处理器线程数
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        //3. 创建服务端启动助手
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        //4. 设置bossGroup线程组和workerGroup线程组
        serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class) //5. 设置服务端通道实现为NIO
                .option(ChannelOption.SO_BACKLOG, 128)//6. 参数设置
                .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)//6. 参数设置
                .childHandler(new ChannelInitializer<SocketChannel>() { //7. 创建一个通道初始化对象
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        //添加解码器
                        //ch.pipeline().addLast(new LineBasedFrameDecoder(2048));
                        ByteBuf byteBuf = Unpooled.copiedBuffer("$".getBytes(StandardCharsets.UTF_8));
                        ch.pipeline().addLast(new DelimiterBasedFrameDecoder(2048, byteBuf));
                        //8. 向pipeline中添加自定义业务处理handler
                        ch.pipeline().addLast(new NettyServerHandler());
                    }
                });
        //9. 启动服务端并绑定端口,同时将异步改为同步
        ChannelFuture future = serverBootstrap.bind(9999);
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    System.out.println("端口绑定成功!");
                } else {
                    System.out.println("端口绑定失败!");
                }
            }
        });
        System.out.println("服务端启动成功.");
        //10. 关闭通道(并不是真正意义上关闭,而是监听通道关闭的状态)和关闭连接池
        future.channel().closeFuture().sync();
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

NettyServerHandler

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.util.CharsetUtil;

/**
 * 自定义处理Handler
 */
public class NettyServerHandler implements ChannelInboundHandler {
    public int count = 0;

    /**
     * 通道读取事件
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("客户端发送过来的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
        System.out.println("读取次数:"+(++count));
    }


    /**
     * 通道读取完毕事件
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("你好.我是Netty服务端",
                CharsetUtil.UTF_8));//消息出站
    }

    /**
     * 通道异常事件
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {

    }


    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {

    }
}

运行结果

在这里插入图片描述

服务端一次读取了客户端发送过来的消息,应该读取10次. 因此发生粘包.

  1. 拆包

客户端

NettyClient

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
 * 客户端
 */
public class NettyClient {
    public static void main(String[] args) throws InterruptedException {
        //1. 创建线程组
        EventLoopGroup group = new NioEventLoopGroup();
        //2. 创建客户端启动助手
        Bootstrap bootstrap = new Bootstrap();
        //3. 设置线程组
        bootstrap.group(group)
                .channel(NioSocketChannel.class)//4. 设置客户端通道实现为NIO
                .handler(new ChannelInitializer<SocketChannel>() { //5. 创建一个通道初始化对象
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        //6. 向pipeline中添加自定义业务处理handler
                        ch.pipeline().addLast(new NettyClientHandler());
                    }
                });
        //7. 启动客户端,等待连接服务端,同时将异步改为同步
        ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9999).sync();
        //8. 关闭通道和关闭连接池
        channelFuture.channel().closeFuture().sync();
        group.shutdownGracefully();
    }
}

NettyClientHandler

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.util.CharsetUtil;

/**
 * 客户端处理类
 */
public class NettyClientHandler implements ChannelInboundHandler {

    /**
     * 通道就绪事件
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for (int i = 0; i < 10; i++) {
            ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀.我是Netty客户端"+i+"$",
                    CharsetUtil.UTF_8));
        }
    }

    /**
     * 通道读就绪事件
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("服务端发送的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
    }


    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {

    }


    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {

    }


    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

    }
}

服务端

NettyServer

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.LineBasedFrameDecoder;

import java.nio.charset.StandardCharsets;

/**
 * Netty服务端
 */
public class NettyServer {
    public static void main(String[] args) throws InterruptedException {
        //1. 创建bossGroup线程组: 处理网络事件--连接事件
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        //2. 创建workerGroup线程组: 处理网络事件--读写事件 2*处理器线程数
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        //3. 创建服务端启动助手
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        //4. 设置bossGroup线程组和workerGroup线程组
        serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class) //5. 设置服务端通道实现为NIO
                .option(ChannelOption.SO_BACKLOG, 128)//6. 参数设置
                .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)//6. 参数设置
                .childHandler(new ChannelInitializer<SocketChannel>() { //7. 创建一个通道初始化对象
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        //添加解码器
                        //ch.pipeline().addLast(new LineBasedFrameDecoder(2048));
                        ByteBuf byteBuf = Unpooled.copiedBuffer("$".getBytes(StandardCharsets.UTF_8));
                        ch.pipeline().addLast(new DelimiterBasedFrameDecoder(2048, byteBuf));
                        //8. 向pipeline中添加自定义业务处理handler
                        ch.pipeline().addLast(new NettyServerHandler());
                    }
                });
        //9. 启动服务端并绑定端口,同时将异步改为同步
        ChannelFuture future = serverBootstrap.bind(9999);
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    System.out.println("端口绑定成功!");
                } else {
                    System.out.println("端口绑定失败!");
                }
            }
        });
        System.out.println("服务端启动成功.");
        //10. 关闭通道(并不是真正意义上关闭,而是监听通道关闭的状态)和关闭连接池
        future.channel().closeFuture().sync();
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

NettyServerHandler

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.util.CharsetUtil;

/**
 * 自定义处理Handler
 */
public class NettyServerHandler implements ChannelInboundHandler {
    public int count = 0;

    /**
     * 通道读取事件
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("客户端发送过来的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
        System.out.println("读取次数:"+(++count));
    }


    /**
     * 通道读取完毕事件
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("你好.我是Netty服务端",
                CharsetUtil.UTF_8));//消息出站
    }

    /**
     * 通道异常事件
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {

    }


    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {

    }
}

运行结果

在这里插入图片描述

当客户端发送的数据包比较大的时候, 读取了18次, 应该读取10次, 则发送拆包事件.

粘包和拆包的解决方法

  1. 业内解决方案

由于底层的TCP无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案,可以归纳如下。

  • 消息长度固定,累计读取到长度和为定长LEN的报文后,就认为读取到了一个完整的信息
  • 将换行符作为消息结束符
  • 将特殊的分隔符作为消息的结束标志,回车换行符就是一种特殊的结束分隔符
  • 通过在消息头中定义长度字段来标识消息的总长度
  1. Netty中的粘包和拆包解决方案

Netty提供了4种解码器来解决,分别如下:

  • 固定长度的拆包器 FixedLengthFrameDecoder,每个应用层数据包的都拆分成都是固定长度的大小
  • 行拆包器 LineBasedFrameDecoder,每个应用层数据包,都以换行符作为分隔符,进行分割拆分
  • 分隔符拆包器 DelimiterBasedFrameDecoder,每个应用层数据包,都通过自定义的分隔符,进行分割拆分
  • 基于数据包长度的拆包器 LengthFieldBasedFrameDecoder,将应用层数据包的长度,作为接收端应用层数据包的拆分依据。按照应用层数据包的大小,拆包。这个拆包器,有一个要求,就是应用层协议中包含数据包的长度
  1. 代码实现
  • LineBasedFrameDecoder解码器

    ch.pipeline().addLast(new LineBasedFrameDecoder(2048));
    
    ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀,我是Netty客户端"+i+"\n",
    CharsetUtil.UTF_8));
    
  • DelimiterBasedFrameDecoder解码器

ByteBuf byteBuf =
Unpooled.copiedBuffer("$".getBytes(StandardCharsets.UTF_8));
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(2048, byteBuf));
ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀,我是Netty客户端"+i+"$",
CharsetUtil.UTF_8));

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/365871.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

关于PHP的webshell免杀小结

0X00普通的一句话木马&#xff08;适用于CTF和小站&#xff09; <?php eval($_POST[a]); ?> //函数的相似替换 <?php assert($_POST[a]); ?><?php eval($_POST[110]);?>与第一个一句话木马相比多了一个"“字符&#xff0c;我们发现这个字符的含义…

【科研】测试速通:python不同文件夹下同名图像拼接

论文必备图像拼接笔记 速通结果&#xff1a; 现有&#xff1a;测试样本相同&#xff08;名命相同&#xff09;&#xff0c;测试模型不同&#xff0c;测试结果分别保存至不同文件夹 目标&#xff1a;结果显示在同一张图像上 目录 论文必备图像拼接笔记 1.如果图像格式不一致…

怎么维护Linux VPS 服务器?简单7个步骤

维护VPS的目的是为了确保服务器网络始终畅通无阻。请注意&#xff0c;此列表中的任务并不是服务器维护所需完成的唯一任务。以下是 Linux VPS 服务器所有者可以做些什么来维护他们的服务器。 1.监控磁盘空间 服务器是个人服务器还是具有多个用户帐户的服务器并不重要&#xff0…

精简版SDL落地实践

一、前言一般安全都属于运维部下面&#xff0c;和上家公司的运维总监聊过几次一些日常安全工作能不能融入到DevOps中&#xff0c;没多久因为各种原因离职。18年入职5月一家第三方支付公司&#xff0c;前半年在各种检查中度过&#xff0c;监管形势严峻加上大领导对安全的重视(主…

【数据结构】二叉树(C语言实现)

文章目录一、树的概念及结构1.树的概念2.树的相关概念名词3.树的表示4.树在实际中的运用二、二叉树概念及结构1.二叉树的概念2.特殊的二叉树3.二叉树的性质4.二叉树的存储结构三、二叉树链式结构的实现1.结构的定义2.构建二叉树3.二叉树前序遍历4.二叉树中序遍历5.二叉树后序遍…

QWebEngineView 类 详细使用说明

文章目录 一、前言 二、详述 三、属性 四、公共函数 五、重新实现的公共函数 六、公共槽函数 七、信号 八、保护函数 九、重新实现的受保护函数 10、总结 一、前言 原文链接 QWebEngineView类提供了一个小部件&#xff0c;用于查看和编辑Web文档。 Header: #include < …

【Web安全-MSF记录篇章一】

文章目录前言msfvenom生成远控木马基本系统命令webcam 摄像头命令常用的信息收集脚本注册表设置nc后门开启 rdp&添加用户获取哈希mimikatz抓取密码前言 最近打站&#xff0c;可以感觉到之前的学的渗透知识忘记很多。。。。。多用多看多练&#xff0c;简单回顾一下 msfven…

2023年了,零基础小白转行IT学习Java还有前途吗?

“2023年了&#xff0c;转行IT学习Java是不是已经听过看过很多次了&#xff0c;Java从出现到现在有多少年了呢&#xff1f;掐指一算&#xff0c;Java是1995年由Sun公司推出的一款高级编程语言……距今已有28年了&#xff01; Sun公司都被收购了&#xff0c;莫不是Java也要垮台了…

Android实现Dribbble上动感的Gallery App Icon

先来看看原Dribbble上动感的Gallery App Icon效果图思路拆解一下&#xff0c;还是比较简单&#xff0c;需要绘制的有&#xff1a;圆形背景太阳(圆形)山(三角形)云朵(圆角矩形 三个圆)需要进行的动画&#xff1a;太阳 - 旋转动画山 - 上下平移动画云朵 - 左右平移动画不必绘制圆…

随想录二刷 (双指针法) leetcode 27 26 283 844

双指针法的原理 双指针法相对于暴力解法的优点有以下几点 暴力遍历的时间复杂度会比较高双指针法利用两个指针进行遍历完成双层循环所做的事情 双指针一般有两种方法 同向指针&#xff0c;双向指针 第一题 leetcode 27 移除元素 题目描述 题目分析 采用暴力遍历可以得出结…

vector的基本使用

目录 介绍&#xff1a; vector iterator 的使用 增删查改 增&#xff08;push_back insert&#xff09;&#xff1a; 删(pop_back erase)&#xff1a; swap&#xff1a; vector的容量和扩容&#xff1a; 排序&#xff08;sort&#xff09;&#xff1a; 介绍&#xff…

SpringBoot入门(二)

这里写目录标题一、SpringBoot整合Junit1.1 搭建SpringBoot工程1.2 引入starter-test起步依赖1.3 编写类1.4 测试二、SpringBoot整合mybatis2.1 搭建SpringBoot工程2.2 引入mybatis起步依赖&#xff0c;添加驱动2.3 编写DataSource和MyBatis相关配置2.4 定义表和实体类2.5 编写…

100%BIM学员的疑惑:不会CAD可以学Revit吗?

在新一轮科技创新和产业变革中&#xff0c;信息化与建筑业的融合发展已成为建筑业发展的方向&#xff0c;将对建筑业发展带来战略性和全局性的影响。 建筑业是传统产业&#xff0c;推动建筑业科技创新&#xff0c;加快推进信息化发展&#xff0c;激发创新活力&#xff0c;培育…

apk中代码执行adb指令实现

背景&#xff1a;想要在android apk中直接使用adb指令&#xff0c;从而不需要把手机通过数据线方式连接到电脑&#xff0c;在电脑端执行adb指令。 一、权限相关 想要在apk代码中执行adb命令&#xff0c;涉及到执行权限。 首先手机需要有root权限。其次就算手机已经root了&…

yolov5/6/7系列模型训练日志结果数据对比分析可视化

早在之前使用yolov3和yolov4这类项目的时候可视化分析大都是自己去做的&#xff0c;到了yolov5的时候&#xff0c;变成了一个工具包了&#xff0c;作者全部集成进去了&#xff0c;这里我们以一个具体的结果为例&#xff0c;如下&#xff1a;整个训练过程产生的指标等数据都会自…

11.3 基于Django4的可重用、用户注册和登录系统搭建(优化)

文章目录邮件注册发送邮件功能测试基本的邮件注册功能实现完成注册表单完成注册的业务逻辑密码加密功能实现邮件注册确认创建模型修改视图测试处理邮件确认请求修改登录规则测试邮件注册 根据官方文档进行&#xff1a;https://docs.djangoproject.com/zh-hans/4.1/topics/emai…

什么是智慧实验室?

智慧实验室是利用现代信息技术和先进设备将实验室实现智能化和智慧化的概念。通过将各种数据、信息和资源整合在一起&#xff0c;实现实验室设备的互联互通&#xff0c;数据的实时采集、传输、处理和分析&#xff0c;从而提高实验室的效率、精度和可靠性。一、智慧实验室包含多…

Java~对于代码块与内部类的理解

目录 代码块 普通代码块 构造代码块 静态代码块 内部类 成员内部类 普通内部类 静态内部类 局部内部类 代码块 使用“{}”定义的一段代码成为代码块&#xff0c;代码块分为普通代码块、构造代码块、匿名代码块、同步代码块。 普通代码块 定义在方法中的代码&#x…

【go语言之thrift协议一】

go语言之thrift协议thrift文件shared.thriftSharedStructSharedServiceSharedServiceProcessorSharedServiceGetStructArgsSharedServiceGetStructResulttutorial.thrift基本数据类型引入其他thrift文件自定义类型定义常量enum继承thrift 相对于grpc而言&#xff0c;可能用的不…

逆向-还原代码之max 再画堆栈图 (Interl 64)

// source code #include <stdio.h> void max(int * a, int * b) { if (*a < *b) *a *b; } int main() { int a 5, b 6; max(&a, &b); printf("a, b max %d\n", a); return 0; } // 再画堆栈图 下周一&#xff08;2.27…