基于Netty实现TCP通信

news2024/11/19 3:22:24

创建一个Maven项目添加下面依赖

    <dependencies>
        <!-- 日志依赖 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.32</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.24</version>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.codehaus.jackson</groupId>
            <artifactId>jackson-mapper-asl</artifactId>
            <version>1.9.13</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.78</version>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.84.Final</version>
        </dependency>
    </dependencies>

编码解码器

package com.example.nettydemo.coder;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

import java.nio.charset.StandardCharsets;


public class NettyEncoder extends MessageToByteEncoder<String> {
    public NettyEncoder() {
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
        byte[] byteMsg = msg.getBytes(StandardCharsets.UTF_8);
        int msgLength = byteMsg.length;
        ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(4 + byteMsg.length);
        buf.writeInt(msgLength);
        buf.writeBytes(byteMsg, 0, msgLength);
        out.writeBytes(buf);
        buf.release();
    }
}

package com.example.nettydemo.coder;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.StandardCharsets;
import java.util.List;


@Slf4j
public class NettyDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int beginReader = in.readerIndex();
        int dataLength = in.readInt();
        if (in.readableBytes() < dataLength) {
            in.readerIndex(beginReader);
        } else {
            byte[] data = new byte[dataLength];
            in.readBytes(data);
            String str = new String(data, 0, dataLength, StandardCharsets.UTF_8);
            out.add(str);
        }
    }
}

服务端

package com.example.nettydemo.server;

import com.example.nettydemo.coder.NettyDecoder;
import com.example.nettydemo.coder.NettyEncoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.map.ObjectMapper;

import java.io.IOException;
import java.util.Map;


@Slf4j
public class TcpServer {
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;
    private ServerBootstrap server;
    private ChannelFuture channelFuture;
    private Integer port;

    public TcpServer(Integer port) {
        this.port = port;

        // nio连接处理池
        this.bossGroup = new NioEventLoopGroup();
        // 处理事件池
        this.workerGroup = new NioEventLoopGroup();
        server = new ServerBootstrap();
        server.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        // 自定义处理类
                        ch.pipeline().addLast(new NettyDecoder());
                        ch.pipeline().addLast(new NettyEncoder());
                        ch.pipeline().addLast(new TcpServerHandler());
                    }
                });
        server.option(ChannelOption.SO_BACKLOG, 128);
        server.childOption(ChannelOption.SO_KEEPALIVE, true);
    }

    public synchronized void startListen() {
        try {
            // 绑定到指定端口
            channelFuture = server.bind(port).sync();
            log.info("netty服务器在[{}]端口启动监听", port);
        } catch (Exception e) {
            log.error("netty服务器在[{}]端口启动监听失败", port);
            e.printStackTrace();
        }
    }

    public void sendMessageToClient(String clientIp, Object msg) {
        Map<String, Channel> channelMap = TcpServerHandler.channelSkipMap.get(port);
        Channel channel = channelMap.get(clientIp);
        String sendStr;
        try {
            sendStr = OBJECT_MAPPER.writeValueAsString(msg);
        } catch (JsonGenerationException e) {
            throw new RuntimeException(e);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

        try {
            log.info("向客户端 {} 发送消息内容:{}", clientIp, sendStr);
            channel.writeAndFlush(sendStr);
        } catch (Exception var4) {
            log.error("向客户端 {} 发送消息失败,消息内容:{}", clientIp, sendStr);
            throw new RuntimeException(var4);
        }
    }

    public void pushMessageToClients(Object msg) {
        Map<String, Channel> channelMap = TcpServerHandler.channelSkipMap.get(port);
        if (channelMap != null && !channelMap.isEmpty()) {
            channelMap.forEach((k, v) -> sendMessageToClient(k, msg));
        }
    }
}

package com.example.nettydemo.server;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;


@Slf4j
public class TcpServerHandler extends SimpleChannelInboundHandler<String> {

