nope/lib/observables/nopeObservable.ts

457 lines
11 KiB
TypeScript
Raw Normal View History

2020-11-24 14:14:56 +00:00
/**
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2020-11-23 08:06:30
2021-10-19 19:27:30 +00:00
* @modify date 2021-10-19 17:55:35
2020-11-24 14:14:56 +00:00
* @desc [description]
*/
import { BehaviorSubject, Observable, Subscription } from "rxjs";
2021-10-19 19:27:30 +00:00
import { map } from "rxjs/operators";
2020-11-24 14:14:56 +00:00
import { generateId } from "../helpers/idMethods";
import { callImmediate } from "../helpers/runtimeMethods";
2021-10-18 06:02:29 +00:00
import { getNopeLogger } from "../logger/getLogger";
import {
INopeObservable,
INopeObserver,
INopePartialObserver,
INopeSubscriptionOptions,
2021-10-19 19:27:30 +00:00
INopeWaitForOpitions, IObservableAdditionalData, IObservableCallback,
IObservableType,
IPartialObserver,
2021-10-19 19:27:30 +00:00
IPipe, IwaitForCallback
} from "../types/nope/nopeObservable.interface";
2020-08-21 14:49:48 +00:00
2021-10-18 06:02:29 +00:00
const logger = getNopeLogger("obervable");
2020-08-21 14:49:48 +00:00
/**
* RsJX based Observable.
*
2020-08-21 14:49:48 +00:00
* Contains additional Functionalities like:
* - property with the current value
* - function to publish values. (wrapper for next)
* - enables performing a subscription with synced call or a immediate call.
*/
export class NopeObservable<T, S = T, G = T>
2021-08-30 04:40:51 +00:00
implements INopeObservable<T, S, G>
{
2021-10-19 19:27:30 +00:00
public observable: BehaviorSubject<IObservableType<G>> = new BehaviorSubject<IObservableType<G>>(undefined);
2020-08-25 22:11:26 +00:00
public readonly id: string = generateId();
2020-08-21 14:49:48 +00:00
2020-08-25 22:11:26 +00:00
public _value: T;
2021-10-19 19:27:30 +00:00
public options: any = {
generateTimeStamp: true
};
2020-08-21 14:49:48 +00:00
/**
2020-08-25 22:11:26 +00:00
* Function to specify a Setter
2020-08-21 14:49:48 +00:00
*/
public setter:
| ((
2021-10-18 06:02:29 +00:00
value: S | null,
sender: string | null,
timeStamp: number | null,
...data
) => {
data: T | null;
valid: boolean;
})
| null = null;
2020-08-21 14:49:48 +00:00
/**
2020-08-25 22:11:26 +00:00
* Function to specify a Getter
2020-08-21 14:49:48 +00:00
*/
2020-08-30 10:00:31 +00:00
public getter: ((value: T | null) => G | null) | null = null;
2020-08-21 14:49:48 +00:00
2020-08-30 10:00:31 +00:00
/**
2021-10-19 19:27:30 +00:00
* Function to set the content of the Observable
* @param value
* @param sender
* @param timeStamp
* @param data
2020-08-30 10:00:31 +00:00
*/
public setContent(
value: S | null,
2021-10-19 19:27:30 +00:00
options: IObservableAdditionalData = {}
): boolean {
2020-08-30 10:00:31 +00:00
// Change the Value.
if (this.setter !== null) {
2021-10-19 19:27:30 +00:00
const adapted = this.setter(value as S, options.sender, options.timestamp);
2020-08-25 22:11:26 +00:00
2020-08-30 10:00:31 +00:00
if (!adapted.valid) {
2020-11-24 14:14:56 +00:00
return false;
2020-08-30 10:00:31 +00:00
}
this._value = adapted.data;
} else {
// Adapt the Value if required.
2021-08-30 04:40:51 +00:00
this._value = value as any as T;
2020-08-30 10:00:31 +00:00
}
const valueToPublish = this.getContent();
/** Publish the Data */
2021-10-19 19:27:30 +00:00
if (!this.disablePublishing && (options.forced || this.observable.value?.value !== valueToPublish)) {
return this._publish(valueToPublish, options);
2020-08-30 10:00:31 +00:00
}
2020-11-16 12:06:45 +00:00
return false;
2020-08-30 10:00:31 +00:00
}
2020-08-25 22:11:26 +00:00
2021-10-19 19:27:30 +00:00
/**
* Helper to update the Timestamp and sender
*
* @author M.Karkowski
* @protected
* @param {IObservableAdditionalData} options
* @return {*} {ISetContentOptions}
* @memberof NopeObservable
*/
protected _updateSenderAndTimestamp(options: IObservableAdditionalData): IObservableAdditionalData {
2020-08-25 22:11:26 +00:00
// Define a Sender if required
2021-10-19 19:27:30 +00:00
if (options.sender === undefined) {
options.sender = this.id;
2020-08-25 22:11:26 +00:00
}
// Generate a Timestamp if required.
if (this.options.generateTimeStamp === true) {
2021-10-19 19:27:30 +00:00
options.timestamp = options.timestamp === undefined ? Date.now() : options.timestamp;
2020-08-25 22:11:26 +00:00
}
2021-10-19 19:27:30 +00:00
// Return the adapted element.
return options;
2020-08-30 10:00:31 +00:00
}
2020-08-25 22:11:26 +00:00
2020-08-30 10:00:31 +00:00
/**
* Internal Function to Publish content
2021-10-19 19:27:30 +00:00
*
* @author M.Karkowski
* @protected
* @param {G} value The value to use.
* @param {IObservableAdditionalData} [options={}]
* @return {*} {boolean}
* @memberof NopeObservable
2020-08-30 10:00:31 +00:00
*/
protected _publish(
value: G,
2021-10-19 19:27:30 +00:00
options: IObservableAdditionalData = {}
): boolean {
2020-08-30 10:00:31 +00:00
// Only Proceed if Publishing is required.
2021-10-19 19:27:30 +00:00
if (options.forced || this.disablePublishing === false) {
options = this._updateSenderAndTimestamp(options);
2020-08-25 22:11:26 +00:00
2021-10-19 19:27:30 +00:00
// Define the value.
this.observable.next({ value, ...options });
2020-11-16 12:06:45 +00:00
return this.hasSubscriptions;
2020-08-25 22:11:26 +00:00
}
2020-11-16 12:06:45 +00:00
return false;
2020-08-21 14:49:48 +00:00
}
2020-08-30 10:00:31 +00:00
/**
2021-10-19 19:27:30 +00:00
* Function to Force an Update
*
* @author M.Karkowski
* @param {IObservableAdditionalData} options Options which might be relevant
* @return {*} {boolean}
* @memberof NopeObservable
2020-08-30 10:00:31 +00:00
*/
2021-10-19 19:27:30 +00:00
public forcePublish(options: IObservableAdditionalData = {}): boolean {
options.forced = true;
return this._publish(this.getContent(), options);
2020-08-30 10:00:31 +00:00
}
2020-08-21 14:49:48 +00:00
/**
2020-08-25 22:11:26 +00:00
* A Set containing the Subscriptions
2020-08-21 14:49:48 +00:00
*/
public _subscriptions = new Set<() => void>();
2020-08-21 14:49:48 +00:00
2020-08-25 22:11:26 +00:00
/**
* Flag to Disable Publishing
*/
public disablePublishing = false;
2020-08-30 10:00:31 +00:00
/**
* Function, used to dispose the observable.
* Every item will be unsubscribed.
*/
2020-08-25 22:11:26 +00:00
public dispose(): void {
for (const _unsubscribe of this._subscriptions) {
_unsubscribe();
2020-08-21 14:49:48 +00:00
}
2020-08-25 22:11:26 +00:00
this._subscriptions.clear();
this.observable.closed = true;
}
2020-08-30 10:00:31 +00:00
/**
* Function to extract the Content.
* If a Getter is provided, the Getter will be used
* to Transform the item.
*/
public getContent(): G | null {
if (this.getter !== null) return this.getter(this._value);
2021-08-30 04:40:51 +00:00
return this._value as any as G;
2020-08-25 22:11:26 +00:00
}
2021-10-18 06:02:29 +00:00
protected _lastDataUpdate: number;
2020-08-30 08:32:48 +00:00
/**
* A Function to subscribe to updates of the Observable.
* @param observer The Observer. Could be a Function or a Partial Observer.
* @param mode The Mode of the Subscription
* @param options Additional Options.
*/
public subscribe(
observer: INopePartialObserver<G> | IObservableCallback<G>,
options: INopeSubscriptionOptions = {
2021-10-19 19:27:30 +00:00
type: "sync",
2021-09-04 11:45:52 +00:00
mode: ["direct", "sub", "super"],
skipCurrent: false
}
): INopeObserver {
2020-08-25 22:11:26 +00:00
const _this = this;
2021-10-18 06:02:29 +00:00
let active = true;
2021-10-19 19:27:30 +00:00
let _observer: IPartialObserver<IObservableType<G>>;
2021-10-18 06:02:29 +00:00
let _first = true;
2020-08-30 08:32:48 +00:00
2020-11-24 14:14:56 +00:00
if (typeof observer === "object") {
2020-08-30 08:32:48 +00:00
_observer = {
2021-10-19 19:27:30 +00:00
next: (data: IObservableType<G>) => {
2021-10-18 06:02:29 +00:00
// Make shure we are skipping the current Item, if desired
if (_first && options.skipCurrent) {
_first = false;
return;
}
_first = false;
if (active && data !== undefined && observer.next) {
2021-10-19 19:27:30 +00:00
const { value, ...rest } = data;
switch (options.type) {
case "immediate":
callImmediate(
observer.next,
value,
rest
);
break;
default:
case "sync":
observer.next(
value,
rest
);
break;
2020-08-30 08:32:48 +00:00
}
2020-08-25 22:11:26 +00:00
}
2020-08-30 08:32:48 +00:00
},
complete: () => {
if (observer.complete) {
observer.complete();
}
},
error: (error) => {
if (observer.error) {
observer.error(error);
}
}
2020-11-24 14:14:56 +00:00
};
} else if (typeof observer === "function") {
2020-08-30 08:32:48 +00:00
_observer = {
2021-10-19 19:27:30 +00:00
next: (data: IObservableType<G>) => {
2021-10-18 06:02:29 +00:00
// Make shure we are skipping the current Item, if desired
if (_first && options.skipCurrent) {
_first = false;
return;
}
_first = false;
if (active && data !== undefined) {
2021-10-19 19:27:30 +00:00
const { value, ...rest } = data;
switch (options.type) {
case "immediate":
callImmediate(
observer,
value,
rest
);
break;
default:
case "sync":
observer(
value,
rest
);
break;
2020-08-30 08:32:48 +00:00
}
}
},
2021-10-19 19:27:30 +00:00
complete: () => {
// Nothing to do here
},
2021-10-18 06:02:29 +00:00
error: (error) => {
logger.error("");
logger.error(error);
}
2020-11-24 14:14:56 +00:00
};
}
2020-08-21 14:49:48 +00:00
2020-08-25 22:11:26 +00:00
// Create a Subscription.
const subscription = this.observable.subscribe(_observer);
const ret: INopeObserver = Object.assign(subscription, {
2020-08-25 22:11:26 +00:00
options,
pause: () => {
active = false;
},
unpause: () => {
active = true;
}
});
2021-09-04 11:45:52 +00:00
active = true;
2020-08-25 22:11:26 +00:00
return ret;
2020-08-21 14:49:48 +00:00
}
2020-08-25 10:33:33 +00:00
/**
* Create an enhanced Subscription of the Observable. Use the Pipes, to
2020-08-30 10:00:31 +00:00
* Define what should be subscribed.
* @param next The Next Function, used to transmit changes
* @param options The Options, used to determine the Enhancements.
2020-08-25 10:33:33 +00:00
*/
public enhancedSubscription<K>(
next: (data: K) => void,
options: {
scope?: { [index: string]: any };
pipe?: IPipe<T | G, K>;
} = {}
): Subscription {
2021-08-30 04:40:51 +00:00
let observable: Observable<K> = this as any as Observable<K>;
2020-08-25 22:11:26 +00:00
2020-08-25 10:33:33 +00:00
if (options.pipe) {
2021-10-19 19:27:30 +00:00
observable = options.pipe(options.scope, this.observable.pipe(map(value => value.value)));
2020-08-25 10:33:33 +00:00
}
const subscription = observable.subscribe({
next
});
2020-08-25 22:11:26 +00:00
return subscription;
}
2020-08-25 10:33:33 +00:00
2020-08-25 22:11:26 +00:00
/**
* Creates a Subscription for the value of the Observable. After one Update the Value will be deleted
2020-08-25 22:11:26 +00:00
* @param func Function which is called when new Datas are pushed
* @param mode Mode of the Subscription
* @param options Additional Options
*/
once(
func: IObservableCallback<G>,
options?: INopeSubscriptionOptions
): INopeObserver {
let ret: INopeObserver = null;
2020-08-25 22:11:26 +00:00
ret = this.subscribe(
{
next: (...args) => {
ret.unsubscribe();
func(...args);
}
},
options
);
2020-08-25 22:11:26 +00:00
return ret;
}
/**
* Async Function to Wait for an Update
* @param mode Mode of the Subscription
* @param options Additional Options for the Wait Function.
*/
public waitFor(
2021-08-30 04:40:51 +00:00
testCallback: IwaitForCallback<G> = (value) =>
(value as any as boolean) == true,
options: INopeWaitForOpitions = { testCurrent: true }
): Promise<G> {
2020-08-25 22:11:26 +00:00
const _this = this;
2021-10-18 06:02:29 +00:00
let resolved = false;
let subscription: INopeObserver = null;
return new Promise<G>((resolve, reject) => {
const finish = (error: any, test: boolean, data: G) => {
// Reject the error.
if (error) {
reject(error);
2020-08-25 22:11:26 +00:00
}
2021-10-18 06:02:29 +00:00
// Unsubscribe the Subscription.
if (test && subscription) {
subscription.unsubscribe();
subscription = null;
2020-08-25 22:11:26 +00:00
}
2021-10-18 06:02:29 +00:00
if (test && !resolved) {
// Mark the Task as Resolved.
resolved = true;
resolve(data);
}
};
let first = true;
const checkData = (data: G) => {
if (first && options.testCurrent || !first) {
// Create a promise of the data
const prom = Promise.resolve(testCallback(_this.getContent()));
// Now we link the element
prom.catch(e => finish(e, false, data));
prom.then(r => finish(false, r, data));
}
first = false;
};
try {
subscription = _this.subscribe(data => checkData(data));
} catch (e) {
reject(e);
}
});
2020-08-25 22:11:26 +00:00
}
/**
* Async Function to Wait for an Update
* @param mode Mode of the Subscription
* @param options Additional Options for the Wait Function.
*/
public waitForUpdate(
options?: INopeSubscriptionOptions
): Promise<G> {
2020-08-25 22:11:26 +00:00
const _this = this;
2020-08-30 10:00:31 +00:00
return new Promise<G>((resolve, reject) => {
2020-08-25 22:11:26 +00:00
try {
2021-10-19 19:27:30 +00:00
_this.once((content) => resolve(content), options);
2020-08-25 22:11:26 +00:00
} catch (e) {
reject(e);
}
2020-11-24 14:14:56 +00:00
});
2020-08-25 10:33:33 +00:00
}
2020-09-08 14:58:50 +00:00
2020-11-24 14:14:56 +00:00
public get hasSubscriptions(): boolean {
2020-09-08 14:58:50 +00:00
return this.observable.observers.length > 0;
}
2020-12-30 18:55:45 +00:00
public get observerLength(): number {
return this.observable.observers.length;
}
}