使用 Quarkus Mutiny 组合多个异步请求

本文旨在解决在使用 Quarkus Mutiny 进行反应式编程时,如何正确组合多个异步请求,并确保在所有请求完成后再返回结果的问题。通过避免阻塞主线程的 await() 方法,并利用 Uni.combine().all() 方法,我们可以高效地处理并发请求,构建响应迅速且可靠的应用程序。文章提供了详细的代码示例,演示了如何将多个 Uni 对象组合成一个,以及如何在所有异步操作完成后处理结果。

在使用 Quarkus Mutiny 进行反应式编程时,经常会遇到需要并发执行多个异步请求,并将它们的结果组合起来的情况。一个常见的错误做法

是在异步操作中使用 await() 方法,这会阻塞当前的 Vert.x 事件循环线程,导致性能下降。本文将介绍如何使用 Uni.combine().all() 方法,以非阻塞的方式组合多个 Uni 对象,确保在所有请求完成后再返回最终结果。

避免阻塞:await() 的陷阱

在反应式编程中,阻塞操作是性能的敌人。await() 方法会使当前线程暂停,直到异步操作完成。在 Quarkus 这样的反应式框架中,这会阻塞 Vert.x 事件循环线程,导致应用程序无法响应其他请求。因此,应尽量避免使用 await() 方法。

使用 Uni.combine().all() 组合异步请求

Uni.combine().all() 方法提供了一种非阻塞的方式来组合多个 Uni 对象。它会并发地执行所有的 Uni 对象,并在所有对象都发出结果后,将结果组合成一个 Uni 对象。

以下是一个示例,演示了如何使用 Uni.combine().all() 方法组合多个 Uni 对象:

import io.smallrye.mutiny.Uni;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import javax.ws.rs.GET;
import javax.ws.rs.Path;

@Path("/testing")
public class TestingResource {

    @GET
    @Path("/async")
    public Uni> testingMutiny() {
        List> unis = new ArrayList<>();
        List.of("hello", "RestEasy").forEach(e -> {
            unis.add(Uni.createFrom().item(e)
                    .onItem().delayIt().by(Duration.ofMillis(10000))
                    .onItem().transform(String::toUpperCase)); // 转换成大写
        });

        return Uni.combine().all().unis(unis)
                .combinedWith(list -> (List) list);
    }
}

在这个例子中,我们首先创建了一个包含两个字符串的列表。然后,我们使用 forEach 循环遍历列表,为每个字符串创建一个 Uni 对象。每个 Uni 对象都会延迟 10 秒,然后将字符串转换为大写。最后,我们使用 Uni.combine().all() 方法将所有的 Uni 对象组合成一个 Uni> 对象。combinedWith 方法接收一个函数,该函数将所有 Uni 对象的结果组合成一个列表。

处理更复杂的场景

假设我们需要从多个不同的服务获取数据,并将这些数据组合成一个 Car 对象。每个服务都返回一个包含车辆部件信息的 Uni> 对象。

以下是一个示例,演示了如何使用 Uni.combine().all() 方法来处理这种情况:

import io.smallrye.mutiny.Uni;
import java.util.List;
import javax.ws.rs.GET;
import javax.ws.rs.Path;

@Path("/cars")
public class CarResource {

    // 假设这些方法从外部服务获取数据
    private Uni> getDoors(String variable1, String variable2, String variable3) {
        // 模拟延迟
        return Uni.createFrom().item(List.of(new JsonObjectCar("door1"), new JsonObjectCar("door2"))).onItem().delayIt().by(java.time.Duration.ofSeconds(1));
    }

    private Uni> getWheels(String variable1, String variable2, String variable3) {
        // 模拟延迟
        return Uni.createFrom().item(List.of(new JsonObjectCar("wheel1"), new JsonObjectCar("wheel2"))).onItem().delayIt().by(java.time.Duration.ofSeconds(2));
    }

    private Uni> getWindows(String variable1, String variable2, String variable3) {
        // 模拟延迟
        return Uni.createFrom().item(List.of(new JsonObjectCar("window1"), new JsonObjectCar("window2"))).onItem().delayIt().by(java.time.Duration.ofSeconds(3));
    }

    private Car createCar(List doors, List wheels, List windows) {
        Car car = new Car();
        car.setDoors(doors);
        car.setWheels(wheels);
        car.setWindows(windows);
        return car;
    }


    @GET
    @Path("/async")
    public Uni testingMutiny() {
        String variable1 = "var1";
        String variable2 = "var2";
        String variable3 = "var3";

        Uni> carDoorsUni = getDoors(variable1, variable2, variable3);
        Uni> carWheelsUni = getWheels(variable1, variable2, variable3);
        Uni> carWindowsUni = getWindows(variable1, variable2, variable3);

        return Uni.combine()
                .all()
                .unis(carDoorsUni, carWheelsUni, carWindowsUni)
                .combinedWith(list -> {
                    List carDoors = list.get(0);
                    List carWheels = list.get(1);
                    List carWindows = list.get(2);

                    return createCar(carDoors, carWheels, carWindows);
                })
                .invoke(() -> System.out.println("Okay it worked"));
    }

    // 辅助类
    public static class Car {
        private List doors;
        private List wheels;
        private List windows;

        public List getDoors() {
            return doors;
        }

        public void setDoors(List doors) {
            this.doors = doors;
        }

        public List getWheels() {
            return wheels;
        }

        public void setWheels(List wheels) {
            this.wheels = wheels;
        }

        public List getWindows() {
            return windows;
        }

        public void setWindows(List windows) {
            this.windows = windows;
        }
    }

    public static class JsonObjectCar {
        private String partName;

        public JsonObjectCar(String partName) {
            this.partName = partName;
        }

        public String getPartName() {
            return partName;
        }

        public void setPartName(String partName) {
            this.partName = partName;
        }
    }
}

在这个例子中,我们首先定义了三个方法 getDoors、getWheels 和 getWindows,它们分别返回一个包含车辆部件信息的 Uni> 对象。然后,我们使用 Uni.combine().all() 方法将这三个 Uni 对象组合成一个 Uni 对象。combinedWith 方法接收一个函数,该函数将所有 Uni 对象的结果组合成一个 Car 对象。

总结

使用 Uni.combine().all() 方法可以有效地组合多个异步请求,避免阻塞主线程,提高应用程序的性能和响应速度。在编写反应式代码时,应尽量避免使用 await() 方法,并利用 Mutiny 提供的组合操作符来处理并发场景。记住,Quarkus 能够识别异步 API 并正确处理结果,因此通常不需要手动订阅 Uni 或 Multi 对象。