nope/lib/observables/nopeObservable.ts
Martin Karkowski f32ac4ea7d using 'immediate' instead of 'async'
prevent publishing undefined messages.
2020-09-01 16:56:26 +02:00

379 lines
10 KiB
TypeScript

import { injectable } from 'inversify';
import { BehaviorSubject, CompletionObserver, ErrorObserver, NextObserver, Observable, Subscription } from 'rxjs';
import { generateId } from '../helpers/idMethods';
import { callImmediate } from '../helpers/runtimeMethods';
export type waitForCallback<T> = (content?: T | null, sender?: string, timeStamp?: number | null, ...data) => boolean | Promise<boolean>;
export interface nopeWaitForOpitions {
testFirst?: boolean;
subscriptionMode?: 'immediate' | 'sync';
triggerMode?: Array<'sub' | 'super' | 'direct'>;
}
export interface nopeObserver<T> extends Subscription {
options: nopeSubscriptionOptions
pause(): void;
unpause(): void;
}
export interface nopeSubscriptionOptions {
mode: Array<'sub' | 'super' | 'direct'>
}
export interface NopePartialObserver<T> {
next?: observableCallback<T>,
error?: (error: any) => void,
complete?: () => void,
}
export declare type PartialObserver<T> = (NextObserver<T> | ErrorObserver<T> | CompletionObserver<T>);
export type observableCallback<T> = (content: T | null, sender: string, timeStamp: number | null, ...data) => void;
export type pipe<T, K> = (scope: { [index: string]: any }, observable: Observable<T>) => Observable<K>
/**
* RsJX based Observable.
*
* Contains additional Functionalities like:
* - property with the current value
* - function to publish values. (wrapper for next)
* - enables performing a subscription with synced call or a immediate call.
*/
@injectable()
export class nopeObservable<T, S = T, G = T> {
public observable: BehaviorSubject<G> = new BehaviorSubject<G>(undefined);
public readonly id: string = generateId();
public _value: T;
public options: any = {};
/**
* Function to specify a Setter
*/
public setter: ((value: S | null, sender: string | null, timeStamp: number | null, ...data) => {
data: T | null,
valid: boolean,
}) | null = null;
/**
* Function to specify a Getter
*/
public getter: ((value: T | null) => G | null) | null = null;
protected _args: any[] = [];
protected _sender: string;
protected _timeStamp: number;
/**
* Function to
* @param value
* @param sender
* @param timeStamp
* @param data
*/
public setContent(value: S | null, sender: string | null = null, timeStamp: number | null = null, ...data): void {
// Change the Value.
if (this.setter !== null) {
const adapted = this.setter(value as S, sender, timeStamp);
if (!adapted.valid) {
return;
}
this._value = adapted.data;
} else {
// Adapt the Value if required.
this._value = value as any as T;
}
const valueToPublish = this.getContent();
/** Publish the Data */
if (!this.disablePublishing && this.observable.value !== valueToPublish) {
this._publish(valueToPublish, sender, timeStamp);
}
}
protected _updateSenderAndTimestamp(sender: string | null = null, timestamp: number | null = null,) {
// Define a Sender if required
if (sender === null) {
sender = this.id;
}
// Generate a Timestamp if required.
if (this.options.generateTimeStamp === true) {
timestamp = (timestamp === null) ? Date.now() : timestamp;
}
//
return {
sender,
timestamp,
}
}
/**
* Internal Function to Publish content
* @param value
* @param sender
* @param timestamp
* @param data
*/
protected _publish(value: G, sender: string | null = null, timestamp: number | null = null, ...data) {
// Only Proceed if Publishing is required.
if (this.disablePublishing === false) {
// Define a Sender if required
if (sender === null) {
sender = this.id;
}
// Generate a Timestamp if required.
if (this.options.generateTimeStamp === true) {
timestamp = (timestamp === null) ? Date.now() : timestamp;
}
// Get the Value.
this._args = data;
this._sender = sender;
this._timeStamp = timestamp;
this.observable.next(value);
}
}
/**
* Function to Force an Update.
* @param sender Sender, which initiated the Update
* @param timestamp The Timestamp of the Updaet
* @param data Additional Args.
*/
public forcePublish(sender: string | null = null, timestamp: number | null = null, ...data) {
return this._publish(this.getContent(), sender, timestamp, ...data);
}
/**
* A Set containing the Subscriptions
*/
public _subscriptions = new Set<() => void>();
/**
* Flag to Disable Publishing
*/
public disablePublishing = false;
/**
* Function, used to dispose the observable.
* Every item will be unsubscribed.
*/
public dispose(): void {
for (const _unsubscribe of this._subscriptions) {
_unsubscribe();
}
this._subscriptions.clear();
this.observable.closed = true;
}
/**
* Function to extract the Content.
* If a Getter is provided, the Getter will be used
* to Transform the item.
*/
public getContent(): G | null {
if (this.getter !== null)
return this.getter(this._value);
return this._value as any as G;
}
/**
* A Function to subscribe to updates of the Observable.
* @param observer The Observer. Could be a Function or a Partial Observer.
* @param mode The Mode of the Subscription
* @param options Additional Options.
*/
public subscribe(observer: NopePartialObserver<G> | observableCallback<G>,
mode: 'immediate' | 'sync' = 'sync',
options: nopeSubscriptionOptions = {
mode: ['direct', 'sub', 'super']
}): nopeObserver<G> {
const _this = this;
let active = true;
let _observer: PartialObserver<G>;
if (typeof observer === 'object') {
_observer = {
next: (data: G) => {
if (active && data !== undefined && observer.next) {
if (mode === 'immediate') {
callImmediate(observer.next, data, this._sender, this._timeStamp, ..._this._args);
} else {
observer.next(data, this._sender, this._timeStamp, ..._this._args);
}
}
},
complete: () => {
if (observer.complete) {
observer.complete();
}
},
error: (error) => {
if (observer.error) {
observer.error(error);
}
},
}
} else if (typeof observer === 'function') {
_observer = {
next: (data: G) => {
if (active && data !== undefined) {
if (mode === 'immediate') {
callImmediate(observer, data, this._sender, this._timeStamp, ..._this._args);
} else {
observer(data, this._sender, this._timeStamp, ..._this._args);
}
}
},
complete: () => { },
error: (error) => { },
}
}
// Create a Subscription.
const subscription = this.observable.subscribe(_observer);
const ret: nopeObserver<T> = Object.assign(subscription, {
options,
pause: () => {
active = false;
},
unpause: () => {
active = true;
}
});
return ret;
}
/**
* Create an enhanced Subscription of the Observable. Use the Pipes, to
* Define what should be subscribed.
* @param next The Next Function, used to transmit changes
* @param options The Options, used to determine the Enhancements.
*/
public enhancedSubscription<K>(next: (data: K) => void, options: {
scope?: { [index: string]: any },
pipe?: pipe<T | G, K>
} = {}) {
let observable: Observable<K> = this as any as Observable<K>;
if (options.pipe) {
observable = options.pipe(options.scope, this.observable);
}
const subscription = observable.subscribe({
next
});
return subscription;
}
/**
* Creates a Subscription for the value of the Observable. After one Update the Value will be deleted
* @param func Function which is called when new Datas are pushed
* @param mode Mode of the Subscription
* @param options Additional Options
*/
once(func: observableCallback<G>, mode?: 'sync' | 'immediate', options?: nopeSubscriptionOptions): nopeObserver<G> {
let ret: nopeObserver<G> = null;
ret = this.subscribe({
next: (...args) => {
ret.unsubscribe();
func(...args);
}
}, mode, options);
return ret;
}
/**
* Async Function to Wait for an Update
* @param mode Mode of the Subscription
* @param options Additional Options for the Wait Function.
*/
waitFor(testCallback: waitForCallback<G>, options: nopeWaitForOpitions = { testFirst: true }): Promise<G> {
const _this = this;
const _isAsync = testCallback[Symbol.toStringTag] === 'AsyncFunction';
if (_isAsync) {
let called = false;
return new Promise<G>(async (resolve, reject) => {
try {
if (options.testFirst && await testCallback(_this.getContent())) {
resolve(_this.getContent());
} else {
let subscription: nopeObserver<T> = null;
subscription = _this.subscribe({
next: async (content, ...args) => {
if (!called && await testCallback(content, ...args)) {
subscription.unsubscribe();
called = true;
resolve(content);
}
}
});
}
} catch (e) {
reject(e);
}
});
} else {
return new Promise<G>((resolve, reject) => {
try {
if (options.testFirst && testCallback(_this.getContent())) {
resolve(_this.getContent());
} else {
let subscription: nopeObserver<T> = null;
subscription = _this.subscribe({
next: (content, ...args) => {
if (testCallback(content, ...args)) {
subscription.unsubscribe();
resolve(content);
}
}
});
}
} catch (e) {
reject(e);
}
});
}
}
/**
* Async Function to Wait for an Update
* @param mode Mode of the Subscription
* @param options Additional Options for the Wait Function.
*/
waitForUpdate(mode?: 'sync' | 'immediate', options?: nopeSubscriptionOptions): Promise<G> {
const _this = this;
return new Promise<G>((resolve, reject) => {
try {
_this.once((content) => resolve(content), mode, options);
} catch (e) {
reject(e);
}
})
}
}