/** * @author Martin Karkowski * @email m.karkowski@zema.de * @create date 2020-11-23 08:06:30 * @modify date 2020-11-23 08:11:13 * @desc [description] */ import { injectable } from "inversify"; import { BehaviorSubject, Observable, Subscription } from "rxjs"; import { generateId } from "../helpers/idMethods"; import { callImmediate } from "../helpers/runtimeMethods"; import { INopeObservable, INopeObserver, INopePartialObserver, INopeSubscriptionOptions, INopeWaitForOpitions, IObservableCallback, IPartialObserver, IPipe, IwaitForCallback } from "../types/nope/nopeObservable.interface"; /** * RsJX based Observable. * * Contains additional Functionalities like: * - property with the current value * - function to publish values. (wrapper for next) * - enables performing a subscription with synced call or a immediate call. */ @injectable() export class NopeObservable implements INopeObservable { public observable: BehaviorSubject = new BehaviorSubject(undefined); public readonly id: string = generateId(); public _value: T; public options: any = {}; /** * Function to specify a Setter */ public setter: ((value: S | null, sender: string | null, timeStamp: number | null, ...data) => { data: T | null, valid: boolean, }) | null = null; /** * Function to specify a Getter */ public getter: ((value: T | null) => G | null) | null = null; protected _args: any[] = []; protected _sender: string; protected _timeStamp: number; /** * Function to * @param value * @param sender * @param timeStamp * @param data */ public setContent(value: S | null, sender: string | null = null, timeStamp: number | null = null, ...data): boolean { // Change the Value. if (this.setter !== null) { const adapted = this.setter(value as S, sender, timeStamp); if (!adapted.valid) { return false; } this._value = adapted.data; } else { // Adapt the Value if required. this._value = value as any as T; } const valueToPublish = this.getContent(); /** Publish the Data */ if (!this.disablePublishing && this.observable.value !== valueToPublish) { return this._publish(valueToPublish, sender, timeStamp); } return false; } protected _updateSenderAndTimestamp(sender: string | null = null, timestamp: number | null = null): { sender: string, timestamp: number } { // Define a Sender if required if (sender === null) { sender = this.id; } // Generate a Timestamp if required. if (this.options.generateTimeStamp === true) { timestamp = (timestamp === null) ? Date.now() : timestamp; } // return { sender, timestamp, }; } /** * Internal Function to Publish content * @param value * @param sender * @param timestamp * @param data */ protected _publish(value: G, sender: string | null = null, timestamp: number | null = null, ...data) { // Only Proceed if Publishing is required. if (this.disablePublishing === false) { // Define a Sender if required if (sender === null) { sender = this.id; } // Generate a Timestamp if required. if (this.options.generateTimeStamp === true) { timestamp = (timestamp === null) ? Date.now() : timestamp; } // Get the Value. this._args = data; this._sender = sender; this._timeStamp = timestamp; this.observable.next(value); return this.hasSubscriptions; } return false; } /** * Function to Force an Update. * @param sender Sender, which initiated the Update * @param timestamp The Timestamp of the Updaet * @param data Additional Args. */ public forcePublish(sender: string | null = null, timestamp: number | null = null, ...data): boolean { return this._publish(this.getContent(), sender, timestamp, ...data); } /** * A Set containing the Subscriptions */ public _subscriptions = new Set<() => void>(); /** * Flag to Disable Publishing */ public disablePublishing = false; /** * Function, used to dispose the observable. * Every item will be unsubscribed. */ public dispose(): void { for (const _unsubscribe of this._subscriptions) { _unsubscribe(); } this._subscriptions.clear(); this.observable.closed = true; } /** * Function to extract the Content. * If a Getter is provided, the Getter will be used * to Transform the item. */ public getContent(): G | null { if (this.getter !== null) return this.getter(this._value); return this._value as any as G; } /** * A Function to subscribe to updates of the Observable. * @param observer The Observer. Could be a Function or a Partial Observer. * @param mode The Mode of the Subscription * @param options Additional Options. */ public subscribe(observer: INopePartialObserver | IObservableCallback, mode: "immediate" | "sync" = "sync", options: INopeSubscriptionOptions = { mode: ["direct", "sub", "super"] }): INopeObserver { const _this = this; let active = true; let _observer: IPartialObserver; if (typeof observer === "object") { _observer = { next: (data: G) => { if (active && data !== undefined && observer.next) { if (mode === "immediate") { callImmediate(observer.next, data, this._sender, this._timeStamp, ..._this._args); } else { observer.next(data, this._sender, this._timeStamp, ..._this._args); } } }, complete: () => { if (observer.complete) { observer.complete(); } }, error: (error) => { if (observer.error) { observer.error(error); } }, }; } else if (typeof observer === "function") { _observer = { next: (data: G) => { if (active && data !== undefined) { if (mode === "immediate") { callImmediate(observer, data, this._sender, this._timeStamp, ..._this._args); } else { observer(data, this._sender, this._timeStamp, ..._this._args); } } }, complete: () => { }, error: (error) => { }, }; } // Create a Subscription. const subscription = this.observable.subscribe(_observer); const ret: INopeObserver = Object.assign(subscription, { options, pause: () => { active = false; }, unpause: () => { active = true; } }); return ret; } /** * Create an enhanced Subscription of the Observable. Use the Pipes, to * Define what should be subscribed. * @param next The Next Function, used to transmit changes * @param options The Options, used to determine the Enhancements. */ public enhancedSubscription(next: (data: K) => void, options: { scope?: { [index: string]: any }, pipe?: IPipe } = {}): Subscription { let observable: Observable = this as any as Observable; if (options.pipe) { observable = options.pipe(options.scope, this.observable); } const subscription = observable.subscribe({ next }); return subscription; } /** * Creates a Subscription for the value of the Observable. After one Update the Value will be deleted * @param func Function which is called when new Datas are pushed * @param mode Mode of the Subscription * @param options Additional Options */ once(func: IObservableCallback, mode?: "sync" | "immediate", options?: INopeSubscriptionOptions): INopeObserver { let ret: INopeObserver = null; ret = this.subscribe({ next: (...args) => { ret.unsubscribe(); func(...args); } }, mode, options); return ret; } /** * Async Function to Wait for an Update * @param mode Mode of the Subscription * @param options Additional Options for the Wait Function. */ public waitFor(testCallback: IwaitForCallback, options: INopeWaitForOpitions = { testCurrent: true }): Promise { const _this = this; const _isAsync = testCallback[Symbol.toStringTag] === "AsyncFunction"; if (_isAsync) { let called = false; return new Promise(async (resolve, reject) => { try { if (options.testCurrent && await testCallback(_this.getContent())) { resolve(_this.getContent()); } else { let subscription: INopeObserver = null; subscription = _this.subscribe({ next: async (content, ...args) => { if (!called && await testCallback(content, ...args)) { subscription.unsubscribe(); called = true; resolve(content); } } }); } } catch (e) { reject(e); } }); } else { return new Promise((resolve, reject) => { try { if (options.testCurrent && testCallback(_this.getContent())) { resolve(_this.getContent()); } else { let subscription: INopeObserver = null; subscription = _this.subscribe({ next: (content, ...args) => { if (testCallback(content, ...args)) { subscription.unsubscribe(); resolve(content); } } }); } } catch (e) { reject(e); } }); } } /** * Async Function to Wait for an Update * @param mode Mode of the Subscription * @param options Additional Options for the Wait Function. */ public waitForUpdate(mode?: "sync" | "immediate", options?: INopeSubscriptionOptions): Promise { const _this = this; return new Promise((resolve, reject) => { try { _this.once((content) => resolve(content), mode, options); } catch (e) { reject(e); } }); } public get hasSubscriptions(): boolean { return this.observable.observers.length > 0; } }