创建TCP通信
nodeJS的Net模块实现了底层通信接口
通信过程
- 创建服务端:接收和回写客户端数据
- 创建客户端:发送和接收服务端数据
- 数据传输: 内置服务事件和方法读写数据
通信事件
- listing事件:调用server.listen方法之后触发
- connection事件:新的连接建立时触发
- close事件: 当server关闭时触发
- error事件: 当错误出现时触发
通信事件&方法
- data事件:当接收到数据时触发该事件
- 每当某一端调用write方法来发送数据的时候,那么另一端就可以通过on来监听data事件去消耗数据。其实也就是从可读流里拿数据的操作
- write方法:在socket上发送数据,默认ut8编码
- 它和data方法是相对的,net模块所创建的都是基于流的操作,所以他本身就是可读流和可写流的集合。data可以去消费数据,write就可以用来写入数据。
- end操作:当socket的一端发送FIN包时触发,结束可读端
基于net创建服务端和客户端的TCP通信
自此我们就知道了,在node中通过net模块可以创建一个基于流操作的tcp通信,依据内置的方法可以创建一个服务端和一个客户端然后再去监听具体的事件,当某些事件被触发的时候,我们就可以去利用相应的方法来生产和消费数据。
- server.js
const net = require('net')
// 创建服务端实例
const server = net.createServer()
const PORT = 1234
const HOST = 'localhost'
server.listen(PORT, HOST)
server.on('listening', () => {
console.log(`服务端已经开启在 ${HOST}: ${PORT}`)
})
// 接收消息 回写消息
server.on('connection', (socket) => {
socket.on('data', (chunk) => {
const msg = chunk.toString()
console.log(msg)
// 回数据
socket.write(Buffer.from('您好' + msg))
})
})
server.on('close', () => {
console.log('服务端关闭了')
})
server.on('error', (err) => {
if (err.code == 'EADDRINUSE') {
console.log('地址正在被使用')
}else{
console.log(err)
}
})
- client.js
const net = require('net')
const client = net.createConnection({
port: 1234,
host: '127.0.0.1'
})
client.on('connect', () => {
client.write('小星星')
})
client.on('data', (chunk) => {
console.log(chunk.toString())
})
client.on('error', (err) => {
console.log(err)
})
client.on('close', () => {
console.log('客户端断开连接')
})
TCP数据粘包
通信包含数据发送端和接收端.发送端先将数据放在一个缓存区,累积完数据之后再统一发送.接收端同样将接收的数据先放到缓存中,然后再执行数据的获取和使用。 这样的好处是能减少IO操作带来的性能消耗,但是对于数据的使用就会产生粘包
的问题。比如客户端向服务端发送连续的数据:
服务端接收之后会把他们放在一起再返回给客户端。
最后客户端接收到的数据就是下面这样了,后面的小星星就被粘在一起了。
主要的解决方法就是加入时间间隔,比如每过1s发送一条数据。这样虽然能解决问题,但是如果在传输的过程中有很多的数据,而且还涉及网络贷款与用户的等待时长都是有等待上限的,不应该这样去做处理。那么现实当中是如何处理的呢?答案是通过拆包与封包
还有一个问题就是数据是放在缓存中的,那么什么时候发送数据呢?这个就取决于TCP的拥塞机制
了。而TCP拥塞机制决定发送时机。
数据的封包与拆包解决粘包问题
它的核心思想就是按照提前规定好的规则去打包,将来接收到数据的时候再使用特点规则去拆包即可。我们这里使用的是长度编码的方式约定通信双方的数据传输方式。
首先将被传输的数据分为定长消息头和不定长的消息体两个部分。其中头部使用header来表示,消息体使用body来表示
同时再讲头部分为序列号和消息长度两个部分,序列号去区分不同的消息包,消息长度确定一次取多少长度的内容。
数据传输的过程
- 进行数据编码,将数据编码成二进制。然后将这条数据按上述规则封装成一个包。
- 而服务端拿到这个包之后就按照约定好的长度拆解数据,获取指定长度的内容。然后把二进制解码成字符串,这边就要使用buffer了,这边涉及通过具体位置进行读写的两个操作,如下
Buffer数据读写
- writeInt16BE:将value从指定位置
写
入 - readInt16BE:从指定位置开始
读
取数据
这里32BE的就先不说了
封包与拆包
封包与拆包类实现
myTransform.js
class MyTransformCode{
constructor() {
this.packageHeaderLen = 4 // header的空间大小(4个字节)
this.serialNum = 0 // header的第一部分:描述包的编号
this.serialLen = 2 // header的第二部分:制定解码时获取包的长度
}
// 封包
encode(data, serialNum) {
const body = Buffer.from(data)
// 01 先按照指定的长度来申请一片内存空间做为 header 来使用
const headerBuf = Buffer.alloc(this.packageHeaderLen)
// 02 写入编号
headerBuf.writeInt16BE(serialNum || this.serialNum)
// 03 跳过序列号,写入解码时获取包的长度
headerBuf.writeInt16BE(body.length, this.serialLen)
if (serialNum == undefined) {
// 没有传编号,就自增
this.serialNum++
}
// 合并消息体 封包完成
return Buffer.concat([headerBuf, body])
}
// 拆包
decode(buffer) {
// 拆出头信息
const headerBuf = buffer.slice(0, this.packageHeaderLen)
// 拆出体信息
const bodyBuf = buffer.slice(this.packageHeaderLen)
return {
serialNum: headerBuf.readInt16BE(),
bodyLength: headerBuf.readInt16BE(this.serialLen),// 跳过编号位置
body: bodyBuf.toString()
}
}
// 获取包长度的方法
getPackageLen(buffer) {
// 长度小于头部长度
if (buffer.length < this.packageHeaderLen) {
return 0
} else {
// 返回实际长度 = 消息头长度 + 消息体长度
return this.packageHeaderLen + buffer.readInt16BE(this.serialLen)
}
}
}
module.exports = MyTransformCode
封包与拆包类使用
server.js
const net = require('net')
const MyTransform = require('./myTransform.js')
const server = net.createServer()
let overageBuffer = null
let ts = new MyTransform()
server.listen('1234', 'localhost')
server.on('listening', () => {
console.log('服务端运行在 localhost:1234')
})
server.on('connection', (socket) => {
socket.on('data', (chunk) => {
if (overageBuffer) {
chunk = Buffer.concat([overageBuffer, chunk])
}
let packageLen = 0
while(packageLen = ts.getPackageLen(chunk)) {
const packageCon = chunk.slice(0, packageLen)
chunk = chunk.slice(packageLen)
const ret = ts.decode(packageCon)
console.log(ret)
socket.write(ts.encode(ret.body, ret.serialNum))
}
overageBuffer = chunk
})
})
client.js
const net = require('net')
const MyTransform = require('./myTransform.js')
let overageBuffer = null
let ts = new MyTransform()
const client = net.createConnection({
host: 'localhost',
port: 1234
})
client.write(ts.encode('小星星1'))
client.write(ts.encode('小星星2'))
client.write(ts.encode('小星星3'))
client.write(ts.encode('小星星4'))
client.write(ts.encode('小星星5'))
client.on('data', (chunk) => {
if (overageBuffer) {
chunk = Buffer.concat([overageBuffer, chunk])
}
let packageLen = 0
while(packageLen = ts.getPackageLen(chunk)) {
const packageCon = chunk.slice(0, packageLen)
chunk = chunk.slice(packageLen)
const ret = ts.decode(packageCon)
console.log(ret)
}
overageBuffer = chunk
})