2517 lines
74 KiB
TypeScript
2517 lines
74 KiB
TypeScript
/**
|
|
* @author Martin Karkowski
|
|
* @email m.karkowski@zema.de
|
|
* @create date 2020-10-12 18:52:00
|
|
* @modify date 2021-01-08 10:56:04
|
|
* @desc [description]
|
|
*/
|
|
|
|
import * as Logger from "js-logger";
|
|
import { ILogger } from "js-logger";
|
|
import { generateId } from "../helpers/idMethods";
|
|
import { RUNNINGINNODE } from "../helpers/runtimeMethods";
|
|
import { getNopeLogger } from "../logger/getLogger";
|
|
import { NopeGenericModule } from "../module/GenericModule";
|
|
import { NopePromise } from "../promise/nopePromise";
|
|
import {
|
|
IAvailableInstanceGeneratorsMsg,
|
|
IAvailableServicesMsg,
|
|
IAvailableTopicsMsg,
|
|
ICallOptions,
|
|
ICommunicationInterface,
|
|
IExternalEventMsg,
|
|
IInstanceCreationMsg,
|
|
IInstanceDescriptionMsg,
|
|
IInstanceRemovalMsg,
|
|
INopeDispatcherOptions,
|
|
IRequestTaskMsg,
|
|
IResponseTaskMsg,
|
|
ITaskCancelationMsg
|
|
} from "../types/nope/nopeCommunication.interface";
|
|
import {
|
|
ENopeDispatcherStatus,
|
|
IDispatcherInfo,
|
|
IGenerateRemoteInstanceCallback,
|
|
IGenerateRemoteInstanceForOtherDispatcherCallback,
|
|
INopeDispatcher
|
|
} from "../types/nope/nopeDispatcher.interface";
|
|
import {
|
|
INopeModule,
|
|
INopeModuleDescription,
|
|
IPropertyOptions
|
|
} from "../types/nope/nopeModule.interface";
|
|
import {
|
|
INopeObservable,
|
|
INopeSubscriptionOptions,
|
|
IObservableCallback,
|
|
IPipe
|
|
} from "../types/nope/nopeObservable.interface";
|
|
import { INopePromise } from "../types/nope/nopePromise.interface";
|
|
|
|
/**
|
|
* A Dispatcher to perform a function on a Remote
|
|
* Dispatcher. Therefore a Task is created and forwarded
|
|
* to the remote.
|
|
*
|
|
* @export
|
|
* @class nopeDispatcher
|
|
*/
|
|
// @injectable()
|
|
export class nopeDispatcher implements INopeDispatcher {
|
|
public readonly id: string;
|
|
|
|
protected _logger: ILogger;
|
|
|
|
/**
|
|
* Internal Element to store the registered Functions
|
|
*
|
|
* @protected
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
protected _definedFunctions: Map<string, (...args) => Promise<any>>;
|
|
protected _remotlyCalledFunctions: Set<string>;
|
|
protected _communicatorCallbacks: Map<
|
|
string,
|
|
{
|
|
registeredId: string;
|
|
type: "request" | "response";
|
|
cb: (data) => any;
|
|
}
|
|
>;
|
|
public readonly communicator: ICommunicationInterface;
|
|
protected _externalDispatchers: Map<string, IDispatcherInfo>;
|
|
|
|
protected _mappingOfRemoteDispatchersAndServices: Map<
|
|
string,
|
|
IAvailableServicesMsg
|
|
>;
|
|
protected _externalProvidedServices: Set<string>;
|
|
|
|
protected _mappingOfRemoteDispatchersAndGenerators: Map<
|
|
string,
|
|
IAvailableInstanceGeneratorsMsg
|
|
>;
|
|
protected _externalProvidedGenerators: Set<string>;
|
|
|
|
public methodInterfaceWithOptions: {
|
|
[index: string]: <T>(options: ICallOptions, ...args) => INopePromise<T>;
|
|
};
|
|
public methodInterface: { [index: string]: <T>(...args) => INopePromise<T> };
|
|
|
|
protected _mappingOfRemoteDispatchersAndPropsOrEvents: Map<
|
|
string,
|
|
IAvailableTopicsMsg
|
|
>;
|
|
protected _externalSubscribed: Set<string>;
|
|
protected _externalPublished: Set<string>;
|
|
|
|
protected _mappingOfRemoteDispatchersAndInstances: Map<
|
|
string,
|
|
INopeModuleDescription[]
|
|
>;
|
|
protected _externalInstancesNames: Set<string>;
|
|
protected _externalInstances: Map<string, INopeModuleDescription>;
|
|
|
|
protected _eventsToSendCurrentValueOnSubscription: Map<
|
|
string,
|
|
Set<INopeObservable<any>>
|
|
>;
|
|
|
|
protected _lastPublishedEvent: Map<string, IExternalEventMsg>;
|
|
|
|
public readonly subscribedEvents: INopeObservable<string[]>;
|
|
public readonly publishedEvents: INopeObservable<string[]>;
|
|
public readonly canceledTask: INopeObservable<ITaskCancelationMsg>;
|
|
public readonly ready: INopeObservable<boolean>;
|
|
public readonly availableInstances: INopeObservable<INopeModuleDescription[]>;
|
|
public readonly externalDispatchers: INopeObservable<IDispatcherInfo[]>;
|
|
|
|
/**
|
|
* Internal Element to store the running tasks.
|
|
*
|
|
* @protected
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
protected _runningInternalRequestedTasks: Map<
|
|
string,
|
|
{
|
|
resolve: (value: any) => void;
|
|
reject: (error: any) => void;
|
|
clear: () => void;
|
|
serviceName: string;
|
|
timeout?: any;
|
|
}
|
|
>;
|
|
|
|
protected _runningExternalRequestedTasks: Set<string>;
|
|
protected timeouts: {
|
|
sendAliveInterval: number;
|
|
checkInterval: number;
|
|
slow: number;
|
|
warn: number;
|
|
dead: number;
|
|
remove: number;
|
|
};
|
|
|
|
readonly _subscriptionMode: "individual" | "generic";
|
|
readonly _publishingMode: "individual" | "generic";
|
|
|
|
/**
|
|
* Creates an instance of nopeDispatcher.
|
|
* @param {nopeRpcDispatcherOptions} options The Options, used by the Dispatcher.
|
|
* @param {() => INopeObservable<IExternalEventMsg>} _generateObservable A Helper, to generate Observables.
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
constructor(
|
|
public options: INopeDispatcherOptions,
|
|
protected _generateObservable: <T>() => INopeObservable<T>
|
|
) {
|
|
this.communicator = options.communicator;
|
|
|
|
this.id = generateId();
|
|
|
|
if (options.logger) {
|
|
this._logger = getNopeLogger("dispatcher " + this.id, "info");
|
|
}
|
|
|
|
// Define the Timeouts.
|
|
if (options.timeouts) {
|
|
this.timeouts = options.timeouts;
|
|
} else {
|
|
this.timeouts = {
|
|
sendAliveInterval: 500,
|
|
checkInterval: 250,
|
|
slow: 1000,
|
|
warn: 2000,
|
|
dead: 5000,
|
|
remove: 10000
|
|
};
|
|
}
|
|
|
|
this._subscriptionMode = this.communicator.subscriptionMode || "generic";
|
|
this._publishingMode = this.communicator.resultSharing || "generic";
|
|
|
|
/**
|
|
* Define A Proxy for accessing methods easier.
|
|
*/
|
|
const _this = this;
|
|
const _handlerWithOptions = {
|
|
get(target, name) {
|
|
return (options: ICallOptions, ...args) => {
|
|
return _this.performCall(name, args, options);
|
|
};
|
|
}
|
|
};
|
|
const _handlerWithoutOptions = {
|
|
get(target, name) {
|
|
return (...args) => {
|
|
return _this.performCall(name, args);
|
|
};
|
|
}
|
|
};
|
|
this.methodInterfaceWithOptions = new Proxy({}, _handlerWithOptions);
|
|
this.methodInterface = new Proxy({}, _handlerWithoutOptions);
|
|
|
|
// Define the Observables provided by the dispatcher.
|
|
this.subscribedEvents = this._generateObservable();
|
|
this.subscribedEvents.setContent([]);
|
|
this.publishedEvents = this._generateObservable();
|
|
this.publishedEvents.setContent([]);
|
|
this.canceledTask = this._generateObservable();
|
|
|
|
// Flag to show if the system is ready or not.
|
|
this.ready = this._generateObservable();
|
|
this.ready.setContent(false);
|
|
|
|
// Holding all available instances.
|
|
this.availableInstances = this._generateObservable();
|
|
this.availableInstances.setContent([]);
|
|
|
|
// Observable containing all Dispatcher Informations.
|
|
this.externalDispatchers = this._generateObservable();
|
|
this.externalDispatchers.setContent([]);
|
|
|
|
if (this._logger) {
|
|
this._logger.info("Dispatcher online. -> Reseting and Initializing");
|
|
}
|
|
|
|
this.reset();
|
|
this._init().catch((error) => {
|
|
if (_this._logger) {
|
|
_this._logger.error("Failed to intialize the Dispatcher", error);
|
|
}
|
|
});
|
|
}
|
|
|
|
provideInstanceGeneratorForExternalDispatchers<I extends INopeModule>(
|
|
identifier: string,
|
|
cb: IGenerateRemoteInstanceForOtherDispatcherCallback<I>
|
|
): void {
|
|
const _this = this;
|
|
|
|
if (this._logger?.enabledFor(Logger.DEBUG)) {
|
|
this._logger.debug(
|
|
"Adding instance generator for \"" +
|
|
identifier +
|
|
"\" to external Generators. Other Elements can now create instances of this type."
|
|
);
|
|
}
|
|
|
|
const _cb = this.registerFunction(
|
|
async (data: IInstanceCreationMsg) => {
|
|
// Check if an instance exists or not.
|
|
// if not => create an instance an store it.
|
|
if (!_this._instances.has(data.identifier)) {
|
|
// Create an Instance
|
|
const _instance = await cb(_this, data.identifier);
|
|
|
|
// Make shure the Data is expressed as Array.
|
|
if (!Array.isArray(data.params)) {
|
|
data.params = [data.params];
|
|
}
|
|
|
|
// Initialize the instance with the parameters.
|
|
await _instance.init(...data.params);
|
|
|
|
// A Function is registered, taking care of removing
|
|
// an instances, if it isnt needed any more.
|
|
_this.registerFunction(
|
|
async (_data: IInstanceRemovalMsg) => {
|
|
if (_this._instances.get(data.identifier)?.usedBy) {
|
|
// Get the Index of the dispatcher, which is using
|
|
// the element
|
|
const idx = _this._instances
|
|
.get(data.identifier)
|
|
.usedBy.indexOf(_data.dispatcherID);
|
|
|
|
if (idx > -1) {
|
|
_this._instances.get(data.identifier).usedBy.splice(idx, 1);
|
|
}
|
|
|
|
if (_this._instances.get(data.identifier).usedBy.length == 0) {
|
|
// Unmark as internal instance
|
|
_this._internalInstances.delete(data.identifier);
|
|
|
|
// Remove the Instance.
|
|
await _instance.dispose();
|
|
|
|
// Delete the Entry.
|
|
_this._instances.delete(data.identifier);
|
|
|
|
// Remove the Function itself
|
|
_this.unregisterFunction(
|
|
"instance_dispose_" + data.identifier
|
|
);
|
|
}
|
|
}
|
|
},
|
|
{
|
|
deleteAfterCalling: false,
|
|
id: "instance_dispose_" + data.identifier
|
|
}
|
|
);
|
|
|
|
// Store the Instance.
|
|
_this._instances.set(data.identifier, {
|
|
instance: _instance,
|
|
usedBy: [data.dispatcherID]
|
|
});
|
|
|
|
_this._internalInstances.add(data.identifier);
|
|
|
|
// Update the available instances:
|
|
_this._sendAvailableInstances();
|
|
} else {
|
|
// If an Element exists => Add the Element.
|
|
_this._instances.get(data.identifier).usedBy.push(data.dispatcherID);
|
|
}
|
|
|
|
// Define the Response.
|
|
const response: IInstanceDescriptionMsg = {
|
|
description: _this._instances
|
|
.get(data.identifier)
|
|
.instance.toDescription(),
|
|
type: data.type
|
|
};
|
|
|
|
// Send the Response
|
|
return response;
|
|
},
|
|
{
|
|
id: "generateInstance_" + identifier
|
|
}
|
|
);
|
|
// Store the Generator.
|
|
this._externalGenerators.set(identifier, _cb);
|
|
|
|
// Send an update of the available Generators
|
|
this._sendAvailableGenerators();
|
|
}
|
|
|
|
unprovideInstanceGeneratorForExternalDispatchers(identifier: string): void {
|
|
if (this._externalGenerators.has(identifier)) {
|
|
if (this._logger?.enabledFor(Logger.DEBUG)) {
|
|
this._logger.debug(
|
|
"Removing instance generator for \"" +
|
|
identifier +
|
|
"\" from external Generators. Other Elements cant create instances of this type anymore."
|
|
);
|
|
}
|
|
|
|
this.unregisterFunction(this._externalGenerators.get(identifier));
|
|
this._externalGenerators.delete(identifier);
|
|
|
|
// Send an update of the available Generators
|
|
this._sendAvailableGenerators();
|
|
}
|
|
}
|
|
|
|
registerInternalInstanceGenerator<I extends INopeModule>(
|
|
identifier: string,
|
|
cb: IGenerateRemoteInstanceCallback<I>
|
|
): void {
|
|
if (this._logger?.enabledFor(Logger.DEBUG)) {
|
|
this._logger.debug(
|
|
"Adding instance generator for \"" +
|
|
identifier +
|
|
"\" as internal Generator. This Generator wont be used externally."
|
|
);
|
|
}
|
|
|
|
this._internalGenerators.set(identifier, cb);
|
|
}
|
|
|
|
unregisterInternalInstanceGenerator(identifier: string): void {
|
|
if (this._logger?.enabledFor(Logger.DEBUG)) {
|
|
this._logger.debug(
|
|
"Rmoving instance generator for \"" +
|
|
identifier +
|
|
"\" from internal Generator. The sytem cant create elements of this type any more."
|
|
);
|
|
}
|
|
|
|
this._internalGenerators.delete(identifier);
|
|
}
|
|
|
|
protected _internalGenerators: Map<
|
|
string,
|
|
IGenerateRemoteInstanceCallback<INopeModule>
|
|
>;
|
|
protected _externalGenerators: Map<
|
|
string,
|
|
IGenerateRemoteInstanceCallback<INopeModule>
|
|
>;
|
|
protected _instances: Map<
|
|
string,
|
|
{
|
|
instance: INopeModule;
|
|
usedBy: Array<string>;
|
|
}
|
|
>;
|
|
protected _internalInstances: Set<string>;
|
|
|
|
public async generateInstance<I extends INopeModule>(
|
|
description: Partial<IInstanceCreationMsg>
|
|
): Promise<I> {
|
|
// Define the Default Description
|
|
// which will lead to an error.
|
|
const _defDescription: IInstanceCreationMsg = {
|
|
dispatcherID: this.id,
|
|
identifier: "error",
|
|
params: [],
|
|
type: "unkown"
|
|
};
|
|
|
|
// Assign the provided Description
|
|
const _description = Object.assign(_defDescription, description, {
|
|
dispatcherID: this.id
|
|
}) as IInstanceCreationMsg;
|
|
if (
|
|
_defDescription.type === "unkown" ||
|
|
_description.identifier === "error"
|
|
) {
|
|
throw Error(
|
|
"Please Provide at least a \"type\" and \"identifier\" in the paremeters"
|
|
);
|
|
}
|
|
|
|
if (this._logger?.enabledFor(Logger.DEBUG)) {
|
|
this._logger.debug(
|
|
"Requesting an Instance of type: \"" +
|
|
_defDescription.type +
|
|
"\" with the identifier: \"" +
|
|
_defDescription.identifier +
|
|
"\""
|
|
);
|
|
}
|
|
|
|
if (this._instances.has(_description.identifier)) {
|
|
if (this._logger?.enabledFor(Logger.DEBUG)) {
|
|
this._logger.debug(
|
|
"Already created instance with the identifiert: \"" +
|
|
_defDescription.identifier +
|
|
"\" => returning this instance"
|
|
);
|
|
}
|
|
|
|
// Add the Dispatcher to the Element:
|
|
this._instances
|
|
.get(_description.identifier)
|
|
.usedBy.push(_description.dispatcherID);
|
|
|
|
// Return the Instance.
|
|
return this._instances.get(_description.identifier).instance as I;
|
|
}
|
|
|
|
try {
|
|
let _type = _description.type;
|
|
|
|
if (!this._internalGenerators.has(_type)) {
|
|
// No default type is present for a remote
|
|
// => assing the default type which is "*""
|
|
_type = "*";
|
|
}
|
|
|
|
if (!this.generatorExists(_description.type)) {
|
|
throw Error("Generator isnt present in the network!");
|
|
}
|
|
|
|
if (this._internalGenerators.has(_type)) {
|
|
if (this._logger?.enabledFor(Logger.DEBUG)) {
|
|
this._logger.debug(
|
|
"No instance with the identifiert: \"" +
|
|
_defDescription.identifier +
|
|
"\" found, but an internal generator is available. Using the internal one for creating the instance and requesting the \"real\" instance externally"
|
|
);
|
|
}
|
|
|
|
const result = await this.performCall<IInstanceDescriptionMsg>(
|
|
"generateInstance_" + _description.type,
|
|
[_description],
|
|
{
|
|
paramsHasNoCallback: true
|
|
}
|
|
);
|
|
|
|
if (this._logger?.enabledFor(Logger.DEBUG)) {
|
|
this._logger.debug("Received a description for the instance");
|
|
}
|
|
|
|
// Create the Instance
|
|
const instance = (await this._internalGenerators.get(_type)(
|
|
this,
|
|
result.description
|
|
)) as I;
|
|
|
|
// Store the Instances.
|
|
this._instances.set(_description.identifier, {
|
|
instance,
|
|
usedBy: [_description.dispatcherID]
|
|
});
|
|
|
|
return instance;
|
|
}
|
|
|
|
throw Error("No generator Available!");
|
|
} catch (e) {
|
|
if (this._logger) {
|
|
this._logger.error(
|
|
"During creating an Instance, the following error Occurd"
|
|
);
|
|
this._logger.error(e);
|
|
}
|
|
throw e;
|
|
}
|
|
}
|
|
|
|
public async deleteInstance<I extends INopeModule>(
|
|
instance: I | string,
|
|
preventSendingUpdate = false
|
|
): Promise<boolean> {
|
|
// Block to find the instance.
|
|
// Based on the property (string or instance)
|
|
// the corresponding instance object has to be select.
|
|
let _instance: { instance: INopeModule; usedBy: Array<string> };
|
|
if (typeof instance === "string") {
|
|
_instance = this._instances.get(instance);
|
|
} else {
|
|
for (const data of this._instances.values()) {
|
|
if (instance == data.instance) {
|
|
_instance = data;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
// if the instance has been found => delete the instance.
|
|
if (_instance) {
|
|
_instance.usedBy.pop();
|
|
|
|
if (_instance.usedBy.length === 0) {
|
|
const params: IInstanceRemovalMsg = {
|
|
dispatcherID: this.id,
|
|
identifier: _instance.instance.identifier
|
|
};
|
|
|
|
// Call the corresponding Dispose Function for the "real" instance
|
|
// All other elements are just accessors.
|
|
await this.performCall(
|
|
"instance_dispose_" + _instance.instance.identifier,
|
|
[params]
|
|
);
|
|
|
|
// Delete the Identifier
|
|
this._instances.delete(_instance.instance.identifier);
|
|
|
|
// Check if an update should be emitted or not.
|
|
if (!preventSendingUpdate) {
|
|
// Update the Instances provided by this module.
|
|
this._sendAvailableInstances();
|
|
}
|
|
|
|
// Dispose the Handler;
|
|
await _instance.instance.dispose();
|
|
}
|
|
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
/**
|
|
* Internal Method to handle some requests.
|
|
*
|
|
* @protected
|
|
* @param {IRequestTaskMsg} data The provided data of the request
|
|
* @param {*} [_function=this._definedFunctions.get(data.functionId)] The Function can be provided
|
|
* @return {Promise<void>}
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
protected async _handleExternalRequest(
|
|
data: IRequestTaskMsg,
|
|
_function?: (...args) => Promise<any>
|
|
): Promise<void> {
|
|
try {
|
|
// Try to get the function if not provided:
|
|
if (typeof _function !== "function") {
|
|
_function = this._definedFunctions.get(data.functionId);
|
|
}
|
|
|
|
if (this._logger?.enabledFor(Logger.DEBUG)) {
|
|
// If there is a Logger:
|
|
this._logger.debug(
|
|
"Dispatcher \"" +
|
|
this.id +
|
|
"\" received request: \"" +
|
|
data.functionId +
|
|
"\" -> task: \"" +
|
|
data.taskId +
|
|
"\""
|
|
);
|
|
}
|
|
|
|
const _this = this;
|
|
|
|
if (typeof _function === "function") {
|
|
// Callbacks
|
|
const cbs: Array<(reason) => void> = [];
|
|
|
|
const observer = _this.canceledTask.subscribe((cancelEvent) => {
|
|
if (cancelEvent.taskId == data.taskId) {
|
|
// Call Every Callback.
|
|
cbs.map((cb) => cb(cancelEvent.reason));
|
|
|
|
// Although we are allowed to Cancel the Subscription
|
|
observer.unsubscribe();
|
|
}
|
|
});
|
|
|
|
// Only if the Function is present extract the arguments etc.
|
|
const args = [];
|
|
|
|
// First extract the basic arguments
|
|
data.params.map((item) => (args[item.idx] = item.data));
|
|
|
|
// Add the Callbacks. Therefore create a function which will
|
|
// trigger the remote.
|
|
data.callbacks.map(
|
|
(options) =>
|
|
(args[options.idx] = async (..._args) => {
|
|
// And Create the Task and its Promise.
|
|
const servicePromise = _this.performCall<any>(
|
|
options.functionId,
|
|
_args,
|
|
options
|
|
);
|
|
|
|
const cancelCallback = (reason) => {
|
|
// The Main Task has been canceled =>
|
|
// We are allowed to canel the Subtask as well.
|
|
servicePromise.cancel(reason);
|
|
};
|
|
cbs.push(cancelCallback);
|
|
|
|
// Await the Result. If an Task is canceled => The Error is Thrown.
|
|
const result = await servicePromise;
|
|
|
|
// Remove the Index
|
|
cbs.splice(cbs.indexOf(cancelCallback), 1);
|
|
|
|
return result;
|
|
})
|
|
);
|
|
|
|
// Perform the Task it self.
|
|
const _resultPromise = _function(...args);
|
|
|
|
if (
|
|
typeof (_resultPromise as INopePromise<any>).cancel === "function"
|
|
) {
|
|
// Push the Callback to the Result.
|
|
cbs.push((reason) =>
|
|
(_resultPromise as INopePromise<any>).cancel(reason)
|
|
);
|
|
}
|
|
|
|
// Wait for the Result to finish.
|
|
const _result = await _resultPromise;
|
|
|
|
// Define the Result message
|
|
const result: IResponseTaskMsg = {
|
|
result: typeof _result !== "undefined" ? _result : null,
|
|
taskId: data.taskId,
|
|
type: "response"
|
|
};
|
|
|
|
// Use the communicator to publish the result.
|
|
await this.communicator.emitRpcResponse(data.resultSink, result);
|
|
}
|
|
} catch (error) {
|
|
if (this._logger) {
|
|
// If there is a Logger:
|
|
this._logger.error(
|
|
"Dispatcher \"" +
|
|
this.id +
|
|
"\" failed with request: \"" +
|
|
data.taskId +
|
|
"\""
|
|
);
|
|
this._logger.error(error);
|
|
}
|
|
|
|
// An Error occourd => Forward the Error.
|
|
const result: IResponseTaskMsg = {
|
|
error: {
|
|
error,
|
|
msg: error.toString()
|
|
},
|
|
taskId: data.taskId,
|
|
type: "response"
|
|
};
|
|
|
|
// Send the Error via the communicator to the remote.
|
|
await this.communicator.emitRpcResponse(data.resultSink, result);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Internal Function to handle responses. In Generale,
|
|
* the dispatcher checks if there is an open task with
|
|
* the provided id. If so => finish the promise.
|
|
*
|
|
* @protected
|
|
* @param {IResponseTaskMsg} data The Data provided to handle the Response.
|
|
* @return {boolean} Returns a boolean, indicating whether a corresponding task was found or not.
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
protected _handleExternalResponse(data: IResponseTaskMsg): boolean {
|
|
try {
|
|
// Extract the Task
|
|
const task = this._runningInternalRequestedTasks.get(data.taskId);
|
|
|
|
// Delete the Task:
|
|
this._runningInternalRequestedTasks.delete(data.taskId);
|
|
|
|
// Based on the Result of the Remote => proceed.
|
|
// Either throw an error or forward the result
|
|
if (task && data.error) {
|
|
if (this._logger) {
|
|
this._logger.error("Failed with task " + data.taskId);
|
|
this._logger.error("Reason: " + data.error.msg);
|
|
this._logger.error(data.error);
|
|
}
|
|
|
|
task.reject(data.error);
|
|
|
|
// Clearout the Timer
|
|
if (task.timeout) {
|
|
clearTimeout(task.timeout);
|
|
}
|
|
|
|
return true;
|
|
}
|
|
if (task) {
|
|
task.resolve(data.result);
|
|
|
|
// Clearout the Timer
|
|
if (task.timeout) {
|
|
clearTimeout(task.timeout);
|
|
}
|
|
|
|
return true;
|
|
}
|
|
} catch (e) {}
|
|
|
|
return false;
|
|
}
|
|
|
|
protected _checkInterval: any = null;
|
|
protected _sendInterval: any = null;
|
|
|
|
/**
|
|
* Internal Function, used to initialize the Dispatcher.
|
|
* It subscribes to the "Messages" of the communicator.
|
|
*
|
|
* @protected
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
protected async _init(): Promise<void> {
|
|
const _this = this;
|
|
|
|
// Wait until the Element is connected.
|
|
await this.communicator.connected.waitFor((value) => value, {
|
|
testCurrent: true
|
|
});
|
|
|
|
// Setup Test Intervals:
|
|
if (this.timeouts.checkInterval > 0) {
|
|
// Define a Checker, which will test the status
|
|
// of the external Dispatchers.
|
|
this._checkInterval = setInterval(
|
|
() => _this._checkDispachterHealth(),
|
|
this.timeouts.checkInterval
|
|
);
|
|
}
|
|
if (this.timeouts.sendAliveInterval > 0) {
|
|
// Define a Timer, which will emit Status updates with
|
|
// the disered delay.
|
|
this._sendInterval = setInterval(
|
|
() => _this.communicator.emitStatusUpdate(_this._genAliveMessage()),
|
|
this.timeouts.sendAliveInterval
|
|
);
|
|
}
|
|
|
|
this.communicator.connected.subscribe((connected) => {
|
|
// Handle an unconnect.
|
|
if (connected) _this.communicator.emitBonjour(_this._genAliveMessage());
|
|
});
|
|
|
|
this.registerInternalInstanceGenerator(
|
|
"*",
|
|
async (dispather, description) => {
|
|
const mod = new NopeGenericModule(dispather, _this._generateObservable);
|
|
await mod.fromDescription(description, "overwrite");
|
|
return mod;
|
|
}
|
|
);
|
|
|
|
// Based on the Mode of the Subscription =>
|
|
// either create indivdual topics for the methods
|
|
// or use the generice function.
|
|
switch (this._subscriptionMode) {
|
|
case "individual":
|
|
// Iterate over the Defined Functions.
|
|
for (const [id, cb] of this._definedFunctions.entries()) {
|
|
// Subscribe the Function
|
|
this._subscribeToService(id, cb);
|
|
}
|
|
break;
|
|
case "generic":
|
|
// Add a generic Subscription for callbacks:
|
|
this.communicator.onRpcRequest("request", (data: IRequestTaskMsg) => {
|
|
if (data.type === "requestOfTask") {
|
|
_this._handleExternalRequest(data);
|
|
}
|
|
});
|
|
break;
|
|
}
|
|
|
|
// If the Responses are shared using the Generic Methods => subscribe to Responses
|
|
// On the generic "response" channel.
|
|
if (this._publishingMode === "generic") {
|
|
this.communicator.onRpcResponse("response", (data) => {
|
|
if (data.type === "response") {
|
|
_this._handleExternalResponse(data);
|
|
}
|
|
});
|
|
}
|
|
|
|
// Subscribe to the availableServices of Remotes.
|
|
// If there is a new Service => udpate the External Services
|
|
this.communicator.onNewServicesAvailable((data) => {
|
|
try {
|
|
if (data.dispatcher !== _this.id) {
|
|
_this._mappingOfRemoteDispatchersAndServices.set(
|
|
data.dispatcher,
|
|
data
|
|
);
|
|
_this._updateExternalServices();
|
|
if (this._logger?.enabledFor(Logger.DEBUG)) {
|
|
// If there is a Logger:
|
|
this._logger.debug(
|
|
this.id,
|
|
"received new services from",
|
|
data.dispatcher
|
|
);
|
|
}
|
|
}
|
|
} catch (e) {}
|
|
});
|
|
|
|
// Subscribe to new available Topics.
|
|
this.communicator.onNewObservablesAvailable((data) => {
|
|
try {
|
|
if (data.dispatcher !== _this.id) {
|
|
_this._mappingOfRemoteDispatchersAndPropsOrEvents.set(
|
|
data.dispatcher,
|
|
data
|
|
);
|
|
_this._updateExternalObservables(data.dispatcher);
|
|
if (this._logger?.enabledFor(Logger.DEBUG)) {
|
|
// If there is a Logger:
|
|
this._logger.debug(
|
|
this.id,
|
|
"received new Observables from",
|
|
data.dispatcher
|
|
);
|
|
}
|
|
}
|
|
} catch (e) {}
|
|
});
|
|
|
|
this.communicator.onNewInstanceGeneratorsAvailable((data) => {
|
|
try {
|
|
_this._mappingOfRemoteDispatchersAndGenerators.set(
|
|
data.dispatcher,
|
|
data
|
|
);
|
|
_this._updateExternalGenerators();
|
|
if (this._logger?.enabledFor(Logger.DEBUG)) {
|
|
// If there is a Logger:
|
|
this._logger.debug(
|
|
this.id,
|
|
"received new generators from",
|
|
data.dispatcher
|
|
);
|
|
}
|
|
} catch (e) {}
|
|
});
|
|
|
|
this.communicator.onStatusUpdate((info) => {
|
|
_this._externalDispatchers.set(info.id, info);
|
|
_this.externalDispatchers.setContent(
|
|
Array.from(_this._externalDispatchers.values())
|
|
);
|
|
});
|
|
|
|
this.communicator.onBonjour((info) => {
|
|
_this._externalDispatchers.set(info.id, info);
|
|
_this.externalDispatchers.setContent(
|
|
Array.from(_this._externalDispatchers.values())
|
|
);
|
|
|
|
if (_this.id !== info.id) {
|
|
_this._sendAvailableServices();
|
|
_this._sendAvailableObservables();
|
|
_this._sendAvailableGenerators();
|
|
_this._sendAvailableInstances();
|
|
|
|
if (_this._logger?.enabledFor(Logger.DEBUG)) {
|
|
// If there is a Logger:
|
|
_this._logger.debug(
|
|
"Remote Dispatcher \"" + info.id + "\" went online"
|
|
);
|
|
}
|
|
}
|
|
});
|
|
|
|
this.communicator.onAurevoir((dispatcher: string) =>
|
|
_this._removeDispatcher(dispatcher)
|
|
);
|
|
|
|
// Listen to newly created instances.
|
|
this.communicator.onNewInstancesAvailable((message) => {
|
|
// Store the instances:
|
|
_this._mappingOfRemoteDispatchersAndInstances.set(
|
|
message.dispatcher,
|
|
message.instances
|
|
);
|
|
|
|
// Update the Mapping:
|
|
_this._updateExternalInstances();
|
|
|
|
if (_this._logger?.enabledFor(Logger.DEBUG)) {
|
|
// If there is a Logger:
|
|
_this._logger.debug(
|
|
"Remote Dispatcher \"" +
|
|
message.dispatcher +
|
|
"\" updated its available instances"
|
|
);
|
|
}
|
|
});
|
|
|
|
this.communicator.onTaskCancelation((event) => {
|
|
if (event.dispatcher !== _this.id) {
|
|
_this.canceledTask.setContent(event);
|
|
}
|
|
});
|
|
|
|
if (this._logger) {
|
|
this._logger.info("initialized");
|
|
}
|
|
this.communicator.emitBonjour(this._genAliveMessage());
|
|
this.ready.setContent(true);
|
|
}
|
|
|
|
protected _checkDispachterHealth(): void {
|
|
const currentTime = Date.now();
|
|
let changes = false;
|
|
|
|
for (const status of this._externalDispatchers.values()) {
|
|
// determine the Difference
|
|
const diff = currentTime - status.timestamp;
|
|
|
|
// Based on the Difference Determine the Status
|
|
if (diff > this.timeouts.remove) {
|
|
// remove the Dispatcher. But be quite.
|
|
// Perhaps more dispatchers will be removed
|
|
this._removeDispatcher(status.id, true);
|
|
changes = true;
|
|
} else if (
|
|
diff > this.timeouts.dead &&
|
|
status.status !== ENopeDispatcherStatus.DEAD
|
|
) {
|
|
status.status = ENopeDispatcherStatus.DEAD;
|
|
changes = true;
|
|
} else if (
|
|
diff > this.timeouts.warn &&
|
|
diff <= this.timeouts.dead &&
|
|
status.status !== ENopeDispatcherStatus.WARNING
|
|
) {
|
|
status.status = ENopeDispatcherStatus.WARNING;
|
|
changes = true;
|
|
} else if (
|
|
diff > this.timeouts.slow &&
|
|
diff <= this.timeouts.warn &&
|
|
status.status !== ENopeDispatcherStatus.SLOW
|
|
) {
|
|
status.status = ENopeDispatcherStatus.SLOW;
|
|
changes = true;
|
|
} else if (
|
|
diff <= this.timeouts.slow &&
|
|
status.status !== ENopeDispatcherStatus.HEALTHY
|
|
) {
|
|
status.status = ENopeDispatcherStatus.HEALTHY;
|
|
changes = true;
|
|
}
|
|
}
|
|
|
|
if (changes) {
|
|
// Update the External Dispatchers
|
|
this.externalDispatchers.setContent(
|
|
Array.from(this._externalDispatchers.values())
|
|
);
|
|
}
|
|
}
|
|
|
|
protected _removeDispatcher(dispatcher: string, quite = false): void {
|
|
// Delete the Generators of the Instances.
|
|
this._mappingOfRemoteDispatchersAndGenerators.delete(dispatcher);
|
|
this._mappingOfRemoteDispatchersAndServices.delete(dispatcher);
|
|
this._mappingOfRemoteDispatchersAndPropsOrEvents.delete(dispatcher);
|
|
this._mappingOfRemoteDispatchersAndInstances.delete(dispatcher);
|
|
this._externalDispatchers.delete(dispatcher);
|
|
|
|
// Iterate over the available instances and remove the providers:
|
|
for (const instance of this._instances.values()) {
|
|
// Remove all Dispachers:
|
|
let idx = instance.usedBy.indexOf(dispatcher);
|
|
while (idx !== -1) {
|
|
instance.usedBy.splice(idx, 1);
|
|
idx = instance.usedBy.indexOf(dispatcher);
|
|
}
|
|
|
|
// Check if the Element isnt required any more => delete it.
|
|
if (instance.usedBy.length === 0) {
|
|
this._logger.info(
|
|
"Disposing instance, because it isnt used any more.",
|
|
instance.instance.identifier
|
|
);
|
|
instance.instance
|
|
.dispose()
|
|
.catch((e) => {
|
|
this._logger.error("Failed to remove the Instance.", e);
|
|
})
|
|
.then(() => {
|
|
this._logger.info(
|
|
"sucessfully disposed instance ",
|
|
instance.instance.identifier
|
|
);
|
|
});
|
|
}
|
|
}
|
|
|
|
this._updateExternalServices();
|
|
this._updateExternalGenerators();
|
|
this._updateExternalObservables();
|
|
this._updateExternalInstances();
|
|
|
|
if (!quite) {
|
|
this.externalDispatchers.setContent(
|
|
Array.from(this._externalDispatchers.values())
|
|
);
|
|
}
|
|
|
|
if (this._logger?.enabledFor(Logger.DEBUG)) {
|
|
// If there is a Logger:
|
|
this._logger.debug("a dispatcher went offline");
|
|
}
|
|
}
|
|
|
|
protected _genAliveMessage(): IDispatcherInfo {
|
|
if (RUNNINGINNODE) {
|
|
const os = require("os");
|
|
return {
|
|
id: this.id,
|
|
host: {
|
|
cores: os.cpus().length,
|
|
cpu: `${os.cpus()[0].model}`,
|
|
os: os.platform(),
|
|
ram: os.freemem() / os.totalmem(),
|
|
name: os.hostname()
|
|
},
|
|
pid: process.pid,
|
|
timestamp: Date.now(),
|
|
status: ENopeDispatcherStatus.HEALTHY
|
|
};
|
|
}
|
|
return {
|
|
host: {
|
|
cores: -1,
|
|
cpu: "unkown",
|
|
name: navigator.appCodeName + " " + navigator.appVersion,
|
|
os: navigator.platform,
|
|
ram: 1
|
|
},
|
|
id: this.id,
|
|
pid: this.id,
|
|
timestamp: Date.now(),
|
|
status: ENopeDispatcherStatus.HEALTHY
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Function to cancel an indivual Task.
|
|
*
|
|
* @param {string} taskId The Id of the Task. Which should be canceled.
|
|
* @param {Error} reason The Reason, why the Task should be canceled (In general shoudl be something meaning full)
|
|
* @return {*} Flag, that indicates, whether cancelation was sucessfull or not.
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
public cancelTask(taskId: string, reason: Error) {
|
|
if (this._runningInternalRequestedTasks.has(taskId)) {
|
|
const task = this._runningInternalRequestedTasks.get(taskId);
|
|
|
|
// Delete the task
|
|
this._runningInternalRequestedTasks.delete(taskId);
|
|
|
|
// Propagate the Cancellation (internally):
|
|
task.reject(reason);
|
|
|
|
// Propagate the Cancellation externally.
|
|
// Therefore use the desired Mode.
|
|
switch (this._publishingMode) {
|
|
default:
|
|
case "individual":
|
|
case "generic":
|
|
this.communicator.emitTaskCancelation({
|
|
dispatcher: this.id,
|
|
reason,
|
|
taskId
|
|
});
|
|
break;
|
|
// case 'individua2l':
|
|
// this._communicator
|
|
// break;
|
|
}
|
|
|
|
// Indicate a successful cancelation.
|
|
return true;
|
|
}
|
|
|
|
// Task hasnt been found => Cancel the Task.
|
|
return false;
|
|
}
|
|
|
|
/**
|
|
* Internal Helper Function, used to close all tasks with a specific service.
|
|
*
|
|
* @protected
|
|
* @param {string} serviceName The Name of the Service.
|
|
* @param {Error} reason The provided Reason, why cancelation is reuqired.
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
public cancelRunningTasksOfService(serviceName: string, reason: Error) {
|
|
// Provide a List containing all Tasks, that has to be canceled
|
|
const _tasksToCancel: {
|
|
reject: (error: Error) => void;
|
|
id: string;
|
|
}[] = [];
|
|
|
|
// Filter all Tasks that shoud be canceled.
|
|
for (const [id, task] of this._runningInternalRequestedTasks.entries()) {
|
|
// Therefore compare the reuqired Service by the Task
|
|
if (task.serviceName === serviceName) {
|
|
// if the service matches, put it to our list.
|
|
_tasksToCancel.push({
|
|
id,
|
|
reject: task.reject
|
|
});
|
|
}
|
|
}
|
|
|
|
if (_tasksToCancel.length > 0) {
|
|
// First remove all Tasks.
|
|
// Then cancel them to avoid side effects
|
|
for (const item of _tasksToCancel) {
|
|
this._runningInternalRequestedTasks.delete(item.id);
|
|
}
|
|
|
|
// Now Reject all Tasks.
|
|
for (const item of _tasksToCancel) {
|
|
item.reject(reason);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Will dispose the Dispatcher. Must be called on exit for a clean exit. Otherwise it is defined as dirty exits
|
|
*/
|
|
public async dispose(): Promise<void> {
|
|
for (const task of Array.from(this._runningInternalRequestedTasks.keys())) {
|
|
this.cancelTask(task, new Error("Client going offline"));
|
|
}
|
|
|
|
if (this._sendInterval) {
|
|
clearInterval(this._sendInterval);
|
|
}
|
|
if (this._checkInterval) {
|
|
clearInterval(this._checkInterval);
|
|
}
|
|
|
|
// Emits the aurevoir Message.
|
|
this.communicator.emitAurevoir(this.id);
|
|
}
|
|
|
|
/**
|
|
* Function to update the used Services.
|
|
*
|
|
* @protected
|
|
* @memberof serviceRegistry
|
|
*/
|
|
protected _updateExternalServices() {
|
|
const _this = this;
|
|
|
|
// Store the Availabe Services before the Update.
|
|
const _servicesBeforeUpdate = new Set(this._externalProvidedServices);
|
|
|
|
// Clear the Services
|
|
this._externalProvidedServices.clear();
|
|
for (const dispatcherInfo of this._mappingOfRemoteDispatchersAndServices.values()) {
|
|
dispatcherInfo.services.map((service) =>
|
|
_this._externalProvidedServices.add(service)
|
|
);
|
|
}
|
|
|
|
// Create a Comparing loop.
|
|
// The Loop checks if the element doesnt exists in the known services
|
|
// before the update.
|
|
const added = new Set<string>();
|
|
for (const service of this._externalProvidedServices) {
|
|
if (!_servicesBeforeUpdate.has(service)) {
|
|
added.add(service);
|
|
} else {
|
|
// Delete the element, because it is available.
|
|
_servicesBeforeUpdate.delete(service);
|
|
}
|
|
}
|
|
|
|
// If there are unavailable tasks => cancel their tasks.
|
|
if (_servicesBeforeUpdate.size > 0) {
|
|
for (const unavailable of _servicesBeforeUpdate) {
|
|
// Cancel the Tasks
|
|
this.cancelRunningTasksOfService(
|
|
unavailable,
|
|
new Error("Service unavailable!")
|
|
);
|
|
}
|
|
}
|
|
|
|
return {
|
|
// Contains the "new" services
|
|
addedServices: Array.from(added),
|
|
// contains the "unavailable services"
|
|
removedServices: Array.from(_servicesBeforeUpdate)
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Function to update the used Generators.
|
|
*
|
|
* @protected
|
|
* @memberof serviceRegistry
|
|
*/
|
|
protected _updateExternalGenerators() {
|
|
const _this = this;
|
|
|
|
// Clear the Services
|
|
this._externalProvidedGenerators.clear();
|
|
for (const generators of this._mappingOfRemoteDispatchersAndGenerators.values()) {
|
|
generators.generators.map((gen) =>
|
|
_this._externalProvidedGenerators.add(gen)
|
|
);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Internal Function to update the Listing of external Topcis.
|
|
* This Function creates a list containing all subscriptions
|
|
* and publishers which are external.
|
|
*
|
|
* @protected
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
protected _updateExternalObservables(externalDispatcher = "") {
|
|
const _this = this;
|
|
|
|
// Clear the Services
|
|
const _published = new Set<string>();
|
|
const _subscribed = new Set<string>();
|
|
for (const dispatcherInfo of this._mappingOfRemoteDispatchersAndPropsOrEvents.values()) {
|
|
dispatcherInfo.published.map((propOrEvent) =>
|
|
_published.add(propOrEvent)
|
|
);
|
|
dispatcherInfo.subscribed.map((propOrEvent) =>
|
|
_subscribed.add(propOrEvent)
|
|
);
|
|
}
|
|
|
|
// Send the Values of the Requested Properties
|
|
// Therefore iterate over the new elements and
|
|
// provide the last value
|
|
|
|
// 1. Filter the new Subscribed Elements:
|
|
const _newlySubscribed = new Set(
|
|
[..._subscribed].filter((item) => !_this._externalSubscribed.has(item))
|
|
);
|
|
|
|
if (externalDispatcher !== "") {
|
|
// Make shure, that the subscribed elements of the
|
|
// new element are added correctly.
|
|
this._mappingOfRemoteDispatchersAndPropsOrEvents
|
|
.get(externalDispatcher)
|
|
.subscribed.map((item) => _newlySubscribed.add(item));
|
|
}
|
|
|
|
// 2. Iterate over the Elements and
|
|
// add the lastly available value.
|
|
// TODO: CHECK IF REQUIRED.
|
|
for (const _topic of _newlySubscribed) {
|
|
if (this._lastPublishedEvent.has(_topic)) {
|
|
// Send the Element
|
|
this.communicator.emitEvent(
|
|
_topic,
|
|
this._lastPublishedEvent.get(_topic)
|
|
);
|
|
}
|
|
}
|
|
|
|
// Store the new values:
|
|
this._externalSubscribed = _subscribed;
|
|
this._externalPublished = _published;
|
|
|
|
// Update the Elements.
|
|
this.subscribedEvents.setContent(Array.from(this._externalSubscribed));
|
|
this.publishedEvents.setContent(Array.from(this._externalPublished));
|
|
}
|
|
|
|
/**
|
|
* Internal Function to update the Listing of external provided instances
|
|
*
|
|
*
|
|
* @protected
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
protected _updateExternalInstances() {
|
|
const _this = this;
|
|
|
|
// Clear the available instances.
|
|
this._externalInstances.clear();
|
|
this._externalInstancesNames.clear();
|
|
|
|
for (const dispatcherInfo of this._mappingOfRemoteDispatchersAndInstances.values()) {
|
|
dispatcherInfo.map((instance) => {
|
|
// Check if the Instance exists:
|
|
if (!_this._externalInstances.has(instance.identifier)) {
|
|
_this._externalInstancesNames.add(instance.identifier);
|
|
_this._externalInstances.set(instance.identifier, instance);
|
|
} else if (
|
|
_this._externalInstances.get(instance.identifier)?.type !==
|
|
instance.type
|
|
) {
|
|
// A Miss Matching occurd.
|
|
// That should lead to a fatal error ?
|
|
_this._logger.warn(
|
|
"An Instance with the name \"" +
|
|
instance.identifier +
|
|
"\" has already been declared",
|
|
_this._externalInstances.get(instance.identifier)?.type,
|
|
"!=",
|
|
instance.type
|
|
);
|
|
}
|
|
});
|
|
}
|
|
|
|
// Store the Instance Mapping and Publish it.
|
|
this.availableInstances.setContent(
|
|
Array.from(this._externalInstances.values())
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Function to test if a specific Service exists.
|
|
*
|
|
* @param {string} id The Id of the Serivce
|
|
* @return {boolean} The result of the Test. True if either local or remotly a service is known.
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
public serviceExists(id: string) {
|
|
return (
|
|
this._definedFunctions.has(id) || this._externalProvidedServices.has(id)
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Function to test if an subscription exists,
|
|
* for the given topic.
|
|
*
|
|
* @param {string} topic The topic to test.
|
|
* @return {boolean} The result of the test. True if an external subscription exsits
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
public subscriptionExists(topic: string) {
|
|
return this._externalSubscribed.has(topic);
|
|
}
|
|
|
|
/**
|
|
* Function to test if a generator exists for
|
|
* the given type idenfitier
|
|
*
|
|
* @param {string} typeIdentifier Identifier of the type.
|
|
* @return {boolean} The result of the test. True if an external generator exsits.
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
public generatorExists(typeIdentifier: string) {
|
|
return this._externalProvidedGenerators.has(typeIdentifier);
|
|
}
|
|
|
|
/**
|
|
* Function to adapt a Request name.
|
|
* Only used internally
|
|
*
|
|
* @protected
|
|
* @param {string} id the original ID
|
|
* @return {string} the adapted ID.
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
protected _getServiceName(id: string, type: "request" | "response"): string {
|
|
return id.startsWith(`${type}/`) ? id : `${type}/${id}`;
|
|
}
|
|
|
|
/**
|
|
* Internal Helper Function to subscribe to a Function element.
|
|
*
|
|
* @protected
|
|
* @param {string} id the Id of the function
|
|
* @param {(...args) => Promise<any>} _cb The Callback of the Function
|
|
* @return {void} the adapted ID.
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
protected _subscribeToService(
|
|
id: string,
|
|
_cb: (...args) => Promise<any>
|
|
): void {
|
|
const _req = this._getServiceName(id, "request");
|
|
if (
|
|
this._subscriptionMode === "individual" &&
|
|
!this._communicatorCallbacks.has(_req)
|
|
) {
|
|
const _this = this;
|
|
|
|
// Define a Function.
|
|
const cb = (data: IRequestTaskMsg) => {
|
|
if (data.type === "requestOfTask") {
|
|
_this._handleExternalRequest(data, _cb);
|
|
}
|
|
};
|
|
|
|
// Add the Callback.
|
|
this._communicatorCallbacks.set(_req, {
|
|
registeredId: _req,
|
|
type: "request",
|
|
cb
|
|
});
|
|
|
|
// Register Functions.
|
|
this.communicator.onRpcRequest(_req, cb);
|
|
|
|
if (this._logger?.enabledFor(Logger.DEBUG)) {
|
|
// If there is a Logger:
|
|
this._logger.debug(
|
|
"Dispatcher \"" + this.id + "\" listening on: \"" + _req + "\""
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Function, used to subscribe to results.
|
|
*
|
|
* @protected
|
|
* @param {string} id
|
|
* @param {boolean} deleteAfterCalling
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
protected _subscribeToResult(id: string, deleteAfterCalling: boolean): void {
|
|
const _res = this._getServiceName(id, "response");
|
|
if (
|
|
this._publishingMode === "individual" &&
|
|
!this._communicatorCallbacks.has(_res)
|
|
) {
|
|
const _this = this;
|
|
|
|
// Define a Function.
|
|
const cb = (data: IResponseTaskMsg) => {
|
|
if (data.type === "response") {
|
|
if (_this._handleExternalResponse(data) && deleteAfterCalling) {
|
|
_this._removeRpcSubscription(_res);
|
|
}
|
|
}
|
|
};
|
|
|
|
// Add the Callback.
|
|
this._communicatorCallbacks.set(_res, {
|
|
registeredId: _res,
|
|
type: "response",
|
|
cb
|
|
});
|
|
|
|
// Register Functions.
|
|
this.communicator.onRpcResponse(_res, cb);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Creates an Event listener (if required)
|
|
*
|
|
* @protected
|
|
* @param {string} event The Event to Listen.
|
|
* @return {nopeObservable<IExternalEventMsg>} An Listener on the Communication Channel.
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
protected _subscribeToEvent(event: string) {
|
|
const item = this._externallySubscribeObservables.get(event) || {
|
|
observable: this._generateObservable<IExternalEventMsg>(),
|
|
cb: () => {}
|
|
};
|
|
|
|
if (!item.observable.hasSubscriptions) {
|
|
const _this = this;
|
|
const cb = (data: IExternalEventMsg) => {
|
|
item.observable.setContent(data, _this.id);
|
|
};
|
|
|
|
this.communicator.onEvent(event, cb);
|
|
item.cb = cb;
|
|
}
|
|
|
|
// Set the Items.
|
|
this._externallySubscribeObservables.set(event, item);
|
|
|
|
return item.observable;
|
|
}
|
|
|
|
/**
|
|
* Function to register a Function in the Dispatcher
|
|
*
|
|
* @param {(...args) => Promise<any>} func The function which should be called if a request is mapped to the Function.
|
|
* @param {{
|
|
* // Flag to enable unregistering the function after calling.
|
|
* deleteAfterCalling?: boolean,
|
|
* // Instead of generating a uuid an id could be provided
|
|
* id?: string;
|
|
* }} [options={}] Options to enhance the registered ID and enabling unregistering the Element after calling it.
|
|
* @return {*} {(...args) => Promise<any>} The registered Function
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
public registerFunction(
|
|
func: (...args) => Promise<any>,
|
|
options: {
|
|
/** Flag to enable unregistering the function after calling. */
|
|
deleteAfterCalling?: boolean;
|
|
/** Instead of generating a uuid an id could be provided */
|
|
id?: string;
|
|
/** Flag to enable / disable sending to registery */
|
|
preventSendingToRegistery?: boolean;
|
|
} = {}
|
|
): (...args) => Promise<any> {
|
|
const _this = this;
|
|
// Define / Use the ID of the Function.
|
|
const _id = options.id || generateId();
|
|
|
|
let _func = func;
|
|
|
|
if (options.deleteAfterCalling) {
|
|
_func = async (...args) => {
|
|
// Unregister the Method
|
|
_this.unregisterFunction(_id, {
|
|
preventSendingToRegistery: options.preventSendingToRegistery
|
|
});
|
|
// Return the Result of the Original Function.
|
|
return await func(...args);
|
|
};
|
|
}
|
|
|
|
// Define a ID for the Function
|
|
_func["id"] = _id;
|
|
|
|
// Define the callback.
|
|
_func["unregister"] = () => _this.unregisterFunction(_id);
|
|
|
|
// Reister the Function
|
|
this._definedFunctions.set(_func["id"], _func);
|
|
|
|
// Register the Callback:
|
|
this._subscribeToService(_id, _func);
|
|
|
|
if (!options.preventSendingToRegistery) {
|
|
// Publish the Available Services.
|
|
this._sendAvailableServices();
|
|
|
|
if (this._logger?.enabledFor(Logger.DEBUG)) {
|
|
// If there is a Logger:
|
|
this._logger.debug(
|
|
"Dispatcher \"" + this.id + "\" registered: \"" + _id + "\""
|
|
);
|
|
}
|
|
}
|
|
|
|
// Return the Function.
|
|
return _func;
|
|
}
|
|
|
|
/**
|
|
* Function to unregister a Function from the Dispatcher
|
|
* @param {(((...args) => void) | string | number)} func The Function to unregister
|
|
* @return {*} {boolean} Flag, whether the element was removed (only if found) or not.
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
public unregisterFunction(
|
|
func: ((...args) => void) | string,
|
|
options: {
|
|
/** Flag to enable / disable sending to registery */
|
|
preventSendingToRegistery?: boolean;
|
|
} = {}
|
|
): boolean {
|
|
const _id = typeof func === "string" ? func : (func["id"] as string) || "0";
|
|
|
|
this._removeRpcSubscription(_id);
|
|
|
|
if (!options.preventSendingToRegistery) {
|
|
// Publish the Available Services.
|
|
this._sendAvailableServices();
|
|
|
|
if (this._logger?.enabledFor(Logger.DEBUG)) {
|
|
// If there is a Logger:
|
|
this._logger.debug(
|
|
"Dispatcher \"" + this.id + "\" unregistered: \"" + _id + "\""
|
|
);
|
|
}
|
|
}
|
|
|
|
return this._definedFunctions.delete(_id);
|
|
}
|
|
|
|
public registerObservable<T, K, S = T, G = T>(
|
|
observable: INopeObservable<T, S, G>,
|
|
options: IPropertyOptions
|
|
): INopeObservable<T, S, G> {
|
|
// Reference to itself
|
|
const _this = this;
|
|
|
|
// Extract the Topic, pipe and scope.
|
|
const _subTopic =
|
|
typeof options.topic === "string"
|
|
? options.topic
|
|
: options.topic.subscribe || null;
|
|
const _pubTopic =
|
|
typeof options.topic === "string"
|
|
? options.topic
|
|
: options.topic.publish || null;
|
|
const _pipe =
|
|
typeof options.pipe === "function" ? options.pipe || null : null;
|
|
const _scope =
|
|
typeof options.pipe === "object" ? options.pipe.scope || null : null;
|
|
|
|
// A Flag, indicating, whether the topic is new or not.
|
|
const newElement = !this._externallySubscribeObservables.has(_subTopic);
|
|
const _externalSource = this._subscribeToEvent(_subTopic);
|
|
|
|
// Test if the Item should be subscribe or not.
|
|
if (
|
|
options.mode == "subscribe" ||
|
|
(Array.isArray(options.mode) && options.mode.includes("subscribe"))
|
|
) {
|
|
if (_pipe) {
|
|
const observer = _externalSource.enhancedSubscription(
|
|
(data: IExternalEventMsg) => {
|
|
// Test if the Content, which has been forwared in here inst the own dispathcer.
|
|
if (data.sender != _this.id) {
|
|
observable.setContent(data.data, _this.id, data.timestamp);
|
|
}
|
|
},
|
|
{
|
|
scope: _scope,
|
|
pipe: _pipe
|
|
}
|
|
);
|
|
|
|
const dispose = observable.dispose;
|
|
observable.dispose = () => {
|
|
// Kill the Observer;
|
|
observer.unsubscribe();
|
|
|
|
// Unsubscribe the Event
|
|
_this._unsubscribeObservable(_subTopic);
|
|
|
|
// Call the original Dispose function;
|
|
dispose.apply(observable);
|
|
};
|
|
} else {
|
|
const observer = _externalSource.subscribe({
|
|
next(data: IExternalEventMsg) {
|
|
if (_this.id !== data.sender) {
|
|
observable.setContent(data.data, _this.id, data.timestamp);
|
|
}
|
|
},
|
|
complete() {
|
|
observable.observable.complete();
|
|
},
|
|
error(err) {
|
|
observable.observable.error(err);
|
|
}
|
|
});
|
|
|
|
// Overwrite the Original Dispose Function.
|
|
const dispose = observable.dispose;
|
|
observable.dispose = () => {
|
|
// Kill the Observer;
|
|
observer.unsubscribe();
|
|
|
|
// Unsubscribe the Event
|
|
_this._unsubscribeObservable(_subTopic);
|
|
|
|
// Call the original Dispose function;
|
|
dispose.apply(observable);
|
|
};
|
|
}
|
|
|
|
// Only if required => update the Value:
|
|
// This is done, by using the lastly published /
|
|
// received value
|
|
if (!options.preventSendingUpdateOnNewSubscription) {
|
|
// Get the available data.
|
|
// First try to get current source-data (which in theory should be)
|
|
// Up-to-date.
|
|
// If this data isnt available try to use the lastly published data.
|
|
const _sourceData = this._lastPublishedEvent.get(_subTopic);
|
|
const data =
|
|
_sourceData !== null && _sourceData !== undefined
|
|
? _sourceData
|
|
: _externalSource.getContent();
|
|
if (data) {
|
|
if (!observable.setContent(data.data, _this.id, data.timestamp)) {
|
|
observable.forcePublish();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (
|
|
options.mode == "publish" ||
|
|
(Array.isArray(options.mode) && options.mode.includes("publish"))
|
|
) {
|
|
const cb = (data, sender?, timestamp?, ...args) => {
|
|
// Only Publish data, if there exists a Subscription.
|
|
if (_this.id !== sender) {
|
|
const msg: IExternalEventMsg = {
|
|
data: data,
|
|
topic: _pubTopic,
|
|
sender: _this.id,
|
|
type: "event",
|
|
timestamp
|
|
};
|
|
|
|
// Test if the Message is subscribed externally:
|
|
// if so, use the socket to send the data to other
|
|
// dispatchers.
|
|
if (_this.subscriptionExists(_pubTopic)) {
|
|
// Use the Communicator to emit the Event.
|
|
_this.communicator.emitEvent(_pubTopic, msg);
|
|
}
|
|
|
|
// Store the lastly published message, this will be published if
|
|
// a new subscription is provided
|
|
if (_this._eventsToSendCurrentValueOnSubscription.has(_pubTopic)) {
|
|
_this._lastPublishedEvent.set(_pubTopic, msg);
|
|
|
|
_externalSource.setContent({
|
|
data: data,
|
|
topic: _pubTopic,
|
|
// Watchout => We are using the provided sender here.
|
|
// It is used
|
|
sender: sender,
|
|
type: "event",
|
|
timestamp
|
|
});
|
|
} else if (_externalSource.observerLength > 0) {
|
|
// The Observable it is used multiple times internally.
|
|
// For this purpose, send the data using the "external source"
|
|
// -channel. because we are using the original id of the sender,
|
|
// we prevent endless loops. Observables are always checking
|
|
// if an update has been send by them self or not.
|
|
_externalSource.setContent({
|
|
data: data,
|
|
topic: _pubTopic,
|
|
// Watchout => We are using the provided sender here.
|
|
// It is used
|
|
sender: sender,
|
|
type: "event",
|
|
timestamp
|
|
});
|
|
}
|
|
}
|
|
};
|
|
|
|
// Register the Internally Subscribed Element.
|
|
const _set01 =
|
|
this._internallySubscribeObservables.get(_pubTopic) || new Set();
|
|
_set01.add(observable);
|
|
this._internallySubscribeObservables.set(_pubTopic, _set01);
|
|
|
|
// Mark for sending updates if required.
|
|
if (!options.preventSendingUpdateOnNewSubscription) {
|
|
const _set02 =
|
|
this._eventsToSendCurrentValueOnSubscription.get(_pubTopic) ||
|
|
new Set();
|
|
_set02.add(observable);
|
|
this._eventsToSendCurrentValueOnSubscription.set(_pubTopic, _set02);
|
|
|
|
// Test if there exists content, if so => store it
|
|
// to the Element.
|
|
const data = observable.getContent();
|
|
if (data !== null && data !== undefined) {
|
|
this._lastPublishedEvent.set(_pubTopic, {
|
|
data: data,
|
|
topic: _pubTopic,
|
|
sender: _this.id,
|
|
type: "event",
|
|
timestamp: Date.now()
|
|
});
|
|
|
|
// Use the External Source to store the Last Value
|
|
_externalSource.setContent({
|
|
data: data,
|
|
topic: _pubTopic,
|
|
sender: observable.id,
|
|
type: "event",
|
|
timestamp: Date.now()
|
|
});
|
|
}
|
|
}
|
|
|
|
if (_pipe) {
|
|
const observer = observable.enhancedSubscription(cb, {
|
|
scope: _scope,
|
|
pipe: _pipe
|
|
});
|
|
|
|
// Overwrite the Original Dispose Function.
|
|
const dispose = observable.dispose;
|
|
observable.dispose = () => {
|
|
// Kill the Observer;
|
|
observer.unsubscribe();
|
|
|
|
// Unsubscribe the Event
|
|
_this._unsubscribeObservable(_subTopic);
|
|
|
|
// Unregister the Internally Subscribed Element.
|
|
const _set01 =
|
|
_this._internallySubscribeObservables.get(_pubTopic) || new Set();
|
|
_set01.delete(observable);
|
|
if (_set01.size > 0) {
|
|
_this._internallySubscribeObservables.set(_pubTopic, _set01);
|
|
} else {
|
|
_this._internallySubscribeObservables.delete(_pubTopic);
|
|
|
|
// Optionally send an update.
|
|
if (!options.preventSendingToRegistery) {
|
|
// Publish the Available Services.
|
|
_this._sendAvailableObservables();
|
|
}
|
|
}
|
|
|
|
// Remove the Element.
|
|
const _set02 =
|
|
this._eventsToSendCurrentValueOnSubscription.get(_pubTopic) ||
|
|
new Set();
|
|
_set02.delete(observable);
|
|
|
|
if (_set02.size > 0) {
|
|
this._eventsToSendCurrentValueOnSubscription.set(_pubTopic, _set02);
|
|
} else {
|
|
// Topic isnt provided any more.
|
|
// delete the flag
|
|
_this._eventsToSendCurrentValueOnSubscription.delete(_pubTopic);
|
|
_this._lastPublishedEvent.delete(_pubTopic);
|
|
}
|
|
|
|
// Call the original Dispose function;
|
|
dispose.apply(observable);
|
|
};
|
|
} else {
|
|
const observer = observable.subscribe(cb);
|
|
|
|
// Overwrite the Original Dispose Function.
|
|
const dispose = observable.dispose;
|
|
observable.dispose = () => {
|
|
// Kill the Observer;
|
|
observer.unsubscribe();
|
|
|
|
// Unsubscribe the Event
|
|
_this._unsubscribeObservable(_subTopic);
|
|
|
|
// Unregister the Internally Subscribed Element.
|
|
const _set01 =
|
|
_this._internallySubscribeObservables.get(_pubTopic) || new Set();
|
|
_set01.delete(observable);
|
|
if (_set01.size > 0) {
|
|
_this._internallySubscribeObservables.set(_pubTopic, _set01);
|
|
} else {
|
|
_this._internallySubscribeObservables.delete(_pubTopic);
|
|
|
|
// Optionally send an update.
|
|
if (!options.preventSendingToRegistery) {
|
|
// Publish the Available Services.
|
|
_this._sendAvailableObservables();
|
|
}
|
|
}
|
|
|
|
// Remove the Element.
|
|
const _set02 =
|
|
this._eventsToSendCurrentValueOnSubscription.get(_pubTopic) ||
|
|
new Set();
|
|
_set02.delete(observable);
|
|
|
|
if (_set02.size > 0) {
|
|
this._eventsToSendCurrentValueOnSubscription.set(_pubTopic, _set02);
|
|
} else {
|
|
// Topic isnt provided any more.
|
|
// delete the flag
|
|
_this._eventsToSendCurrentValueOnSubscription.delete(_pubTopic);
|
|
_this._lastPublishedEvent.delete(_pubTopic);
|
|
}
|
|
|
|
// Call the original Dispose function;
|
|
dispose.apply(observable);
|
|
};
|
|
}
|
|
}
|
|
|
|
if (!options.preventSendingToRegistery && newElement) {
|
|
// Publish the Available Services.
|
|
this._sendAvailableObservables();
|
|
}
|
|
|
|
// Return the Function.
|
|
return observable;
|
|
}
|
|
|
|
public unregisterObservable(
|
|
observable: INopeObservable<any> | string,
|
|
options: {
|
|
// Flag to enable / disable sending to registery
|
|
preventSendingToRegistery?: boolean;
|
|
} = {}
|
|
): boolean {
|
|
const _id =
|
|
typeof observable === "string"
|
|
? observable
|
|
: (observable.id as string) || "0";
|
|
|
|
if (!options.preventSendingToRegistery) {
|
|
// Publish the Available Services.
|
|
this._sendAvailableObservables();
|
|
|
|
if (this._logger?.enabledFor(Logger.DEBUG)) {
|
|
// If there is a Logger:
|
|
this._logger.debug(
|
|
"Dispatcher \"" + this.id + "\" unregistered: \"" + _id + "\""
|
|
);
|
|
}
|
|
}
|
|
|
|
return this._definedFunctions.delete(_id);
|
|
}
|
|
|
|
protected _externallySubscribeObservables: Map<
|
|
string,
|
|
{
|
|
observable: INopeObservable<IExternalEventMsg>;
|
|
cb: (...arg) => void;
|
|
}
|
|
>;
|
|
|
|
protected _internallySubscribeObservables: Map<
|
|
string,
|
|
Set<INopeObservable<any>>
|
|
>;
|
|
|
|
/**
|
|
* Function to unsubscribe from an event of the channel.
|
|
*
|
|
* @protected
|
|
* @param {string} path
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
protected _unsubscribeObservable(path: string) {
|
|
const item = this._externallySubscribeObservables.get(path);
|
|
|
|
if (item) {
|
|
this.communicator.offEvent(path, item.cb);
|
|
// Dispose the Observable
|
|
const obs = this._externallySubscribeObservables.get(path).observable;
|
|
obs.dispose();
|
|
// Remove the Observable
|
|
this._externallySubscribeObservables.delete(path);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Helper Function to directly subscribe to a specific value.
|
|
* @param event The Event.
|
|
* @param callback The Callback used to subscribe to the event
|
|
* @param options Additional Options used to specify the Subscribing.
|
|
*/
|
|
public async subscribeToEvent<G = any, K = any>(
|
|
event: string,
|
|
callback: IObservableCallback<G>,
|
|
options: {
|
|
pipe?: {
|
|
pipe?: IPipe<IExternalEventMsg, K>;
|
|
scope?: { [index: string]: any };
|
|
};
|
|
preventSendingToRegistery?: boolean;
|
|
mode?: "immediate" | "sync";
|
|
subscriptionOptions?: INopeSubscriptionOptions;
|
|
}
|
|
) {
|
|
// Create a new observable:
|
|
const observable = this._generateObservable<G>();
|
|
|
|
// register the newly created observable.
|
|
this.registerObservable(observable, {
|
|
mode: "subscribe",
|
|
topic: event,
|
|
preventSendingToRegistery: options.preventSendingToRegistery,
|
|
pipe: options.pipe
|
|
});
|
|
|
|
// Create an Observer by susbcribing to the external source (this is directly linked to the System)
|
|
const observer = observable.subscribe(
|
|
callback,
|
|
options.mode,
|
|
options.subscriptionOptions
|
|
);
|
|
|
|
observer.unsubscribe = () => {
|
|
observable.dispose();
|
|
};
|
|
|
|
// Return the Observer.
|
|
return observer;
|
|
}
|
|
|
|
/**
|
|
* Function to manually emit an Event.
|
|
* @param _eventName
|
|
* @param data
|
|
* @param sender
|
|
* @param timestamp
|
|
* @param forced
|
|
* @param args
|
|
*/
|
|
public async emit<T>(
|
|
_eventName: string,
|
|
data: T,
|
|
sender?: string,
|
|
timestamp?: number,
|
|
forced = false,
|
|
...args
|
|
) {
|
|
// Only Publish data, if there exists a Subscription.
|
|
if (forced || (this.subscriptionExists(_eventName) && this.id !== sender)) {
|
|
// Use the Communicator to emit the Event or its forced
|
|
await this.communicator.emitEvent(_eventName, {
|
|
data: data,
|
|
topic: _eventName,
|
|
sender: this.id,
|
|
type: "event",
|
|
timestamp
|
|
});
|
|
}
|
|
}
|
|
|
|
protected _removeRpcSubscription(_id: string) {
|
|
// Try to unregister the Callback from the communcator:
|
|
if (this._communicatorCallbacks.has(_id)) {
|
|
const _callbacks = this._communicatorCallbacks.get(_id);
|
|
|
|
switch (_callbacks.type) {
|
|
case "request":
|
|
// Unregister the RPC-Request-Listener
|
|
this.communicator.offRpcRequest(
|
|
_callbacks.registeredId,
|
|
_callbacks.cb
|
|
);
|
|
break;
|
|
case "response":
|
|
// Unregister the RPC-Response-Listener
|
|
this.communicator.offRpcResponse(
|
|
_callbacks.registeredId,
|
|
_callbacks.cb
|
|
);
|
|
break;
|
|
}
|
|
|
|
// Remove the Callback
|
|
this._communicatorCallbacks.delete(_id);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Function used to update the Available Services.
|
|
*
|
|
* @protected
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
protected _sendAvailableServices() {
|
|
// Define the Message
|
|
const message: IAvailableServicesMsg = {
|
|
dispatcher: this.id,
|
|
services: Array.from(this._definedFunctions.keys())
|
|
};
|
|
|
|
if (this._logger?.enabledFor(Logger.DEBUG)) {
|
|
this._logger.debug("sending available services");
|
|
}
|
|
|
|
// Send the Message.
|
|
this.communicator.emitNewServicesAvailable(message);
|
|
}
|
|
|
|
/**
|
|
* Function to emit the available topics.
|
|
*
|
|
* @protected
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
protected _sendAvailableObservables() {
|
|
// Define the Message
|
|
const message: IAvailableTopicsMsg = {
|
|
dispatcher: this.id,
|
|
published: Array.from(this._internallySubscribeObservables.keys()),
|
|
subscribed: Array.from(this._externallySubscribeObservables.keys())
|
|
};
|
|
|
|
if (this._logger?.enabledFor(Logger.DEBUG)) {
|
|
this._logger.debug(
|
|
"sending available properties. (subscribing and publishing events)"
|
|
);
|
|
}
|
|
|
|
// Send the Message.
|
|
this.communicator.emitNewObersvablesAvailable(message);
|
|
}
|
|
|
|
protected _sendAvailableGenerators() {
|
|
// Define the Message
|
|
const message: IAvailableInstanceGeneratorsMsg = {
|
|
dispatcher: this.id,
|
|
generators: Array.from(this._externalGenerators.keys())
|
|
};
|
|
|
|
if (this._logger?.enabledFor(Logger.DEBUG)) {
|
|
this._logger.debug("sending available instance generators");
|
|
}
|
|
|
|
// Send the Message.
|
|
this.communicator.emitNewInstanceGeneratorsAvailable(message);
|
|
}
|
|
|
|
/**
|
|
* Update the Available Instances
|
|
*
|
|
* @protected
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
protected _sendAvailableInstances(): void {
|
|
const _this = this;
|
|
// Update the Instances provided by this module.
|
|
this.communicator.emitNewInstancesAvailable({
|
|
dispatcher: this.id,
|
|
instances: Array.from(this._internalInstances).map((identifier) =>
|
|
// Generate the Module Description
|
|
_this._instances.get(identifier).instance.toDescription()
|
|
)
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Function which is used to perform a call on the remote.
|
|
*
|
|
* @template T
|
|
* @param {string} serviceName The Name / ID of the Function
|
|
* @param {any[]} params The provided Parameters.
|
|
* @param {({
|
|
* deletableCallbacks: Array<number>;
|
|
* })} [options={
|
|
* deletableCallbacks: [],
|
|
* paramsHasNoCallback: false,
|
|
* preventErrorTest: false
|
|
* }] You could additiona Optionf for the callback.
|
|
* @return {*} {Promise<T>} The result of the Operation
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
public performCall<T>(
|
|
serviceName: string,
|
|
params: any[],
|
|
options: Partial<ICallOptions> = {}
|
|
): INopePromise<T> {
|
|
// Get a Call Id
|
|
const _taskId = generateId();
|
|
const _registeredIdx: Array<string> = [];
|
|
const _this = this;
|
|
|
|
const _options = Object.assign(
|
|
{
|
|
deletableCallbacks: [],
|
|
paramsHasNoCallback: false,
|
|
dynamicCallback: false,
|
|
resultSink:
|
|
this._publishingMode === "generic"
|
|
? "response"
|
|
: this._getServiceName(serviceName, "response")
|
|
},
|
|
options
|
|
) as ICallOptions;
|
|
|
|
this._subscribeToResult(serviceName, _options.dynamicCallback);
|
|
|
|
const clear = () => {
|
|
// Delete all Callbacks.
|
|
_registeredIdx.map((id) => _this.unregisterFunction(id));
|
|
|
|
// Remove the task:
|
|
if (_this._runningInternalRequestedTasks.has(_taskId)) {
|
|
const task = _this._runningInternalRequestedTasks.get(_taskId);
|
|
|
|
// Remove the Timeout.
|
|
if (task.timeout) {
|
|
clearTimeout(task.timeout);
|
|
}
|
|
|
|
// Remove the Task itself
|
|
_this._runningInternalRequestedTasks.delete(_taskId);
|
|
}
|
|
|
|
if (_this._logger?.enabledFor(Logger.DEBUG)) {
|
|
_this._logger.debug("Clearing Callbacks from " + _taskId);
|
|
}
|
|
};
|
|
|
|
if (_this._logger?.enabledFor(Logger.DEBUG)) {
|
|
_this._logger.debug(
|
|
"Dispatcher \"" +
|
|
this.id +
|
|
"\" requesting externally Function \"" +
|
|
serviceName +
|
|
"\" with task: \"" +
|
|
_taskId +
|
|
"\""
|
|
);
|
|
}
|
|
|
|
// Define a Callback-Function, which will expect the Task.
|
|
const ret = new NopePromise<T>(async (resolve, reject) => {
|
|
try {
|
|
const requestedTask: any = {
|
|
resolve,
|
|
reject,
|
|
clear,
|
|
serviceName,
|
|
timeout: null
|
|
};
|
|
|
|
// Register the Handlers,
|
|
_this._runningInternalRequestedTasks.set(_taskId, requestedTask);
|
|
|
|
// Define a Task-Request
|
|
const taskRequest: IRequestTaskMsg = {
|
|
functionId: serviceName,
|
|
params: [],
|
|
callbacks: [],
|
|
taskId: _taskId,
|
|
type: "requestOfTask",
|
|
resultSink: _options.resultSink
|
|
};
|
|
|
|
// Test if there is no Callback integrated
|
|
if (!options.paramsHasNoCallback) {
|
|
// If so, the parameters has to be detailled:
|
|
|
|
// Iterate over all Parameters and
|
|
// Determin Callbacks. Based on the Parameter-
|
|
// Type assign it either to packet.params (
|
|
// for parsable Parameters) and packet.callbacks
|
|
// (for callback Parameters)
|
|
for (const [idx, contentOfParameter] of params.entries()) {
|
|
// Test if the parameter is a Function
|
|
if (typeof contentOfParameter !== "function") {
|
|
taskRequest.params.push({
|
|
idx,
|
|
data: contentOfParameter
|
|
});
|
|
} else {
|
|
// The Parameter is a Callback => store a
|
|
// Description of the Callback and register
|
|
// the callback inside of the Dispatcher
|
|
|
|
const deleteAfterCalling = _options.deletableCallbacks.includes(
|
|
idx
|
|
);
|
|
const _func = _this.registerFunction(contentOfParameter, {
|
|
deleteAfterCalling,
|
|
preventSendingToRegistery: true
|
|
});
|
|
|
|
_registeredIdx.push(_func["id"]);
|
|
|
|
// Register the Callback
|
|
taskRequest.callbacks.push({
|
|
functionId: _func["id"],
|
|
idx,
|
|
deleteAfterCalling,
|
|
dynamicCallback: true,
|
|
deletableCallbacks: [],
|
|
resultSink:
|
|
_this._publishingMode === "generic"
|
|
? "response"
|
|
: _this._getServiceName(_func["id"], "response")
|
|
});
|
|
}
|
|
}
|
|
} else {
|
|
for (const [idx, contentOfParameter] of params.entries()) {
|
|
taskRequest.params.push({
|
|
idx,
|
|
data: contentOfParameter
|
|
});
|
|
}
|
|
}
|
|
|
|
if (!_options.dynamicCallback && !_this.serviceExists(serviceName)) {
|
|
// Create an Error:
|
|
const error = new Error(
|
|
"No Service Provider known for \"" + serviceName + "\""
|
|
);
|
|
|
|
if (_this._logger) {
|
|
_this._logger.error(
|
|
"No Service Provider known for \"" + serviceName + "\""
|
|
);
|
|
_this._logger.error(error);
|
|
}
|
|
|
|
throw error;
|
|
}
|
|
|
|
// Send the Message to the specific element:
|
|
if (_this._subscriptionMode === "individual") {
|
|
await _this.communicator.emitRpcRequest(
|
|
_this._getServiceName(taskRequest.functionId, "request"),
|
|
taskRequest
|
|
);
|
|
|
|
if (_this._logger?.enabledFor(Logger.DEBUG)) {
|
|
_this._logger.debug(
|
|
"Dispatcher \"" +
|
|
this.id +
|
|
"\" putting task \"" +
|
|
_taskId +
|
|
"\" on: \"" +
|
|
_this._getServiceName(taskRequest.functionId, "request") +
|
|
"\""
|
|
);
|
|
}
|
|
} else {
|
|
await _this.communicator.emitRpcRequest("request", taskRequest);
|
|
|
|
if (_this._logger?.enabledFor(Logger.DEBUG)) {
|
|
_this._logger.debug(
|
|
"Dispatcher \"" +
|
|
this.id +
|
|
"\" putting task \"" +
|
|
_taskId +
|
|
"\" on: \"request\""
|
|
);
|
|
}
|
|
}
|
|
|
|
// If there is a timeout =>
|
|
if (options.timeout > 0) {
|
|
requestedTask.timeout = setTimeout(() => {
|
|
_this.cancelTask(
|
|
_taskId,
|
|
new Error(
|
|
"TIMEOUT. The Service allowed execution time of " +
|
|
options.timeout.toString() +
|
|
"[ms] has been excided"
|
|
)
|
|
);
|
|
}, options.timeout);
|
|
}
|
|
} catch (e) {
|
|
// Clear all Elements of the Function:
|
|
clear();
|
|
|
|
// Throw the error.
|
|
reject(e);
|
|
}
|
|
});
|
|
|
|
ret.taskId = _taskId;
|
|
ret.cancel = (reason) => {
|
|
_this.cancelTask(_taskId, reason);
|
|
};
|
|
|
|
return ret;
|
|
}
|
|
|
|
/**
|
|
* Function to clear all pending tasks
|
|
*
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
public clearTasks(): void {
|
|
if (this._runningInternalRequestedTasks)
|
|
this._runningInternalRequestedTasks.clear();
|
|
else this._runningInternalRequestedTasks = new Map();
|
|
}
|
|
|
|
/**
|
|
* Function to unregister all Functions of the Dispatcher.
|
|
*
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
public unregisterAll(): void {
|
|
if (this._definedFunctions) {
|
|
for (const id of this._definedFunctions.keys()) {
|
|
this.unregisterFunction(id);
|
|
}
|
|
this._definedFunctions.clear();
|
|
} else {
|
|
this._definedFunctions = new Map<string, (...args) => Promise<any>>();
|
|
}
|
|
|
|
// Reset the Callbacks.
|
|
this._communicatorCallbacks = new Map();
|
|
}
|
|
|
|
/**
|
|
* Function to reset the Dispatcher.
|
|
*
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
public reset(): void {
|
|
this._remotlyCalledFunctions = new Set();
|
|
|
|
this._mappingOfRemoteDispatchersAndServices = new Map();
|
|
this._mappingOfRemoteDispatchersAndPropsOrEvents = new Map();
|
|
this._mappingOfRemoteDispatchersAndGenerators = new Map();
|
|
this._mappingOfRemoteDispatchersAndInstances = new Map();
|
|
|
|
this._internalGenerators = new Map();
|
|
this._externalGenerators = new Map();
|
|
|
|
// If Instances Exists => Delete them.
|
|
if (this._instances) {
|
|
const _this = this;
|
|
|
|
// Dispose all Instances.
|
|
for (const [name, instance] of this._instances.entries()) {
|
|
// Remove the Instance.
|
|
this.deleteInstance(name, true).catch((e) => {
|
|
if (_this._logger) {
|
|
_this._logger.error("Failed Removing Instance \"" + name + "\"");
|
|
_this._logger.error(e);
|
|
}
|
|
});
|
|
}
|
|
}
|
|
this._instances = new Map();
|
|
this._externalInstances = new Map();
|
|
this._internalInstances = new Set();
|
|
this._externalInstancesNames = new Set();
|
|
this._externalDispatchers = new Map();
|
|
this._lastPublishedEvent = new Map();
|
|
this._eventsToSendCurrentValueOnSubscription = new Map();
|
|
|
|
if (this.communicator.connected.getContent()) {
|
|
// Update the Instances
|
|
this._sendAvailableInstances();
|
|
}
|
|
|
|
this._externalProvidedServices = new Set();
|
|
this._externalPublished = new Set();
|
|
this._externalSubscribed = new Set();
|
|
this._externalProvidedGenerators = new Set();
|
|
|
|
this._internallySubscribeObservables = new Map();
|
|
this._externallySubscribeObservables = new Map();
|
|
this.clearTasks();
|
|
this.unregisterAll();
|
|
}
|
|
}
|