convert an ordinary Observable into a connectable Observable
A connectable Observable resembles an ordinary Observable, except that it does not begin emitting items when it is subscribed to, but only when the Connect operator is applied to it. In this way you can prompt an Observable to begin emitting items at a time of your choosing.
publish
RxGroovy implements this operator as publish
.
publish()
There is also a variant that takes a function as a parameter. This function takes an emitted item from the source Observable as a parameter and produces the item that will be emitted in its place by the resulting Observable.
publish(Func1)
publish
RxJava implements this operator as publish
.
publish()
There is also a variant that takes a function as a parameter. This function takes as a parameter the ConnectableObservable
that shares a single subscription to the underlying Observable sequence. This function produces and returns a new Observable sequence.
publish(Func1)
let letBind multicast publish publishLast publishValue
In RxJS, the publish
operator takes a function as a parameter. This function takes an emitted item from the source Observable as a parameter and produces the item that will be emitted in its place by the returned ConnectableObservable
.
var interval = Rx.Observable.interval(1000); var source = interval .take(2) .doAction(function (x) { console.log('Side effect'); }); var published = source.publish(); published.subscribe(createObserver('SourceA')); published.subscribe(createObserver('SourceB')); var connection = published.connect(); function createObserver(tag) { return Rx.Observer.create( function (x) { console.log('Next: ' + tag + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); }); }
The publishValue
operator takes, in addition to the function described above, an initial item to be emitted by the resulting ConnectableObservable
at connection time before emitting the items from the source Observable. It will not, however, emit this initial item to observers that subscribe after the time of connection.
var interval = Rx.Observable.interval(1000); var source = interval .take(2) .doAction(function (x) { console.log('Side effect'); }); var published = source.publishValue(42); published.subscribe(createObserver('SourceA')); published.subscribe(createObserver('SourceB')); var connection = published.connect(); function createObserver(tag) { return Rx.Observer.create( function (x) { console.log('Next: ' + tag + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); }); }
The publishLast
operator is similar to publish
, and takes a similarly-behaving function as its parameter. It differs from publish
in that instead of applying that function to, and emitting an item for every item emitted by the source Observable subsequent to the connection, it only applies that function to and emits an item for the last item that was emitted by the source Observable, when that source Observable terminates normally.
var interval = Rx.Observable.interval(1000); var source = interval .take(2) .doAction(function (x) { console.log('Side effect'); }); var published = source.publishLast(); published.subscribe(createObserver('SourceA')); published.subscribe(createObserver('SourceB')); var connection = published.connect(); function createObserver(tag) { return Rx.Observer.create( function (x) { console.log('Next: ' + tag + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); }); }
The above operators are available in the following packages:
rx.all.js
rx.all.compat.js
rx.binding.js
(requires either rx.js
or rx.compat.js
)rx.lite.js
rx.lite.compat.js
RxJS also has a multicast
operator which operates on an ordinary Observable, multicasts that Observable by means of a particular Subject that you specify, applies a transformative function to each emission, and then emits those transformed values as its own ordinary Observable sequence. Each subscription to this new Observable will trigger a new subscription to the underlying multicast Observable.
var subject = new Rx.Subject(); var source = Rx.Observable.range(0, 3) .multicast(subject); var observer = Rx.Observer.create( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); } ); var subscription = source.subscribe(observer); subject.subscribe(observer); var connected = source.connect(); subscription.dispose();
The multicast
operator is available in the following packages:
rx.all.js
rx.all.compat.js
rx.binding.js
(requires either rx.lite.js
or rx.compat.js
)rx.lite.js
rx.lite.compat.js
There is also a let
operator (the alias letBind
is available for browsers such as Internet Explorer before IE9 where “let
” is forbidden). It is similar to multicast
but does not multicast the underlying Observable through a Subject:
var obs = Rx.Observable.range(1, 3); var source = obs.let(function (o) { return o.concat(o); }); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); }); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
The let
(or letBind
) operator is available in the following packages:
rx.all.js
rx.all.compat.js
rx.experimental.js
It requires one of the following packages:
rx.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
multicast multicastWithSelector publish publishLast publishValue
RxPHP implements this operator as multicast
.
Multicasts the source sequence notifications through an instantiated subject into all uses of the sequence within a selector function. Each subscription to the resulting sequence causes a separate multicast invocation, exposing the sequence resulting from the selector function's invocation. For specializations with fixed subject types, see Publish, PublishLast, and Replay.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/multicast/multicast.php $subject = new \Rx\Subject\Subject(); $source = \Rx\Observable::range(0, 3)->multicast($subject); $subscription = $source->subscribe($stdoutObserver); $subject->subscribe($stdoutObserver); $connected = $source->connect();
RxPHP also has an operator multicastWithSelector
.
Multicasts the source sequence notifications through an instantiated subject from a subject selector factory, into all uses of the sequence within a selector function. Each subscription to the resulting sequence causes a separate multicast invocation, exposing the sequence resulting from the selector function's invocation. For specializations with fixed subject types, see Publish, PublishLast, and Replay.
RxPHP also has an operator publish
.
Returns an observable sequence that is the result of invoking the selector on a connectable observable sequence that shares a single subscription to the underlying sequence. This operator is a specialization of Multicast using a regular Subject.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/publish/publish.php /* With publish */ $interval = \Rx\Observable::range(0, 10); $source = $interval ->take(2) ->doOnNext(function ($x) { echo "Side effect\n"; }); $published = $source->publish(); $published->subscribe($createStdoutObserver('SourceC ')); $published->subscribe($createStdoutObserver('SourceD ')); $published->connect();
RxPHP also has an operator publishLast
.
Returns an observable sequence that is the result of invoking the selector on a connectable observable sequence that shares a single subscription to the underlying sequence containing only the last notification. This operator is a specialization of Multicast using a AsyncSubject.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/publish/publishLast.php $range = \Rx\Observable::fromArray(range(0, 1000)); $source = $range ->take(2) ->doOnNext(function ($x) { echo "Side effect\n"; }); $published = $source->publishLast(); $published->subscribe($createStdoutObserver('SourceA')); $published->subscribe($createStdoutObserver('SourceB')); $connection = $published->connect();
RxPHP also has an operator publishValue
.
Returns an observable sequence that is the result of invoking the selector on a connectable observable sequence that shares a single subscription to the underlying sequence and starts with initialValue. This operator is a specialization of Multicast using a BehaviorSubject.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/publish/publishValue.php $range = \Rx\Observable::fromArray(range(0, 1000)); $source = $range ->take(2) ->doOnNext(function ($x) { echo "Side effect\n"; }); $published = $source->publishValue(42); $published->subscribe($createStdoutObserver('SourceA')); $published->subscribe($createStdoutObserver('SourceB')); $connection = $published->connect();
© ReactiveX contributors
Licensed under the Apache License 2.0.
http://reactivex.io/documentation/operators/publish.html