122 lines
4.3 KiB
TypeScript
122 lines
4.3 KiB
TypeScript
import { BehaviorSubject, CompletionObserver, ErrorObserver, NextObserver, Subscription } from 'rxjs';
|
|
|
|
export interface nopeObserver {
|
|
active: boolean;
|
|
}
|
|
|
|
export declare type NopePartialObserver<T> = NextObserver<T> & nopeObserver | ErrorObserver<T> & nopeObserver | CompletionObserver<T> & nopeObserver;
|
|
|
|
/**
|
|
* RsJX based Observable.
|
|
*
|
|
* Contains additional Functionalities like:
|
|
* - property with the current value
|
|
* - function to publish values. (wrapper for next)
|
|
* - enables performing a subscription with synced call or a immediate call.
|
|
*/
|
|
export class nopeObservable<T> extends BehaviorSubject<T> {
|
|
|
|
protected _currentValue: T;
|
|
|
|
/**
|
|
* Getter for the Current Value.
|
|
*/
|
|
public get currentValue(): T {
|
|
return this._currentValue;
|
|
}
|
|
|
|
/**
|
|
* Setter for the Current value.
|
|
*/
|
|
public set currentValue(value: T) {
|
|
this.next(value);
|
|
}
|
|
|
|
/**
|
|
* Creates a custom Observable.
|
|
* @param initialValue
|
|
*/
|
|
constructor(initialValue: T) {
|
|
super(initialValue);
|
|
|
|
const _this = this;
|
|
this.subscribe((data) => {
|
|
_this._currentValue = data;
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Function to Publish a Value to the stream.
|
|
* @param value The Value.
|
|
*/
|
|
publish(value: T) {
|
|
this.next(value);
|
|
}
|
|
|
|
subscribe(observer?: NopePartialObserver<T>, cbMode?: 'sync' | 'immediate'): Subscription;
|
|
/** @deprecated Use an observer instead of a complete callback */
|
|
subscribe(next: null | undefined, error: null | undefined, complete: () => void, cbMode?: 'sync' | 'immediate'): Subscription;
|
|
/** @deprecated Use an observer instead of an error callback */
|
|
subscribe(next: null | undefined, error: (error: any) => void, complete?: () => void, cbMode?: 'sync' | 'immediate'): Subscription;
|
|
/** @deprecated Use an observer instead of a complete callback */
|
|
subscribe(next: (value: T) => void, error: null | undefined, complete: () => void, cbMode?: 'sync' | 'immediate'): Subscription;
|
|
subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void, cbMode?: 'sync' | 'immediate'): Subscription;
|
|
|
|
// Underlying implementation
|
|
subscribe(observerOrNext?, errorOrcbMode?, complete?: () => void, cbMode?: 'sync' | 'immediate'): Subscription {
|
|
// The first argument could be an Observer or Callback (Depricated) and the callbacks for complete and error
|
|
let _cbMode = cbMode;
|
|
let _error = errorOrcbMode;
|
|
let _complete = complete;
|
|
let _observerOrNext = observerOrNext;
|
|
|
|
// If the first argument is an Observer => the second argument must be the cbMode
|
|
if (typeof observerOrNext === 'object') {
|
|
// Adapt the Callback
|
|
_cbMode = errorOrcbMode || 'sync';
|
|
// The error function must be undefined
|
|
_error = undefined;
|
|
}
|
|
|
|
// Test the cbMode
|
|
if (_cbMode === 'immediate') {
|
|
switch (typeof observerOrNext) {
|
|
case 'object':
|
|
if ((observerOrNext as NopePartialObserver<T>).next) {
|
|
const orgFunc = observerOrNext.next;
|
|
observerOrNext.next = (v: T) => setImmediate(() => {
|
|
if ((observerOrNext as NopePartialObserver<T>).active) {
|
|
orgFunc.call(observerOrNext, v);
|
|
}
|
|
});
|
|
}
|
|
if ((observerOrNext as NopePartialObserver<T>).error) {
|
|
const orgFunc = observerOrNext.next;
|
|
observerOrNext.next = (v: T) => setImmediate(() => {
|
|
if ((observerOrNext as NopePartialObserver<T>).active) {
|
|
orgFunc.call(observerOrNext, v);
|
|
}
|
|
});
|
|
} if ((observerOrNext as NopePartialObserver<T>).complete) {
|
|
const orgFunc = observerOrNext.next;
|
|
observerOrNext.next = (v: T) => setImmediate(() => {
|
|
if ((observerOrNext as NopePartialObserver<T>).active) {
|
|
orgFunc.call(observerOrNext, v);
|
|
}
|
|
});
|
|
}
|
|
break;
|
|
case 'function':
|
|
_observerOrNext = typeof observerOrNext === 'function' ? (_value: T) => setImmediate(observerOrNext, _value) : observerOrNext;
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
|
|
_error = typeof errorOrcbMode === 'function' ? (_errorValue: any) => setImmediate(errorOrcbMode, _errorValue) : undefined;
|
|
_complete = typeof complete === 'function' ? () => setImmediate(complete) : undefined;
|
|
}
|
|
|
|
return super.subscribe(_observerOrNext, _error, _complete);
|
|
}
|
|
} |