2021-11-14 22:16:07 +00:00
|
|
|
/**
|
|
|
|
* @author Martin Karkowski
|
|
|
|
* @email m.karkowski@zema.de
|
|
|
|
* @create date 2020-11-23 08:06:30
|
|
|
|
* @modify date 2021-10-19 17:55:35
|
|
|
|
* @desc [description]
|
|
|
|
*/
|
|
|
|
|
|
|
|
import { Observable, Subject, Subscription } from "rxjs";
|
|
|
|
import { map } from "rxjs/operators";
|
|
|
|
import { getSubject, TSubjectOptions } from "../helpers/getSubject";
|
|
|
|
import { generateId } from "../helpers/idMethods";
|
|
|
|
import { callImmediate } from "../helpers/runtimeMethods";
|
|
|
|
import { getNopeLogger } from "../logger/getLogger";
|
|
|
|
import {
|
2021-12-04 07:25:26 +00:00
|
|
|
IEventAdditionalData,
|
|
|
|
IEventCallback,
|
|
|
|
INopeEventEmitter,
|
|
|
|
INopeObserver,
|
2021-11-14 22:16:07 +00:00
|
|
|
INopePartialObserver,
|
|
|
|
INopeSubscriptionOptions,
|
2021-12-04 07:25:26 +00:00
|
|
|
INopeWaitForObservableChangeOptions,
|
|
|
|
IObservableType,
|
2021-11-14 22:16:07 +00:00
|
|
|
IPartialObserver,
|
2021-12-04 07:25:26 +00:00
|
|
|
IPipe,
|
|
|
|
IWaitForCallback,
|
2021-11-14 22:16:07 +00:00
|
|
|
} from "../types/nope/index";
|
|
|
|
|
|
|
|
const logger = getNopeLogger("obervable");
|
|
|
|
|
|
|
|
/**
|
|
|
|
* RsJX based Observable.
|
|
|
|
*
|
|
|
|
* Contains additional Functionalities like:
|
|
|
|
* - property with the current value
|
|
|
|
* - function to publish values. (wrapper for next)
|
|
|
|
* - enables performing a subscription with synced call or a immediate call.
|
|
|
|
*/
|
2022-01-07 17:12:08 +00:00
|
|
|
export class NopeEventEmitter<
|
|
|
|
T = unknown,
|
|
|
|
S = T,
|
|
|
|
G = T,
|
|
|
|
AD extends IEventAdditionalData = IEventAdditionalData
|
|
|
|
> implements INopeEventEmitter<T, S, G, AD>
|
2021-12-04 07:25:26 +00:00
|
|
|
{
|
2022-01-07 17:12:08 +00:00
|
|
|
protected _emitter: Subject<IObservableType<G, AD>>;
|
2021-11-14 22:16:07 +00:00
|
|
|
|
|
|
|
public readonly id: string = generateId();
|
|
|
|
|
|
|
|
public options: any = {
|
2021-12-04 07:25:26 +00:00
|
|
|
generateTimeStamp: true,
|
2021-11-14 22:16:07 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Function to specify a Setter
|
|
|
|
*/
|
|
|
|
public setter:
|
|
|
|
| ((
|
2021-12-04 07:25:26 +00:00
|
|
|
value: S | null,
|
2022-01-07 17:12:08 +00:00
|
|
|
options?: Partial<AD>
|
2021-12-04 07:25:26 +00:00
|
|
|
) => {
|
|
|
|
data: T | null;
|
|
|
|
valid: boolean;
|
|
|
|
})
|
2021-11-14 22:16:07 +00:00
|
|
|
| null = null;
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Function to specify a Getter
|
|
|
|
*/
|
|
|
|
protected _getter: ((value: T | null) => G | null) | null = null;
|
|
|
|
|
|
|
|
public get getter(): ((value: T | null) => G | null) | null {
|
|
|
|
return this._getter;
|
|
|
|
}
|
|
|
|
|
|
|
|
public set getter(_getter: ((value: T | null) => G | null) | null) {
|
|
|
|
this._getter = _getter;
|
|
|
|
}
|
|
|
|
|
2022-01-07 17:12:08 +00:00
|
|
|
public emit(value: S | null, options: Partial<AD> = {}): boolean {
|
2021-11-14 22:16:07 +00:00
|
|
|
return this._emit(value, options);
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Function to set the content of the Observable
|
|
|
|
* @param value
|
|
|
|
* @param sender
|
|
|
|
* @param timeStamp
|
|
|
|
* @param data
|
|
|
|
*/
|
2022-01-07 17:12:08 +00:00
|
|
|
protected _emit(value: S | null, options: Partial<AD> = {}): boolean {
|
2021-11-14 22:16:07 +00:00
|
|
|
let _value: G = value as any as G;
|
|
|
|
|
|
|
|
// Change the Value.
|
|
|
|
if (this.setter !== null) {
|
2022-01-07 17:12:08 +00:00
|
|
|
const adapted = this.setter(value as S, options);
|
2021-11-14 22:16:07 +00:00
|
|
|
|
|
|
|
if (!adapted.valid) {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
_value = adapted.data as any as G;
|
|
|
|
}
|
|
|
|
|
2021-12-04 07:25:26 +00:00
|
|
|
_value = this.getter !== null ? this.getter(_value as any as T) : _value;
|
2021-11-14 22:16:07 +00:00
|
|
|
|
|
|
|
// Publish the data.
|
|
|
|
if (options.forced || this.disablePublishing === false) {
|
|
|
|
options = this._updateSenderAndTimestamp(options);
|
|
|
|
|
|
|
|
// Define the value.
|
2022-01-07 17:12:08 +00:00
|
|
|
this._emitter.next({ value: _value, ...(options as AD) });
|
2021-11-14 22:16:07 +00:00
|
|
|
|
|
|
|
return this.hasSubscriptions;
|
|
|
|
}
|
|
|
|
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Helper to update the Timestamp and sender
|
|
|
|
*
|
|
|
|
* @author M.Karkowski
|
|
|
|
* @protected
|
|
|
|
* @param {IEventAdditionalData} options
|
|
|
|
* @return {*} {ISetContentOptions}
|
|
|
|
* @memberof NopeObservable
|
|
|
|
*/
|
2022-01-07 17:12:08 +00:00
|
|
|
protected _updateSenderAndTimestamp(options: Partial<AD>): Partial<AD> {
|
2021-11-14 22:16:07 +00:00
|
|
|
// Define a Sender if required
|
|
|
|
if (options.sender === undefined) {
|
|
|
|
options.sender = this.id;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Generate a Timestamp if required.
|
|
|
|
if (this.options.generateTimeStamp === true) {
|
2021-12-04 07:25:26 +00:00
|
|
|
options.timestamp =
|
|
|
|
options.timestamp === undefined ? Date.now() : options.timestamp;
|
2021-11-14 22:16:07 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Return the adapted element.
|
|
|
|
return options;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* A Set containing the Subscriptions
|
|
|
|
*/
|
|
|
|
public _subscriptions = new Set<() => void>();
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Flag to Disable Publishing
|
|
|
|
*/
|
|
|
|
public disablePublishing = false;
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Function, used to dispose the observable.
|
|
|
|
* Every item will be unsubscribed.
|
|
|
|
*/
|
|
|
|
public dispose(): void {
|
|
|
|
for (const _unsubscribe of this._subscriptions) {
|
|
|
|
_unsubscribe();
|
|
|
|
}
|
|
|
|
|
|
|
|
this._subscriptions.clear();
|
|
|
|
this._emitter.closed = true;
|
|
|
|
}
|
|
|
|
|
|
|
|
protected _lastDataUpdate: number;
|
|
|
|
|
|
|
|
/**
|
|
|
|
* A Function to subscribe to updates of the Observable.
|
|
|
|
* @param observer The Observer. Could be a Function or a Partial Observer.
|
|
|
|
* @param mode The Mode of the Subscription
|
|
|
|
* @param options Additional Options.
|
|
|
|
*/
|
|
|
|
public subscribe(
|
2022-01-07 17:12:08 +00:00
|
|
|
observer: INopePartialObserver<G, AD> | IEventCallback<G, AD>,
|
2021-11-14 22:16:07 +00:00
|
|
|
options: INopeSubscriptionOptions = {
|
|
|
|
type: "sync",
|
2021-12-04 07:25:26 +00:00
|
|
|
mode: ["direct", "sub", "super"],
|
2021-11-14 22:16:07 +00:00
|
|
|
}
|
|
|
|
): INopeObserver {
|
2021-12-04 07:25:26 +00:00
|
|
|
options.skipCurrent =
|
|
|
|
!!this._options.showCurrent && !this._options.playHistory;
|
2021-11-14 22:16:07 +00:00
|
|
|
return this._subscribe(observer, options);
|
|
|
|
}
|
|
|
|
|
|
|
|
protected _subscribe(
|
2022-01-07 17:12:08 +00:00
|
|
|
observer: INopePartialObserver<G, AD> | IEventCallback<G, AD>,
|
2021-11-14 22:16:07 +00:00
|
|
|
options: INopeSubscriptionOptions = {
|
|
|
|
type: "sync",
|
2021-12-04 07:25:26 +00:00
|
|
|
mode: ["direct", "sub", "super"],
|
2021-11-14 22:16:07 +00:00
|
|
|
}
|
|
|
|
): INopeObserver {
|
|
|
|
const _this = this;
|
|
|
|
|
|
|
|
let active = true;
|
2022-01-07 17:12:08 +00:00
|
|
|
let _observer: IPartialObserver<IObservableType<G, AD>>;
|
2021-11-14 22:16:07 +00:00
|
|
|
let _first = true;
|
|
|
|
|
|
|
|
if (typeof observer === "object") {
|
|
|
|
_observer = {
|
2022-01-07 17:12:08 +00:00
|
|
|
next: (data: IObservableType<G, AD>) => {
|
2021-11-14 22:16:07 +00:00
|
|
|
// Make shure we are skipping the current Item, if desired
|
|
|
|
if (_first && options.skipCurrent) {
|
|
|
|
_first = false;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
_first = false;
|
|
|
|
|
|
|
|
if (active && data !== undefined && observer.next) {
|
|
|
|
const { value, ...rest } = data;
|
|
|
|
switch (options.type) {
|
|
|
|
case "immediate":
|
2021-12-04 07:25:26 +00:00
|
|
|
callImmediate(observer.next, value, rest);
|
2021-11-14 22:16:07 +00:00
|
|
|
break;
|
|
|
|
default:
|
2022-07-25 05:36:40 +00:00
|
|
|
observer.next(value, rest as any as Partial<AD>);
|
|
|
|
break;
|
2021-11-14 22:16:07 +00:00
|
|
|
case "sync":
|
2022-01-07 17:12:08 +00:00
|
|
|
observer.next(value, rest as any as Partial<AD>);
|
2021-11-14 22:16:07 +00:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
},
|
|
|
|
complete: () => {
|
|
|
|
if (observer.complete) {
|
|
|
|
observer.complete();
|
|
|
|
}
|
|
|
|
},
|
|
|
|
error: (error) => {
|
|
|
|
if (observer.error) {
|
|
|
|
observer.error(error);
|
|
|
|
}
|
2021-12-04 07:25:26 +00:00
|
|
|
},
|
2021-11-14 22:16:07 +00:00
|
|
|
};
|
|
|
|
} else if (typeof observer === "function") {
|
|
|
|
_observer = {
|
2022-01-07 17:12:08 +00:00
|
|
|
next: (data) => {
|
2021-11-14 22:16:07 +00:00
|
|
|
// Make shure we are skipping the current Item, if desired
|
|
|
|
if (_first && options.skipCurrent) {
|
|
|
|
_first = false;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
_first = false;
|
|
|
|
|
|
|
|
if (active && data !== undefined) {
|
|
|
|
const { value, ...rest } = data;
|
|
|
|
switch (options.type) {
|
|
|
|
case "immediate":
|
2021-12-04 07:25:26 +00:00
|
|
|
callImmediate(observer, value, rest);
|
2021-11-14 22:16:07 +00:00
|
|
|
break;
|
|
|
|
default:
|
2022-07-25 05:36:40 +00:00
|
|
|
observer(value, rest as any as Partial<AD>);
|
|
|
|
break;
|
2021-11-14 22:16:07 +00:00
|
|
|
case "sync":
|
2022-01-07 17:12:08 +00:00
|
|
|
observer(value, rest as any as Partial<AD>);
|
2021-11-14 22:16:07 +00:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
},
|
|
|
|
complete: () => {
|
|
|
|
// Nothing to do here
|
|
|
|
},
|
|
|
|
error: (error) => {
|
|
|
|
logger.error("");
|
|
|
|
logger.error(error);
|
2021-12-04 07:25:26 +00:00
|
|
|
},
|
2021-11-14 22:16:07 +00:00
|
|
|
};
|
|
|
|
}
|
|
|
|
|
|
|
|
// Create a Subscription.
|
|
|
|
const subscription = this._emitter.subscribe(_observer);
|
|
|
|
|
|
|
|
const ret: INopeObserver = Object.assign(subscription, {
|
|
|
|
options,
|
|
|
|
pause: () => {
|
|
|
|
active = false;
|
|
|
|
},
|
|
|
|
unpause: () => {
|
|
|
|
active = true;
|
2021-12-04 07:25:26 +00:00
|
|
|
},
|
2021-11-14 22:16:07 +00:00
|
|
|
});
|
|
|
|
|
|
|
|
return ret;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Create an enhanced Subscription of the Observable. Use the Pipes, to
|
|
|
|
* Define what should be subscribed.
|
|
|
|
* @param next The Next Function, used to transmit changes
|
|
|
|
* @param options The Options, used to determine the Enhancements.
|
|
|
|
*/
|
|
|
|
public enhancedSubscription<K>(
|
|
|
|
next: (data: K) => void,
|
|
|
|
options: {
|
|
|
|
scope?: { [index: string]: any };
|
|
|
|
pipe?: IPipe<T | G, K>;
|
|
|
|
} = {}
|
|
|
|
): Subscription {
|
|
|
|
let observable: Observable<K> = this as any as Observable<K>;
|
|
|
|
|
|
|
|
if (options.pipe) {
|
2021-12-04 07:25:26 +00:00
|
|
|
observable = options.pipe(
|
|
|
|
options.scope,
|
2022-07-23 05:34:38 +00:00
|
|
|
this._emitter.pipe(
|
|
|
|
map((value) => {
|
|
|
|
return value.value;
|
|
|
|
})
|
|
|
|
)
|
2021-12-04 07:25:26 +00:00
|
|
|
);
|
2021-11-14 22:16:07 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
const subscription = observable.subscribe({
|
2021-12-04 07:25:26 +00:00
|
|
|
next,
|
2021-11-14 22:16:07 +00:00
|
|
|
});
|
|
|
|
|
|
|
|
return subscription;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Creates a Subscription for the value of the Observable. After one Update the Value will be deleted
|
|
|
|
* @param func Function which is called when new Datas are pushed
|
|
|
|
* @param mode Mode of the Subscription
|
|
|
|
* @param options Additional Options
|
|
|
|
*/
|
|
|
|
once(
|
2022-01-07 17:12:08 +00:00
|
|
|
func: IEventCallback<G, AD>,
|
2021-11-14 22:16:07 +00:00
|
|
|
options?: INopeSubscriptionOptions
|
|
|
|
): INopeObserver {
|
|
|
|
let ret: INopeObserver = null;
|
|
|
|
|
|
|
|
ret = this.subscribe(
|
|
|
|
{
|
|
|
|
next: (...args) => {
|
|
|
|
ret.unsubscribe();
|
|
|
|
func(...args);
|
2021-12-04 07:25:26 +00:00
|
|
|
},
|
2021-11-14 22:16:07 +00:00
|
|
|
},
|
|
|
|
options
|
|
|
|
);
|
|
|
|
|
|
|
|
return ret;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Async Function to Wait for an Update
|
|
|
|
* @param mode Mode of the Subscription
|
|
|
|
* @param options Additional Options for the Wait Function.
|
|
|
|
*/
|
|
|
|
public waitFor(
|
2022-07-25 05:36:40 +00:00
|
|
|
testCallback: IWaitForCallback<G, AD> = (value) => {
|
|
|
|
return (value as any as boolean) == true;
|
2022-08-03 11:18:26 +00:00
|
|
|
},
|
2021-11-14 22:16:07 +00:00
|
|
|
options: INopeWaitForObservableChangeOptions = { testCurrent: true }
|
|
|
|
): Promise<G> {
|
|
|
|
const _this = this;
|
|
|
|
|
|
|
|
let resolved = false;
|
|
|
|
let subscription: INopeObserver = null;
|
2022-11-06 19:25:01 +00:00
|
|
|
let timeout = null;
|
2021-11-14 22:16:07 +00:00
|
|
|
|
|
|
|
return new Promise<G>((resolve, reject) => {
|
|
|
|
const finish = (error: any, test: boolean, data: G) => {
|
|
|
|
// Reject the error.
|
|
|
|
if (error) {
|
|
|
|
reject(error);
|
|
|
|
}
|
|
|
|
|
2022-11-06 19:25:01 +00:00
|
|
|
if (timeout) {
|
|
|
|
clearTimeout(timeout);
|
|
|
|
}
|
|
|
|
|
2021-11-14 22:16:07 +00:00
|
|
|
// Unsubscribe the Subscription.
|
|
|
|
if (test && subscription) {
|
|
|
|
subscription.unsubscribe();
|
|
|
|
subscription = null;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (test && !resolved) {
|
|
|
|
// Mark the Task as Resolved.
|
|
|
|
resolved = true;
|
|
|
|
resolve(data);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
let first = true;
|
|
|
|
|
2022-01-07 17:12:08 +00:00
|
|
|
const checkData = (data: G, opts: Partial<AD>) => {
|
2021-12-04 07:25:26 +00:00
|
|
|
if ((first && options.testCurrent) || !first) {
|
2021-11-14 22:16:07 +00:00
|
|
|
// Create a promise of the data
|
2022-01-07 17:12:08 +00:00
|
|
|
const prom = Promise.resolve(testCallback(data, opts));
|
2021-11-14 22:16:07 +00:00
|
|
|
|
|
|
|
// Now we link the element
|
2022-07-23 05:34:38 +00:00
|
|
|
prom.catch((e) => {
|
|
|
|
finish(e, false, data);
|
|
|
|
});
|
|
|
|
prom.then((r) => {
|
|
|
|
finish(false, r, data);
|
|
|
|
});
|
2021-11-14 22:16:07 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
first = false;
|
|
|
|
};
|
|
|
|
|
|
|
|
try {
|
2022-07-23 05:34:38 +00:00
|
|
|
subscription = _this.subscribe((data, opts) => {
|
|
|
|
checkData(data, opts);
|
|
|
|
});
|
2022-11-06 19:25:01 +00:00
|
|
|
|
|
|
|
if (options?.timeout > 0) {
|
|
|
|
timeout = setTimeout(() => {
|
|
|
|
finish(Error("Timeout.!"), false, null);
|
|
|
|
}, options.timeout);
|
|
|
|
}
|
2021-11-14 22:16:07 +00:00
|
|
|
} catch (e) {
|
|
|
|
reject(e);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Async Function to Wait for an Update
|
|
|
|
* @param mode Mode of the Subscription
|
|
|
|
* @param options Additional Options for the Wait Function.
|
|
|
|
*/
|
2021-12-04 07:25:26 +00:00
|
|
|
public waitForUpdate(options?: INopeSubscriptionOptions): Promise<G> {
|
2021-11-14 22:16:07 +00:00
|
|
|
const _this = this;
|
|
|
|
return new Promise<G>((resolve, reject) => {
|
|
|
|
try {
|
2022-07-23 05:34:38 +00:00
|
|
|
_this.once((content) => {
|
|
|
|
resolve(content);
|
|
|
|
}, options);
|
2021-11-14 22:16:07 +00:00
|
|
|
} catch (e) {
|
|
|
|
reject(e);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
public get hasSubscriptions(): boolean {
|
|
|
|
return this._emitter.observed;
|
|
|
|
}
|
|
|
|
|
|
|
|
public get observerLength(): number {
|
|
|
|
return this._emitter.observers.length;
|
|
|
|
}
|
|
|
|
|
|
|
|
constructor(protected _options: TSubjectOptions = {}) {
|
2022-01-07 17:12:08 +00:00
|
|
|
this._emitter = getSubject<IObservableType<G, AD>>(_options);
|
2021-11-14 22:16:07 +00:00
|
|
|
}
|
|
|
|
}
|