Delaying Uni’s item
When you have a Uni
, you can delay the item emission using onItem().delayIt().by(…)
:
Uni<String> delayed = Uni.createFrom().item("hello")
.onItem().delayIt().by(Duration.ofMillis(10));
You pass a duration. When the item is received, it waits for that duration before propagating it to the downstream consumer.
You can also delay the item’s emission based on another companion Uni
:
Uni<String> delayed = Uni.createFrom().item("hello")
// The write method returns a Uni completed
// when the operation is done.
.onItem().delayIt().until(this::write);
The item is propagated downstream when the Uni
returned by the function emits an item (possibly null
).
If the function emits a failure (or throws an exception), this failure is propagated downstream.
Throttling a Multi
Multi does not have a delayIt operator because applying the same delay to all items is rarely what you want to do.
However, there are several ways to apply a delay in a Multi
.
First, you can use the onItem().call()
, which delays the emission until the Uni
produced the call
emits an item.
For example, the following snippet delays all the items by 10 ms:
Multi<Integer> delayed = multi
.onItem().call(i ->
// Delay the emission until the returned uni emits its item
Uni.createFrom().nullItem().onItem().delayIt().by(Duration.ofMillis(10))
);
In general, you don’t want to apply the same delay to all the items.
You can combine call
with a random delay as follows:
Random random = new Random();
Multi<Integer> delayed = Multi.createFrom().items(1, 2, 3, 4, 5)
.onItem().call(i -> {
Duration delay = Duration.ofMillis(random.nextInt(100) + 1);
return Uni.createFrom().nullItem().onItem().delayIt().by(delay);
});
Finally, you may want to throttle the items.
For example, you can introduce a (minimum) one-second delay between each item.
To achieve this, combine Multi.createFrom().ticks()
and the multi to throttled:
// Introduce a one second delay between each item
Multi<Long> ticks = Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.onOverflow().drop();
Multi<Integer> delayed = Multi.createBy().combining().streams(ticks, multi)
.using((x, item) -> item);
The onOverflow().drop() is used to avoid the ticks to fail if the other stream (multi ) is too slow.
|