apply a function to each item emitted by an Observable, sequentially, and emit each successive value
The Scan operator applies a function to the first item emitted by the source Observable and then emits the result of that function as its own first emission. It also feeds the result of the function back into the function along with the second item emitted by the source Observable in order to generate its second emission. It continues to feed back its own subsequent emissions along with the subsequent emissions from the source Observable in order to create the rest of its sequence.
This sort of operator is sometimes called an “accumulator” in other contexts.
scan
RxGroovy implements this operator as scan
. The following code, for example, takes an Observable that emits a consecutive sequence of n integers starting with 1
and converts it, via scan
, into an Observable that emits the first n triangular numbers:
numbers = Observable.from([1, 2, 3, 4, 5]); numbers.scan({ a, b -> a+b }).subscribe( { println(it); }, // onNext { println("Error: " + it.getMessage()); }, // onError { println("Sequence complete"); } // onCompleted );
scan(Func2)
There is also a variant of scan
to which you can pass a seed value to pass to the accumulator function the first time it is called (for the first emission from the source Observable) in place of the result from the missing prior call to the accumulator. Note that if you use this version, scan
will emit this seed value as its own initial emission. Note also that passing a seed of null
is not the same as passing no seed at all. A null
seed is a valid variety of seed.
scan(R,Func2)
This operator does not by default operate on any particular Scheduler.
scan
RxJava implements this operator as scan
.
Observable.just(1, 2, 3, 4, 5) .scan(new Func2<Integer, Integer, Integer>() { @Override public Integer call(Integer sum, Integer item) { return sum + item; } }).subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); } @Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); } @Override public void onCompleted() { System.out.println("Sequence complete."); } });
scan(Func2)
There is also a variant of scan
to which you can pass a seed value to pass to the accumulator function the first time it is called (for the first emission from the source Observable) in place of the result from the missing prior call to the accumulator. Note that if you use this version, scan
will emit this seed value as its own initial emission. Note also that passing a seed of null
is not the same as passing no seed at all. A null
seed is a valid variety of seed.
scan(R,Func2)
This operator does not by default operate on any particular Scheduler.
expand scan
RxJS implements the scan
operator.
var source = Rx.Observable.range(1, 3) .scan( function (acc, x) { return acc + x; }); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
You can optionally pass scan
a seed value as an additional parameter. scan
will pass this seed value to the accumulator function the first time it is called (for the first emission from the source Observable) in place of the result from the missing prior call to the accumulator.
var source = Rx.Observable.range(1, 3) .scan( function (acc, x) { return acc * x; }, 1 ); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
scan
is found in each of the following distributions:
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
RxJS also implements the expand
operator, which is somewhat similar. Rather than applying the function to the previous return value of the function combined with the next item emitted from the source Observable, such that the number of items it emits is equal to the number emitted by the source Observable, expand
simply feeds the return value from the function back into the function without regard to future emissions from the Observable, such that it will just continue to create new values at its own pace.
var source = Rx.Observable.return(42) .expand(function (x) { return Rx.Observable.return(42 + x); }) .take(5); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
expand
is found in each of the following distributions:
rx.all.js
rx.all.compat.js
rx.experimental.js
expand
requires one of the following distributions:
rx.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
scan
RxPHP implements this operator as scan
.
Applies an accumulator function over an observable sequence and returns each intermediate result. The optional seed value is used as the initial accumulator value.
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/scan/scan.php //Without a seed $source = Rx\Observable::range(1, 3); $subscription = $source ->scan(function ($acc, $x) { return $acc + $x; }) ->subscribe($createStdoutObserver());
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/scan/scan-with-seed.php //With a seed $source = Rx\Observable::range(1, 3); $subscription = $source ->scan(function ($acc, $x) { return $acc * $x; }, 1) ->subscribe($createStdoutObserver());
© ReactiveX contributors
Licensed under the Apache License 2.0.
http://reactivex.io/documentation/operators/scan.html