C# 使用Pipelines处理Socket数据包

news2024/11/18 13:25:39

写在前面

在上一篇中对Pipelines进行简单的了解,同时也留下了未解的问题,如何将Pipelines类库运用到Socket通讯过程中来解决粘包和分包。链接地址如下: 初识System.IO.Pipelines icon-default.png?t=N7T8https://rjcql.blog.csdn.net/article/details/135211047

这一篇做了一个完整的demo,使用Pipelines接收和处理来自多个客户端发出的消息;相对于以往在报文包头放包体长度再结合结束符来判断的方式,确实要简洁了许多。

代码实现

服务端实现

using System.Net.Sockets;
using System.Net;
using System.Text;

class Program
{
    static async Task Main()
    {
        SocketServerForPiplines();
    }

    static async void SocketServerForPiplines()
    {
        Console.WriteLine("Socket Server");

        // 创建服务端Socket对象
        var serverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
        serverSocket.Bind(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 9090));
        serverSocket.ReceiveTimeout = 1000;
        serverSocket.SendTimeout = 1000;
        serverSocket.Listen(1000);
        Console.WriteLine("服务端启动监听");

        while (true)
        {
            var clientSocket = serverSocket.Accept();

            Console.WriteLine("有客户端连上了");

            var handler = new PiplinesHandler(clientSocket);
            await handler.StartReceiveAsync();
        }

        Console.ReadLine();
    }
}

 PiplinesHandler 类:

using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Linq;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;

namespace PipelinesTester
{
    public class PiplinesHandler
    {
        private const int _minimumBufferSize = 512;
        private Socket _socket;
        private Pipe _pipe;

        public PiplinesHandler(Socket socket)
        {
            _socket = socket;
            var options = new PipeOptions(pauseWriterThreshold: 4096, resumeWriterThreshold: 1024);
            _pipe = new Pipe(options);
        }

        public async Task StartReceiveAsync()
        {
            Task receiveTask = ReceiveMessageAsync();
            Task processTask = ProcessMessageAsync();

            await Task.WhenAll(receiveTask, processTask);
        }


        private async Task ReceiveMessageAsync()
        {
            PipeWriter writer = _pipe.Writer;

            while (true)
            {
                try
                {
                    //从writer申请缓冲区
                    Memory<byte> memory = writer.GetMemory(_minimumBufferSize);
                    //从socket读取数据,直接写入到缓冲区中,即直接写入了PipeWriter中        
                    int bytesRead = await _socket.ReceiveAsync(memory, SocketFlags.None);
                    if (bytesRead == 0)
                    {
                        break;
                    }
                    //前移写标志位
                    writer.Advance(bytesRead);
                    //通知Reader,可以读取了
                    var result = await writer.FlushAsync();

                    if (result.IsCompleted)
                        break;
                }
                catch (Exception e)
                {
                    Console.WriteLine(e);
                    break;
                }

            }

            await writer.CompleteAsync();

            try
            {
                _socket.Shutdown(SocketShutdown.Both);
                _socket.Close();
            }
            catch (Exception e)
            {
                Console.WriteLine(e);
            }
        }

        private async Task ProcessMessageAsync()
        {
            PipeReader _pipeReader = _pipe.Reader;

            while (true)
            {
                //读取消息
                var result = await _pipeReader.ReadAsync();
                var buffer = result.Buffer;
                //查找结束符            
                SequencePosition? position = buffer.PositionOf((byte)'\n');

                if (position == null)
                {
                    continue;
                }

                // 处理消息
                var line = buffer.Slice(0, position.Value);
                string msg = Encoding.UTF8.GetString(line);
                Console.WriteLine(msg);

                // 前移PipeReader
                buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
                _pipeReader.AdvanceTo(buffer.Start, buffer.End);

                // Stop reading if there's no more data coming.
                if (result.IsCompleted)
                {
                    break;
                }
            }

            await _pipeReader.CompleteAsync();
        }
    }
}

客户端实现

using System;
using System.Net;
using System.Net.Sockets;
using System.Text;

class Program
{
    static void Main(string[] args)
    {
        TcpClientTest();
    }

