c# 如何在C#中实现一个无锁环形缓冲区(Ring Buffer)

ConcurrentQueue无法替代环形缓冲区,因其链表实现导致内存不连续、缺乏原子批次操作、不支持预分配与零拷贝;环形缓冲区在SPSC场景下凭借固定容量、缓存友好和无锁特性,更适用于高吞吐低延迟场景。

为什么不能直接用 ConcurrentQueue 代替环形缓冲区

因为 ConcurrentQueue 是链表实现,内存不连续,无法保证写入/读取的原子性批次,也不支持预分配固定大小和零拷贝访问。环形缓冲区核心价值在于:确定容量、缓存友好、单生产者/单消费者(SPSC)场景下免锁、支持指针快速读写。如果你需要的是高吞吐低延迟的日志暂存、网络包收发或实时音频流缓冲,ConcurrentQueue 的 GC 压力和间接寻址开销会成为瓶颈。

如何用 Interlocked 实现 SPSC 无锁环形缓冲区

关键不是“完全不用锁”,而是避免 lock 语句阻塞线程;SPSC 场景下,仅用 Interlocked.CompareExchangeInterlocked.Add 即可协调读写索引。必须满足:一个线程只写、一个线程只读,且不允许多对一或一对多。

  • 缓冲区底层数组用 T[] 预分配,长度为 2 的幂(便于位运算取模)
  • 写索引(_writeIndex)和读索引(_readIndex)均为 long 类型,避免 32 位溢出导致误判
  • 实际位置用 index & (_capacity - 1) 计算,比 % _capacity 快且安全
  • 写操作前先用 Interlocked.CompareExchange 检查是否有足够空位,失败则返回 false(不阻塞)
  • 读操作同理,检查是否有数据可读,再用 Interlocked.Add 批量推进读索引
public sealed class RingBuffer
{
    private readonly T[] _buffer;
    private readonly int _capacity;
    private readonly int _mask;
    private long _writeIndex;
    private long _readIndex;
public RingBuffer(int capacity)
{
    _capacity = RoundUpToPowerOfTwo(capacity);
    _mask = _capacity - 1;
    _buffer = new T[_capacity];
}

public bool TryWrite(T item)
{
    long writePos = Interlocked.Read(ref _writeIndex);
    long readPos = Interlocked.Read(ref _readIndex);
    long available = _capacity - (writePos - readPos);

    if (available zuojiankuohaophpcn= 0) return false;

    _buffer[writePos & _mask] = item;
    Interlocked.Increment(ref _writeIndex);
    return true;
}

public bool TryRead(out T item)
{
    item = default!;
    long writePos = Interlocked.Read(ref _writeIndex);
    long readPos = Interlocked.Read(ref _readIndex);

    if (writePos == readPos) return false;

    item = _buffer[readPos & _mask];
    Interlocked.Increment(ref _readIndex);
    return true;
}

private static int RoundUpToPowerOfTwo(int v)
{
    v--;
    v |= v youjiankuohaophpcnyoujiankuohaophpcn 1;
    v |= v youjiankuohaophpcnyoujiankuohaophpcn 2;
    v |= v youjiankuohaophpcnyoujiankuohaophpcn 4;
    v |= v youjiankuohaophpcnyoujiankuohaophpcn 8;
    v |= v youjiankuohaophpcnyoujiankuohaophpcn 16;
    return v + 1;
}

}

为什么 volatile 不够,而必须用 Interlocked.Read

在 x86/x64 上,volatile 字段读写会插入内存屏障,但不能保证“读-改-写”操作的原子性。比如两个线程同时执行 _writeIndex++,即使字段是 volatile,仍可能丢失一次自增。而 Interlocked.Read(ref _writeIndex) 不仅保证读取最新值,还强制刷新 CPU 缓存行,确保你看到的是其他线程写入后的结果。尤其在 ARM 平台上,缺少 Interlocked 会导致读写索引严重错乱。

容易被忽略的边界:批量读写与内存可见性

上面示例是单元素读写,实际中常需 TryWriteBatchTryReadBatch。这时不能简单循环调用 TryWrite,否则每轮都重复检查可用空间,效率低且逻辑错乱。正确做法是:一次计算最大可写数量,用 Interlocked.CompareExchange 原子预留位置,再逐个赋值,最后用 Interlocked.Add 提交写索引偏移。同样,读端也要先确认数据量,再批量复制,最后提交读索引——否则中间被写端覆盖就丢数据了。

另外,如果 T 是引用类型,写入时只是存引用,不触发对象复制;但若 T 是结构体且较大(如超过 16 字节),要考虑缓存行对齐和复制开销。无锁结构体写入本身没问题,但频繁大结构体搬运会抵消无锁带来的性能优势。