combine multiple Observables into one by merging their emissions
You can combine the output of multiple Observables so that they act like a single Observable, by using the Merge operator.
Merge may interleave the items emitted by the merged Observables (a similar operator, Concat, does not interleave items, but emits all of each source Observable’s items in turn before beginning to emit items from the next source Observable).
As shown in the above diagram, an onError notification from any of the source Observables will immediately be passed through to observers and will terminate the merged Observable.
In many ReactiveX implementations there is a second operator, MergeDelayError, that changes this behavior — reserving onError notifications until all of the merged Observables complete and only then passing it along to the observers:
interleave interleave* merge merge* merge-delay-error merge-delay-error* In RxClojure there are six operators of concern here:
merge converts two or more Observables into a single Observable that emits all of the items emitted by all of those Observables.
merge* converts an Observable that emits Observables into a single Observable that emits all of the items emitted by all of the emitted Observables.
merge-delay-error is like merge, but will emit all items from all of the merged Observables even if one or more of those Observables terminates with an onError notification while emissions are still pending.
merge-delay-error* is a similarly-modified version of merge*.
interleave is like merge, but more deliberate about how it interleaves the items from the source Observables: the resulting Observable emits the first item emitted by the first source Observable, then the first item emitted by the second source Observable, and so forth, and having reached the last source Observable, then emits the second item emitted by the first source Observable, the second item emitted by the second source Observable, and so forth, until all of the source Observables terminate.
interleave* is similar but operates on an Observable of Observables.
merge RxCpp implements this operator as merge.
merge mergeDelayError mergeWith RxGroovy implements this operator as merge, mergeWith, and mergeDelayError.
For example, the following code merges the odds and evens into a single Observable. (The subscribeOn operator makes odds operate on a different thread from evens so that the two Observables may both emit items at the same time, to demonstrate how Merge may interleave these items.)
odds = Observable.from([1, 3, 5, 7]).subscribeOn(someScheduler);
evens = Observable.from([2, 4, 6]);
Observable.merge(odds,evens).subscribe(
{ println(it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
); 1 3 2 5 4 7 6 Sequence complete
Instead of passing multiple Observables (up to nine) into merge, you could also pass in a List<> (or other Iterable) of Observables, an Array of Observables, or even an Observable that emits Observables, and merge will merge their output into the output of a single Observable:
merge(Iterable)
merge(Iterable,int)
merge(Observable[])
merge(Observable[], int) (RxGroovy 1.1)
merge(Observable, Observable) (there are also versions that take up to nine Observables)
If you pass in an Observable of Observables, you have the option of also passing in a value indicating to merge the maximum number of those Observables it should attempt to be subscribed to simultaneously. Once it reaches this maximum subscription count, it will refrain from subscribing to any other Observables emitted by the source Observable until such time as one of the already-subscribed-to Observables issues an onCompleted notification.
The instance version of merge is mergeWith, so, for example, in the code sample above, instead of writing Observable.merge(odds,evens) you could also write odds.mergeWith(evens).
mergeWith(Observable)
If any of the individual Observables passed into merge terminates with an onError notification, the Observable produced by merge itself will immediately terminate with an onError notification. If you would prefer a merge that continues emitting the results of the remaining, error-free Observables before reporting the error, use mergeDelayError instead.
mergeDelayError behaves much like merge. The exception is when one of the Observables being merged terminates with an onError notification. If this happens with merge, the merged Observable will immediately issue an onError notification and terminate. mergeDelayError, on the other hand, will hold off on reporting the error until it has given any other non-error-producing Observables that it is merging a chance to finish emitting their items, and it will emit those itself, and will only terminate with an onError notification when all of the other merged Observables have finished.
Because it is possible that more than one of the merged Observables encountered an error, mergeDelayError may pass information about multiple errors in the onError notification (it will never invoke the observer’s onError method more than once). For this reason, if you want to know the nature of these errors, you should write your observers’ onError methods so that they accept a parameter of the class CompositeException.
mergeDelayError has fewer variants. You cannot pass it an Iterable or Array of Observables, but you can pass it an Observable that emits Observables or between one and nine individual Observables as parameters. There is not an instance method version of mergeDelayError as there is for merge.
mergeDelayError(Observable<Observable>)
mergeDelayError(Observable,Observable) (there are also versions that take up to nine Observables)merge mergeDelayError mergeWith RxJava implements this operator as merge, mergeWith, and mergeDelayError.
Observable<Integer> odds = Observable.just(1, 3, 5).subscribeOn(someScheduler);
Observable<Integer> evens = Observable.just(2, 4, 6);
Observable.merge(odds, evens)
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
}); Next: 1 Next: 3 Next: 5 Next: 2 Next: 4 Next: 6 Sequence complete.
merge(Iterable)
merge(Iterable,int)
merge(Observable[])
merge(Observable[], int) (RxJava 1.1)
merge(Observable, Observable) (there are also versions that take up to nine Observables) Instead of passing multiple Observables (up to nine) into merge, you could also pass in a List<> (or other Iterable) of Observables, an Array of Observables, or even an Observable that emits Observables, and merge will merge their output into the output of a single Observable:
If you pass in an Observable of Observables, you have the option of also passing in a value indicating to merge the maximum number of those Observables it should attempt to be subscribed to simultaneously. Once it reaches this maximum subscription count, it will refrain from subscribing to any other Observables emitted by the source Observable until such time as one of the already-subscribed-to Observables issues an onCompleted notification.
merge(Observable<Observable>)
merge(Observable<Observable>, int) (RxJava 1.1)
The instance version of merge is mergeWith, so, for example, instead of writing Observable.merge(odds,evens) you could also write odds.mergeWith(evens).
If any of the individual Observables passed into merge terminates with an onError notification, the Observable produced by merge itself will immediately terminate with an onError notification. If you would prefer a merge that continues emitting the results of the remaining, error-free Observables before reporting the error, use mergeDelayError instead.
mergeDelayError behaves much like merge. The exception is when one of the Observables being merged terminates with an onError notification. If this happens with merge, the merged Observable will immediately issue an onError notification and terminate. mergeDelayError, on the other hand, will hold off on reporting the error until it has given any other non-error-producing Observables that it is merging a chance to finish emitting their items, and it will emit those itself, and will only terminate with an onError notification when all of the other merged Observables have finished.
Because it is possible that more than one of the merged Observables encountered an error, mergeDelayError may pass information about multiple errors in the onError notification (it will never invoke the observer’s onError method more than once). For this reason, if you want to know the nature of these errors, you should write your observers’ onError methods so that they accept a parameter of the class CompositeException.
mergeDelayError has fewer variants. You cannot pass it an Iterable or Array of Observables, but you can pass it an Observable that emits Observables or between one and nine individual Observables as parameters. There is not an instance method version of mergeDelayError as there is for merge.
mergeDelayError(Observable<Observable>)
mergeDelayError(Observable,Observable) (there are also versions that take up to nine Observables)merge mergeAll mergeDelayError
The first variant of merge is an instance operator that takes a variable number of Observables as parameters, merging each of these Observables with the source (instance) Observables to produce its single output Observable.
This first variant of merge is found in the following distributions:
rx.jsrx.compat.jsrx.lite.jsrx.lite.compat.js The second variant of merge is a prototype (class) operator that accepts two parameters. The second of these is an Observable that emits the Observables you want to merge. The first is a number that indicates the maximum number of these emitted Observables that you want merge to attempt to be subscribed to at any moment. Once it reaches this maximum subscription count, it will refrain from subscribing to any other Observables emitted by the source Observable until such time as one of the already-subscribed-to Observables issues an onCompleted notification.
This second variant of merge is found in the following distributions:
rx.jsrx.all.jsrx.all.compat.jsrx.compat.jsrx.lite.jsrx.lite.compat.js
mergeAll is like this second variant of merge except that it does not allow you to set this maximum subscription count. It only takes the single parameter of an Observable of Observables.
mergeAll is found in the following distributions:
rx.jsrx.all.jsrx.all.compat.jsrx.compat.jsrx.lite.jsrx.lite.compat.js
If any of the individual Observables passed into merge or mergeAll terminates with an onError notification, the resulting Observable will immediately terminate with an onError notification. If you would prefer a merge that continues emitting the results of the remaining, error-free Observables before reporting the error, use mergeDelayError instead.
var source1 = Rx.Observable.of(1,2,3);
var source2 = Rx.Observable.throwError(new Error('whoops!'));
var source3 = Rx.Observable.of(4,5,6);
var merged = Rx.Observable.mergeDelayError(source1, source2, source3);
var subscription = merged.subscribe(
function (x) { console.log('Next: %s', x); },
function (err) { console.log('Error: %s', err); }
function () { console.log('Completed' } ); 1 2 3 4 5 6 Error: Error: whoops!
mergeDelayError is found in the following distributions:
rx.jsrx.compat.jsrx.lite.jsrx.lite.compat.jsmerge mergeDelayError mergeWith RxKotlin implements this operator as merge, mergeWith, and mergeDelayError.
Instead of passing multiple Observables (up to nine) into merge, you could also pass in a List<> (or other Iterable) of Observables, an Array of Observables, or even an Observable that emits Observables, and merge will merge their output into the output of a single Observable:
If you pass in an Observable of Observables, you have the option of also passing in a value indicating to merge the maximum number of those Observables it should attempt to be subscribed to simultaneously. Once it reaches this maximum subscription count, it will refrain from subscribing to any other Observables emitted by the source Observable until such time as one of the already-subscribed-to Observables issues an onCompleted notification.
The instance version of merge is mergeWith, so, for example, instead of writing Observable.merge(odds,evens) you could also write odds.mergeWith(evens).
If any of the individual Observables passed into merge terminates with an onError notification, the Observable produced by merge itself will immediately terminate with an onError notification. If you would prefer a merge that continues emitting the results of the remaining, error-free Observables before reporting the error, use mergeDelayError instead.
mergeDelayError behaves much like merge. The exception is when one of the Observables being merged terminates with an onError notification. If this happens with merge, the merged Observable will immediately issue an onError notification and terminate. mergeDelayError, on the other hand, will hold off on reporting the error until it has given any other non-error-producing Observables that it is merging a chance to finish emitting their items, and it will emit those itself, and will only terminate with an onError notification when all of the other merged Observables have finished.
Because it is possible that more than one of the merged Observables encountered an error, mergeDelayError may pass information about multiple errors in the onError notification (it will never invoke the observer’s onError method more than once). For this reason, if you want to know the nature of these errors, you should write your observers’ onError methods so that they accept a parameter of the class CompositeException.
mergeDelayError has fewer variants. You cannot pass it an Iterable or Array of Observables, but you can pass it an Observable that emits Observables or between one and nine individual Observables as parameters. There is not an instance method version of mergeDelayError as there is for merge.
Merge Rx.NET implements this operator as Merge.
You can pass Merge an Array of Observables, an Enumerable of Observables, an Observable of Observables, or two individual Observables.
If you pass an Enumerable or Observable of Observables, you have the option of also passing in an integer indicating the maximum number of those Observables it should attempt to be subscribed to simultaneously. Once it reaches this maximum subscription count, it will refrain from subscribing to any other Observables emitted by the source Observable until such time as one of the already-subscribed-to Observables issues an onCompleted notification.
merge mergeAll RxPHP implements this operator as merge.
Combine an Observable together with another Observable by merging their emissions into a single Observable.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/merge/merge.php
$observable = Rx\Observable::of(42)->repeat();
$otherObservable = Rx\Observable::of(21)->repeat();
$mergedObservable = $observable
->merge($otherObservable)
->take(10);
$disposable = $mergedObservable->subscribe($stdoutObserver); Next value: 42 Next value: 21 Next value: 42 Next value: 21 Next value: 42 Next value: 21 Next value: 42 Next value: 21 Next value: 42 Next value: 21 Complete!
RxPHP also has an operator mergeAll.
Merges an observable sequence of observables into an observable sequence.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/merge/merge-all.php
$sources = Rx\Observable::range(0, 3)
->map(function ($x) {
return Rx\Observable::range($x, 3);
});
$merged = $sources->mergeAll();
$disposable = $merged->subscribe($stdoutObserver); Next value: 0 Next value: 1 Next value: 1 Next value: 2 Next value: 2 Next value: 2 Next value: 3 Next value: 3 Next value: 4 Complete!
merge merge_all merge_observable RxPY implements this operator as merge and merge_all/merge_observable.
You can either pass merge a set of Observables as individual parameters, or as a single parameter containing an array of those Observables.
merge_all and its alias merge_observable take as their single parameter an Observable that emits Observables. They merge the emissions of all of these Observables to create their own Observable.
merge merge_all merge_concurrent Rx.rb implements this operator as merge, merge_concurrent, and merge_all.
merge merges a second Observable into the one it is operating on to create a new merged Observable.
merge_concurrent operates on an Observable that emits Observables, merging the emissions from each of these Observables into its own emissions. You can optionally pass it an integer parameter indicating how many of these emitted Observables merge_concurrent should try to subscribe to concurrently. Once it reaches this maximum subscription count, it will refrain from subscribing to any other Observables emitted by the source Observable until such time as one of the already-subscribed-to Observables issues an onCompleted notification. The default is 1, which makes it equivalent to merge_all.
merge_all is like merge_concurrent(1). It subscribes to each emitted Observable one at a time, mirroring its emissions as its own, and waiting to subscribe to the next Observable until the present one terminates with an onCompleted notification. In this respect it is more like a Concat variant.
flatten flattenDelayError merge mergeDelayError RxScala implements this operator as flatten, flattenDelayError, merge, and mergeDelayError.
merge takes a second Observable as a parameter and merges that Observable with the one the merge operator is applied to in order to create a new output Observable.
mergeDelayError is similar to merge except that it will always emit all items from both Observables even if one of the Observables terminates with an onError notification before the other Observable has finished emitting items.
flatten takes as its parameter an Observable that emits Observables. It merges the items emitted by each of these Observables to create its own single Observable sequence. A variant of this operator allows you to pass in an Int indicating the maximum number of these emitted Observables you want flatten to try to be subscribed to at any time. It it hits this maximum subscription count, it will refrain from subscribing to any other Observables emitted by the source Observable until such time as one of the already-subscribed-to Observables issues an onCompleted notification.
flattenDelayError is similar to flatten except that it will always emit all items from all of the emitted Observables even if one or more of those Observables terminates with an onError notification before the other Observables have finished emitting items.
merge RxSwift implements this operator as merge.
merge takes as its parameter an Observable that emits Observables. It merges the items emitted by each of these Observables to create its own single Observable sequence.
A variant of this operator merge(maxConcurrent:) allows you to pass in an Int indicating the maximum number of these emitted Observables you want merge to try to be subscribed to at any time. If it hits this maximum subscription count, it will refrain from subscribing to any other Observables emitted by the source Observable until such time as one of the already-subscribed-to Observables issues an onCompleted notification.
let subject1 = PublishSubject()
let subject2 = PublishSubject()
Observable.of(subject1, subject2)
.merge()
.subscribe {
print($0)
}
subject1.on(.Next(10))
subject1.on(.Next(11))
subject1.on(.Next(12))
subject2.on(.Next(20))
subject2.on(.Next(21))
subject1.on(.Next(14))
subject1.on(.Completed)
subject2.on(.Next(22))
subject2.on(.Completed) Next(10) Next(11) Next(12) Next(20) Next(21) Next(14) Next(22) Completed
© ReactiveX contributors
Licensed under the Apache License 2.0.
http://reactivex.io/documentation/operators/merge.html