    static void TcpClientTest()
    {
        Console.WriteLine("TcpClient");

        var msg = $"这是来自客户端的消息{DateTime.Now.ToString("yyyy-MM-dd:HH:mm:ss")}\n";
        var client = new TcpClient("127.0.0.1", 9090);
        var sendStream = client.GetStream();
        var sendBytes = Encoding.Default.GetBytes(msg);
        sendStream.Write(sendBytes, 0, sendBytes.Length);
        sendStream.Flush();
        sendStream.Close();//关闭网络流  
        client.Close();//关闭客户端  

        Console.WriteLine(msg);

        Console.ReadLine();
    }
}

调用示例

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1338429.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

嵌入式单片机的存储区域与堆和栈

一、单片机存储区域 如图所示位STM32F103ZET6的参数&#xff1a; 单片机的ROM&#xff08;内部FLASH&#xff09;&#xff1a;512KB&#xff0c;用来存放程序代码的空间。 单片机的RAM&#xff1a;64KB&#xff0c;一般都被分配为堆、栈、变量等的空间。 二、堆和栈的概念 …

深入探索Spring Boot的核心功能:快速构建原生程序响应式处理数据(文末送书)

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525; 个人专栏:《linux深造日志》《粉丝福利》 ⛺️生活的理想&#xff0c;就是为了理想的生活! ⛳️ 写在前面参与规则 ✅参与方式&#xff1a;关注博主、点赞、收藏、评论&#xff0c;任意评论&#xff08;每人最多评论…

【linux】Linux管道的原理与使用场景

Linux管道是Linux命令行界面中一种强大的工具&#xff0c;它允许用户将多个命令链接起来&#xff0c;使得一个命令的输出可以作为另一个命令的输入。这种机制使得我们可以创建复杂的命令链&#xff0c;并在处理数据时提供了极大的灵活性。在本文中&#xff0c;我们将详细介绍Li…

什么是焊点保护胶?它的作用是什么

焊点保护胶是一种用于电子元件焊点和连接处的保护的特殊胶水。它主要作用是提供以下几点的保护和增强功能&#xff1a; 防腐蚀保护 电子元件的焊点容易受到环境中的湿度、化学物质和其他腐蚀性因素的影响。焊点保护胶能够形成一层防护膜&#xff0c;减少腐蚀的风险&#xff0c…

苏州科技大学计算机817程序设计(java) 学习笔记

之前备考苏州科技大学计算机&#xff08;专业课&#xff1a;817程序设计&#xff08;java&#xff09;&#xff09;。 学习Java和算法相关内容&#xff0c;现将笔记及资料统一整理归纳移至这里。 部分内容不太完善&#xff0c;欢迎提议。 目录 考情分析 考卷题型 刷题攻略…

Typora使用PicGo+Gitee上传图片报错403 Forbidden

Typora使用PicGoGitee上传图片报错403 Forbidden Typora使用PicGoGitee上传图片&#xff0c;上传失败了&#xff0c;错误信息如下 打开PicGo的日志文件查看&#xff0c;可以看到错误详情如下 换了一个插件github-plus重新配置&#xff0c;解决了这个问题 再打开日志查看&…

vue+element+springboot实现多张图片上传

1.需求说明 2.实现思路 3.el-upload组件主要属性说明 4.前端传递MultipartFile数组与服务端接收说明 5.完整代码 1.需求说明 动态模块新增添加动态功能,支持多张图片上传.实现过程中对el-upload组件不是很熟悉,踩了很多坑,当然也参考过别的文章,发现处…

浅谈互联网架构演变

更好的阅读体验 \large{\color{red}{更好的阅读体验}} 更好的阅读体验 前言 可以将某个项目或产品的架构体系按照如下方式分层&#xff1a; 业务层面&#xff1a;项目业务体系技术层面&#xff1a; 数据架构&#xff1a;数据持久层策略应用架构&#xff1a;应用层的实现方式 …

HBase深度历险 | 京东物流技术团队

简介 HBase 的全称是 Hadoop Database&#xff0c;是一个分布式的&#xff0c;可扩展&#xff0c;面向列簇的数据库&#xff0c;是一个通过大量廉价的机器解决海量数据的高速存储和读取的分布式数据库解决方案。本文会像剥洋葱一样&#xff0c;层层剥开她的心。 特点 首先我…

Android中_Service生命周期和AMS流程的创建

Service生命周期可以结合Android生命周期分析。 Service生命周期可以从两种启动Service的模式开始讲起&#xff0c;分别是context.startService()和context.bindService()。 Service的生命周期与启动和绑定状态相关。当调用startService()方法启动服务时&#xff0c;会执行onS…

