2020-08-25 10:33:33 +00:00
|
|
|
import { BehaviorSubject, CompletionObserver, ErrorObserver, NextObserver, Observable, Subscription } from 'rxjs';
|
2020-08-25 22:11:26 +00:00
|
|
|
import { generateId } from '../helpers/idMethods';
|
|
|
|
import { callImmediate } from '../helpers/runtimeMethods';
|
2020-08-30 08:32:48 +00:00
|
|
|
import { type } from 'os';
|
2020-08-21 14:49:48 +00:00
|
|
|
|
2020-08-25 22:11:26 +00:00
|
|
|
|
|
|
|
export type waitForCallback<T> = (content?: T | null, sender?: string, timeStamp?: number | null, ...data) => boolean | Promise<boolean>;
|
|
|
|
export interface nopeWaitForOpitions {
|
|
|
|
testFirst?: boolean;
|
|
|
|
subscriptionMode?: 'async' | 'sync';
|
|
|
|
triggerMode?: Array<'sub' | 'super' | 'direct'>;
|
2020-08-21 14:49:48 +00:00
|
|
|
}
|
|
|
|
|
2020-08-25 22:11:26 +00:00
|
|
|
export interface nopeObserver<T> extends Subscription {
|
|
|
|
options: nopeSubscriptionOptions
|
|
|
|
pause(): void;
|
|
|
|
unpause(): void;
|
|
|
|
}
|
2020-08-25 10:33:33 +00:00
|
|
|
|
2020-08-25 22:11:26 +00:00
|
|
|
export interface nopeSubscriptionOptions {
|
|
|
|
mode: Array<'sub' | 'super' | 'direct'>
|
|
|
|
}
|
|
|
|
|
|
|
|
export interface NopePartialObserver<T> {
|
|
|
|
next?: observableCallback<T>,
|
|
|
|
error?: (error: any) => void,
|
|
|
|
complete?: () => void,
|
|
|
|
}
|
2020-08-25 10:33:33 +00:00
|
|
|
|
2020-08-25 22:11:26 +00:00
|
|
|
export declare type PartialObserver<T> = (NextObserver<T> | ErrorObserver<T> | CompletionObserver<T>);
|
|
|
|
|
|
|
|
export type observableCallback<T> = (content: T | null, sender: string, timeStamp: number | null, ...data) => void;
|
|
|
|
|
|
|
|
export type pipe<T, K> = (scope: { [index: string]: any }, observable: Observable<T>) => Observable<K>
|
2020-08-21 14:49:48 +00:00
|
|
|
|
|
|
|
/**
|
|
|
|
* RsJX based Observable.
|
|
|
|
*
|
|
|
|
* Contains additional Functionalities like:
|
|
|
|
* - property with the current value
|
|
|
|
* - function to publish values. (wrapper for next)
|
|
|
|
* - enables performing a subscription with synced call or a immediate call.
|
|
|
|
*/
|
2020-08-25 22:11:26 +00:00
|
|
|
export class nopeObservable<T> {
|
|
|
|
|
|
|
|
public observable: BehaviorSubject<T> = new BehaviorSubject<T>(null);
|
|
|
|
|
|
|
|
public readonly id: string = generateId();
|
2020-08-21 14:49:48 +00:00
|
|
|
|
2020-08-25 22:11:26 +00:00
|
|
|
public _value: T;
|
|
|
|
|
|
|
|
public options: any = {};
|
2020-08-21 14:49:48 +00:00
|
|
|
|
|
|
|
/**
|
2020-08-25 22:11:26 +00:00
|
|
|
* Element containing the Value
|
2020-08-21 14:49:48 +00:00
|
|
|
*/
|
2020-08-25 22:11:26 +00:00
|
|
|
public set value(value: T | null) {
|
|
|
|
this.setContent(value, this.id, null, 'direct');
|
|
|
|
}
|
|
|
|
|
|
|
|
public get value(): T | null {
|
|
|
|
return this.getContent();
|
2020-08-21 14:49:48 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
2020-08-25 22:11:26 +00:00
|
|
|
* Function to specify a Setter
|
2020-08-21 14:49:48 +00:00
|
|
|
*/
|
2020-08-25 22:11:26 +00:00
|
|
|
public setter: ((value: T | null, sender: string | null, timeStamp: number | null, ...data) => {
|
|
|
|
data: T | null,
|
|
|
|
valid: boolean,
|
|
|
|
}) | null = null;
|
2020-08-21 14:49:48 +00:00
|
|
|
|
|
|
|
/**
|
2020-08-25 22:11:26 +00:00
|
|
|
* Function to specify a Getter
|
2020-08-21 14:49:48 +00:00
|
|
|
*/
|
2020-08-25 22:11:26 +00:00
|
|
|
public getter: ((value: T | null) => T | null) | null = null;
|
2020-08-21 14:49:48 +00:00
|
|
|
|
2020-08-25 22:11:26 +00:00
|
|
|
protected _args: any[] = [];
|
|
|
|
protected _sender: string;
|
|
|
|
protected _timeStamp: number;
|
|
|
|
|
|
|
|
public setContent(value: T | null, sender: string | null = null, timeStamp: number | null = null, ...data): void {
|
|
|
|
|
|
|
|
// Define a Sender if required
|
|
|
|
if (sender === null) {
|
|
|
|
sender = this.id;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Generate a Timestamp if required.
|
|
|
|
if (this.options.generateTimeStamp === true) {
|
|
|
|
timeStamp = (timeStamp === null) ? Date.now() : timeStamp;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Adapt the Value if required.
|
|
|
|
this._value = value;
|
|
|
|
|
|
|
|
// Change the Value.
|
|
|
|
if (this.setter !== null) {
|
|
|
|
const adapted = this.setter(value, sender, timeStamp);
|
|
|
|
|
|
|
|
if (!adapted.valid) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
this._value = adapted.data;
|
|
|
|
}
|
|
|
|
|
|
|
|
/** Publish the Data */
|
|
|
|
if (!this.disablePublishing && this.observable.value !== this._value) {
|
|
|
|
this._args = data;
|
|
|
|
this._sender = sender;
|
|
|
|
this._timeStamp = timeStamp;
|
|
|
|
this.observable.next(this.getContent());
|
|
|
|
}
|
2020-08-21 14:49:48 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
2020-08-25 22:11:26 +00:00
|
|
|
* A Set containing the Subscriptions
|
2020-08-21 14:49:48 +00:00
|
|
|
*/
|
2020-08-25 22:11:26 +00:00
|
|
|
protected _subscriptions = new Set<() => void>();
|
2020-08-21 14:49:48 +00:00
|
|
|
|
2020-08-25 22:11:26 +00:00
|
|
|
/**
|
|
|
|
* Flag to Disable Publishing
|
|
|
|
*/
|
|
|
|
public disablePublishing = false;
|
|
|
|
|
|
|
|
public dispose(): void {
|
|
|
|
|
|
|
|
for (const _unsubscribe of this._subscriptions) {
|
|
|
|
_unsubscribe();
|
2020-08-21 14:49:48 +00:00
|
|
|
}
|
|
|
|
|
2020-08-25 22:11:26 +00:00
|
|
|
this._subscriptions.clear();
|
|
|
|
this.observable.closed = true;
|
|
|
|
}
|
|
|
|
|
|
|
|
public getContent(): T | null {
|
|
|
|
if (this.getter !== null)
|
|
|
|
return (this.getter as (value: T | null) => T | null)(this._value);
|
|
|
|
return this._value;
|
|
|
|
}
|
|
|
|
|
2020-08-30 08:32:48 +00:00
|
|
|
/**
|
|
|
|
* A Function to subscribe to updates of the Observable.
|
|
|
|
* @param observer The Observer. Could be a Function or a Partial Observer.
|
|
|
|
* @param mode The Mode of the Subscription
|
|
|
|
* @param options Additional Options.
|
|
|
|
*/
|
|
|
|
public subscribe(observer: NopePartialObserver<T> | observableCallback<T>,
|
2020-08-25 22:11:26 +00:00
|
|
|
mode: 'immediate' | 'sync' = 'sync',
|
|
|
|
options: nopeSubscriptionOptions = {
|
|
|
|
mode: ['direct', 'sub', 'super']
|
|
|
|
}): nopeObserver<T> {
|
2020-08-21 14:49:48 +00:00
|
|
|
|
2020-08-25 22:11:26 +00:00
|
|
|
const _this = this;
|
|
|
|
|
|
|
|
let active = true;
|
|
|
|
|
2020-08-30 08:32:48 +00:00
|
|
|
let _observer: PartialObserver<T>;
|
|
|
|
|
|
|
|
if (typeof observer === 'object') {
|
|
|
|
_observer = {
|
|
|
|
next: (data: T) => {
|
|
|
|
if (active && observer.next) {
|
|
|
|
if (mode === 'immediate') {
|
|
|
|
callImmediate(observer.next, data, this._sender, this._timeStamp, ..._this._args);
|
|
|
|
} else {
|
|
|
|
observer.next(data, this._sender, this._timeStamp, ..._this._args);
|
|
|
|
}
|
2020-08-25 22:11:26 +00:00
|
|
|
}
|
2020-08-30 08:32:48 +00:00
|
|
|
},
|
|
|
|
complete: () => {
|
|
|
|
if (observer.complete) {
|
|
|
|
observer.complete();
|
|
|
|
}
|
|
|
|
},
|
|
|
|
error: (error) => {
|
|
|
|
if (observer.error) {
|
|
|
|
observer.error(error);
|
|
|
|
}
|
|
|
|
},
|
|
|
|
}
|
|
|
|
} else if (typeof observer === 'function') {
|
|
|
|
_observer = {
|
|
|
|
next: (data: T) => {
|
|
|
|
if (active) {
|
|
|
|
if (mode === 'immediate') {
|
|
|
|
callImmediate(observer, data, this._sender, this._timeStamp, ..._this._args);
|
|
|
|
} else {
|
|
|
|
observer(data, this._sender, this._timeStamp, ..._this._args);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
},
|
|
|
|
complete: () => {},
|
|
|
|
error: (error) => {},
|
|
|
|
}
|
|
|
|
}
|
2020-08-21 14:49:48 +00:00
|
|
|
|
2020-08-25 22:11:26 +00:00
|
|
|
// Create a Subscription.
|
|
|
|
const subscription = this.observable.subscribe(_observer);
|
|
|
|
|
|
|
|
const ret: nopeObserver<T> = Object.assign(subscription, {
|
|
|
|
options,
|
|
|
|
pause: () => {
|
|
|
|
active = false;
|
|
|
|
},
|
|
|
|
unpause: () => {
|
|
|
|
active = true;
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
return ret;
|
2020-08-21 14:49:48 +00:00
|
|
|
}
|
2020-08-25 10:33:33 +00:00
|
|
|
|
|
|
|
/**
|
2020-08-25 22:11:26 +00:00
|
|
|
* Additional Function for the Observable.
|
|
|
|
* @param next
|
2020-08-25 10:33:33 +00:00
|
|
|
* @param options
|
|
|
|
*/
|
2020-08-25 22:11:26 +00:00
|
|
|
public enhancedSubscription<K>(next: (data: K) => void, options: {
|
2020-08-25 10:33:33 +00:00
|
|
|
scope?: { [index: string]: any },
|
|
|
|
pipe?: pipe<T, K>
|
|
|
|
} = {}) {
|
|
|
|
let observable: Observable<K> = this as any as Observable<K>;
|
2020-08-25 22:11:26 +00:00
|
|
|
|
2020-08-25 10:33:33 +00:00
|
|
|
if (options.pipe) {
|
2020-08-25 22:11:26 +00:00
|
|
|
observable = options.pipe(options.scope, this.observable);
|
2020-08-25 10:33:33 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
const subscription = observable.subscribe({
|
|
|
|
next
|
|
|
|
});
|
|
|
|
|
2020-08-25 22:11:26 +00:00
|
|
|
return subscription;
|
|
|
|
}
|
2020-08-25 10:33:33 +00:00
|
|
|
|
2020-08-25 22:11:26 +00:00
|
|
|
/**
|
|
|
|
* Creates a Subscription for the value of the Observable. After one Update the Value will be deleted
|
|
|
|
* @param func Function which is called when new Datas are pushed
|
|
|
|
* @param mode Mode of the Subscription
|
|
|
|
* @param options Additional Options
|
|
|
|
*/
|
|
|
|
once(func: observableCallback<T>, mode?: 'sync' | 'immediate', options?: nopeSubscriptionOptions): nopeObserver<T> {
|
|
|
|
let ret: nopeObserver<T> = null;
|
|
|
|
|
|
|
|
ret = this.subscribe({
|
|
|
|
next: (...args) => {
|
|
|
|
ret.unsubscribe();
|
|
|
|
func(...args);
|
|
|
|
}
|
|
|
|
}, mode, options);
|
|
|
|
|
|
|
|
return ret;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Async Function to Wait for an Update
|
|
|
|
* @param mode Mode of the Subscription
|
|
|
|
* @param options Additional Options for the Wait Function.
|
|
|
|
*/
|
|
|
|
waitFor(testCallback: waitForCallback<T>, options: nopeWaitForOpitions = { testFirst: true }): Promise<T> {
|
|
|
|
const _this = this;
|
|
|
|
|
|
|
|
const _isAsync = testCallback[Symbol.toStringTag] === 'AsyncFunction';
|
|
|
|
|
|
|
|
if (_isAsync) {
|
|
|
|
let called = false;
|
|
|
|
return new Promise<T>(async (resolve, reject) => {
|
|
|
|
try {
|
|
|
|
if (options.testFirst && await testCallback(_this.getContent())) {
|
|
|
|
resolve(_this.getContent());
|
|
|
|
} else {
|
|
|
|
let subscription: nopeObserver<T> = null;
|
|
|
|
subscription = _this.subscribe({
|
|
|
|
next: async (content, ...args) => {
|
|
|
|
if (!called && await testCallback(content, ...args)) {
|
|
|
|
subscription.unsubscribe();
|
|
|
|
|
|
|
|
called = true;
|
|
|
|
|
|
|
|
resolve(content);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
} catch (e) {
|
|
|
|
reject(e);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
} else {
|
|
|
|
return new Promise<T>((resolve, reject) => {
|
|
|
|
try {
|
|
|
|
if (options.testFirst && testCallback(_this.getContent())) {
|
|
|
|
resolve(_this.getContent());
|
|
|
|
} else {
|
|
|
|
let subscription: nopeObserver<T> = null;
|
|
|
|
subscription = _this.subscribe({
|
|
|
|
next: (content, ...args) => {
|
|
|
|
if (testCallback(content, ...args)) {
|
|
|
|
subscription.unsubscribe();
|
|
|
|
resolve(content);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
} catch (e) {
|
|
|
|
reject(e);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Async Function to Wait for an Update
|
|
|
|
* @param mode Mode of the Subscription
|
|
|
|
* @param options Additional Options for the Wait Function.
|
|
|
|
*/
|
|
|
|
waitForUpdate(mode?: 'sync' | 'immediate', options?: nopeSubscriptionOptions): Promise<T> {
|
|
|
|
const _this = this;
|
|
|
|
return new Promise<T>((resolve, reject) => {
|
|
|
|
try {
|
|
|
|
_this.once((content) => resolve(content), mode, options);
|
|
|
|
} catch (e) {
|
|
|
|
reject(e);
|
|
|
|
}
|
|
|
|
})
|
2020-08-25 10:33:33 +00:00
|
|
|
}
|
2020-08-21 14:49:48 +00:00
|
|
|
}
|