如何使用 Project Reactor 创建 Flux 和 Mono 序列

Project Reactor 提供 Mono 和 Flux 两种响应式序列,可通过静态工厂方法(如 just、empty)、异步操作(fromCallable、fromFuture)、边界处理(defer、onErrorResume)及组合转换(zip、flatMap、concat)等方式创建与管理。

Project Reactor 是 Spring WebFlux 的核心响应式编程库,提供 Mono(0 或 1 个元素)和 Flux(0 到 N 个元素)两种响应式序列类型。创建它们的关键在于选择合适的静态工厂方法,并理解其语义差异。

从静态工厂方法创建基础序列

Reactor 提供了大量静态方法用于快速构建常见场景的序列:

  • Mono.just(value):发出单个非 null 值,例如 Mono.just("hello")
  • Mono.empty():发出 onComplete 信号,不发射任何数据
  • Mono.error(exception):立即发出 onError 信号
  • Flux.just(a, b, c):按顺序发出多个值,如 Flux.just(1, 2, 3)
  • Flux.fromArray(array)Flux.fromIterable(list):将集合或数组转为流
  • Flux.range(start, count):生成整数序列,如 Flux.range(1, 5) 发出 1~5

从异步操作创建响应式序列

实际开发中,多数数据来自 I/O 或计算密集型任务,需用异步方式创建序列:

  • Mono.fromCallable(() -> doSyncWork()):在订阅时执行一次同步计算,结果封装为 Mono
  • Mono.fromFuture(completableFuture):包装已存在的 CompletableFuture
  • Flux.fromStream(() -> Stream.generate(...)):注意需配合 .take(n) 控制长度,避免无限流
  • Flux.interval(Duration.ofSeconds(1)):每秒发出一个递增的 long 值(从 0 开始),常用于定时任务

延迟、错误与空值处理的常见写法

创建序列时要主动考虑边界情况,避免运行时异常:

  • 避免 nul

    l 入参
    :如 Mono.just(null) 会抛出 NullPointerException;应改用 Mono.justOrEmpty(Optional.ofNullable(value))Mono.empty()
  • 延迟创建逻辑:使用 Mono.defer(() -> Mono.just(...)) 确保每次订阅都重新执行创建逻辑,适用于有状态或时间敏感的数据源
  • 容错初始化:若构造过程可能失败,用 Mono.fromCallable(...).onErrorReturn(defaultValue).onErrorResume(e -> fallbackMono)
  • 空集合安全转换:对可能为空的 List,推荐 list != null && !list.isEmpty() ? Flux.fromIterable(list) : Flux.empty(),或用 Flux.fromIterable(Optional.ofNullable(list).orElse(Collections.emptyList()))

组合与转换已有序列

多数真实业务不会只靠“创建”,而是组合多个序列形成新流:

  • 合并两个 Mono:用 Mono.zip(m1, m2, (a, b) -> new Pair(a, b)) 并发等待两者完成
  • 扁平化嵌套 Flux:如 Flux> source → source.flatMap(m -> m)
  • 串联多个 Flux:用 Flux.concat(f1, f2, f3) 按顺序订阅,前一个完成才开始下一个
  • 条件性创建:用 Mono.fromSupplier(() -> condition ? value : null).filter(Objects::nonNull) 实现 if-else 式流分支

掌握这些创建方式后,就能根据数据来源(内存对象、同步方法、Future、定时器、集合等)准确选用对应 API。关键是理解每个工厂方法的触发时机(立即执行 vs 延迟执行)、错误传播行为和空值约定。