65内网安全-域环境工作组局域网探针

这篇分为三个部分&#xff0c;基本认知&#xff0c;信息收集&#xff0c;后续探针&#xff0c; 基本认知 分为&#xff0c;名词&#xff0c;域&#xff0c;认知&#xff1b; 完整架构图 名词 dwz称之为军事区&#xff0c;两个防火墙之间的区域称之为dwz&#xff0c;但安全性…

大象机器人发布万元级水星Mercury人形机器人产品系列,联结未来,一触即达!

十四五机器人产业发展规划指出机器人的研发、制造、应用是衡量一个国家科技创新和高端制造业水平的重要标志。当前&#xff0c;机器人产业蓬勃发展&#xff0c;正极大改变着人类生产和生活方式&#xff0c;为经济社会发展注入强劲动能。 人形机器人作为机器人产业中重要的一环&…

【nw.js】使用nw.js将html页面打包成exe免安装程序

文章目录 一、批处理zip命令&#xff08;已有可跳过此步骤&#xff09;二、nw.js包三、使用批处理命令打包成exe可执行文件四、使用EnigmaVB打包成免安装可独立运行的exe文件五、结束 一、批处理zip命令&#xff08;已有可跳过此步骤&#xff09; 下载zip&#xff0c;你可以到该…

连续语义分割(CSS)24种最新经典方法汇总,包含数据回放、自监督、正则化等5个细分方向

连续语义分割&#xff08;CSS&#xff09;是计算机视觉中的一个新兴领域&#xff0c;其基本任务是在某一时刻学习预测特定类别的图像分割&#xff0c;并在随后需要的时候连续增加学习类别的数量&#xff0c;同时保持对已有类别的分割能力。这个过程中需要解决的主要挑战包括灾难…

Java虚拟机知识点总结

总结黑马程序员笔记 Java运行时数据区域 可以分成线程私有的和线程共享的区域。 线程私有的区域有&#xff1a;虚拟机栈&#xff0c;本地方法栈&#xff0c;程序计数器 线程共享的区域有&#xff1a;堆&#xff0c;方法区&#xff08;在JDK1.8中&#xff0c;方法区放在了本…

【验证概括 SV的数据类型_2023.12.18】

验证概括 验证的过程是保证芯片实现符合规格说明书&#xff08;Specification&#xff0c;spec&#xff09;的过程 验证的两项任务&#xff1a; RTL sim&#xff1a;前仿真&#xff0c;验证功能 GLS-Gate (Level Simulation)&#xff1a;后仿真&#xff0c;验证功能和时序 验…

JDK 14全景透视:每个Java开发者必知的新特性

欢迎来到我的博客&#xff0c;代码的世界里&#xff0c;每一行都是一个故事 JDK 14全景透视&#xff1a;每个Java开发者必知的新特性 前言&#xff1a;switch表达式标准化Switch表达式成为正式特性的意义&#xff1a;如何使用Switch表达式&#xff1a;注意事项&#xff1a; ins…

【PostGIS】在Java中操作postgis——使用springboot+Maven+mybatis框架

前言&#xff1a; PostgreSQL15对应PostGIS安装教程及空间数据可视化 空间数据库-常用空间函数 完成PostGIS的安装与配置后&#xff0c;让我们来写一个Java操作postgis数据库的demo吧~ 使用工具&#xff1a; NavicatIDEA 一、PostGIS数据库准备 在Navicat中新建一个postgr…

前端Vue进阶

Vue进阶 当你熟悉了Vue.js的基本概念和用法后&#xff0c;可以继续深入学习Vue.js的进阶内容。以下是一些Vue.js的进阶主题&#xff0c;可以帮助你更好地理解和应用Vue.js。 组件通信 Vue.js提供了多种方式来实现组件之间的通信。除了父子组件之间的通信&#xff0c;还有兄弟…

python降低图像的空间分辨率——冈萨雷斯数字图像处理

原理&#xff1a; 降低图像的空间分辨率意味着减少图像中可见的细节&#xff0c;使图像变得模糊或粗糙。这可以通过减少图像的像素数量或改变像素的排列来实现。以下是一些降低图像空间分辨率的常见原理和方法&#xff1a; 下采样&#xff08;Subsampling&#xff09;&#xf…