There are many poll-based API around us.
Sometimes you need to use these APIs to generate a stream from the polled values.
To do this, use the repeat()
feature:
PollableDataSource source = new PollableDataSource();
// First creates a uni that emit the polled item.
// Because `poll` blocks, let's use a specific executor
Uni<String> pollItemFromSource = Uni.createFrom().item(source::poll)
.runSubscriptionOn(executor);
// To get the stream of items, just repeat the uni indefinitely
Multi<String> stream = pollItemFromSource.repeat().indefinitely();
Cancellable cancellable = stream.subscribe().with(item -> System.out.println("Polled item: " + item));
// ... later ..
// when you don't want the items anymore, cancel the subscription and close the source if needed.
cancellable.cancel();
source.close();
You can also stop the repetition using the repeat().until()
method which will continue the repetition until the given predicate returns true
, and/or directly create a Multi
using Multi.createBy().repeating()
:
PollableDataSource source = new PollableDataSource();
Multi<String> stream = Multi.createBy().repeating()
.supplier(source::poll)
.until(s -> s == null)
.runSubscriptionOn(executor);
stream.subscribe().with(item -> System.out.println("Polled item: " + item));