nope/lib/observables/nopeObservable.ts

122 lines
4.3 KiB
TypeScript
Raw Normal View History

2020-08-21 14:49:48 +00:00
import { BehaviorSubject, CompletionObserver, ErrorObserver, NextObserver, Subscription } from 'rxjs';
export interface nopeObserver {
active: boolean;
}
export declare type NopePartialObserver<T> = NextObserver<T> & nopeObserver | ErrorObserver<T> & nopeObserver | CompletionObserver<T> & nopeObserver;
/**
* RsJX based Observable.
*
* Contains additional Functionalities like:
* - property with the current value
* - function to publish values. (wrapper for next)
* - enables performing a subscription with synced call or a immediate call.
*/
export class nopeObservable<T> extends BehaviorSubject<T> {
protected _currentValue: T;
/**
* Getter for the Current Value.
*/
public get currentValue(): T {
return this._currentValue;
}
/**
* Setter for the Current value.
*/
public set currentValue(value: T) {
this.next(value);
}
/**
* Creates a custom Observable.
* @param initialValue
*/
constructor(initialValue: T) {
super(initialValue);
const _this = this;
this.subscribe((data) => {
_this._currentValue = data;
});
}
/**
* Function to Publish a Value to the stream.
* @param value The Value.
*/
publish(value: T) {
this.next(value);
}
subscribe(observer?: NopePartialObserver<T>, cbMode?: 'sync' | 'immediate'): Subscription;
/** @deprecated Use an observer instead of a complete callback */
subscribe(next: null | undefined, error: null | undefined, complete: () => void, cbMode?: 'sync' | 'immediate'): Subscription;
/** @deprecated Use an observer instead of an error callback */
subscribe(next: null | undefined, error: (error: any) => void, complete?: () => void, cbMode?: 'sync' | 'immediate'): Subscription;
/** @deprecated Use an observer instead of a complete callback */
subscribe(next: (value: T) => void, error: null | undefined, complete: () => void, cbMode?: 'sync' | 'immediate'): Subscription;
subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void, cbMode?: 'sync' | 'immediate'): Subscription;
// Underlying implementation
subscribe(observerOrNext?, errorOrcbMode?, complete?: () => void, cbMode?: 'sync' | 'immediate'): Subscription {
// The first argument could be an Observer or Callback (Depricated) and the callbacks for complete and error
let _cbMode = cbMode;
let _error = errorOrcbMode;
let _complete = complete;
let _observerOrNext = observerOrNext;
// If the first argument is an Observer => the second argument must be the cbMode
if (typeof observerOrNext === 'object') {
// Adapt the Callback
_cbMode = errorOrcbMode || 'sync';
// The error function must be undefined
_error = undefined;
}
// Test the cbMode
if (_cbMode === 'immediate') {
switch (typeof observerOrNext) {
case 'object':
if ((observerOrNext as NopePartialObserver<T>).next) {
const orgFunc = observerOrNext.next;
observerOrNext.next = (v: T) => setImmediate(() => {
if ((observerOrNext as NopePartialObserver<T>).active) {
orgFunc.call(observerOrNext, v);
}
});
}
if ((observerOrNext as NopePartialObserver<T>).error) {
const orgFunc = observerOrNext.next;
observerOrNext.next = (v: T) => setImmediate(() => {
if ((observerOrNext as NopePartialObserver<T>).active) {
orgFunc.call(observerOrNext, v);
}
});
} if ((observerOrNext as NopePartialObserver<T>).complete) {
const orgFunc = observerOrNext.next;
observerOrNext.next = (v: T) => setImmediate(() => {
if ((observerOrNext as NopePartialObserver<T>).active) {
orgFunc.call(observerOrNext, v);
}
});
}
break;
case 'function':
_observerOrNext = typeof observerOrNext === 'function' ? (_value: T) => setImmediate(observerOrNext, _value) : observerOrNext;
break;
default:
break;
}
_error = typeof errorOrcbMode === 'function' ? (_errorValue: any) => setImmediate(errorOrcbMode, _errorValue) : undefined;
_complete = typeof complete === 'function' ? () => setImmediate(complete) : undefined;
}
return super.subscribe(_observerOrNext, _error, _complete);
}
}