mirror the source Observable, but issue an error notification if a particular period of time elapses without any emitted items
The Timeout operator allows you to abort an Observable with an onError termination if that Observable fails to emit any items during a specified span of time.
timeout RxGroovy implements this operator as timeout, but in several variants.
The first variant accepts parameters that define a duration of time (a quantity of time, and a TimeUnit that this quantity is denominated in). Each time the source Observable emits an item, timeout starts a timer, and if that timer exceeds the duration before the source Observable emits another item, timeout terminates its Observable with an error (TimeoutException).
By default this variant of timeout operates on the computation Scheduler, but you can choose a different Scheduler by passing it in as an optional third parameter to timeout
timeout(long,TimeUnit)
timeout()
A second variant of timeout differs from the first in that instead of issuing an error notification in case of a timeout condition, it instead immediately switches to a backup Observable that you specify.
By default this variant of timeout operates on the computation Scheduler, but you can choose a different Scheduler by passing it in as an optional third parameter to timeout
timeout(long,TimeUnit,Observable)
timeout(long,TimeUnit,Observable,Scheduler)
A third variant of timeout does not use a constant timeout duration, but sets its timeout duration on a per-item basis by passing each item from the source Observable into a function that returns an Observable and then monitoring those Observables. If any such Observable completes before the source Observable emits another item, this is considered a timeout condition, and triggers an onError notification (“TimeoutException”) from the Observable timeout returns.
This variant of timeout by default runs on the immediate Scheduler.
timeout(Func1)
There is also a variant of timeout that both uses a per-item Observable to set the timeout duration and switches to a backup Observable in case of a timeout.
This variant of timeout by default runs on the immediate Scheduler.
timeout(Func1,Observable)
The variant of timeout that uses a per-item Observable to set the timeout has a variant that allows you to pass in a function that returns an Observable that acts as a timeout timer for the very first item emitted by the source Observable (in the absence of this, there would be no timeout for the first item).
This variant of timeout by default runs on the immediate Scheduler.
timeout(Func0,Func1)
And that variant also has a cousin that will switch to a specified backup Observable rather than emitting an error upon hitting a timeout condition.
This variant of timeout by default runs on the immediate Scheduler.
timeout(Func0,Func1,Observable)
timeout RxJava implements this operator as timeout, but in several variants.
The first variant accepts parameters that define a duration of time (a quantity of time, and a TimeUnit that this quantity is denominated in). Each time the source Observable emits an item, timeout starts a timer, and if that timer exceeds the duration before the source Observable emits another item, timeout terminates its Observable with an error (TimeoutException).
By default this variant of timeout operates on the computation Scheduler, but you can choose a different Scheduler by passing it in as an optional third parameter to timeout
timeout(long,TimeUnit)
timeout()
A second variant of timeout differs from the first in that instead of issuing an error notification in case of a timeout condition, it instead immediately switches to a backup Observable that you specify.
By default this variant of timeout operates on the computation Scheduler, but you can choose a different Scheduler by passing it in as an optional third parameter to timeout.
timeout(long,TimeUnit,Observable)
timeout(long,TimeUnit,Observable,Scheduler)
A third variant of timeout does not use a constant timeout duration, but sets its timeout duration on a per-item basis by passing each item from the source Observable into a function that returns an Observable and then monitoring those Observables. If any such Observable completes before the source Observable emits another item, this is considered a timeout condition, and triggers an onError notification (“TimeoutException”) from the Observable timeout returns.
This variant of timeout by default runs on the immediate Scheduler.
timeout(Func1)
There is also a variant of timeout that both uses a per-item Observable to set the timeout duration and switches to a backup Observable in case of a timeout.
This variant of timeout by default runs on the immediate Scheduler.
timeout(Func1,Observable)
The variant of timeout that uses a per-item Observable to set the timeout has a variant that allows you to pass in a function that returns an Observable that acts as a timeout timer for the very first item emitted by the source Observable (in the absence of this, there would be no timeout for the first item).
This variant of timeout by default runs on the immediate Scheduler.
timeout(Func0,Func1)
And that variant also has a cousin that will switch to a specified backup Observable rather than emitting an error upon hitting a timeout condition.
This variant of timeout by default runs on the immediate Scheduler.
timeout(Func0,Func1,Observable)
timeout timeoutWithSelector RxJS implements this operator as timeout and timeoutWithSelector:
One variant of timeout accepts a duration of time (in milliseconds). Each time the source Observable emits an item, timeout starts a timer, and if that timer exceeds the duration before the source Observable emits another item, timeout terminates its Observable with an error (“Timeout” or a string of your choice that you pass as an optional second parameter).
var source = Rx.Observable
.return(42)
.delay(5000)
.timeout(200, 'Timeout has occurred.');
var subscription = source.subscribe(
function (x) { console.log('Next: %s', x); },
function (err) { console.log('Error: %s', err); },
function () { console.log('Completed'); }); Error: Timeout has occurred.
Another variant allows you to instruct timeout to switch to a backup Observable that you specify, rather than terminating with an error, if the timeout condition is triggered. To use this variant, pass the backup Observable (or Promise) as the second parameter to timeout.
var source = Rx.Observable
.return(42)
.delay(5000)
.timeout(200, Promise.resolve(42));
var subscription = source.subscribe(
function (x) { console.log('Next: %s', x); },
function (err) { console.log('Error: %s', err); },
function () { console.log('Completed'); }); Next: 42 Completed
timeoutWithSelector does not use a constant timeout duration, but sets its timeout duration on a per-item basis by passing each item from the source Observable into a function that returns an Observable and then monitoring those Observables. If any such Observable completes before the source Observable emits another item, this is considered a timeout condition, and triggers an onError notification (“Error: Timeout”) from the Observable timeoutWithSelector returns.
var array = [
200,
300,
350,
400
];
var source = Rx.Observable
.for(array, function (x) {
return Rx.Observable.timer(x);
})
.map(function (x, i) { return i; })
.timeoutWithSelector(function (x) {
return Rx.Observable.timer(400);
});
var subscription = source.subscribe(
function (x) { console.log('Next: ' + x); },
function (err) { console.log('Error: ' + err); },
function () { console.log('Completed'); }); Next: 0 Next: 1 Next: 2 Error: Error: Timeout
There is also a variant of timeoutWithSelector that both uses a per-item Observable to set the timeout duration and switches to a backup Observable in case of a timeout.
var array = [
200,
300,
350,
400
];
var source = Rx.Observable
.for(array, function (x) {
return Rx.Observable.timer(x);
})
.map(function (x, i) { return i; })
.timeoutWithSelector(function (x) {
return Rx.Observable.timer(400);
}, Rx.Observable.return(42));
var subscription = source.subscribe(
function (x) { console.log('Next: ' + x); },
function (err) { console.log('Error: ' + err); },
function () { console.log('Completed'); }); Next: 0 Next: 1 Next: 2 Next: 42 Completed
The variant of timeoutWithSelector that uses a per-item Observable to set the timeout has a variant that allows you to pass in an Observable that acts as a timeout timer for the very first item emitted by the source Observable (in the absence of this, there would be no timeout for the first item; that is to say, the default Observable that governs this first timeout period is Rx.Observable.never()).
var array = [
200,
300,
350,
400
];
var source = Rx.Observable
.for(array, function (x) {
return Rx.Observable.timer(x);
})
.map(function (x, i) { return i; })
.timeoutWithSelector(Rx.Observable.timer(250), function (x) {
return Rx.Observable.timer(400);
});
var subscription = source.subscribe(
function (x) { console.log('Next: ' + x); },
function (err) { console.log('Error: ' + err); },
function () { console.log('Completed'); }); Next: 0 Next: 1 Next: 2 Error: Error: Timeout
And that variant also has a cousin that will switch to a specified backup Observable rather than emitting an error upon hitting a timeout condition.
var array = [
200,
300,
350,
400
];
var source = Rx.Observable
.for(array, function (x) {
return Rx.Observable.timer(x);
})
.map(function (x, i) { return i; })
.timeoutWithSelector(Rx.Observable.timer(250), function (x) {
return Rx.Observable.timer(400);
}, Rx.Observable.return(42));
var subscription = source.subscribe(
function (x) { console.log('Next: ' + x); },
function (err) { console.log('Error: ' + err); },
function () { console.log('Completed'); }); Next: 0 Next: 1 Next: 2 Next: 42 Completed
timeout and timeoutWithSelector are found in each of the following distributions:
rx.all.jsrx.all.compat.jsrx.time.jsrx.lite.jsrx.lite.compat.jsThey require one of the following distributions:
rx.jsrx.compat.jsrx.lite.jsrx.lite.compat.jstimeout RxPHP implements this operator as timeout.
Errors the observable sequence if no item is emitted in the specified time. When a timeout occurs, this operator errors with an instance of Rx\Exception\TimeoutException
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/timeout/timeout.php
Rx\Observable::interval(1000)
->timeout(500)
->subscribe($createStdoutObserver('One second - '));
Rx\Observable::interval(100)
->take(3)
->timeout(500)
->subscribe($createStdoutObserver('100 ms - ')); 100 ms - Next value: 0 100 ms - Next value: 1 100 ms - Next value: 2 100 ms - Complete! One second - Exception: timeout
© ReactiveX contributors
Licensed under the Apache License 2.0.
http://reactivex.io/documentation/operators/timeout.html