2020-09-01 14:56:26 +00:00
|
|
|
import { injectable } from 'inversify';
|
2020-10-13 16:22:04 +00:00
|
|
|
import { BehaviorSubject, Observable } from 'rxjs';
|
2020-08-25 22:11:26 +00:00
|
|
|
import { generateId } from '../helpers/idMethods';
|
|
|
|
import { callImmediate } from '../helpers/runtimeMethods';
|
2020-11-06 08:10:30 +00:00
|
|
|
import { INopeObservable, INopeObserver, INopePartialObserver, INopeSubscriptionOptions, INopeWaitForOpitions, IObservableCallback, IPartialObserver, IPipe, IwaitForCallback } from '../types/nope/nopeObservable.interface';
|
2020-08-21 14:49:48 +00:00
|
|
|
|
|
|
|
/**
|
|
|
|
* RsJX based Observable.
|
|
|
|
*
|
|
|
|
* Contains additional Functionalities like:
|
|
|
|
* - property with the current value
|
|
|
|
* - function to publish values. (wrapper for next)
|
|
|
|
* - enables performing a subscription with synced call or a immediate call.
|
|
|
|
*/
|
2020-09-01 14:56:26 +00:00
|
|
|
@injectable()
|
2020-10-13 16:22:04 +00:00
|
|
|
export class NopeObservable<T, S = T, G = T> implements INopeObservable<T,S,G> {
|
2020-08-25 22:11:26 +00:00
|
|
|
|
2020-09-01 14:56:26 +00:00
|
|
|
public observable: BehaviorSubject<G> = new BehaviorSubject<G>(undefined);
|
2020-08-25 22:11:26 +00:00
|
|
|
|
|
|
|
public readonly id: string = generateId();
|
2020-08-21 14:49:48 +00:00
|
|
|
|
2020-08-25 22:11:26 +00:00
|
|
|
public _value: T;
|
|
|
|
|
|
|
|
public options: any = {};
|
2020-08-21 14:49:48 +00:00
|
|
|
|
|
|
|
/**
|
2020-08-25 22:11:26 +00:00
|
|
|
* Function to specify a Setter
|
2020-08-21 14:49:48 +00:00
|
|
|
*/
|
2020-08-30 10:00:31 +00:00
|
|
|
public setter: ((value: S | null, sender: string | null, timeStamp: number | null, ...data) => {
|
2020-08-25 22:11:26 +00:00
|
|
|
data: T | null,
|
|
|
|
valid: boolean,
|
|
|
|
}) | null = null;
|
2020-08-21 14:49:48 +00:00
|
|
|
|
|
|
|
/**
|
2020-08-25 22:11:26 +00:00
|
|
|
* Function to specify a Getter
|
2020-08-21 14:49:48 +00:00
|
|
|
*/
|
2020-08-30 10:00:31 +00:00
|
|
|
public getter: ((value: T | null) => G | null) | null = null;
|
2020-08-21 14:49:48 +00:00
|
|
|
|
2020-08-25 22:11:26 +00:00
|
|
|
protected _args: any[] = [];
|
|
|
|
protected _sender: string;
|
|
|
|
protected _timeStamp: number;
|
2020-09-01 14:56:26 +00:00
|
|
|
|
2020-08-30 10:00:31 +00:00
|
|
|
/**
|
|
|
|
* Function to
|
|
|
|
* @param value
|
|
|
|
* @param sender
|
|
|
|
* @param timeStamp
|
|
|
|
* @param data
|
|
|
|
*/
|
|
|
|
public setContent(value: S | null, sender: string | null = null, timeStamp: number | null = null, ...data): void {
|
|
|
|
// Change the Value.
|
|
|
|
if (this.setter !== null) {
|
|
|
|
const adapted = this.setter(value as S, sender, timeStamp);
|
2020-08-25 22:11:26 +00:00
|
|
|
|
2020-08-30 10:00:31 +00:00
|
|
|
if (!adapted.valid) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
this._value = adapted.data;
|
|
|
|
} else {
|
|
|
|
// Adapt the Value if required.
|
|
|
|
this._value = value as any as T;
|
|
|
|
}
|
|
|
|
|
|
|
|
const valueToPublish = this.getContent();
|
|
|
|
|
|
|
|
/** Publish the Data */
|
|
|
|
if (!this.disablePublishing && this.observable.value !== valueToPublish) {
|
|
|
|
this._publish(valueToPublish, sender, timeStamp);
|
|
|
|
}
|
|
|
|
}
|
2020-08-25 22:11:26 +00:00
|
|
|
|
2020-09-01 14:56:26 +00:00
|
|
|
protected _updateSenderAndTimestamp(sender: string | null = null, timestamp: number | null = null,) {
|
2020-08-25 22:11:26 +00:00
|
|
|
// Define a Sender if required
|
|
|
|
if (sender === null) {
|
|
|
|
sender = this.id;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Generate a Timestamp if required.
|
|
|
|
if (this.options.generateTimeStamp === true) {
|
2020-08-30 10:00:31 +00:00
|
|
|
timestamp = (timestamp === null) ? Date.now() : timestamp;
|
2020-08-25 22:11:26 +00:00
|
|
|
}
|
|
|
|
|
2020-08-30 10:00:31 +00:00
|
|
|
//
|
|
|
|
return {
|
|
|
|
sender,
|
|
|
|
timestamp,
|
|
|
|
}
|
|
|
|
}
|
2020-08-25 22:11:26 +00:00
|
|
|
|
2020-08-30 10:00:31 +00:00
|
|
|
/**
|
|
|
|
* Internal Function to Publish content
|
|
|
|
* @param value
|
|
|
|
* @param sender
|
|
|
|
* @param timestamp
|
|
|
|
* @param data
|
|
|
|
*/
|
|
|
|
protected _publish(value: G, sender: string | null = null, timestamp: number | null = null, ...data) {
|
|
|
|
// Only Proceed if Publishing is required.
|
|
|
|
if (this.disablePublishing === false) {
|
|
|
|
// Define a Sender if required
|
|
|
|
if (sender === null) {
|
|
|
|
sender = this.id;
|
2020-08-25 22:11:26 +00:00
|
|
|
}
|
|
|
|
|
2020-08-30 10:00:31 +00:00
|
|
|
// Generate a Timestamp if required.
|
|
|
|
if (this.options.generateTimeStamp === true) {
|
|
|
|
timestamp = (timestamp === null) ? Date.now() : timestamp;
|
|
|
|
}
|
2020-08-25 22:11:26 +00:00
|
|
|
|
2020-08-30 10:00:31 +00:00
|
|
|
// Get the Value.
|
2020-08-25 22:11:26 +00:00
|
|
|
this._args = data;
|
|
|
|
this._sender = sender;
|
2020-08-30 10:00:31 +00:00
|
|
|
this._timeStamp = timestamp;
|
|
|
|
this.observable.next(value);
|
2020-08-25 22:11:26 +00:00
|
|
|
}
|
2020-08-21 14:49:48 +00:00
|
|
|
}
|
2020-09-01 14:56:26 +00:00
|
|
|
|
2020-08-30 10:00:31 +00:00
|
|
|
/**
|
|
|
|
* Function to Force an Update.
|
|
|
|
* @param sender Sender, which initiated the Update
|
|
|
|
* @param timestamp The Timestamp of the Updaet
|
|
|
|
* @param data Additional Args.
|
|
|
|
*/
|
|
|
|
public forcePublish(sender: string | null = null, timestamp: number | null = null, ...data) {
|
|
|
|
return this._publish(this.getContent(), sender, timestamp, ...data);
|
|
|
|
}
|
2020-08-21 14:49:48 +00:00
|
|
|
|
|
|
|
/**
|
2020-08-25 22:11:26 +00:00
|
|
|
* A Set containing the Subscriptions
|
2020-08-21 14:49:48 +00:00
|
|
|
*/
|
2020-09-01 14:56:26 +00:00
|
|
|
public _subscriptions = new Set<() => void>();
|
2020-08-21 14:49:48 +00:00
|
|
|
|
2020-08-25 22:11:26 +00:00
|
|
|
/**
|
|
|
|
* Flag to Disable Publishing
|
|
|
|
*/
|
|
|
|
public disablePublishing = false;
|
|
|
|
|
2020-08-30 10:00:31 +00:00
|
|
|
/**
|
|
|
|
* Function, used to dispose the observable.
|
|
|
|
* Every item will be unsubscribed.
|
|
|
|
*/
|
2020-08-25 22:11:26 +00:00
|
|
|
public dispose(): void {
|
|
|
|
|
|
|
|
for (const _unsubscribe of this._subscriptions) {
|
|
|
|
_unsubscribe();
|
2020-08-21 14:49:48 +00:00
|
|
|
}
|
|
|
|
|
2020-08-25 22:11:26 +00:00
|
|
|
this._subscriptions.clear();
|
|
|
|
this.observable.closed = true;
|
|
|
|
}
|
|
|
|
|
2020-08-30 10:00:31 +00:00
|
|
|
/**
|
|
|
|
* Function to extract the Content.
|
|
|
|
* If a Getter is provided, the Getter will be used
|
|
|
|
* to Transform the item.
|
|
|
|
*/
|
|
|
|
public getContent(): G | null {
|
2020-08-25 22:11:26 +00:00
|
|
|
if (this.getter !== null)
|
2020-08-30 10:00:31 +00:00
|
|
|
return this.getter(this._value);
|
|
|
|
return this._value as any as G;
|
2020-08-25 22:11:26 +00:00
|
|
|
}
|
|
|
|
|
2020-08-30 08:32:48 +00:00
|
|
|
/**
|
|
|
|
* A Function to subscribe to updates of the Observable.
|
|
|
|
* @param observer The Observer. Could be a Function or a Partial Observer.
|
|
|
|
* @param mode The Mode of the Subscription
|
|
|
|
* @param options Additional Options.
|
|
|
|
*/
|
2020-09-28 16:38:35 +00:00
|
|
|
public subscribe(observer: INopePartialObserver<G> | IObservableCallback<G>,
|
2020-08-25 22:11:26 +00:00
|
|
|
mode: 'immediate' | 'sync' = 'sync',
|
2020-09-28 16:38:35 +00:00
|
|
|
options: INopeSubscriptionOptions = {
|
2020-08-25 22:11:26 +00:00
|
|
|
mode: ['direct', 'sub', 'super']
|
2020-11-07 10:23:36 +00:00
|
|
|
}): INopeObserver {
|
2020-08-21 14:49:48 +00:00
|
|
|
|
2020-08-25 22:11:26 +00:00
|
|
|
const _this = this;
|
|
|
|
|
|
|
|
let active = true;
|
|
|
|
|
2020-09-28 16:38:35 +00:00
|
|
|
let _observer: IPartialObserver<G>;
|
2020-08-30 08:32:48 +00:00
|
|
|
|
|
|
|
if (typeof observer === 'object') {
|
|
|
|
_observer = {
|
2020-08-30 10:00:31 +00:00
|
|
|
next: (data: G) => {
|
2020-09-01 14:56:26 +00:00
|
|
|
if (active && data !== undefined && observer.next) {
|
2020-08-30 08:32:48 +00:00
|
|
|
if (mode === 'immediate') {
|
|
|
|
callImmediate(observer.next, data, this._sender, this._timeStamp, ..._this._args);
|
|
|
|
} else {
|
|
|
|
observer.next(data, this._sender, this._timeStamp, ..._this._args);
|
|
|
|
}
|
2020-08-25 22:11:26 +00:00
|
|
|
}
|
2020-08-30 08:32:48 +00:00
|
|
|
},
|
|
|
|
complete: () => {
|
|
|
|
if (observer.complete) {
|
|
|
|
observer.complete();
|
|
|
|
}
|
|
|
|
},
|
|
|
|
error: (error) => {
|
|
|
|
if (observer.error) {
|
|
|
|
observer.error(error);
|
|
|
|
}
|
|
|
|
},
|
|
|
|
}
|
|
|
|
} else if (typeof observer === 'function') {
|
|
|
|
_observer = {
|
2020-08-30 10:00:31 +00:00
|
|
|
next: (data: G) => {
|
2020-09-01 14:56:26 +00:00
|
|
|
if (active && data !== undefined) {
|
2020-08-30 08:32:48 +00:00
|
|
|
if (mode === 'immediate') {
|
|
|
|
callImmediate(observer, data, this._sender, this._timeStamp, ..._this._args);
|
|
|
|
} else {
|
|
|
|
observer(data, this._sender, this._timeStamp, ..._this._args);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
},
|
2020-09-01 14:56:26 +00:00
|
|
|
complete: () => { },
|
|
|
|
error: (error) => { },
|
2020-08-30 08:32:48 +00:00
|
|
|
}
|
2020-09-01 14:56:26 +00:00
|
|
|
}
|
2020-08-21 14:49:48 +00:00
|
|
|
|
2020-08-25 22:11:26 +00:00
|
|
|
// Create a Subscription.
|
|
|
|
const subscription = this.observable.subscribe(_observer);
|
|
|
|
|
2020-11-07 10:23:36 +00:00
|
|
|
const ret: INopeObserver = Object.assign(subscription, {
|
2020-08-25 22:11:26 +00:00
|
|
|
options,
|
|
|
|
pause: () => {
|
|
|
|
active = false;
|
|
|
|
},
|
|
|
|
unpause: () => {
|
|
|
|
active = true;
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
return ret;
|
2020-08-21 14:49:48 +00:00
|
|
|
}
|
2020-08-25 10:33:33 +00:00
|
|
|
|
|
|
|
/**
|
2020-08-30 10:00:31 +00:00
|
|
|
* Create an enhanced Subscription of the Observable. Use the Pipes, to
|
|
|
|
* Define what should be subscribed.
|
|
|
|
* @param next The Next Function, used to transmit changes
|
|
|
|
* @param options The Options, used to determine the Enhancements.
|
2020-08-25 10:33:33 +00:00
|
|
|
*/
|
2020-08-25 22:11:26 +00:00
|
|
|
public enhancedSubscription<K>(next: (data: K) => void, options: {
|
2020-08-25 10:33:33 +00:00
|
|
|
scope?: { [index: string]: any },
|
2020-09-28 16:38:35 +00:00
|
|
|
pipe?: IPipe<T | G, K>
|
2020-08-25 10:33:33 +00:00
|
|
|
} = {}) {
|
|
|
|
let observable: Observable<K> = this as any as Observable<K>;
|
2020-08-25 22:11:26 +00:00
|
|
|
|
2020-08-25 10:33:33 +00:00
|
|
|
if (options.pipe) {
|
2020-08-25 22:11:26 +00:00
|
|
|
observable = options.pipe(options.scope, this.observable);
|
2020-08-25 10:33:33 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
const subscription = observable.subscribe({
|
|
|
|
next
|
|
|
|
});
|
|
|
|
|
2020-08-25 22:11:26 +00:00
|
|
|
return subscription;
|
|
|
|
}
|
2020-08-25 10:33:33 +00:00
|
|
|
|
2020-08-25 22:11:26 +00:00
|
|
|
/**
|
|
|
|
* Creates a Subscription for the value of the Observable. After one Update the Value will be deleted
|
|
|
|
* @param func Function which is called when new Datas are pushed
|
|
|
|
* @param mode Mode of the Subscription
|
|
|
|
* @param options Additional Options
|
|
|
|
*/
|
2020-11-07 10:23:36 +00:00
|
|
|
once(func: IObservableCallback<G>, mode?: 'sync' | 'immediate', options?: INopeSubscriptionOptions): INopeObserver {
|
|
|
|
let ret: INopeObserver = null;
|
2020-08-25 22:11:26 +00:00
|
|
|
|
|
|
|
ret = this.subscribe({
|
|
|
|
next: (...args) => {
|
|
|
|
ret.unsubscribe();
|
|
|
|
func(...args);
|
|
|
|
}
|
|
|
|
}, mode, options);
|
|
|
|
|
|
|
|
return ret;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Async Function to Wait for an Update
|
|
|
|
* @param mode Mode of the Subscription
|
|
|
|
* @param options Additional Options for the Wait Function.
|
|
|
|
*/
|
2020-10-13 16:22:04 +00:00
|
|
|
public waitFor(testCallback: IwaitForCallback<G>, options: INopeWaitForOpitions = { testFirst: true }): Promise<G> {
|
2020-08-25 22:11:26 +00:00
|
|
|
const _this = this;
|
|
|
|
|
|
|
|
const _isAsync = testCallback[Symbol.toStringTag] === 'AsyncFunction';
|
|
|
|
|
|
|
|
if (_isAsync) {
|
|
|
|
let called = false;
|
2020-08-30 10:00:31 +00:00
|
|
|
return new Promise<G>(async (resolve, reject) => {
|
2020-08-25 22:11:26 +00:00
|
|
|
try {
|
|
|
|
if (options.testFirst && await testCallback(_this.getContent())) {
|
|
|
|
resolve(_this.getContent());
|
|
|
|
} else {
|
2020-11-07 10:23:36 +00:00
|
|
|
let subscription: INopeObserver = null;
|
2020-08-25 22:11:26 +00:00
|
|
|
subscription = _this.subscribe({
|
|
|
|
next: async (content, ...args) => {
|
|
|
|
if (!called && await testCallback(content, ...args)) {
|
|
|
|
subscription.unsubscribe();
|
|
|
|
|
|
|
|
called = true;
|
|
|
|
|
|
|
|
resolve(content);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
} catch (e) {
|
|
|
|
reject(e);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
} else {
|
2020-08-30 10:00:31 +00:00
|
|
|
return new Promise<G>((resolve, reject) => {
|
2020-08-25 22:11:26 +00:00
|
|
|
try {
|
|
|
|
if (options.testFirst && testCallback(_this.getContent())) {
|
|
|
|
resolve(_this.getContent());
|
|
|
|
} else {
|
2020-11-07 10:23:36 +00:00
|
|
|
let subscription: INopeObserver = null;
|
2020-08-25 22:11:26 +00:00
|
|
|
subscription = _this.subscribe({
|
|
|
|
next: (content, ...args) => {
|
|
|
|
if (testCallback(content, ...args)) {
|
|
|
|
subscription.unsubscribe();
|
|
|
|
resolve(content);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
} catch (e) {
|
|
|
|
reject(e);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Async Function to Wait for an Update
|
|
|
|
* @param mode Mode of the Subscription
|
|
|
|
* @param options Additional Options for the Wait Function.
|
|
|
|
*/
|
2020-09-28 16:38:35 +00:00
|
|
|
public waitForUpdate(mode?: 'sync' | 'immediate', options?: INopeSubscriptionOptions): Promise<G> {
|
2020-08-25 22:11:26 +00:00
|
|
|
const _this = this;
|
2020-08-30 10:00:31 +00:00
|
|
|
return new Promise<G>((resolve, reject) => {
|
2020-08-25 22:11:26 +00:00
|
|
|
try {
|
|
|
|
_this.once((content) => resolve(content), mode, options);
|
|
|
|
} catch (e) {
|
|
|
|
reject(e);
|
|
|
|
}
|
|
|
|
})
|
2020-08-25 10:33:33 +00:00
|
|
|
}
|
2020-09-08 14:58:50 +00:00
|
|
|
|
|
|
|
public get hasSubscriptions() {
|
|
|
|
return this.observable.observers.length > 0;
|
|
|
|
}
|
2020-08-21 14:49:48 +00:00
|
|
|
}
|