Merge multiple Python rx Observables into one Observable without creating an empty Observable.

Do you have a list of Observable streams that you would like to combine into one stream? You could use Observable.merge(observable).merge(observable). But what if you have a list and don't want to chain each and everyone of them? After reading this, you know how to use the Observable.merge factory method to merge a list of observables.

Let’s say you have three streams that produce strings. You would like to end up with one stream to subscribe to:

stream1       ----x---------
stream2       ------x-------
stream3       --------x-----

single_stream ----x-x-x-----

Python allows you to merge observables by chaining them:

single_stream = stream1.merge(stream2).merge(stream3)

That works as long as you have a fixed number of streams. If you have a list of observables, you need to do something like:

streams = [stream1, stream2, stream3]

single_stream = Observable.empty()

for s in streams:
single_stream = single_stream.merge(s)

But there is an easier way. We can use the Observable.merge factory method to create a new observable, based on a list of observables.

Use Observable.merge factory to create an observable from a list of observables

streams = [stream1, stream2, stream3]
single_stream = Observable.merge(streams)

If you want to test this code yourself, open a terminal and create a project like this:

mkdir rx_fun
cd rx_fun/
python3 -m venv env
source env/bin/activate
pip install rx

Create and enter this code:

from rx.subjects import Subject
from rx import Observable

stream1 = Subject()
stream2 = Subject()
stream3 = Subject()

streams = [stream1, stream2, stream3]

single_stream = Observable.merge(streams)

single_stream.subscribe(lambda x: print(x))


If you run this code, the result is:

Written by Loek van den Ouweland on 2019-02-09. Questions regarding this artice? You can send them to the address below.