Merging and Concatenating Streams

Learn how to merge or concatenate streams

Merging or concatenating streams is a frequent operation which consists in taking multiple streams and creating a new Multi out of them. Such an operation observes the items emitted by the different streams and produces a new Multi emitting the events.

All the streams merged or concatenated this way should emit the same type of items.

The difference between merge and concatenate

Understanding the difference between merge and concatenate is essential.

When merging streams, it observes the different upstreams and emits the items as they come. If the streams emit their items concurrently, the items from the different streams are interleaved.

Merging streams

When using merge, failures are also propagated to the merged stream, and no more items are emitted after that failure. The completion event is only emitted by the merged stream when all the observed streams are completed.

But if we want to keep the order of the observed stream, we need to concatenate.

When concatenating, it waits for the first stream to complete before subscribing to the second one. Thus, it ensures that all the items from the first stream have been emitted before emitting the second stream items. It preserves an order corresponding to the source:

Concatenating streams

When the first stream emits the completion event, it switches to the second stream, and so on. When the last stream completes, the concatenated stream sends the completion event. As for merge, if a stream fails then there won’t be further evebts.

Merging Multis

To create a new Multi from the merge of multiple Multi streams use:

Multi<T> multi1 = getFirstMulti();
Multi<T> multi2 = getSecondMulti();

Multi<T> merged = Multi.createBy().merging().streams(multi1, multi2);

For example, we can merge multiple streams emitting periodical events and look at the output:

Multi<String> first = Multi.createFrom().ticks().every(Duration.ofMillis(10))
        .onItem().transform(l -> "Stream 1 - " + l);

Multi<String> second = Multi.createFrom().ticks().every(Duration.ofMillis(15))
        .onItem().transform(l -> "Stream 2 - " + l);

Multi<String> third = Multi.createFrom().ticks().every(Duration.ofMillis(5))
        .onItem().transform(l -> "Stream 3 - " + l);

Cancellable cancellable = Multi.createBy().merging().streams(first, second, third)
        .subscribe().with(s -> System.out.println("Got item: " + s));
Got item: Stream 1 - 0
Got item: Stream 2 - 0
Got item: Stream 3 - 0
Got item: Stream 3 - 1
Got item: Stream 1 - 1
Got item: Stream 3 - 2
Got item: Stream 2 - 1
Got item: Stream 3 - 3
Got item: Stream 1 - 2
Got item: Stream 3 - 4
Got item: Stream 3 - 5
....

Concatenating Multis

To create a new Multi from the concatenation of multiple Multi streams use:

Multi<T> multi1 = getFirstMulti();
Multi<T> multi2 = getSecondMulti();

Multi<T> concatenated = Multi.createBy().concatenating().streams(multi1, multi2);

Don’t forget that the streams order matters in this case, as (streamA, streamB) does not provide the same result as (streamB, streamA):

Multi<String> first = Multi.createFrom().items("A1", "A2", "A3");
Multi<String> second = Multi.createFrom().items("B1", "B2", "B3");

Multi.createBy().concatenating().streams(first, second)
        .subscribe().with(item -> System.out.print(item)); // "A1A2A3B1B2B3"

Multi.createBy().concatenating().streams(second, first)
        .subscribe().with(item -> System.out.print(item)); // "B1B2B3A1A2A3"
If one of the concatenated streams is unbounded (infinite), the next streams in the list won’t be consumed!