class Observable<T>
new (subscriber: (observer: {next: (value: T) => void, error: (err: Error) => void, complete: () => void}) => void | () => void)
Construct with subscriber function that receives observer object. Observer is for pushing values, errors and completion signal.
Subscriber function may return function that should be called if Observable receives complete signal or is cancelled.
static of
(...args: T[]): Observable<T>;
Function to create an Observable that immediately pushes values given as arguments.
Important
Stream is immediately completed after all values are pushed.
If you wish to create a stream and kickstart it with a value(s), use startWith.
static from
(values: T[]): Observable<T>;
Function to create an Observable that immediately pushes values given as an array of values.
Important
Stream is immediately completed after all values are pushed.
If you wish to create a stream and kickstart it with a value(s), use startWith.
static fromPromise
(values: Promise<T>): Observable<T>
Function to create an Observable that pushes a value
revealed with provided Promise and then completes
as no more values to get from it.
Subscribable interface
subscribe
({
start: (sub: Subscription) => void,
next: (value: T) => void,
error: (err: Error) => void,
complete: () => void
} ) => { unsubscribe(): void }
(
onNext?: (value: T) => void,
onError?: (err: Error) => void,
onCompletion?: () => void
) => { unsubscribe(): void }
Registers a function called every time when the Observable changes value that it holds, error is pushed or Observable is complete.
Returns a function to unregister the subscription.
toPromise
() => Promise<T>
Returns next value that's going to be pushed through the Observable instance.
// Example
const source = new Observable()
setTimeout(() => source.next(1), 100)
const value = await source.toPromise()
Promise interface
Observables are also implementing interface of a Promise,
so it actually is possible to deref as if Observable was a Promise
and use it both with .then and async..await
with await
(async () => {
console.log(
await new Observable(observer => { observer.next(1) })
)
// logs: 1
})()
then
<U>(onsuccess?: (value: T) => Promise<U> | U, onrejection?: (reason: any) => any): Promise<U>
new Observable(
observer => { observer.next(1) }).then(_ => console.log(_)
)
// logs: 1
AsyncIterable interface
Observables implement interface for asynchronous iteration with for..await..of looping.
let source = new Observable()
(async () => {
for await (let value of source) {
console.log(value)
}
})()
source.next(1)
source.next(2)
// logs 1
// logs 2
Observer interface
next
(nextValue: T) => void
Sends value signal through the Observable instance.
Values can be received, by .subscribe, .toPromise, .then methods
or through asynchronous iteration over the Observable.
error
(error: Error) => void
Sends error signal through the Observable instance. Errors can be received, with .subscribe method.
complete
() => void
Sends complete signal through the Observable instance. Completion can be handled, with .subscribe method.
Operators - common
map
(action: (value: T) => U): Observable<U>
Creates a derivative stream of values where
every value pushed by a parent is transformed with action function and push further by Observable -
result of this function call.
filter
(filter: (value: T) => boolean): Observable<T>;
Creates a derivative stream of values where
only those values that meet requirements formulated
with filter function are going to be pushed by that derivative Observable.
scan
(accumulator: (summary: U, value: T, index: number) => U, defaultValue?: U): Observable<U>
Creates a derivative stream of values where
on every value that parent pushes
there is accumulator function called
getting last pushed value and new value that has been pushed by parent Observable.
Result of the function is next value of newly created Observable.
First call is with summary being undefined unless defautValue is also passed.
flatten
(): Observable<U>
When parent Observable is releasing other Observables as values
use flatten to create a derivative stream that consists only of values
that are released by these "observable values".
merge
`(...args: Observable
Creates a stream containing all values of parent and of provided in arguments streams.
distinct
(comparator?: (prev: T, next: T) => boolean): Observable<T>;
Creates a derivative stream of values but eliminates repeated subsequent value occurences.
If comparator is passed, it is going to be used
to determine if a value is distinct from previous one.
Otherwise strict equal is incorporated.
buffer
(maxLastValues: number = 0): Observable<T[]>
Creates a derivative stream of parent Observable values gathered in array.
New values set is released after
all other Observables values are pushed through
and all Domain actions being called.
You can specify how many of there messages has to be remembered.
materialize
(defaultState: T): Atom<T>
Creates a derivative stream with an Atom where all values pushed by source are pushed by this node too.
Resulting Atom will start with defaultState state value.
async function example() {
const a = new Observable().materialize(1)
await leThreeHoursLater()
console.log(a.deref()) // logs '1'
}
Operators - specific
startWith
(firstValue: T): Observable<T>;
Creates a derivative stream of values where
it has all values of parent stream, but these are preceeded
with firstValue that is immediately shared with all other nodes
that subscribed to the newly created stream.
bufferCount
(bufferSize: number, customBufferCount: number = null): Observable<T[]>
Creates a derivative stream containing parent Observable values
gathered in an Array.
New values set is released
every bufferSize-th parent Observable value or
every customBufferCount-th parent Observable value
if second argument is present.
As it might be still mysterious how it works consider these graphical representations of how values are pushed in time:
const a = Observable.of(1, 2, 3, 4, 5)
const b = a.bufferCount(2)
a --1--2--3--4--5--|
b -----[1,2]-[3,4]-[5]|
const a = Observable.of(1, 2, 3, 4, 5)
const b = a.bufferCount(3, 2) // now with custom window
a --1--2--3-------4--5-|
b --------[1,2,3]----[3,4,5]-|
reemit
(): Observable<T>
Creates a derivative stream where all values pushed by source are pushed by this node too, but also this new one will push at first last value that has been emited by the source, if that is available.
async function example() {
const a = new Observable().startWith(1)
await leThreeHoursLater()
const b = a.reemit()
b.subscribe(value => console.log(value)) // logs '1'
}