c# 在 C# 中实现事务性发件箱(Transactional Outbox)模式

直接用数据库事务发消息会出问题,因为SaveChanges()后调用消息发送若失败,业务已提交但消息丢失,破坏一致性;Transactional Outbox通过将消息写入同事务的outbox表,再由独立幂等投递器轮询发送来解决。

为什么直接用数据库事务发消息会出问题

在 C# 应用中,你可能写过类似这样的代码:SaveChanges() 之后立刻调用 bus.Publish()producer.SendAsync()。表面看是“先存库再发消息”,但一旦消息发送失败(网络抖动、Broker 不可用、序列化异常),业务已提交,消息却丢了——违反了“要么都成功,要么都不发生”的一致性要求。

Transactional Outbox 的核心思路是:把要发的消息也当作业务数据,写进同一个数据库事务里。消息不是“发出去”,而是“记下来”,后续由一个独立的、幂等的投递器(Outbox Processor)去轮询并转发。

如何在 Entity Framework Core 中建 outbox 表并自动写入

你需要一张 OutboxMessages 表,字段至少包含:Id(GUID)、TypeName(事件全名)、Content(JSON 字符串)、ProcessedAt(NULL 表示未处理)、CreatedAt。关键在于:它必须和你的业务实体共享同一个 DbContext 实例,并在同一个 SaveChanges() 中被插入。

推荐做法是封装一个 OutboxService,在业务逻辑中调用 AddOutboxMessage(T @event),内部只是 new 一个 OutboxMessagecontext.OutboxMessages.Add()。EF Core 会把它当成普通实体参与事务。

public class OutboxMessage
{
    public Guid Id { get; set; }
    public string TypeName { get; set; } = null!;
    public string Content { get; set; } = null!;
    public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
    public DateTime? ProcessedAt { get; set; }
}
  • 确保 OutboxMessages DbSet 在 OnModelCreating 中配置了 HasIndex(x => x.ProcessedAt).IsDescending(),方便后续查询未处理项
  • 不要手动调用 SaveChanges() 多次;所有操作(业务实体 + outbox 记录)必须在一次 SaveChanges() 中完成
  • 如果使用 EF Core 7+,可考虑用 SaveChangesAsync(cancellationToken) 配合超时控制,避免事务卡死

怎么安全地轮询并投递 outbox 消息

投递器不能和业务应用跑在同一个进程里(否则进程崩溃会导致消息丢失),建议作为独立后台服务(如 .NET Worker Service),或用 Quartz.NET / Hangfire 定时触发。每次只取少量(例如 100 条)ProcessedAt IS NULL 的记录,按 CreatedAt 排序,逐条尝试发送到消息队列(如 RabbitMQ、Kafka)。

重点在于“发送成功后才更新 ProcessedAt”——这步更新也必须走数据库事务,且必须是**同一个数据库连接**(不能新开 DbContext)。否则会出现消息已发、但 DB 更新失败,导致重复投递。

  • 使用 SELECT ... FOR UPDATE(PostgreSQL)或 UPDLOCK, ROWLOCK(SQL Server)锁定待处理行,防止多个投递器实例并发处理同一条消息
  • 投递失败时,应记录日志并跳过该条(不更新 ProcessedAt),下次轮询重试;不要 throw 异常中断整个批次
  • Kafka 场景下,可利用事务性 Producer(InitTransactions + SendOffsetsToTransaction)将 offset 提交与 ProcessedAt 更新绑定,但实现复杂,多数场景用 DB 事务更稳

常见坑:序列化、重试、幂等性怎么处理

Outbox 表里的 Content 是 JSON,必须保证序列化前后完全一致。别用 System.Text.Json 默认设置——它会忽略 null 字段、按字母序排序属性。务必显式配置 JsonSerializerOptions,并全局复用同一实例:

var options = new JsonSerializerOptions
{
    DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
    PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
    WriteIndented = false
};

投递失败后的重试天然带来重复风险。解决方案不在 outbox 层,而在消费者端:每条消息带唯一 MessageId(通常就是 outbox 表的 Id),消费者需维护已处理 ID 的去重表(或 Redis Set),收到重复 ID 直接丢弃。

  • 不要在 outbox 投递层做“最多一次”或“最少一次”的语义包装——那是传输层的事;outbox 只负责“至少一次”持久化
  • 如果业务要求强顺序(如账户余额变更必须严格 FIFO),需在 outbox 查询时加 ORDER

    BY CreatedAt
    ,并在消息队列端确保单分区/单队列消费
  • 清理已投递记录?可以,但必须在确认下游 100% 消费成功(如 Kafka commit offset 后)再删,否则删早了就真丢了

最易被忽略的是:投递器的数据库连接字符串是否启用了连接池?是否设置了合理的 Max Pool Size?高吞吐下连接耗尽会导致投递停滞,而业务库仍在持续写入 outbox,最终填满磁盘。