Adding Comments, shifiting Methods

This commit is contained in:
Martin Karkowski 2021-01-03 16:57:58 +01:00
parent 8f92b8c491
commit 2b6d90ffdc

View File

@ -2,7 +2,7 @@
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2020-10-12 18:52:00
* @modify date 2021-01-03 12:49:44
* @modify date 2021-01-03 16:17:40
* @desc [description]
*/
@ -1194,6 +1194,9 @@ export class nopeDispatcher implements INopeDispatcher {
}
}
/**
* Will dispose the Dispatcher. Must be called on exit for a clean exit. Otherwise it is defined as dirty exits
*/
public async dispose(): Promise<void> {
for (const task of Array.from(this._runningInternalRequestedTasks.keys())) {
this.cancelTask(task, new Error("Client going offline"));
@ -1519,6 +1522,36 @@ export class nopeDispatcher implements INopeDispatcher {
}
}
/**
* Creates an Event listener (if required)
*
* @protected
* @param {string} event The Event to Listen.
* @return {nopeObservable<IExternalEventMsg>} An Listener on the Communication Channel.
* @memberof nopeDispatcher
*/
protected _subscribeToEvent(event: string) {
const item = this._externallySubscribeObservables.get(event) || {
observable: this._generateObservable<IExternalEventMsg>(),
cb: () => {}
};
if (!item.observable.hasSubscriptions) {
const _this = this;
const cb = (data: IExternalEventMsg) => {
item.observable.setContent(data, _this.id);
};
this.communicator.onEvent(event, cb);
item.cb = cb;
}
// Set the Items.
this._externallySubscribeObservables.set(event, item);
return item.observable;
}
/**
* Function to register a Function in the Dispatcher
*
@ -1620,171 +1653,6 @@ export class nopeDispatcher implements INopeDispatcher {
return this._definedFunctions.delete(_id);
}
public unregisterObservable(
observable: INopeObservable<any> | string,
options: {
// Flag to enable / disable sending to registery
preventSendingToRegistery?: boolean;
} = {}
): boolean {
const _id =
typeof observable === "string"
? observable
: (observable.id as string) || "0";
if (!options.preventSendingToRegistery) {
// Publish the Available Services.
this._sendAvailableObservables();
if (this._logger?.enabledFor(Logger.DEBUG)) {
// If there is a Logger:
this._logger.debug(
"Dispatcher \"" + this.id + "\" unregistered: \"" + _id + "\""
);
}
}
return this._definedFunctions.delete(_id);
}
protected _externallySubscribeObservables: Map<
string,
{
observable: INopeObservable<IExternalEventMsg>;
cb: (...arg) => void;
}
>;
protected _internallySubscribeObservables: Map<
string,
Set<INopeObservable<any>>
>;
/**
* Creates an Event listener (if required)
*
* @protected
* @param {string} event The Event to Listen.
* @return {nopeObservable<IExternalEventMsg>} An Listener on the Communication Channel.
* @memberof nopeDispatcher
*/
protected _subscribeToEvent(event: string) {
const item = this._externallySubscribeObservables.get(event) || {
observable: this._generateObservable<IExternalEventMsg>(),
cb: () => {}
};
if (!item.observable.hasSubscriptions) {
const _this = this;
const cb = (data: IExternalEventMsg) => {
item.observable.setContent(data, _this.id);
};
this.communicator.onEvent(event, cb);
item.cb = cb;
}
// Set the Items.
this._externallySubscribeObservables.set(event, item);
return item.observable;
}
/**
* Function to unsubscribe from an event of the channel.
*
* @protected
* @param {string} event
* @memberof nopeDispatcher
*/
protected _unsubscribeEvent(event: string) {
const item = this._externallySubscribeObservables.get(event);
if (item) {
this.communicator.offEvent(event, item.cb);
// Dispose the Observable
const obs = this._externallySubscribeObservables.get(event).observable;
obs.dispose();
// Remove the Observable
this._externallySubscribeObservables.delete(event);
}
}
/**
* Helper Function to directly subscribe to a specific value.
* @param event The Event.
* @param callback The Callback used to subscribe to the event
* @param options Additional Options used to specify the Subscribing.
*/
public async subscribeToEvent<G = any, K = any>(
event: string,
callback: IObservableCallback<G>,
options: {
pipe?: {
pipe?: IPipe<IExternalEventMsg, K>;
scope?: { [index: string]: any };
};
preventSendingToRegistery?: boolean;
mode?: "immediate" | "sync";
subscriptionOptions?: INopeSubscriptionOptions;
}
) {
// Create a new observable:
const observable = this._generateObservable<G>();
// register the newly created observable.
this.registerObservable(observable, {
mode: "subscribe",
topic: event,
preventSendingToRegistery: options.preventSendingToRegistery,
pipe: options.pipe
});
// Create an Observer by susbcribing to the external source (this is directly linked to the System)
const observer = observable.subscribe(
callback,
options.mode,
options.subscriptionOptions
);
observer.unsubscribe = () => {
observable.dispose();
};
// Return the Observer.
return observer;
}
/**
* Function to manually emit an Event.
* @param _eventName
* @param data
* @param sender
* @param timestamp
* @param forced
* @param args
*/
public async emit<T>(
_eventName: string,
data: T,
sender?: string,
timestamp?: number,
forced = false,
...args
) {
// Only Publish data, if there exists a Subscription.
if (forced || (this.subscriptionExists(_eventName) && this.id !== sender)) {
// Use the Communicator to emit the Event.
await this.communicator.emitEvent(_eventName, {
data: data,
topic: _eventName,
sender: this.id,
type: "event",
timestamp
});
}
}
public registerObservable<T, K, S = T, G = T>(
observable: INopeObservable<T, S, G>,
options: IPropertyOptions
@ -1837,7 +1705,7 @@ export class nopeDispatcher implements INopeDispatcher {
observer.unsubscribe();
// Unsubscribe the Event
_this._unsubscribeEvent(_subTopic);
_this._unsubscribeObservable(_subTopic);
// Call the original Dispose function;
dispose.apply(observable);
@ -1867,7 +1735,7 @@ export class nopeDispatcher implements INopeDispatcher {
observer.unsubscribe();
// Unsubscribe the Event
_this._unsubscribeEvent(_subTopic);
_this._unsubscribeObservable(_subTopic);
// Call the original Dispose function;
dispose.apply(observable);
@ -1888,7 +1756,9 @@ export class nopeDispatcher implements INopeDispatcher {
? _sourceData
: this._lastPublishedEvent.get(_subTopic);
if (data) {
observable.setContent(data.data, _this.id, data.timestamp);
if (!observable.setContent(data.data, _this.id, data.timestamp)) {
observable.forcePublish();
}
}
}
}
@ -1987,7 +1857,7 @@ export class nopeDispatcher implements INopeDispatcher {
observer.unsubscribe();
// Unsubscribe the Event
_this._unsubscribeEvent(_subTopic);
_this._unsubscribeObservable(_subTopic);
// Unregister the Internally Subscribed Element.
const _set01 =
@ -2033,7 +1903,7 @@ export class nopeDispatcher implements INopeDispatcher {
observer.unsubscribe();
// Unsubscribe the Event
_this._unsubscribeEvent(_subTopic);
_this._unsubscribeObservable(_subTopic);
// Unregister the Internally Subscribed Element.
const _set01 =
@ -2081,6 +1951,141 @@ export class nopeDispatcher implements INopeDispatcher {
return observable;
}
public unregisterObservable(
observable: INopeObservable<any> | string,
options: {
// Flag to enable / disable sending to registery
preventSendingToRegistery?: boolean;
} = {}
): boolean {
const _id =
typeof observable === "string"
? observable
: (observable.id as string) || "0";
if (!options.preventSendingToRegistery) {
// Publish the Available Services.
this._sendAvailableObservables();
if (this._logger?.enabledFor(Logger.DEBUG)) {
// If there is a Logger:
this._logger.debug(
"Dispatcher \"" + this.id + "\" unregistered: \"" + _id + "\""
);
}
}
return this._definedFunctions.delete(_id);
}
protected _externallySubscribeObservables: Map<
string,
{
observable: INopeObservable<IExternalEventMsg>;
cb: (...arg) => void;
}
>;
protected _internallySubscribeObservables: Map<
string,
Set<INopeObservable<any>>
>;
/**
* Function to unsubscribe from an event of the channel.
*
* @protected
* @param {string} path
* @memberof nopeDispatcher
*/
protected _unsubscribeObservable(path: string) {
const item = this._externallySubscribeObservables.get(path);
if (item) {
this.communicator.offEvent(path, item.cb);
// Dispose the Observable
const obs = this._externallySubscribeObservables.get(path).observable;
obs.dispose();
// Remove the Observable
this._externallySubscribeObservables.delete(path);
}
}
/**
* Helper Function to directly subscribe to a specific value.
* @param event The Event.
* @param callback The Callback used to subscribe to the event
* @param options Additional Options used to specify the Subscribing.
*/
public async subscribeToEvent<G = any, K = any>(
event: string,
callback: IObservableCallback<G>,
options: {
pipe?: {
pipe?: IPipe<IExternalEventMsg, K>;
scope?: { [index: string]: any };
};
preventSendingToRegistery?: boolean;
mode?: "immediate" | "sync";
subscriptionOptions?: INopeSubscriptionOptions;
}
) {
// Create a new observable:
const observable = this._generateObservable<G>();
// register the newly created observable.
this.registerObservable(observable, {
mode: "subscribe",
topic: event,
preventSendingToRegistery: options.preventSendingToRegistery,
pipe: options.pipe
});
// Create an Observer by susbcribing to the external source (this is directly linked to the System)
const observer = observable.subscribe(
callback,
options.mode,
options.subscriptionOptions
);
observer.unsubscribe = () => {
observable.dispose();
};
// Return the Observer.
return observer;
}
/**
* Function to manually emit an Event.
* @param _eventName
* @param data
* @param sender
* @param timestamp
* @param forced
* @param args
*/
public async emit<T>(
_eventName: string,
data: T,
sender?: string,
timestamp?: number,
forced = false,
...args
) {
// Only Publish data, if there exists a Subscription.
if (forced || (this.subscriptionExists(_eventName) && this.id !== sender)) {
// Use the Communicator to emit the Event or its forced
await this.communicator.emitEvent(_eventName, {
data: data,
topic: _eventName,
sender: this.id,
type: "event",
timestamp
});
}
}
protected _removeRpcSubscription(_id: string) {
// Try to unregister the Callback from the communcator:
if (this._communicatorCallbacks.has(_id)) {
@ -2235,11 +2240,12 @@ export class nopeDispatcher implements INopeDispatcher {
if (_this._runningInternalRequestedTasks.has(_taskId)) {
const task = _this._runningInternalRequestedTasks.get(_taskId);
// Remove the Timeout.
if (task.timeout) {
clearTimeout(task.timeout);
}
// Remove the Timeout.
// Remove the Task itself
_this._runningInternalRequestedTasks.delete(_taskId);
}