How to delay events?

Learn how to artifically delays event propagation and throttle emissions.

Delaying Uni’s item

When you have a Uni, you can delay the item emission using onItem().delayIt().by(…​):

Uni<String> delayed = Uni.createFrom().item("hello")
        .onItem().delayIt().by(Duration.ofMillis(10));

You pass a duration. When the item is received, it waits for that duration before propagating it to the downstream consumer.

You can also delay the item’s emission based on another companion Uni:

Uni<String> delayed = Uni.createFrom().item("hello")
        // The write method returns a Uni completed
        // when the operation is done.
        .onItem().delayIt().until(this::write);

The item is propagated downstream when the Uni returned by the function emits an item (possibly null). If the function emits a failure (or throws an exception), this failure is propagated downstream.

Throttling a Multi

Multi does not have a delayIt operator because applying the same delay to all items is rarely what you want to do. However, there are several ways to apply a delay in a Multi.

First, you can use the onItem().call(), which delays the emission until the Uni produced the call emits an item. For example, the following snippet delays all the items by 10 ms:

Multi<Integer> delayed = multi
    .onItem().call(i ->
        // Delay the emission until the returned uni emits its item
        Uni.createFrom().nullItem().onItem().delayIt().by(Duration.ofMillis(10))
    );

In general, you don’t want to apply the same delay to all the items. You can combine call with a random delay as follows:

Random random = new Random();
Multi<Integer> delayed = Multi.createFrom().items(1, 2, 3, 4, 5)
        .onItem().call(i -> {
            Duration delay = Duration.ofMillis(random.nextInt(100) + 1);
            return Uni.createFrom().nullItem().onItem().delayIt().by(delay);
        });

Finally, you may want to throttle the items. For example, you can introduce a (minimum) one-second delay between each item. To achieve this, combine Multi.createFrom().ticks() and the multi to throttled:

// Introduce a one second delay between each item
Multi<Long> ticks = Multi.createFrom().ticks().every(Duration.ofSeconds(1))
        .onOverflow().drop();
Multi<Integer> delayed = Multi.createBy().combining().streams(ticks, multi)
        .using((x, item) -> item);
The onOverflow().drop() is used to avoid the ticks to fail if the other stream (multi) is too slow.

Delaying other types of events

We have looked at how to delay items, but you may need to delay other events, such as subscription or failure. For these, use the call approach, and return a Uni that delay the event’s propagation.