import { injectable } from 'inversify'; import { BehaviorSubject, CompletionObserver, ErrorObserver, NextObserver, Observable, Subscription } from 'rxjs'; import { generateId } from '../helpers/idMethods'; import { callImmediate } from '../helpers/runtimeMethods'; export type waitForCallback = (content?: T | null, sender?: string, timeStamp?: number | null, ...data) => boolean | Promise; export interface nopeWaitForOpitions { testFirst?: boolean; subscriptionMode?: 'immediate' | 'sync'; triggerMode?: Array<'sub' | 'super' | 'direct'>; } export interface nopeObserver extends Subscription { options: nopeSubscriptionOptions pause(): void; unpause(): void; } export interface nopeSubscriptionOptions { mode: Array<'sub' | 'super' | 'direct'> } export interface NopePartialObserver { next?: observableCallback, error?: (error: any) => void, complete?: () => void, } export declare type PartialObserver = (NextObserver | ErrorObserver | CompletionObserver); export type observableCallback = (content: T | null, sender: string, timeStamp: number | null, ...data) => void; export type pipe = (scope: { [index: string]: any }, observable: Observable) => Observable /** * RsJX based Observable. * * Contains additional Functionalities like: * - property with the current value * - function to publish values. (wrapper for next) * - enables performing a subscription with synced call or a immediate call. */ @injectable() export class nopeObservable { public observable: BehaviorSubject = new BehaviorSubject(undefined); public readonly id: string = generateId(); public _value: T; public options: any = {}; /** * Function to specify a Setter */ public setter: ((value: S | null, sender: string | null, timeStamp: number | null, ...data) => { data: T | null, valid: boolean, }) | null = null; /** * Function to specify a Getter */ public getter: ((value: T | null) => G | null) | null = null; protected _args: any[] = []; protected _sender: string; protected _timeStamp: number; /** * Function to * @param value * @param sender * @param timeStamp * @param data */ public setContent(value: S | null, sender: string | null = null, timeStamp: number | null = null, ...data): void { // Change the Value. if (this.setter !== null) { const adapted = this.setter(value as S, sender, timeStamp); if (!adapted.valid) { return; } this._value = adapted.data; } else { // Adapt the Value if required. this._value = value as any as T; } const valueToPublish = this.getContent(); /** Publish the Data */ if (!this.disablePublishing && this.observable.value !== valueToPublish) { this._publish(valueToPublish, sender, timeStamp); } } protected _updateSenderAndTimestamp(sender: string | null = null, timestamp: number | null = null,) { // Define a Sender if required if (sender === null) { sender = this.id; } // Generate a Timestamp if required. if (this.options.generateTimeStamp === true) { timestamp = (timestamp === null) ? Date.now() : timestamp; } // return { sender, timestamp, } } /** * Internal Function to Publish content * @param value * @param sender * @param timestamp * @param data */ protected _publish(value: G, sender: string | null = null, timestamp: number | null = null, ...data) { // Only Proceed if Publishing is required. if (this.disablePublishing === false) { // Define a Sender if required if (sender === null) { sender = this.id; } // Generate a Timestamp if required. if (this.options.generateTimeStamp === true) { timestamp = (timestamp === null) ? Date.now() : timestamp; } // Get the Value. this._args = data; this._sender = sender; this._timeStamp = timestamp; this.observable.next(value); } } /** * Function to Force an Update. * @param sender Sender, which initiated the Update * @param timestamp The Timestamp of the Updaet * @param data Additional Args. */ public forcePublish(sender: string | null = null, timestamp: number | null = null, ...data) { return this._publish(this.getContent(), sender, timestamp, ...data); } /** * A Set containing the Subscriptions */ public _subscriptions = new Set<() => void>(); /** * Flag to Disable Publishing */ public disablePublishing = false; /** * Function, used to dispose the observable. * Every item will be unsubscribed. */ public dispose(): void { for (const _unsubscribe of this._subscriptions) { _unsubscribe(); } this._subscriptions.clear(); this.observable.closed = true; } /** * Function to extract the Content. * If a Getter is provided, the Getter will be used * to Transform the item. */ public getContent(): G | null { if (this.getter !== null) return this.getter(this._value); return this._value as any as G; } /** * A Function to subscribe to updates of the Observable. * @param observer The Observer. Could be a Function or a Partial Observer. * @param mode The Mode of the Subscription * @param options Additional Options. */ public subscribe(observer: NopePartialObserver | observableCallback, mode: 'immediate' | 'sync' = 'sync', options: nopeSubscriptionOptions = { mode: ['direct', 'sub', 'super'] }): nopeObserver { const _this = this; let active = true; let _observer: PartialObserver; if (typeof observer === 'object') { _observer = { next: (data: G) => { if (active && data !== undefined && observer.next) { if (mode === 'immediate') { callImmediate(observer.next, data, this._sender, this._timeStamp, ..._this._args); } else { observer.next(data, this._sender, this._timeStamp, ..._this._args); } } }, complete: () => { if (observer.complete) { observer.complete(); } }, error: (error) => { if (observer.error) { observer.error(error); } }, } } else if (typeof observer === 'function') { _observer = { next: (data: G) => { if (active && data !== undefined) { if (mode === 'immediate') { callImmediate(observer, data, this._sender, this._timeStamp, ..._this._args); } else { observer(data, this._sender, this._timeStamp, ..._this._args); } } }, complete: () => { }, error: (error) => { }, } } // Create a Subscription. const subscription = this.observable.subscribe(_observer); const ret: nopeObserver = Object.assign(subscription, { options, pause: () => { active = false; }, unpause: () => { active = true; } }); return ret; } /** * Create an enhanced Subscription of the Observable. Use the Pipes, to * Define what should be subscribed. * @param next The Next Function, used to transmit changes * @param options The Options, used to determine the Enhancements. */ public enhancedSubscription(next: (data: K) => void, options: { scope?: { [index: string]: any }, pipe?: pipe } = {}) { let observable: Observable = this as any as Observable; if (options.pipe) { observable = options.pipe(options.scope, this.observable); } const subscription = observable.subscribe({ next }); return subscription; } /** * Creates a Subscription for the value of the Observable. After one Update the Value will be deleted * @param func Function which is called when new Datas are pushed * @param mode Mode of the Subscription * @param options Additional Options */ once(func: observableCallback, mode?: 'sync' | 'immediate', options?: nopeSubscriptionOptions): nopeObserver { let ret: nopeObserver = null; ret = this.subscribe({ next: (...args) => { ret.unsubscribe(); func(...args); } }, mode, options); return ret; } /** * Async Function to Wait for an Update * @param mode Mode of the Subscription * @param options Additional Options for the Wait Function. */ waitFor(testCallback: waitForCallback, options: nopeWaitForOpitions = { testFirst: true }): Promise { const _this = this; const _isAsync = testCallback[Symbol.toStringTag] === 'AsyncFunction'; if (_isAsync) { let called = false; return new Promise(async (resolve, reject) => { try { if (options.testFirst && await testCallback(_this.getContent())) { resolve(_this.getContent()); } else { let subscription: nopeObserver = null; subscription = _this.subscribe({ next: async (content, ...args) => { if (!called && await testCallback(content, ...args)) { subscription.unsubscribe(); called = true; resolve(content); } } }); } } catch (e) { reject(e); } }); } else { return new Promise((resolve, reject) => { try { if (options.testFirst && testCallback(_this.getContent())) { resolve(_this.getContent()); } else { let subscription: nopeObserver = null; subscription = _this.subscribe({ next: (content, ...args) => { if (testCallback(content, ...args)) { subscription.unsubscribe(); resolve(content); } } }); } } catch (e) { reject(e); } }); } } /** * Async Function to Wait for an Update * @param mode Mode of the Subscription * @param options Additional Options for the Wait Function. */ waitForUpdate(mode?: 'sync' | 'immediate', options?: nopeSubscriptionOptions): Promise { const _this = this; return new Promise((resolve, reject) => { try { _this.once((content) => resolve(content), mode, options); } catch (e) { reject(e); } }) } }