Merging or concatenating streams is a frequent operation which consists in taking multiple streams and creating a new Multi
out of them.
Such an operation observes the items emitted by the different streams and produces a new Multi
emitting the events.
All the streams merged or concatenated this way should emit the same type of items.
The difference between merge and concatenate
Understanding the difference between merge and concatenate is essential.
When merging streams, it observes the different upstreams and emits the items as they come. If the streams emit their items concurrently, the items from the different streams are interleaved.

When using merge, failures are also propagated to the merged stream, and no more items are emitted after that failure. The completion event is only emitted by the merged stream when all the observed streams are completed.
But if we want to keep the order of the observed stream, we need to concatenate.
When concatenating, it waits for the first stream to complete before subscribing to the second one. Thus, it ensures that all the items from the first stream have been emitted before emitting the second stream items. It preserves an order corresponding to the source:

When the first stream emits the completion event, it switches to the second stream, and so on. When the last stream completes, the concatenated stream sends the completion event. As for merge, if a stream fails then there won’t be further evebts.
Merging Multis
To create a new Multi
from the merge of multiple Multi
streams use:
Multi<T> multi1 = getFirstMulti();
Multi<T> multi2 = getSecondMulti();
Multi<T> merged = Multi.createBy().merging().streams(multi1, multi2);
For example, we can merge multiple streams emitting periodical events and look at the output:
Multi<String> first = Multi.createFrom().ticks().every(Duration.ofMillis(10))
.onItem().transform(l -> "Stream 1 - " + l);
Multi<String> second = Multi.createFrom().ticks().every(Duration.ofMillis(15))
.onItem().transform(l -> "Stream 2 - " + l);
Multi<String> third = Multi.createFrom().ticks().every(Duration.ofMillis(5))
.onItem().transform(l -> "Stream 3 - " + l);
Cancellable cancellable = Multi.createBy().merging().streams(first, second, third)
.subscribe().with(s -> System.out.println("Got item: " + s));
Got item: Stream 1 - 0
Got item: Stream 2 - 0
Got item: Stream 3 - 0
Got item: Stream 3 - 1
Got item: Stream 1 - 1
Got item: Stream 3 - 2
Got item: Stream 2 - 1
Got item: Stream 3 - 3
Got item: Stream 1 - 2
Got item: Stream 3 - 4
Got item: Stream 3 - 5
....
Concatenating Multis
To create a new Multi
from the concatenation of multiple Multi
streams use:
Multi<T> multi1 = getFirstMulti();
Multi<T> multi2 = getSecondMulti();
Multi<T> concatenated = Multi.createBy().concatenating().streams(multi1, multi2);
Don’t forget that the streams order matters in this case, as (streamA, streamB)
does not provide the same result as (streamB, streamA)
:
Multi<String> first = Multi.createFrom().items("A1", "A2", "A3");
Multi<String> second = Multi.createFrom().items("B1", "B2", "B3");
Multi.createBy().concatenating().streams(first, second)
.subscribe().with(item -> System.out.print(item)); // "A1A2A3B1B2B3"
Multi.createBy().concatenating().streams(second, first)
.subscribe().with(item -> System.out.print(item)); // "B1B2B3A1A2A3"
If one of the concatenated streams is unbounded (infinite), the next streams in the list won’t be consumed! |