nope/lib/observables/nopeObservable.ts
2021-10-18 08:02:29 +02:00

476 lines
11 KiB
TypeScript

/**
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2020-11-23 08:06:30
* @modify date 2020-12-30 16:57:52
* @desc [description]
*/
import { BehaviorSubject, Observable, Subscription } from "rxjs";
import { generateId } from "../helpers/idMethods";
import { callImmediate } from "../helpers/runtimeMethods";
import { getNopeLogger } from "../logger/getLogger";
import {
INopeObservable,
INopeObserver,
INopePartialObserver,
INopeSubscriptionOptions,
INopeWaitForOpitions,
IObservableCallback,
IPartialObserver,
IPipe,
IwaitForCallback
} from "../types/nope/nopeObservable.interface";
const logger = getNopeLogger("obervable");
/**
* RsJX based Observable.
*
* Contains additional Functionalities like:
* - property with the current value
* - function to publish values. (wrapper for next)
* - enables performing a subscription with synced call or a immediate call.
*/
export class NopeObservable<T, S = T, G = T>
implements INopeObservable<T, S, G>
{
public observable: BehaviorSubject<G> = new BehaviorSubject<G>(undefined);
public readonly id: string = generateId();
public _value: T;
public options: any = {};
/**
* Function to specify a Setter
*/
public setter:
| ((
value: S | null,
sender: string | null,
timeStamp: number | null,
...data
) => {
data: T | null;
valid: boolean;
})
| null = null;
/**
* Function to specify a Getter
*/
public getter: ((value: T | null) => G | null) | null = null;
protected _args: any[] = [];
protected _sender: string;
protected _timeStamp: number;
/**
* Function to
* @param value
* @param sender
* @param timeStamp
* @param data
*/
public setContent(
value: S | null,
sender: string | null = null,
timeStamp: number | null = null,
...data
): boolean {
// Change the Value.
if (this.setter !== null) {
const adapted = this.setter(value as S, sender, timeStamp);
if (!adapted.valid) {
return false;
}
this._value = adapted.data;
} else {
// Adapt the Value if required.
this._value = value as any as T;
}
const valueToPublish = this.getContent();
/** Publish the Data */
if (!this.disablePublishing && this.observable.value !== valueToPublish) {
return this._publish(valueToPublish, sender, timeStamp);
}
return false;
}
protected _updateSenderAndTimestamp(
sender: string | null = null,
timestamp: number | null = null
): {
sender: string;
timestamp: number;
} {
// Define a Sender if required
if (sender === null) {
sender = this.id;
}
// Generate a Timestamp if required.
if (this.options.generateTimeStamp === true) {
timestamp = timestamp === null ? Date.now() : timestamp;
}
//
return {
sender,
timestamp
};
}
/**
* Internal Function to Publish content
* @param value
* @param sender
* @param timestamp
* @param data
*/
protected _publish(
value: G,
sender: string | null = null,
timestamp: number | null = null,
...data
) {
// Only Proceed if Publishing is required.
if (this.disablePublishing === false) {
// Define a Sender if required
if (sender === null) {
sender = this.id;
}
// Generate a Timestamp if required.
if (this.options.generateTimeStamp === true) {
timestamp = timestamp === null ? Date.now() : timestamp;
}
// Get the Value.
this._args = data;
this._sender = sender;
this._timeStamp = timestamp;
this.observable.next(value);
return this.hasSubscriptions;
}
return false;
}
/**
* Function to Force an Update.
* @param sender Sender, which initiated the Update
* @param timestamp The Timestamp of the Updaet
* @param data Additional Args.
*/
public forcePublish(
sender: string | null = null,
timestamp: number | null = null,
...data
): boolean {
return this._publish(this.getContent(), sender, timestamp, ...data);
}
/**
* A Set containing the Subscriptions
*/
public _subscriptions = new Set<() => void>();
/**
* Flag to Disable Publishing
*/
public disablePublishing = false;
/**
* Function, used to dispose the observable.
* Every item will be unsubscribed.
*/
public dispose(): void {
for (const _unsubscribe of this._subscriptions) {
_unsubscribe();
}
this._subscriptions.clear();
this.observable.closed = true;
}
/**
* Function to extract the Content.
* If a Getter is provided, the Getter will be used
* to Transform the item.
*/
public getContent(): G | null {
if (this.getter !== null) return this.getter(this._value);
return this._value as any as G;
}
protected _lastDataUpdate: number;
/**
* A Function to subscribe to updates of the Observable.
* @param observer The Observer. Could be a Function or a Partial Observer.
* @param mode The Mode of the Subscription
* @param options Additional Options.
*/
public subscribe(
observer: INopePartialObserver<G> | IObservableCallback<G>,
mode: "immediate" | "sync" = "sync",
options: INopeSubscriptionOptions = {
mode: ["direct", "sub", "super"],
skipCurrent: false
}
): INopeObserver {
const _this = this;
let active = true;
let _observer: IPartialObserver<G>;
let _first = true;
if (typeof observer === "object") {
_observer = {
next: (data: G) => {
// Make shure we are skipping the current Item, if desired
if (_first && options.skipCurrent) {
_first = false;
return;
}
_first = false;
if (active && data !== undefined && observer.next) {
if (mode === "immediate") {
callImmediate(
observer.next,
data,
this._sender,
this._timeStamp,
..._this._args
);
} else {
observer.next(
data,
this._sender,
this._timeStamp,
..._this._args
);
}
}
},
complete: () => {
if (observer.complete) {
observer.complete();
}
},
error: (error) => {
if (observer.error) {
observer.error(error);
}
}
};
} else if (typeof observer === "function") {
_observer = {
next: (data: G) => {
// Make shure we are skipping the current Item, if desired
if (_first && options.skipCurrent) {
_first = false;
return;
}
_first = false;
if (active && data !== undefined) {
if (mode === "immediate") {
callImmediate(
observer,
data,
this._sender,
this._timeStamp,
..._this._args
);
} else {
observer(data, this._sender, this._timeStamp, ..._this._args);
}
}
},
complete: () => { },
error: (error) => {
logger.error("");
logger.error(error);
}
};
}
// Create a Subscription.
const subscription = this.observable.subscribe(_observer);
const ret: INopeObserver = Object.assign(subscription, {
options,
pause: () => {
active = false;
},
unpause: () => {
active = true;
}
});
active = true;
// if (!options.skipCurrent) {
// const _this = this;
// const _start = Date.now();
// callImmediate(() => {
// if (_this._lastDataUpdate < _start) {
// _observer.next(_this.getContent());
// }
// });
// }
return ret;
}
/**
* Create an enhanced Subscription of the Observable. Use the Pipes, to
* Define what should be subscribed.
* @param next The Next Function, used to transmit changes
* @param options The Options, used to determine the Enhancements.
*/
public enhancedSubscription<K>(
next: (data: K) => void,
options: {
scope?: { [index: string]: any };
pipe?: IPipe<T | G, K>;
} = {}
): Subscription {
let observable: Observable<K> = this as any as Observable<K>;
if (options.pipe) {
observable = options.pipe(options.scope, this.observable);
}
const subscription = observable.subscribe({
next
});
return subscription;
}
/**
* Creates a Subscription for the value of the Observable. After one Update the Value will be deleted
* @param func Function which is called when new Datas are pushed
* @param mode Mode of the Subscription
* @param options Additional Options
*/
once(
func: IObservableCallback<G>,
mode?: "sync" | "immediate",
options?: INopeSubscriptionOptions
): INopeObserver {
let ret: INopeObserver = null;
ret = this.subscribe(
{
next: (...args) => {
ret.unsubscribe();
func(...args);
}
},
mode,
options
);
return ret;
}
/**
* Async Function to Wait for an Update
* @param mode Mode of the Subscription
* @param options Additional Options for the Wait Function.
*/
public waitFor(
testCallback: IwaitForCallback<G> = (value) =>
(value as any as boolean) == true,
options: INopeWaitForOpitions = { testCurrent: true }
): Promise<G> {
const _this = this;
let resolved = false;
let subscription: INopeObserver = null;
return new Promise<G>((resolve, reject) => {
const finish = (error: any, test: boolean, data: G) => {
// Reject the error.
if (error) {
reject(error);
}
// Unsubscribe the Subscription.
if (test && subscription) {
subscription.unsubscribe();
subscription = null;
}
if (test && !resolved) {
// Mark the Task as Resolved.
resolved = true;
resolve(data);
}
};
let first = true;
const checkData = (data: G) => {
if (first && options.testCurrent || !first) {
// Create a promise of the data
const prom = Promise.resolve(testCallback(_this.getContent()));
// Now we link the element
prom.catch(e => finish(e, false, data));
prom.then(r => finish(false, r, data));
}
first = false;
};
try {
subscription = _this.subscribe(data => checkData(data));
} catch (e) {
reject(e);
}
});
}
/**
* Async Function to Wait for an Update
* @param mode Mode of the Subscription
* @param options Additional Options for the Wait Function.
*/
public waitForUpdate(
mode?: "sync" | "immediate",
options?: INopeSubscriptionOptions
): Promise<G> {
const _this = this;
return new Promise<G>((resolve, reject) => {
try {
_this.once((content) => resolve(content), mode, options);
} catch (e) {
reject(e);
}
});
}
public get hasSubscriptions(): boolean {
return this.observable.observers.length > 0;
}
public get observerLength(): number {
return this.observable.observers.length;
}
}