nope/lib/observables/nopeObservable.ts
2020-08-26 00:11:26 +02:00

303 lines
7.9 KiB
TypeScript

import { BehaviorSubject, CompletionObserver, ErrorObserver, NextObserver, Observable, Subscription } from 'rxjs';
import { generateId } from '../helpers/idMethods';
import { callImmediate } from '../helpers/runtimeMethods';
export type waitForCallback<T> = (content?: T | null, sender?: string, timeStamp?: number | null, ...data) => boolean | Promise<boolean>;
export interface nopeWaitForOpitions {
testFirst?: boolean;
subscriptionMode?: 'async' | 'sync';
triggerMode?: Array<'sub' | 'super' | 'direct'>;
}
export interface nopeObserver<T> extends Subscription {
options: nopeSubscriptionOptions
pause(): void;
unpause(): void;
}
export interface nopeSubscriptionOptions {
mode: Array<'sub' | 'super' | 'direct'>
}
export interface NopePartialObserver<T> {
next?: observableCallback<T>,
error?: (error: any) => void,
complete?: () => void,
}
export declare type PartialObserver<T> = (NextObserver<T> | ErrorObserver<T> | CompletionObserver<T>);
export type observableCallback<T> = (content: T | null, sender: string, timeStamp: number | null, ...data) => void;
export type pipe<T, K> = (scope: { [index: string]: any }, observable: Observable<T>) => Observable<K>
/**
* RsJX based Observable.
*
* Contains additional Functionalities like:
* - property with the current value
* - function to publish values. (wrapper for next)
* - enables performing a subscription with synced call or a immediate call.
*/
export class nopeObservable<T> {
public observable: BehaviorSubject<T> = new BehaviorSubject<T>(null);
public readonly id: string = generateId();
public _value: T;
public options: any = {};
/**
* Element containing the Value
*/
public set value(value: T | null) {
this.setContent(value, this.id, null, 'direct');
}
public get value(): T | null {
return this.getContent();
}
/**
* Function to specify a Setter
*/
public setter: ((value: T | null, sender: string | null, timeStamp: number | null, ...data) => {
data: T | null,
valid: boolean,
}) | null = null;
/**
* Function to specify a Getter
*/
public getter: ((value: T | null) => T | null) | null = null;
protected _args: any[] = [];
protected _sender: string;
protected _timeStamp: number;
public setContent(value: T | null, sender: string | null = null, timeStamp: number | null = null, ...data): void {
// Define a Sender if required
if (sender === null) {
sender = this.id;
}
// Generate a Timestamp if required.
if (this.options.generateTimeStamp === true) {
timeStamp = (timeStamp === null) ? Date.now() : timeStamp;
}
// Adapt the Value if required.
this._value = value;
// Change the Value.
if (this.setter !== null) {
const adapted = this.setter(value, sender, timeStamp);
if (!adapted.valid) {
return;
}
this._value = adapted.data;
}
/** Publish the Data */
if (!this.disablePublishing && this.observable.value !== this._value) {
this._args = data;
this._sender = sender;
this._timeStamp = timeStamp;
this.observable.next(this.getContent());
}
}
/**
* A Set containing the Subscriptions
*/
protected _subscriptions = new Set<() => void>();
/**
* Flag to Disable Publishing
*/
public disablePublishing = false;
public dispose(): void {
for (const _unsubscribe of this._subscriptions) {
_unsubscribe();
}
this._subscriptions.clear();
this.observable.closed = true;
}
public getContent(): T | null {
if (this.getter !== null)
return (this.getter as (value: T | null) => T | null)(this._value);
return this._value;
}
public subscribe(observer: NopePartialObserver<T>,
mode: 'immediate' | 'sync' = 'sync',
options: nopeSubscriptionOptions = {
mode: ['direct', 'sub', 'super']
}): nopeObserver<T> {
const _this = this;
let active = true;
const _observer: PartialObserver<T> = {
next: (data: T) => {
if (active && observer.next) {
if (mode === 'immediate') {
callImmediate(observer.next, data, this._sender, this._timeStamp, ..._this._args);
} else {
observer.next(data, this._sender, this._timeStamp, ..._this._args);
}
}
},
complete: () => {
if (observer.complete) {
observer.complete();
}
},
error: (error) => {
if (observer.error) {
observer.error(error);
}
},
}
// Create a Subscription.
const subscription = this.observable.subscribe(_observer);
const ret: nopeObserver<T> = Object.assign(subscription, {
options,
pause: () => {
active = false;
},
unpause: () => {
active = true;
}
});
return ret;
}
/**
* Additional Function for the Observable.
* @param next
* @param options
*/
public enhancedSubscription<K>(next: (data: K) => void, options: {
scope?: { [index: string]: any },
pipe?: pipe<T, K>
} = {}) {
let observable: Observable<K> = this as any as Observable<K>;
if (options.pipe) {
observable = options.pipe(options.scope, this.observable);
}
const subscription = observable.subscribe({
next
});
return subscription;
}
/**
* Creates a Subscription for the value of the Observable. After one Update the Value will be deleted
* @param func Function which is called when new Datas are pushed
* @param mode Mode of the Subscription
* @param options Additional Options
*/
once(func: observableCallback<T>, mode?: 'sync' | 'immediate', options?: nopeSubscriptionOptions): nopeObserver<T> {
let ret: nopeObserver<T> = null;
ret = this.subscribe({
next: (...args) => {
ret.unsubscribe();
func(...args);
}
}, mode, options);
return ret;
}
/**
* Async Function to Wait for an Update
* @param mode Mode of the Subscription
* @param options Additional Options for the Wait Function.
*/
waitFor(testCallback: waitForCallback<T>, options: nopeWaitForOpitions = { testFirst: true }): Promise<T> {
const _this = this;
const _isAsync = testCallback[Symbol.toStringTag] === 'AsyncFunction';
if (_isAsync) {
let called = false;
return new Promise<T>(async (resolve, reject) => {
try {
if (options.testFirst && await testCallback(_this.getContent())) {
resolve(_this.getContent());
} else {
let subscription: nopeObserver<T> = null;
subscription = _this.subscribe({
next: async (content, ...args) => {
if (!called && await testCallback(content, ...args)) {
subscription.unsubscribe();
called = true;
resolve(content);
}
}
});
}
} catch (e) {
reject(e);
}
});
} else {
return new Promise<T>((resolve, reject) => {
try {
if (options.testFirst && testCallback(_this.getContent())) {
resolve(_this.getContent());
} else {
let subscription: nopeObserver<T> = null;
subscription = _this.subscribe({
next: (content, ...args) => {
if (testCallback(content, ...args)) {
subscription.unsubscribe();
resolve(content);
}
}
});
}
} catch (e) {
reject(e);
}
});
}
}
/**
* Async Function to Wait for an Update
* @param mode Mode of the Subscription
* @param options Additional Options for the Wait Function.
*/
waitForUpdate(mode?: 'sync' | 'immediate', options?: nopeSubscriptionOptions): Promise<T> {
const _this = this;
return new Promise<T>((resolve, reject) => {
try {
_this.once((content) => resolve(content), mode, options);
} catch (e) {
reject(e);
}
})
}
}