写一个循环缓冲区
目录
背景
在解决 这个问题 时,遇到了 环形缓冲区(circular buffer)。可以解决这些原始需求:
- 存储一系列按顺序到来的数据
- 数据量可能很大,但我们不希望频繁申请/释放内存(例如不断扩容或移动)
- 只保留最近的 N 个数据项(例如音频缓冲、实时数据采集)
- 写入新数据时,如果满了,可以覆盖最旧的数据(可选)
- 高效读取数据,最好是常数时间复杂度
**环形缓冲区(circular buffer)**是:
一种使用固定大小内存的队列结构,内部用数组实现,读写指针在到达末尾后会“绕回开头”,形成一个逻辑上的环,实现高效读写、无需移动数据的特性。
方法1:
package circular
import (
"errors"
)
type Buffer struct {
data []byte
capacity int
size int
readIdx int
writeIdx int
}
func NewBuffer(size int) *Buffer {
return &Buffer{
data: make([]byte, size),
capacity: size,
}
}
func (b *Buffer) ReadByte() (byte, error) {
if b.size == 0 {
return 0, errors.New("empty")
}
val := b.data[b.readIdx]
b.readIdx = (b.readIdx + 1) % b.capacity
b.size--
return val, nil
}
func (b *Buffer) WriteByte(c byte) error {
if b.size == b.capacity {
return errors.New("full")
}
b.data[b.writeIdx] = c
b.writeIdx = (b.writeIdx + 1) % b.capacity
b.size++
return nil
}
func (b *Buffer) Overwrite(c byte) {
if b.size == b.capacity {
// buffer full, overwrite oldest
b.data[b.writeIdx] = c
b.writeIdx = (b.writeIdx + 1) % b.capacity
b.readIdx = (b.readIdx + 1) % b.capacity
} else {
b.WriteByte(c)
}
}
func (b *Buffer) Reset() {
// 其实重置 size 即可, 原有数据读不出来,下次写也会覆盖
// b.data = make([]byte, b.capacity)
// b.readIdx = 0
// b.writeIdx = 0
b.size = 0
}
方法2 :线程安全,sync.Mutex 实现,微调,避免死锁
package circular
import (
"errors"
"sync"
)
type Buffer struct {
data []byte
capacity int
size int
readIdx int
writeIdx int
mutex sync.Mutex
}
func NewBuffer(size int) *Buffer {
return &Buffer{
data: make([]byte, size),
capacity: size,
}
}
func (b *Buffer) ReadByte() (byte, error) {
b.mutex.Lock()
defer b.mutex.Unlock()
if b.size == 0 {
return 0, errors.New("empty")
}
val := b.data[b.readIdx]
b.readIdx = (b.readIdx + 1) % b.capacity
b.size--
return val, nil
}
func (b *Buffer) WriteByte(c byte) error {
b.mutex.Lock()
defer b.mutex.Unlock()
if b.size == b.capacity {
return errors.New("full")
}
b.data[b.writeIdx] = c
b.writeIdx = (b.writeIdx + 1) % b.capacity
b.size++
return nil
}
func (b *Buffer) Overwrite(c byte) {
b.mutex.Lock()
defer b.mutex.Unlock()
if b.size == b.capacity {
// buffer full, overwrite oldest
b.data[b.writeIdx] = c
b.writeIdx = (b.writeIdx + 1) % b.capacity
b.readIdx = (b.readIdx + 1) % b.capacity
} else {
// 不能调用WriteByte,重复加锁会死锁
//b.WriteByte(c)
b.data[b.writeIdx] = c
b.writeIdx = (b.writeIdx + 1) % b.capacity
b.size++
}
}
func (b *Buffer) Reset() {
b.mutex.Lock()
defer b.mutex.Unlock()
// 虽然 只重置 size 即可, 原有数据读不出来,下次写也会覆盖
// 但是可以进行 防御式清理
// 方式1:创建了一个全新的零初始化的切片,旧的切片 gc 了
b.data = make([]byte, b.capacity)
b.readIdx = 0
b.writeIdx = 0
// 方式2:不会触发额外的内存分配,原数组地址不变
for i := range b.data {
b.data[i] = 0
}
b.size = 0
}
方法3:新思路,线程安全的,chan 实现
package circular
import "errors"
type Buffer struct {
buf chan byte
cap int
}
func NewBuffer(size int) *Buffer {
buf := Buffer{make(chan byte, size), size}
return &buf
}
func (b *Buffer) ReadByte() (result byte, err error) {
if len(b.buf) == 0 {
return result, errors.New("Empty")
}
return <-b.buf, nil
}
func (b *Buffer) WriteByte(c byte) error {
if len(b.buf) == cap(b.buf) {
return errors.New("Full")
}
b.buf <- c
return nil
}
func (b *Buffer) Overwrite(c byte) {
if len(b.buf) == cap(b.buf) {
<-b.buf
}
b.buf <- c
}
func (b *Buffer) Reset() {
b.buf = make(chan byte, b.cap)
}
小结
对比方法2和方法3,从基准测试上来看,方法3的更慢,但是使用 chan
实现也是一个天才的想法。