nope/lib/observables/nopeObservable.ts
2020-08-25 12:33:33 +02:00

149 lines
5.0 KiB
TypeScript

import { BehaviorSubject, CompletionObserver, ErrorObserver, NextObserver, Observable, Subscription } from 'rxjs';
export interface nopeObserver {
active: boolean;
}
export declare type NopePartialObserver<T> = NextObserver<T> & nopeObserver | ErrorObserver<T> & nopeObserver | CompletionObserver<T> & nopeObserver;
export type pipe<T, K> = (scope: { [index: string]: any }, observable: Observable<T>) => Observable<K>
/**
* RsJX based Observable.
*
* Contains additional Functionalities like:
* - property with the current value
* - function to publish values. (wrapper for next)
* - enables performing a subscription with synced call or a immediate call.
*/
export class nopeObservable<T> extends BehaviorSubject<T> {
protected _currentValue: T;
/**
* Getter for the Current Value.
*/
public get currentValue(): T {
return this._currentValue;
}
/**
* Setter for the Current value.
*/
public set currentValue(value: T) {
this.next(value);
}
/**
* Creates a custom Observable.
* @param initialValue
*/
constructor(initialValue: T) {
super(initialValue);
const _this = this;
this.subscribe((data) => {
_this._currentValue = data;
});
}
/**
* Function to Publish a Value to the stream.
* @param value The Value.
*/
publish(value: T) {
this.next(value);
}
subscribe(observer?: NopePartialObserver<T>, cbMode?: 'sync' | 'immediate'): Subscription;
/** @deprecated Use an observer instead of a complete callback */
subscribe(next: null | undefined, error: null | undefined, complete: () => void, cbMode?: 'sync' | 'immediate'): Subscription;
/** @deprecated Use an observer instead of an error callback */
subscribe(next: null | undefined, error: (error: any) => void, complete?: () => void, cbMode?: 'sync' | 'immediate'): Subscription;
/** @deprecated Use an observer instead of a complete callback */
subscribe(next: (value: T) => void, error: null | undefined, complete: () => void, cbMode?: 'sync' | 'immediate'): Subscription;
subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void, cbMode?: 'sync' | 'immediate'): Subscription;
// Underlying implementation
subscribe(observerOrNext?, errorOrcbMode?, complete?: () => void, cbMode?: 'sync' | 'immediate'): Subscription {
// The first argument could be an Observer or Callback (Depricated) and the callbacks for complete and error
let _cbMode = cbMode;
let _error = errorOrcbMode;
let _complete = complete;
let _observerOrNext = observerOrNext;
// If the first argument is an Observer => the second argument must be the cbMode
if (typeof observerOrNext === 'object') {
// Adapt the Callback
_cbMode = errorOrcbMode || 'sync';
// The error function must be undefined
_error = undefined;
}
// Test the cbMode
if (_cbMode === 'immediate') {
switch (typeof observerOrNext) {
case 'object':
if ((observerOrNext as NopePartialObserver<T>).next) {
const orgFunc = observerOrNext.next;
observerOrNext.next = (v: T) => setImmediate(() => {
if ((observerOrNext as NopePartialObserver<T>).active) {
orgFunc.call(observerOrNext, v);
}
});
}
if ((observerOrNext as NopePartialObserver<T>).error) {
const orgFunc = observerOrNext.next;
observerOrNext.next = (v: T) => setImmediate(() => {
if ((observerOrNext as NopePartialObserver<T>).active) {
orgFunc.call(observerOrNext, v);
}
});
} if ((observerOrNext as NopePartialObserver<T>).complete) {
const orgFunc = observerOrNext.next;
observerOrNext.next = (v: T) => setImmediate(() => {
if ((observerOrNext as NopePartialObserver<T>).active) {
orgFunc.call(observerOrNext, v);
}
});
}
break;
case 'function':
_observerOrNext = typeof observerOrNext === 'function' ? (_value: T) => setImmediate(observerOrNext, _value) : observerOrNext;
break;
default:
break;
}
_error = typeof errorOrcbMode === 'function' ? (_errorValue: any) => setImmediate(errorOrcbMode, _errorValue) : undefined;
_complete = typeof complete === 'function' ? () => setImmediate(complete) : undefined;
}
return super.subscribe(_observerOrNext, _error, _complete);
}
/**
* Function to enhance the Subscription
* @param next Method, that will be called on next.
* @param options
*/
public enhancedSubscribe<K>(next: (data: K) => void, options: {
scope?: { [index: string]: any },
pipe?: pipe<T, K>
} = {}) {
let observable: Observable<K> = this as any as Observable<K>;
if (options.pipe) {
observable = options.pipe(options.scope, this);
}
const subscription = observable.subscribe({
next
});
const unsubscribe = () => { subscription.unsubscribe(); }
return unsubscribe;
}
}