1. 缓冲写
在阅读这篇博客之前,请先阅读上一篇:golang-bufio 缓冲读
// buffered output
// Writer implements buffering for an io.Writer object.
// If an error occurs writing to a Writer, no more data will be
// accepted and all subsequent writes, and Flush, will return the error.
// After all data has been written, the client should call the
// Flush method to guarantee all data has been forwarded to
// the underlying io.Writer.
type Writer struct {
err error
buf []byte
n int
wr io.Writer
}
缓冲写比缓冲读取更加简单,它的工作原来就是当缓冲区满(或者用户手动强制刷新)了,把整个缓冲区中的内容写入底层数据源。它提供了两种创建的方法:
func NewWriterSize(w io.Writer, size int) *Writer
创建一个指定缓冲区大小的缓冲 Writer 并返回。func NewWriter(w io.Writer) *Writer
创建默认缓冲区大小的缓冲 Writer 并返回。
注意:
- 缓冲 Reader 的默认缓冲区大小是 4096,最小是 16,如果你设置低于这个值,它会强制设置成 16。
- 缓冲 Writer 的默认缓冲区大小是 4096,但是没有最小值,所以你可以设置的很小,但是太小了会有问题,例如当你调用写入单个字符时。
2. 缓冲写测试
这里在正式介绍之前,我们先来使用一下它,不过这里只是演示一下缓冲写的作用。缓冲写其实就是延迟写入,所以它减少的是真正写入的次数(向磁盘文件写入或者写入网络流代价都是很高的)。下面的示例可以看到,不使用缓冲写入,调用一次写入就会实际写入一次。使用缓冲写入后,写满缓冲区或者手动调用 Flush() 才会实际写入。
package main
import (
"bufio"
"fmt"
"strings"
)
func main() {
BufioWriterTest()
}
func BufioWriterTest() {
mBlock := NewMemoryBlock()
// 直接写入,不使用缓冲流,调用几次就是写入几次。
mBlock.Write([]byte("I love you yesterday and today."))
mBlock.Write([]byte("I love you yesterday and today."))
mBlock.Write([]byte("I love you yesterday and today."))
mBlock.Write([]byte("I love you yesterday and today."))
mBlock.Write([]byte("I love you yesterday and today."))
fmt.Printf("查看 MemoryBlock 中的内容:%s\n", mBlock.String())
bufWriter := bufio.NewWriter(mBlock)
// 使用缓冲流,写满缓冲区或者手动调用 Flush() 才会实际写入。
bufWriter.WriteString("I love you yesterday and today.")
bufWriter.WriteString("I love you yesterday and today.")
bufWriter.WriteString("I love you yesterday and today.")
bufWriter.WriteString("I love you yesterday and today.")
bufWriter.WriteString("I love you yesterday and today.")
fmt.Printf("查看 MemoryBlock 中的内容:%s\n", mBlock.String())
bufWriter.Flush()
fmt.Printf("查看 MemoryBlock 中的内容:%s\n", mBlock.String())
}
// 实现一个简陋的 io.Writer,俄罗斯套娃
// 它只是简单的把写入的切片附加到原来的切片上。
type MemoryBlock struct {
data []byte // 使用切片来存储数据
n int // 写入次数
}
func NewMemoryBlock() *MemoryBlock {
return &MemoryBlock{data: make([]byte, 0)}
}
// 写入数据
func (dw *MemoryBlock) Write(p []byte) (int, error) {
// 每次写入 dw.n 次数加一
dw.n += 1
fmt.Printf("MemoryBlock has written %d times\n", dw.n)
dw.data = append(dw.data, p...)
return len(p), nil
}
// 查看内部数据
func (dw *MemoryBlock) String() string {
return string(dw.data)
}
3. 主要方法介绍
接下来会介绍缓冲写的主要方法的作用,并且会添加一些个人的注释。如果有不对的地方,欢迎指正。
3.1 刷新缓冲区 Flush()
缓冲 Writer 的写入都不是真的将数据写入底层数据源,而是写入缓冲区,真正写入靠的是 Flush()
方法。如果缓冲区满了,也是调用 Flush()
来写入的(清空缓冲区)。理解了这个方法,你就大概了解缓冲写入的原理了。
// Flush writes any buffered data to the underlying io.Writer.
func (b *Writer) Flush() error {
// tip:如果有错误就返回,不再写入
if b.err != nil {
return b.err
}
// tip:如果缓冲区是空的,不写入,直接返回
if b.n == 0 {
return nil
}
// tip:把缓冲区内的所有数据一次性写入底层数据源
n, err := b.wr.Write(b.buf[0:b.n])
// tip:只写入部分数据,且错误为空(这应该很少发生的)
if n < b.n && err == nil {
err = io.ErrShortWrite
}
// tip:处理写入发生的错误
if err != nil {
// tip:只写入部分数据(写入错误不为空)
if n > 0 && n < b.n {
// 把未写入的数据复制到缓冲区的开头
copy(b.buf[0:b.n-n], b.buf[n:b.n])
}
// 缓冲区缓冲字节数减去已经写入的数目
b.n -= n
b.err = err
return err
}
// 写入成功,把缓冲区置空(即 n 设置为 0)
b.n = 0
return nil
}
3.2 可用缓冲区 AvailableBuffer()
这个方法返回一个容量为可用缓冲区大小的空切片,我有点不理解它的作用是什么。这里看注释是说,这个空切片是打算用来追加数据(append),然后传递给一个立即连续的写入 (Write)调用。并且,它只在下一次写入操作之前有效(因为写入会影响切片的内容)。
// AvailableBuffer returns an empty buffer with b.Available() capacity.
// This buffer is intended to be appended to and
// passed to an immediately succeeding Write call.
// The buffer is only valid until the next write operation on b.
func (b *Writer) AvailableBuffer() []byte {
return b.buf[b.n:][:0]
}
3.3 缓冲区可用字节数 Available()
这个方法虽然很简单,但是后面会经常用到它,所以这里也提一下。它的功能很简单,返回缓冲区中的可用字节数(还能写入多少数据),即缓冲区大小 - 已经写入的字节数。
// Available returns how many bytes are unused in the buffer.
func (b *Writer) Available() int { return len(b.buf) - b.n }
3.4 写入字节切片 Write(p []byte)
如果需要写入的数据超过了缓冲区的剩余大小且没有错误则执行循环:
如果缓冲区已经缓冲的数据为 0,即空的缓冲区,那么直接写入底层数据源(先缓冲再写入就浪费时间了)。否则,把需要写入的数据复制到缓冲区后面,然后调用一次 Flush()
进行刷新 n = copy(b.buf[b.n:], p)
。然后,累加实际写入的字节数,同时更新待写入的切片(这里就体现了切片的灵活性!)。
如果切片中数据加上缓冲区中的数据仍然不满一个缓冲区,只是把数据加入缓冲区中,并不实际写入。这就是缓冲的作用了,通过延迟写入来提高性能(但是牺牲了实时性)。
// Write writes the contents of p into the buffer.
// It returns the number of bytes written.
// If nn < len(p), it also returns an error explaining
// why the write is short.
func (b *Writer) Write(p []byte) (nn int, err error) {
for len(p) > b.Available() && b.err == nil {
var n int
if b.Buffered() == 0 {
// Large write, empty buffer.
// Write directly from p to avoid copy.
n, b.err = b.wr.Write(p)
} else {
n = copy(b.buf[b.n:], p)
b.n += n
b.Flush()
}
nn += n
p = p[n:]
}
if b.err != nil {
return nn, b.err
}
n := copy(b.buf[b.n:], p)
b.n += n
nn += n
return nn, nil
}
3.5 写入单个字节 WriteByte(c byte)
// WriteByte writes a single byte.
func (b *Writer) WriteByte(c byte) error {
if b.err != nil {
return b.err
}
// tip:如果缓冲区满了(不过这里应该不会小于 0 吧?),
// 它会调用 Flush() 强制写入(会处理错误)
if b.Available() <= 0 && b.Flush() != nil {
return b.err
}
// tip:缓冲区还有足够的大小可以写入,直接把它写入缓冲区
b.buf[b.n] = c
b.n++
return nil
}
3.6 写入单个字符 WriteRune(r rune)
这个方法是写入单个字符(多个字节)的,它基本和写入单个字节是一样的,不过这里需要把字符作为一个整体考虑。主要的区别在于,如果缓冲区可用字节数小于 utf8 的最大字节数(4字节),它会强制刷新,然后再把字符写入缓冲区。也就是说,它不会把一个 rune 拆分成多个字节发送,而是一次发送整个的字符,至于原因可能是分开发送会导致接收端乱码。
有一个比较有意思的地方,如果强制刷新之后,缓冲区的可用字节数还是 utf8 的最大字节数呢?此时缓冲区是空的,说明整个缓冲区的大小小于 4!官方也吐槽了一句:Can only happen if buffer is silly small.
。
// WriteRune writes a single Unicode code point, returning
// the number of bytes written and any error.
func (b *Writer) WriteRune(r rune) (size int, err error) {
// Compare as uint32 to correctly handle negative runes.
if uint32(r) < utf8.RuneSelf {
err = b.WriteByte(byte(r))
if err != nil {
return 0, err
}
return 1, nil
}
if b.err != nil {
return 0, b.err
}
n := b.Available()
if n < utf8.UTFMax {
if b.Flush(); b.err != nil {
return 0, b.err
}
n = b.Available()
if n < utf8.UTFMax {
// tip:这是哪个傻子设置的小缓冲区!
// Can only happen if buffer is silly small.
return b.WriteString(string(r))
}
}
size = utf8.EncodeRune(b.buf[b.n:], r)
b.n += size
return size, nil
}
3.7 写入字符串 WriteString(s string)
写入一个字符串,并不会把整个字符串的内容都写入底层数据源。如果字符串很大(超过了缓冲区的大小)且缓冲区是空的,那么它会直接写入底层数据源,不会先写入缓冲区再写入底层数据源(一次能完成的时候,当然不需要做多次了)。否则,就是将字符串内容填满缓冲区,然后每次写入一整个缓冲区,知道最后的内容不满一个缓冲区。这些内容就留在缓冲区中了,不会写入底层数据源,直到下一次写满缓冲区或者强制刷新 Flush()
。
// WriteString writes a string.
// It returns the number of bytes written.
// If the count is less than len(s), it also returns an error explaining
// why the write is short.
func (b *Writer) WriteString(s string) (int, error) {
var sw io.StringWriter
tryStringWriter := true
nn := 0
for len(s) > b.Available() && b.err == nil {
var n int
if b.Buffered() == 0 && sw == nil && tryStringWriter {
// Check at most once whether b.wr is a StringWriter.
sw, tryStringWriter = b.wr.(io.StringWriter)
}
if b.Buffered() == 0 && tryStringWriter {
// Large write, empty buffer, and the underlying writer supports
// WriteString: forward the write to the underlying StringWriter.
// This avoids an extra copy.
n, b.err = sw.WriteString(s)
} else {
n = copy(b.buf[b.n:], s)
b.n += n
b.Flush()
}
nn += n
s = s[n:]
}
if b.err != nil {
return nn, b.err
}
n := copy(b.buf[b.n:], s)
b.n += n
nn += n
return nn, nil
}
3.8 写入其他数据源 ReadFrom(r io.Reader)
这个方法,我就把它叫做写入其他数据源了。它的作用就是直接写入一个数据源的数据,而不是先读取再写入(底层还是要读取再写入的,只不过提供了一个更易用的方法)。不过,这个方法的逻辑还是蛮复杂的,直接看注释吧。
// ReadFrom implements io.ReaderFrom. If the underlying writer
// supports the ReadFrom method, this calls the underlying ReadFrom.
// If there is buffered data and an underlying ReadFrom, this fills
// the buffer and writes it before calling ReadFrom.
func (b *Writer) ReadFrom(r io.Reader) (n int64, err error) {
if b.err != nil {
return 0, b.err
}
// 把底层数据源转成 io.ReaderFrom,看其是否实现该接口
readerFrom, readerFromOK := b.wr.(io.ReaderFrom)
var m int
for {
// tip:缓冲区满
if b.Available() == 0 {
if err1 := b.Flush(); err1 != nil {
return n, err1
}
}
// tip:缓冲区空,直接让其写入(不写缓冲区了)
if readerFromOK && b.Buffered() == 0 {
nn, err := readerFrom.ReadFrom(r)
b.err = err
n += nn
return n, err
}
// tip:读取传入的 reader 的数据,写入底层数据源,这里最大尝试100次失败
nr := 0
for nr < maxConsecutiveEmptyReads {
m, err = r.Read(b.buf[b.n:])
if m != 0 || err != nil {
break
}
nr++
}
if nr == maxConsecutiveEmptyReads {
return n, io.ErrNoProgress
}
b.n += m
n += int64(m)
if err != nil {
break
}
}
// 如果读取发生的错误是 io.EOF,这是正常情况,否则返回错误情况。
// 如果缓冲区正好满了,那么把数据写入底层数据源,否则只是把数据写入缓冲区。
if err == io.EOF {
// If we filled the buffer exactly, flush preemptively.
if b.Available() == 0 {
err = b.Flush()
} else {
err = nil
}
}
return n, err
}
4. 缓冲输入和输出
看到最后面,发现还有一个同时处理缓冲读写的结构体,不过这个就是把前面的缓冲 Reader 和 缓冲 Writer 结合起来了,只提供了一个创建的的方法:func NewReadWriter(r *Reader, w *Writer) *ReadWriter
。读和写的方法就是前面已经介绍过的了。
// buffered input and output
// ReadWriter stores pointers to a Reader and a Writer.
// It implements io.ReadWriter.
type ReadWriter struct {
*Reader
*Writer
}
// NewReadWriter allocates a new ReadWriter that dispatches to r and w.
func NewReadWriter(r *Reader, w *Writer) *ReadWriter {
return &ReadWriter{r, w}
}