    /**
     * 用跳表存储连接channel
     */
    public static Map<Integer, Map<String, Channel>> channelSkipMap = new ConcurrentSkipListMap<>();

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("应用程序的监听通道异常!");
        cause.printStackTrace();
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        // 获取每个用户端连接的ip
        InetSocketAddress ipSocket = (InetSocketAddress) channel.remoteAddress();
        String clientIp = ipSocket.getAddress().getHostAddress();
        InetSocketAddress localSocket = (InetSocketAddress) channel.localAddress();
        // 本地端口做键
        int localPort = localSocket.getPort();
        Map<String, Channel> channelMap = channelSkipMap.get(localPort);
        if (channelMap == null || channelMap.isEmpty()) {
            channelMap = new HashMap<>(4);
        }
        channelMap.put(clientIp, channel);
        channelSkipMap.put(localPort, channelMap);
        log.info("应用程序添加监听通道,与客户端:{} 建立连接成功!", clientIp);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        // 获取每个用户端连接的ip
        Channel channel = ctx.channel();
        InetSocketAddress localSocket = (InetSocketAddress) channel.localAddress();
        int localPort = localSocket.getPort();
        InetSocketAddress ipSocket = (InetSocketAddress) channel.remoteAddress();
        String clientIp = ipSocket.getAddress().getHostAddress();
        Map<String, Channel> channelMap = channelSkipMap.get(localPort);
        channelMap.remove(clientIp);
        log.info("应用程序移除监听通道,与客户端:{} 断开连接!", clientIp);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
        Channel channel = channelHandlerContext.channel();
        // 获取每个用户端连接的ip
        InetSocketAddress ipSocket = (InetSocketAddress) channel.remoteAddress();
        log.info("接收到客户端: {} 应用数据:{}", ipSocket, msg);
    }
}

package com.example.nettydemo.server;


public class ServerTest {
    public static void main(String[] args) {
        TcpServer tcpServer = new TcpServer(40001);
        tcpServer.startListen();
        while (true) {
            try {
                // 每5秒向客户端发送一次 "test-朱上林123"
                Thread.sleep(5000);
                tcpServer.pushMessageToClients("test-朱上林123");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

客户端

package com.example.nettydemo.client;

import com.example.nettydemo.coder.NettyDecoder;
import com.example.nettydemo.coder.NettyEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;

import java.io.IOException;


@Slf4j
public class TcpClient {
    private EventLoopGroup group;
    private ChannelFuture channelFuture;
    private final String ip;
    private final Integer port;
    private final ObjectMapper objectMapper = new ObjectMapper();

    public TcpClient(String ip, Integer port) {
        this.ip = ip;
        this.port = port;
    }

    /**
     * 建立连接
     *
     */
    public synchronized void connectServer() {
        log.info("开始建立连接,ip:{}, port:{}", ip, port);
        // 生命nio连接池
        this.group = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap();
            // 配置解码器以及消息处理类
            b.group(this.group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new NettyEncoder());
                            pipeline.addLast(new NettyDecoder());
                            pipeline.addLast(new TcpClientHandler());
                        }
                    });

            // 开始连接
            this.channelFuture = b.connect(ip, port).sync();
        } catch (Exception var4) {
            log.error("连接建立失败,ip:{}, port:{}", ip, port);
            this.group.shutdownGracefully();
            var4.printStackTrace();
        }
    }

    /**
     * 关闭连接
     */
    public void close() {
        this.group.shutdownGracefully();
    }

    /**
     * 发送消息
     *
     * @param msg
     */
    public synchronized void sendCommonMsg(Object msg) {
        String sendStr;
        if (!getConnectStatus()) {
            connectServer();
        }
        try {
            sendStr = objectMapper.writeValueAsString(msg);
        } catch (JsonMappingException e) {
            throw new RuntimeException(e);
        } catch (JsonGenerationException e) {
            throw new RuntimeException(e);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

        try {
            log.info("发送消息内容:{}", sendStr);
            this.channelFuture.channel().writeAndFlush(sendStr);
        } catch (Exception var4) {
            log.error("发送消息失败,消息内容:{}", sendStr);
            throw new RuntimeException(var4);
        }
    }

    /**
     * 获取当前连接状态
     */
    public Boolean getConnectStatus() {
        return group != null && !group.isShutdown() && !group.isShuttingDown();
    }
}

package com.example.nettydemo.client;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;


@Slf4j
public class TcpClientHandler extends SimpleChannelInboundHandler<String> {
    /**
     * 读取事件
     *
     * @param channelHandlerContext
     * @param msg
     */
    @Override
    public void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) {
        log.info("服务返回消息 :{}", msg);
    }

