Integrating blocking calls

Learn how to integrate blocking APIs and services.

It is often the case that a source of data (web service, database…​) is synchronous and blocking. To deal with such sources in your applications, apply the following pattern:

Uni<String> blocking = Uni.createFrom().item(this::invokeRemoteServiceUsingBlockingIO)
        .runSubscriptionOn(Infrastructure.getDefaultWorkerPool());

Create a Uni that will supply the item using a blocking call, here the invokeRemoteServiceUsingBlockingIO method. To avoid blocking the subscriber, use runSubscriptionOn, which switches the thread and so call invokeRemoteServiceUsingBlockingIO on another thread. Here we pass the default worker thread pool, but you can use your own executor.

Note that runSubscriptionOn does not subscribe to the Uni. It specifies the executor to use when a subscribe call happens.

Using runSubscriptionOn works when the blocking operation happens at subscription time. But, when dealing with Multi and need to execute a blocking operation for each item, you need emitOn.

While runSubscriptionOn runs the subscription on the given executor, emitOn configures the executor used to propagate downstream the item, failure and completion events:

Multi<String> multi = Multi.createFrom().items("john", "jack", "sue")
        .emitOn(Infrastructure.getDefaultWorkerPool())
        .onItem().transform(this::invokeRemoteServiceUsingBlockingIO);

You can define a caller thread checker hook by calling Infrastructure.setCanCallerThreadBeBlockedSupplier(supplier) to indicate when a blocking operation should not be allowed by Mutiny, resulting in an IllegalStateException to be thrown.