It is often the case that a source of data (web service, database…) is synchronous and blocking. To deal with such sources in your applications, apply the following pattern:
Uni<String> blocking = Uni.createFrom().item(this::invokeRemoteServiceUsingBlockingIO)
.runSubscriptionOn(Infrastructure.getDefaultWorkerPool());
Create a Uni
that will supply the item using a blocking call, here the invokeRemoteServiceUsingBlockingIO
method.
To avoid blocking the subscriber, use runSubscriptionOn
, which switches the thread and so call invokeRemoteServiceUsingBlockingIO
on another thread.
Here we pass the default worker thread pool, but you can use your own executor.
Note that runSubscriptionOn
does not subscribe to the Uni
.
It specifies the executor
to use when a subscribe call happens.
Using runSubscriptionOn
works when the blocking operation happens at subscription time.
But, when dealing with Multi
and need to execute a blocking operation for each item, you need emitOn
.
While runSubscriptionOn
runs the subscription on the given executor, emitOn
configures the executor used to propagate downstream the item
, failure
and completion
events:
Multi<String> multi = Multi.createFrom().items("john", "jack", "sue")
.emitOn(Infrastructure.getDefaultWorkerPool())
.onItem().transform(this::invokeRemoteServiceUsingBlockingIO);
You can define a caller thread checker hook by calling |