    /**
     * 发生异常
     *
     * @param channelHandlerContext
     * @param throwable
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable) {
        log.error("通信发生异常:" + throwable.getMessage());
        channelHandlerContext.close();
    }
}

package com.example.nettydemo.client;


public class TcpClientTest {
    public static void main(String[] args) {
        TcpClient tcpClient = new TcpClient("127.0.0.1", 40001);
        // 客户端连接到服务器后,向服务器发送一条消息:
        tcpClient.connectServer();
        tcpClient.sendCommonMsg("我是Client,刚刚是我连接到你的!");
    }
}

启动服务端和客户端实现通信

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
下课!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1269981.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于PaddleOCR银行卡识别实现(三)

前言 基于PaddleOCR银行卡识别实现&#xff08;一&#xff09; 基于PaddleOCR银行卡识别实现&#xff08;二&#xff09; 前两篇文章讲了检测模型和识别模型的实现&#xff0c;这一篇文章姗姗来迟&#xff0c;将讲解下两个模型的串联应用和PaddleOCR的源码精简&#xff0c;下面…

AI - FlowField(流场寻路)

FlowField流场寻路&#xff0c;利用网格存储每个点对目标点的推力&#xff0c;网格上的单位根据对于推力进行移动。用于大量单位进行寻路对于同一目的地的寻路&#xff0c;常用于rts游戏等。 对应一张网格地图(图中黑块是不可行走区域) 生成热度图 计算所有网格对于目标点(…

蓝桥杯第199题 扫地机器人 暴力优化 二分法 简单题 C++

题目 扫地机器人 - 蓝桥云课 (lanqiao.cn)https://www.lanqiao.cn/problems/199/learning/?page1&first_category_id1&name%E6%89%AB%E5%9C%B0%E6%9C%BA%E5%99%A8%E4%BA%BA 思路和解题方法 首先&#xff0c;通过cin语句输入了终点位置n和障碍物数量k。使用一个数组a来…

零基础也可以学编程,分享中文编程工具开发软件

零基础也可以学编程&#xff0c;分享中文编程工具开发软件 给大家分享一款中文编程工具&#xff0c;零基础轻松学编程&#xff0c;不需英语基础&#xff0c;编程工具可下载。 这款工具不但可以连接部分硬件&#xff0c;而且可以开发大型的软件&#xff0c;象如图这个实例就是用…

MySQL 插入数据报错 Incorrect string value

当在sys_dict_data表中执行插入语句&#xff1b; insert into sys_dict_data values(1, 1, 男, 0, sys_user_sex, , , Y, 0, admin, sysdate(), , null, 性别男);报错信息如下&#xff1a; insert into sys_dict_data values(1, 1, 男, …

记录一次现网问题排查(分享查域名是否封禁小程序)

背景&#xff1a; 收到工单反馈说现网业务一个功能有异常&#xff0c;具体现象是tc.hb.cn域名无法访问&#xff0c;客户地区是河南省&#xff0c;这里记录下排查过程和思路。 首先梳理链路 客户端域名 tc.hb.cn cname—> domainparking-dnspod.cn(新加坡clb)—> snat—&…

Mysql DDL语句建表及空字符串查询出0问题

DDL语句建表 语法&#xff1a; create table 指定要建立库的库名.新建表名 &#xff08;... 新建表的字段以及类型等 ...&#xff09;comment 表的作用注释 charset 表编译格式 row_format DYNAMIC create table dev_dxtiot.sys_url_permission (id integer …

深度学习毕设项目 深度学习 python opencv 动物识别与检测

文章目录 0 前言1 深度学习实现动物识别与检测2 卷积神经网络2.1卷积层2.2 池化层2.3 激活函数2.4 全连接层2.5 使用tensorflow中keras模块实现卷积神经网络 3 YOLOV53.1 网络架构图3.2 输入端3.3 基准网络3.4 Neck网络3.5 Head输出层 4 数据集准备4.1 数据标注简介4.2 数据保存…

基于深度学习的表情动作单元识别综述

论文标题&#xff1a;基于深度学习的表情动作单元识别综述 作者&#xff1a;邵志文1&#xff0c;2&#xff0c;周 勇1&#xff0c;2&#xff0c;谭 鑫3&#xff0c;马利庄3&#xff0c;4&#xff0c;刘 兵1&#xff0c;2&#xff0c;姚 睿1&#xff0c;2 发表日期&#xff1a…

python爬虫AES案例:某招聘网站

声明&#xff1a; 该文章为学习使用&#xff0c;严禁用于商业用途和非法用途&#xff0c;违者后果自负&#xff0c;由此产生的一切后果均与作者无关 一、找出需要加密的参数 js运行 atob(‘aHR0cHM6Ly93d3cua2Fuemh1bi5jb20vc2VhcmNoLz9xdWVyeT1weXRob24mdHlwZT0w’) 拿到网址…

sqlserver12 数据库的安装步骤

点击独立安装或向现有安装添加功能 点击下一步&#xff1a; 点击我接受许可条款&#xff0c;然后点击下一步&#xff1a; 点击包含SQL server产品更新&#xff0c;然后点击下一步&#xff1a; 继续点击下一步&#xff1a; 点击SQL server&#xff08;功能安装&#xff09; 最后…

Thrift RPC Java、Go、PHP使用例子

文章目录 1、Thrift RPC介绍1.1、Protocol 支持的数据传输协议1.2、Transport 支持的数据传输方式1.3、Server 支持的服务模型1.4、IDL语法数据类型1.5、开发步骤 2、接口定义文件2.1、创建接口定义文件2.2、生成对应平台语言代码2.2.1、下载生成工具2.2.2、生成各平台语言代码…

Spring-事务支持

目录 一、事务概述 二、引入事务场景 三、Spring对事务的支持 Spring实现事务的两种方式 声明式事务之注解实现方式 1.在Spring配置文件中配置事务管理器 2. 在Spring配置文件引入tx命名空间 3. 在Spring配置文件中配置“事务注解驱动器”&#xff0c;通过注解的方式控…

一文了解什么是GIS

地理信息系统&#xff08;GIS&#xff09;是捕捉、存储、分析和呈现空间数据的强大工具。通过将地理信息与其他数据源相结合&#xff0c;GIS为有效决策至关重要的模式、关系和趋势提供了有价值的见解。 一、GIS的关键概念 1.空间数据&#xff1a;GIS依赖于空间数据&#xff0c…

RabbitMQ消息模型之Routing-Direct

Routing Direct 在Fanout模式中&#xff0c;一条消息&#xff0c;会被所有订阅的队列都消费。但是在某些场景下&#xff0c;我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。 在Direct模型下&#xff1a; 队列与交换机的绑定&#xff0c;不能是任意…

记录创建粒子的轻量级JavaScript库——particles.js(可用于登录等背景显示)

文章目录 前言一、下载particles.js二、引入particles.js并使用三、配置数据说明如有启发&#xff0c;可点赞收藏哟~ 前言 本文记录使用创建粒子的轻量级JavaScript库 particles.js 可用于登录等背景显示 一、下载particles.js 先下载particles.js库&#xff0c;放在项目libs…

504. 七进制数

这篇文章会收录到 : 算法通关第十三关-青铜挑战数学基础问题-CSDN博客 七进制数 描述 : 给定一个整数 num&#xff0c;将其转化为 7 进制&#xff0c;并以字符串形式输出。 题目 : LeetCode 504. 七进制数 : 504. 七进制数 分析 : 我们先通过二进制想一下7进制数的变化特…

剑指 Offer(第2版)面试题 9:用两个栈实现队列

剑指 Offer&#xff08;第2版&#xff09;面试题 9&#xff1a;用两个栈实现队列 剑指 Offer&#xff08;第2版&#xff09;面试题 9&#xff1a;用两个栈实现队列解法1&#xff1a;模拟拓展&#xff1a;用队列模拟栈 剑指 Offer&#xff08;第2版&#xff09;面试题 9&#xf…

Flutter 控件查阅清单

为了方便记录和使用Flutter中的各种控件&#xff0c;特写此博客以记之&#xff0c;好记性不如烂笔头嘛&#xff1a;&#xff09; 通过控件的首字母进行查找&#xff0c;本文会持续更新 控件目录 AAppBar BCContainerColumn &#xff08;列&#xff09; DDivider (分割线) EElev…

数据结构day4作业

1.单链表任意位置删除 datetype pos;printf("please input pos");scanf("%d",&pos);headdelete_all(head,pos);Output(head);Linklist delete_all(Linklist head,datetype pos) {if(pos<1||pos>length(head)||headNULL)return head;if(head->…