目录

写一个循环缓冲区

在解决 这个问题 时,遇到了 环形缓冲区(circular buffer)。可以解决这些原始需求:

  • 存储一系列按顺序到来的数据
  • 数据量可能很大,但我们不希望频繁申请/释放内存(例如不断扩容或移动)
  • 只保留最近的 N 个数据项(例如音频缓冲、实时数据采集)
  • 写入新数据时,如果满了,可以覆盖最旧的数据(可选)
  • 高效读取数据,最好是常数时间复杂度

**环形缓冲区(circular buffer)**是:

一种使用固定大小内存的队列结构,内部用数组实现,读写指针在到达末尾后会“绕回开头”,形成一个逻辑上的环,实现高效读写、无需移动数据的特性。

package circular

import (
	"errors"
)

type Buffer struct {
	data     []byte
	capacity int
	size     int
	readIdx  int
	writeIdx int
}

func NewBuffer(size int) *Buffer {
	return &Buffer{
		data:     make([]byte, size),
		capacity: size,
	}
}

func (b *Buffer) ReadByte() (byte, error) {
	if b.size == 0 {
		return 0, errors.New("empty")
	}
	val := b.data[b.readIdx]
	b.readIdx = (b.readIdx + 1) % b.capacity
	b.size--
	return val, nil
}

func (b *Buffer) WriteByte(c byte) error {
	if b.size == b.capacity {
		return errors.New("full")
	}
	b.data[b.writeIdx] = c
	b.writeIdx = (b.writeIdx + 1) % b.capacity
	b.size++
	return nil
}

func (b *Buffer) Overwrite(c byte) {
	if b.size == b.capacity {
		// buffer full, overwrite oldest
		b.data[b.writeIdx] = c
		b.writeIdx = (b.writeIdx + 1) % b.capacity
		b.readIdx = (b.readIdx + 1) % b.capacity
	} else {
		b.WriteByte(c)
	}
}

func (b *Buffer) Reset() {
	// 其实重置 size 即可, 原有数据读不出来,下次写也会覆盖
	// b.data = make([]byte, b.capacity)
	// b.readIdx = 0
	// b.writeIdx = 0
	b.size = 0
}
package circular

import (
	"errors"
	"sync"
)

type Buffer struct {
	data     []byte
	capacity int
	size     int
	readIdx  int
	writeIdx int
	mutex    sync.Mutex
}

func NewBuffer(size int) *Buffer {
	return &Buffer{
		data:     make([]byte, size),
		capacity: size,
	}
}

func (b *Buffer) ReadByte() (byte, error) {
	b.mutex.Lock()
	defer b.mutex.Unlock()

	if b.size == 0 {
		return 0, errors.New("empty")
	}
	val := b.data[b.readIdx]
	b.readIdx = (b.readIdx + 1) % b.capacity
	b.size--
	return val, nil
}

func (b *Buffer) WriteByte(c byte) error {
	b.mutex.Lock()
	defer b.mutex.Unlock()

	if b.size == b.capacity {
		return errors.New("full")
	}
	b.data[b.writeIdx] = c
	b.writeIdx = (b.writeIdx + 1) % b.capacity
	b.size++
	return nil
}

func (b *Buffer) Overwrite(c byte) {
	b.mutex.Lock()
	defer b.mutex.Unlock()

	if b.size == b.capacity {
		// buffer full, overwrite oldest
		b.data[b.writeIdx] = c
		b.writeIdx = (b.writeIdx + 1) % b.capacity
		b.readIdx = (b.readIdx + 1) % b.capacity
	} else {
		// 不能调用WriteByte,重复加锁会死锁
		//b.WriteByte(c)
		b.data[b.writeIdx] = c
		b.writeIdx = (b.writeIdx + 1) % b.capacity
		b.size++
	}
}

func (b *Buffer) Reset() {
	b.mutex.Lock()
	defer b.mutex.Unlock()

	// 虽然 只重置 size 即可, 原有数据读不出来,下次写也会覆盖
	// 但是可以进行 防御式清理

	// 方式1:创建了一个全新的零初始化的切片,旧的切片 gc 了
	b.data = make([]byte, b.capacity)
	b.readIdx = 0
	b.writeIdx = 0

	// 方式2:不会触发额外的内存分配,原数组地址不变
	for i := range b.data {
		b.data[i] = 0
	}

	b.size = 0
}
package circular

import "errors"

type Buffer struct {
	buf chan byte
	cap int
}

func NewBuffer(size int) *Buffer {
	buf := Buffer{make(chan byte, size), size}
	return &buf
}

func (b *Buffer) ReadByte() (result byte, err error) {
	if len(b.buf) == 0 {
		return result, errors.New("Empty")
	}
	return <-b.buf, nil
}

func (b *Buffer) WriteByte(c byte) error {
	if len(b.buf) == cap(b.buf) {
		return errors.New("Full")
	}
	b.buf <- c
	return nil
}

func (b *Buffer) Overwrite(c byte) {
	if len(b.buf) == cap(b.buf) {
		<-b.buf
	}
	b.buf <- c
}
func (b *Buffer) Reset() {
	b.buf = make(chan byte, b.cap)
}

对比方法2和方法3,从基准测试上来看,方法3的更慢,但是使用 chan 实现也是一个天才的想法。