写在前面
在上一篇中对Pipelines进行简单的了解,同时也留下了未解的问题,如何将Pipelines类库运用到Socket通讯过程中来解决粘包和分包。链接地址如下: 初识System.IO.Pipelines https://rjcql.blog.csdn.net/article/details/135211047
这一篇做了一个完整的demo,使用Pipelines接收和处理来自多个客户端发出的消息;相对于以往在报文包头放包体长度再结合结束符来判断的方式,确实要简洁了许多。
代码实现
服务端实现
using System.Net.Sockets;
using System.Net;
using System.Text;
class Program
{
static async Task Main()
{
SocketServerForPiplines();
}
static async void SocketServerForPiplines()
{
Console.WriteLine("Socket Server");
// 创建服务端Socket对象
var serverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
serverSocket.Bind(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 9090));
serverSocket.ReceiveTimeout = 1000;
serverSocket.SendTimeout = 1000;
serverSocket.Listen(1000);
Console.WriteLine("服务端启动监听");
while (true)
{
var clientSocket = serverSocket.Accept();
Console.WriteLine("有客户端连上了");
var handler = new PiplinesHandler(clientSocket);
await handler.StartReceiveAsync();
}
Console.ReadLine();
}
}
PiplinesHandler 类:
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Linq;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;
namespace PipelinesTester
{
public class PiplinesHandler
{
private const int _minimumBufferSize = 512;
private Socket _socket;
private Pipe _pipe;
public PiplinesHandler(Socket socket)
{
_socket = socket;
var options = new PipeOptions(pauseWriterThreshold: 4096, resumeWriterThreshold: 1024);
_pipe = new Pipe(options);
}
public async Task StartReceiveAsync()
{
Task receiveTask = ReceiveMessageAsync();
Task processTask = ProcessMessageAsync();
await Task.WhenAll(receiveTask, processTask);
}
private async Task ReceiveMessageAsync()
{
PipeWriter writer = _pipe.Writer;
while (true)
{
try
{
//从writer申请缓冲区
Memory<byte> memory = writer.GetMemory(_minimumBufferSize);
//从socket读取数据,直接写入到缓冲区中,即直接写入了PipeWriter中
int bytesRead = await _socket.ReceiveAsync(memory, SocketFlags.None);
if (bytesRead == 0)
{
break;
}
//前移写标志位
writer.Advance(bytesRead);
//通知Reader,可以读取了
var result = await writer.FlushAsync();
if (result.IsCompleted)
break;
}
catch (Exception e)
{
Console.WriteLine(e);
break;
}
}
await writer.CompleteAsync();
try
{
_socket.Shutdown(SocketShutdown.Both);
_socket.Close();
}
catch (Exception e)
{
Console.WriteLine(e);
}
}
private async Task ProcessMessageAsync()
{
PipeReader _pipeReader = _pipe.Reader;
while (true)
{
//读取消息
var result = await _pipeReader.ReadAsync();
var buffer = result.Buffer;
//查找结束符
SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position == null)
{
continue;
}
// 处理消息
var line = buffer.Slice(0, position.Value);
string msg = Encoding.UTF8.GetString(line);
Console.WriteLine(msg);
// 前移PipeReader
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
_pipeReader.AdvanceTo(buffer.Start, buffer.End);
// Stop reading if there's no more data coming.
if (result.IsCompleted)
{
break;
}
}
await _pipeReader.CompleteAsync();
}
}
}
客户端实现
using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
class Program
{
static void Main(string[] args)
{
TcpClientTest();
}
static void TcpClientTest()
{
Console.WriteLine("TcpClient");
var msg = $"这是来自客户端的消息{DateTime.Now.ToString("yyyy-MM-dd:HH:mm:ss")}\n";
var client = new TcpClient("127.0.0.1", 9090);
var sendStream = client.GetStream();
var sendBytes = Encoding.Default.GetBytes(msg);
sendStream.Write(sendBytes, 0, sendBytes.Length);
sendStream.Flush();
sendStream.Close();//关闭网络流
client.Close();//关闭客户端
Console.WriteLine(msg);
Console.ReadLine();
}
}