/** * @author Martin Karkowski * @email m.karkowski@zema.de * @create date 2020-10-12 18:52:00 * @modify date 2021-01-03 11:06:25 * @desc [description] */ import * as Logger from "js-logger"; import { ILogger } from "js-logger"; import { generateId } from "../helpers/idMethods"; import { RUNNINGINNODE } from "../helpers/runtimeMethods"; import { getNopeLogger } from "../logger/getLogger"; import { NopeGenericModule } from "../module/GenericModule"; import { NopePromise } from "../promise/nopePromise"; import { IAvailableInstanceGeneratorsMsg, IAvailableServicesMsg, IAvailableTopicsMsg, ICallOptions, ICommunicationInterface, IExternalEventMsg, IInstanceCreationMsg, IInstanceDescriptionMsg, IInstanceRemovalMsg, INopeDispatcherOptions, IRequestTaskMsg, IResponseTaskMsg, ITaskCancelationMsg } from "../types/nope/nopeCommunication.interface"; import { ENopeDispatcherStatus, IDispatcherInfo, IGenerateRemoteInstanceCallback, IGenerateRemoteInstanceForOtherDispatcherCallback, INopeDispatcher } from "../types/nope/nopeDispatcher.interface"; import { INopeModule, INopeModuleDescription, IPropertyOptions } from "../types/nope/nopeModule.interface"; import { INopeObservable, INopeSubscriptionOptions, IObservableCallback, IPipe } from "../types/nope/nopeObservable.interface"; import { INopePromise } from "../types/nope/nopePromise.interface"; /** * A Dispatcher to perform a function on a Remote * Dispatcher. Therefore a Task is created and forwarded * to the remote. * * @export * @class nopeDispatcher */ // @injectable() export class nopeDispatcher implements INopeDispatcher { public readonly id: string; protected _logger: ILogger; /** * Internal Element to store the registered Functions * * @protected * @memberof nopeDispatcher */ protected _definedFunctions: Map Promise>; protected _remotlyCalledFunctions: Set; protected _communicatorCallbacks: Map< string, { registeredId: string; type: "request" | "response"; cb: (data) => any; } >; public readonly communicator: ICommunicationInterface; protected _externalDispatchers: Map; protected _mappingOfRemoteDispatchersAndServices: Map< string, IAvailableServicesMsg >; protected _externalProvidedServices: Set; protected _mappingOfRemoteDispatchersAndGenerators: Map< string, IAvailableInstanceGeneratorsMsg >; protected _externalProvidedGenerators: Set; public methodInterfaceWithOptions: { [index: string]: (options: ICallOptions, ...args) => INopePromise; }; public methodInterface: { [index: string]: (...args) => INopePromise }; protected _mappingOfRemoteDispatchersAndPropsOrEvents: Map< string, IAvailableTopicsMsg >; protected _externalSubscribed: Set; protected _externalPublished: Set; protected _mappingOfRemoteDispatchersAndInstances: Map< string, INopeModuleDescription[] >; protected _externalInstancesNames: Set; protected _externalInstances: Map; protected _eventsToSendCurrentValueOnSubscription: Map< string, Set> >; protected _lastPublishedEvent: Map; public readonly subscribedEvents: INopeObservable; public readonly publishedEvents: INopeObservable; public readonly canceledTask: INopeObservable; public readonly ready: INopeObservable; public readonly availableInstances: INopeObservable; public readonly externalDispatchers: INopeObservable; /** * Internal Element to store the running tasks. * * @protected * @memberof nopeDispatcher */ protected _runningInternalRequestedTasks: Map< string, { resolve: (value: any) => void; reject: (error: any) => void; clear: () => void; serviceName: string; timeout?: any; } >; protected _runningExternalRequestedTasks: Set; protected timeouts: { sendAliveInterval: number; checkInterval: number; slow: number; warn: number; dead: number; remove: number; }; readonly _subscriptionMode: "individual" | "generic"; readonly _publishingMode: "individual" | "generic"; /** * Creates an instance of nopeDispatcher. * @param {nopeRpcDispatcherOptions} options The Options, used by the Dispatcher. * @param {() => INopeObservable} _generateObservable A Helper, to generate Observables. * @memberof nopeDispatcher */ constructor( public options: INopeDispatcherOptions, protected _generateObservable: () => INopeObservable ) { this.communicator = options.communicator; this.id = generateId(); if (options.logger) { this._logger = getNopeLogger("dispatcher " + this.id, "info"); } // Define the Timeouts. if (options.timeouts) { this.timeouts = options.timeouts; } else { this.timeouts = { sendAliveInterval: 500, checkInterval: 250, slow: 1000, warn: 2000, dead: 5000, remove: 10000 }; } this._subscriptionMode = this.communicator.subscriptionMode || "generic"; this._publishingMode = this.communicator.resultSharing || "generic"; /** * Define A Proxy for accessing methods easier. */ const _this = this; const _handlerWithOptions = { get(target, name) { return (options: ICallOptions, ...args) => { return _this.performCall(name, args, options); }; } }; const _handlerWithoutOptions = { get(target, name) { return (...args) => { return _this.performCall(name, args); }; } }; this.methodInterfaceWithOptions = new Proxy({}, _handlerWithOptions); this.methodInterface = new Proxy({}, _handlerWithoutOptions); // Define the Observables provided by the dispatcher. this.subscribedEvents = this._generateObservable(); this.subscribedEvents.setContent([]); this.publishedEvents = this._generateObservable(); this.publishedEvents.setContent([]); this.canceledTask = this._generateObservable(); // Flag to show if the system is ready or not. this.ready = this._generateObservable(); this.ready.setContent(false); // Holding all available instances. this.availableInstances = this._generateObservable(); this.availableInstances.setContent([]); // Observable containing all Dispatcher Informations. this.externalDispatchers = this._generateObservable(); this.externalDispatchers.setContent([]); if (this._logger) { this._logger.info("Dispatcher online. -> Reseting and Initializing"); } this.reset(); this._init().catch((error) => { if (_this._logger) { _this._logger.error("Failed to intialize the Dispatcher", error); } }); } provideInstanceGeneratorForExternalDispatchers( identifier: string, cb: IGenerateRemoteInstanceForOtherDispatcherCallback ): void { const _this = this; if (this._logger?.enabledFor(Logger.DEBUG)) { this._logger.debug( "Adding instance generator for \"" + identifier + "\" to external Generators. Other Elements can now create instances of this type." ); } const _cb = this.registerFunction( async (data: IInstanceCreationMsg) => { // Check if an instance exists or not. // if not => create an instance an store it. if (!_this._instances.has(data.identifier)) { // Create an Instance const _instance = await cb(_this, data.identifier); // Make shure the Data is expressed as Array. if (!Array.isArray(data.params)) { data.params = [data.params]; } // Initialize the instance with the parameters. await _instance.init(...data.params); // A Function is registered, taking care of removing // an instances, if it isnt needed any more. _this.registerFunction( async (_data: IInstanceRemovalMsg) => { if (_this._instances.get(data.identifier)?.usedBy) { // Get the Index of the dispatcher, which is using // the element const idx = _this._instances .get(data.identifier) .usedBy.indexOf(_data.dispatcherID); if (idx > -1) { _this._instances.get(data.identifier).usedBy.splice(idx, 1); } if (_this._instances.get(data.identifier).usedBy.length == 0) { // Unmark as internal instance _this._internalInstances.delete(data.identifier); // Remove the Instance. await _instance.dispose(); // Delete the Entry. _this._instances.delete(data.identifier); // Remove the Function itself _this.unregisterFunction( "instance_dispose_" + data.identifier ); } } }, { deleteAfterCalling: false, id: "instance_dispose_" + data.identifier } ); // Store the Instance. _this._instances.set(data.identifier, { instance: _instance, usedBy: [data.dispatcherID] }); _this._internalInstances.add(data.identifier); // Update the available instances: _this._sendAvailableInstances(); } else { // If an Element exists => Add the Element. _this._instances.get(data.identifier).usedBy.push(data.dispatcherID); } // Define the Response. const response: IInstanceDescriptionMsg = { description: _this._instances .get(data.identifier) .instance.toDescription(), type: data.type }; // Send the Response return response; }, { id: "generateInstance_" + identifier } ); // Store the Generator. this._externalGenerators.set(identifier, _cb); // Send an update of the available Generators this._sendAvailableGenerators(); } unprovideInstanceGeneratorForExternalDispatchers(identifier: string): void { if (this._externalGenerators.has(identifier)) { if (this._logger?.enabledFor(Logger.DEBUG)) { this._logger.debug( "Removing instance generator for \"" + identifier + "\" from external Generators. Other Elements cant create instances of this type anymore." ); } this.unregisterFunction(this._externalGenerators.get(identifier)); this._externalGenerators.delete(identifier); // Send an update of the available Generators this._sendAvailableGenerators(); } } registerInternalInstanceGenerator( identifier: string, cb: IGenerateRemoteInstanceCallback ): void { if (this._logger?.enabledFor(Logger.DEBUG)) { this._logger.debug( "Adding instance generator for \"" + identifier + "\" as internal Generator. This Generator wont be used externally." ); } this._internalGenerators.set(identifier, cb); } unregisterInternalInstanceGenerator(identifier: string): void { if (this._logger?.enabledFor(Logger.DEBUG)) { this._logger.debug( "Rmoving instance generator for \"" + identifier + "\" from internal Generator. The sytem cant create elements of this type any more." ); } this._internalGenerators.delete(identifier); } protected _internalGenerators: Map< string, IGenerateRemoteInstanceCallback >; protected _externalGenerators: Map< string, IGenerateRemoteInstanceCallback >; protected _instances: Map< string, { instance: INopeModule; usedBy: Array; } >; protected _internalInstances: Set; public async generateInstance( description: Partial ): Promise { // Define the Default Description // which will lead to an error. const _defDescription: IInstanceCreationMsg = { dispatcherID: this.id, identifier: "error", params: [], type: "unkown" }; // Assign the provided Description const _description = Object.assign(_defDescription, description, { dispatcherID: this.id }) as IInstanceCreationMsg; if ( _defDescription.type === "unkown" || _description.identifier === "error" ) { throw Error( "Please Provide at least a \"type\" and \"identifier\" in the paremeters" ); } if (this._logger?.enabledFor(Logger.DEBUG)) { this._logger.debug( "Requesting an Instance of type: \"" + _defDescription.type + "\" with the identifier: \"" + _defDescription.identifier + "\"" ); } if (this._instances.has(_description.identifier)) { if (this._logger?.enabledFor(Logger.DEBUG)) { this._logger.debug( "Already created instance with the identifiert: \"" + _defDescription.identifier + "\" => returning this instance" ); } // Add the Dispatcher to the Element: this._instances .get(_description.identifier) .usedBy.push(_description.dispatcherID); // Return the Instance. return this._instances.get(_description.identifier).instance as I; } try { let _type = _description.type; if (!this._internalGenerators.has(_type)) { // No default type is present for a remote // => assing the default type which is "*"" _type = "*"; } if (!this.generatorExists(_description.type)) { throw Error("Generator isnt present in the network!"); } if (this._internalGenerators.has(_type)) { if (this._logger?.enabledFor(Logger.DEBUG)) { this._logger.debug( "No instance with the identifiert: \"" + _defDescription.identifier + "\" found, but an internal generator is available. Using the internal one for creating the instance and requesting the \"real\" instance externally" ); } const result = await this.performCall( "generateInstance_" + _description.type, [_description], { paramsHasNoCallback: true } ); if (this._logger?.enabledFor(Logger.DEBUG)) { this._logger.debug("Received a description for the instance"); } // Create the Instance const instance = (await this._internalGenerators.get(_type)( this, result.description )) as I; // Store the Instances. this._instances.set(_description.identifier, { instance, usedBy: [_description.dispatcherID] }); return instance; } throw Error("No generator Available!"); } catch (e) { if (this._logger) { this._logger.error( "During creating an Instance, the following error Occurd" ); this._logger.error(e); } throw e; } } public async deleteInstance( instance: I | string, preventSendingUpdate = false ): Promise { // Block to find the instance. // Based on the property (string or instance) // the corresponding instance object has to be select. let _instance: { instance: INopeModule; usedBy: Array }; if (typeof instance === "string") { _instance = this._instances.get(instance); } else { for (const data of this._instances.values()) { if (instance == data.instance) { _instance = data; break; } } } // if the instance has been found => delete the instance. if (_instance) { _instance.usedBy.pop(); if (_instance.usedBy.length === 0) { const params: IInstanceRemovalMsg = { dispatcherID: this.id, identifier: _instance.instance.identifier }; // Call the corresponding Dispose Function for the "real" instance // All other elements are just accessors. await this.performCall( "instance_dispose_" + _instance.instance.identifier, [params] ); // Delete the Identifier this._instances.delete(_instance.instance.identifier); // Check if an update should be emitted or not. if (!preventSendingUpdate) { // Update the Instances provided by this module. this._sendAvailableInstances(); } // Dispose the Handler; await _instance.instance.dispose(); } return true; } return false; } /** * Internal Method to handle some requests. * * @protected * @param {IRequestTaskMsg} data The provided data of the request * @param {*} [_function=this._definedFunctions.get(data.functionId)] The Function can be provided * @return {Promise} * @memberof nopeDispatcher */ protected async _handleExternalRequest( data: IRequestTaskMsg, _function?: (...args) => Promise ): Promise { try { // Try to get the function if not provided: if (typeof _function !== "function") { _function = this._definedFunctions.get(data.functionId); } if (this._logger?.enabledFor(Logger.DEBUG)) { // If there is a Logger: this._logger.debug( "Dispatcher \"" + this.id + "\" received request: \"" + data.functionId + "\" -> task: \"" + data.taskId + "\"" ); } const _this = this; if (typeof _function === "function") { // Callbacks const cbs: Array<(reason) => void> = []; const observer = _this.canceledTask.subscribe((cancelEvent) => { if (cancelEvent.taskId == data.taskId) { // Call Every Callback. cbs.map((cb) => cb(cancelEvent.reason)); // Although we are allowed to Cancel the Subscription observer.unsubscribe(); } }); // Only if the Function is present extract the arguments etc. const args = []; // First extract the basic arguments data.params.map((item) => (args[item.idx] = item.data)); // Add the Callbacks. Therefore create a function which will // trigger the remote. data.callbacks.map( (options) => (args[options.idx] = async (..._args) => { // And Create the Task and its Promise. const servicePromise = _this.performCall( options.functionId, _args, options ); const cancelCallback = (reason) => { // The Main Task has been canceled => // We are allowed to canel the Subtask as well. servicePromise.cancel(reason); }; cbs.push(cancelCallback); // Await the Result. If an Task is canceled => The Error is Thrown. const result = await servicePromise; // Remove the Index cbs.splice(cbs.indexOf(cancelCallback), 1); return result; }) ); // Perform the Task it self. const _resultPromise = _function(...args); if ( typeof (_resultPromise as INopePromise).cancel === "function" ) { // Push the Callback to the Result. cbs.push((reason) => (_resultPromise as INopePromise).cancel(reason) ); } // Wait for the Result to finish. const _result = await _resultPromise; // Define the Result message const result: IResponseTaskMsg = { result: typeof _result !== "undefined" ? _result : null, taskId: data.taskId, type: "response" }; // Use the communicator to publish the result. await this.communicator.emitRpcResponse(data.resultSink, result); } } catch (error) { if (this._logger) { // If there is a Logger: this._logger.error( "Dispatcher \"" + this.id + "\" failed with request: \"" + data.taskId + "\"" ); this._logger.error(error); } // An Error occourd => Forward the Error. const result: IResponseTaskMsg = { error: { error, msg: error.toString() }, taskId: data.taskId, type: "response" }; // Send the Error via the communicator to the remote. await this.communicator.emitRpcResponse(data.resultSink, result); } } /** * Internal Function to handle responses. In Generale, * the dispatcher checks if there is an open task with * the provided id. If so => finish the promise. * * @protected * @param {IResponseTaskMsg} data The Data provided to handle the Response. * @return {boolean} Returns a boolean, indicating whether a corresponding task was found or not. * @memberof nopeDispatcher */ protected _handleExternalResponse(data: IResponseTaskMsg): boolean { try { // Extract the Task const task = this._runningInternalRequestedTasks.get(data.taskId); // Delete the Task: this._runningInternalRequestedTasks.delete(data.taskId); // Based on the Result of the Remote => proceed. // Either throw an error or forward the result if (task && data.error) { if (this._logger) { this._logger.error("Failed with task " + data.taskId); this._logger.error("Reason: " + data.error.msg); this._logger.error(data.error); } task.reject(data.error); // Clearout the Timer if (task.timeout) { clearTimeout(task.timeout); } return true; } if (task) { task.resolve(data.result); // Clearout the Timer if (task.timeout) { clearTimeout(task.timeout); } return true; } } catch (e) {} return false; } protected _checkInterval: any = null; protected _sendInterval: any = null; /** * Internal Function, used to initialize the Dispatcher. * It subscribes to the "Messages" of the communicator. * * @protected * @memberof nopeDispatcher */ protected async _init(): Promise { const _this = this; // Wait until the Element is connected. await this.communicator.connected.waitFor((value) => value, { testCurrent: true }); // Setup Test Intervals: if (this.timeouts.checkInterval > 0) { // Define a Checker, which will test the status // of the external Dispatchers. this._checkInterval = setInterval( () => _this._checkDispachterHealth(), this.timeouts.checkInterval ); } if (this.timeouts.sendAliveInterval > 0) { // Define a Timer, which will emit Status updates with // the disered delay. this._sendInterval = setInterval( () => _this.communicator.emitStatusUpdate(_this._genAliveMessage()), this.timeouts.sendAliveInterval ); } this.communicator.connected.subscribe((connected) => { // Handle an unconnect. if (connected) _this.communicator.emitBonjour(_this._genAliveMessage()); }); this.registerInternalInstanceGenerator( "*", async (dispather, description) => { const mod = new NopeGenericModule(dispather, _this._generateObservable); await mod.fromDescription(description, "overwrite"); return mod; } ); // Based on the Mode of the Subscription => // either create indivdual topics for the methods // or use the generice function. switch (this._subscriptionMode) { case "individual": // Iterate over the Defined Functions. for (const [id, cb] of this._definedFunctions.entries()) { // Subscribe the Function this._subscribeToService(id, cb); } break; case "generic": // Add a generic Subscription for callbacks: this.communicator.onRpcRequest("request", (data: IRequestTaskMsg) => { if (data.type === "requestOfTask") { _this._handleExternalRequest(data); } }); break; } // If the Responses are shared using the Generic Methods => subscribe to Responses // On the generic "response" channel. if (this._publishingMode === "generic") { this.communicator.onRpcResponse("response", (data) => { if (data.type === "response") { _this._handleExternalResponse(data); } }); } // Subscribe to the availableServices of Remotes. // If there is a new Service => udpate the External Services this.communicator.onNewServicesAvailable((data) => { try { if (data.dispatcher !== _this.id) { _this._mappingOfRemoteDispatchersAndServices.set( data.dispatcher, data ); _this._updateExternalServices(); if (this._logger?.enabledFor(Logger.DEBUG)) { // If there is a Logger: this._logger.debug( this.id, "received new services from", data.dispatcher ); } } } catch (e) {} }); // Subscribe to new available Topics. this.communicator.onNewObservablesAvailable((data) => { try { if (data.dispatcher !== _this.id) { _this._mappingOfRemoteDispatchersAndPropsOrEvents.set( data.dispatcher, data ); _this._updateExternalEvents(data.dispatcher); if (this._logger?.enabledFor(Logger.DEBUG)) { // If there is a Logger: this._logger.debug( this.id, "received new Observables from", data.dispatcher ); } } } catch (e) {} }); this.communicator.onNewInstanceGeneratorsAvailable((data) => { try { _this._mappingOfRemoteDispatchersAndGenerators.set( data.dispatcher, data ); _this._updateExternalGenerators(); if (this._logger?.enabledFor(Logger.DEBUG)) { // If there is a Logger: this._logger.debug( this.id, "received new generators from", data.dispatcher ); } } catch (e) {} }); this.communicator.onStatusUpdate((info) => { _this._externalDispatchers.set(info.id, info); _this.externalDispatchers.setContent( Array.from(_this._externalDispatchers.values()) ); }); this.communicator.onBonjour((info) => { _this._externalDispatchers.set(info.id, info); _this.externalDispatchers.setContent( Array.from(_this._externalDispatchers.values()) ); if (_this.id !== info.id) { _this._sendAvailableServices(); _this._sendAvailableObservables(); _this._sendAvailableGenerators(); _this._sendAvailableInstances(); if (_this._logger?.enabledFor(Logger.DEBUG)) { // If there is a Logger: _this._logger.debug( "Remote Dispatcher \"" + info.id + "\" went online" ); } } }); this.communicator.onAurevoir((dispatcher: string) => _this._removeDispatcher(dispatcher) ); // Listen to newly created instances. this.communicator.onNewInstancesAvailable((message) => { // Store the instances: _this._mappingOfRemoteDispatchersAndInstances.set( message.dispatcher, message.instances ); // Update the Mapping: _this._updateExternalInstances(); if (_this._logger?.enabledFor(Logger.DEBUG)) { // If there is a Logger: _this._logger.debug( "Remote Dispatcher \"" + message.dispatcher + "\" updated its available instances" ); } }); this.communicator.onTaskCancelation((event) => { if (event.dispatcher !== _this.id) { _this.canceledTask.setContent(event); } }); if (this._logger) { this._logger.info("initialized"); } this.communicator.emitBonjour(this._genAliveMessage()); this.ready.setContent(true); } protected _checkDispachterHealth(): void { const currentTime = Date.now(); let changes = false; for (const status of this._externalDispatchers.values()) { // determine the Difference const diff = currentTime - status.timestamp; // Based on the Difference Determine the Status if (diff > this.timeouts.remove) { // remove the Dispatcher. But be quite. // Perhaps more dispatchers will be removed this._removeDispatcher(status.id, true); changes = true; } else if ( diff > this.timeouts.dead && status.status !== ENopeDispatcherStatus.DEAD ) { status.status = ENopeDispatcherStatus.DEAD; changes = true; } else if ( diff > this.timeouts.warn && diff <= this.timeouts.dead && status.status !== ENopeDispatcherStatus.WARNING ) { status.status = ENopeDispatcherStatus.WARNING; changes = true; } else if ( diff > this.timeouts.slow && diff <= this.timeouts.warn && status.status !== ENopeDispatcherStatus.SLOW ) { status.status = ENopeDispatcherStatus.SLOW; changes = true; } else if ( diff <= this.timeouts.slow && status.status !== ENopeDispatcherStatus.HEALTHY ) { status.status = ENopeDispatcherStatus.HEALTHY; changes = true; } } if (changes) { // Update the External Dispatchers this.externalDispatchers.setContent( Array.from(this._externalDispatchers.values()) ); } } protected _removeDispatcher(dispatcher: string, quite = false): void { // Delete the Generators of the Instances. this._mappingOfRemoteDispatchersAndGenerators.delete(dispatcher); this._mappingOfRemoteDispatchersAndServices.delete(dispatcher); this._mappingOfRemoteDispatchersAndPropsOrEvents.delete(dispatcher); this._mappingOfRemoteDispatchersAndInstances.delete(dispatcher); this._externalDispatchers.delete(dispatcher); // Iterate over the available instances and remove the providers: for (const instance of this._instances.values()) { // Remove all Dispachers: let idx = instance.usedBy.indexOf(dispatcher); while (idx !== -1) { instance.usedBy.splice(idx, 1); idx = instance.usedBy.indexOf(dispatcher); } // Check if the Element isnt required any more => delete it. if (instance.usedBy.length === 0) { this._logger.info( "Disposing instance, because it isnt used any more.", instance.instance.identifier ); instance.instance .dispose() .catch((e) => { this._logger.error("Failed to remove the Instance.", e); }) .then(() => { this._logger.info( "sucessfully disposed instance ", instance.instance.identifier ); }); } } this._updateExternalServices(); this._updateExternalGenerators(); this._updateExternalEvents(); this._updateExternalInstances(); if (!quite) { this.externalDispatchers.setContent( Array.from(this._externalDispatchers.values()) ); } if (this._logger?.enabledFor(Logger.DEBUG)) { // If there is a Logger: this._logger.debug("a dispatcher went offline"); } } protected _genAliveMessage(): IDispatcherInfo { if (RUNNINGINNODE) { const os = require("os"); return { id: this.id, host: { cores: os.cpus().length, cpu: `${os.cpus()[0].model}`, os: os.platform(), ram: os.freemem() / os.totalmem(), name: os.hostname() }, pid: process.pid, timestamp: Date.now(), status: ENopeDispatcherStatus.HEALTHY }; } return { host: { cores: -1, cpu: "unkown", name: navigator.appCodeName + " " + navigator.appVersion, os: navigator.platform, ram: 1 }, id: this.id, pid: this.id, timestamp: Date.now(), status: ENopeDispatcherStatus.HEALTHY }; } /** * Function to cancel an indivual Task. * * @param {string} taskId The Id of the Task. Which should be canceled. * @param {Error} reason The Reason, why the Task should be canceled (In general shoudl be something meaning full) * @return {*} Flag, that indicates, whether cancelation was sucessfull or not. * @memberof nopeDispatcher */ public cancelTask(taskId: string, reason: Error) { if (this._runningInternalRequestedTasks.has(taskId)) { const task = this._runningInternalRequestedTasks.get(taskId); // Delete the task this._runningInternalRequestedTasks.delete(taskId); // Propagate the Cancellation (internally): task.reject(reason); // Propagate the Cancellation externally. // Therefore use the desired Mode. switch (this._publishingMode) { default: case "individual": case "generic": this.communicator.emitTaskCancelation({ dispatcher: this.id, reason, taskId }); break; // case 'individua2l': // this._communicator // break; } // Indicate a successful cancelation. return true; } // Task hasnt been found => Cancel the Task. return false; } /** * Internal Helper Function, used to close all tasks with a specific service. * * @protected * @param {string} serviceName The Name of the Service. * @param {Error} reason The provided Reason, why cancelation is reuqired. * @memberof nopeDispatcher */ public cancelRunningTasksOfService(serviceName: string, reason: Error) { // Provide a List containing all Tasks, that has to be canceled const _tasksToCancel: { reject: (error: Error) => void; id: string; }[] = []; // Filter all Tasks that shoud be canceled. for (const [id, task] of this._runningInternalRequestedTasks.entries()) { // Therefore compare the reuqired Service by the Task if (task.serviceName === serviceName) { // if the service matches, put it to our list. _tasksToCancel.push({ id, reject: task.reject }); } } if (_tasksToCancel.length > 0) { // First remove all Tasks. // Then cancel them to avoid side effects for (const item of _tasksToCancel) { this._runningInternalRequestedTasks.delete(item.id); } // Now Reject all Tasks. for (const item of _tasksToCancel) { item.reject(reason); } } } public async dispose(): Promise { for (const task of Array.from(this._runningInternalRequestedTasks.keys())) { this.cancelTask(task, new Error("Client going offline")); } if (this._sendInterval) { clearInterval(this._sendInterval); } if (this._checkInterval) { clearInterval(this._checkInterval); } // Emits the aurevoir Message. this.communicator.emitAurevoir(this.id); } /** * Function to update the used Services. * * @protected * @memberof serviceRegistry */ protected _updateExternalServices() { const _this = this; // Store the Availabe Services before the Update. const _servicesBeforeUpdate = new Set(this._externalProvidedServices); // Clear the Services this._externalProvidedServices.clear(); for (const dispatcherInfo of this._mappingOfRemoteDispatchersAndServices.values()) { dispatcherInfo.services.map((service) => _this._externalProvidedServices.add(service) ); } // Create a Comparing loop. // The Loop checks if the element doesnt exists in the known services // before the update. const added = new Set(); for (const service of this._externalProvidedServices) { if (!_servicesBeforeUpdate.has(service)) { added.add(service); } else { // Delete the element, because it is available. _servicesBeforeUpdate.delete(service); } } // If there are unavailable tasks => cancel their tasks. if (_servicesBeforeUpdate.size > 0) { for (const unavailable of _servicesBeforeUpdate) { // Cancel the Tasks this.cancelRunningTasksOfService( unavailable, new Error("Service unavailable!") ); } } return { // Contains the "new" services addedServices: Array.from(added), // contains the "unavailable services" removedServices: Array.from(_servicesBeforeUpdate) }; } /** * Function to update the used Services. * * @protected * @memberof serviceRegistry */ protected _updateExternalGenerators() { const _this = this; // Clear the Services this._externalProvidedGenerators.clear(); for (const generators of this._mappingOfRemoteDispatchersAndGenerators.values()) { generators.generators.map((gen) => _this._externalProvidedGenerators.add(gen) ); } } /** * Internal Function to update the Listing of external Topcis. * This Function creates a list containing all subscriptions * and publishers which are external. * * @protected * @memberof nopeDispatcher */ protected _updateExternalEvents(externalDispatcher = "") { const _this = this; // Clear the Services const _published = new Set(); const _subscribed = new Set(); for (const dispatcherInfo of this._mappingOfRemoteDispatchersAndPropsOrEvents.values()) { dispatcherInfo.published.map((propOrEvent) => _published.add(propOrEvent) ); dispatcherInfo.subscribed.map((propOrEvent) => _subscribed.add(propOrEvent) ); } // Send the Values of the Requested Properties // Therefore iterate over the new elements and // provide the last value // 1. Filter the new Subscribed Elements: const _newlySubscribed = new Set( [..._subscribed].filter((item) => !_this._externalSubscribed.has(item)) ); if (externalDispatcher !== "") this._mappingOfRemoteDispatchersAndPropsOrEvents .get(externalDispatcher) .subscribed.map((item) => _newlySubscribed.add(item)); // 2. Iterate over the Elements: for (const _topic of _newlySubscribed) { if (this._lastPublishedEvent.has(_topic)) { // Send the Element this.communicator.emitEvent( _topic, this._lastPublishedEvent.get(_topic) ); } } // Store the new values: this._externalSubscribed = _subscribed; this._externalPublished = _published; // Update the Elements. this.subscribedEvents.setContent(Array.from(this._externalSubscribed)); this.publishedEvents.setContent(Array.from(this._externalPublished)); } /** * Internal Function to update the Listing of external provided instances * * * @protected * @memberof nopeDispatcher */ protected _updateExternalInstances() { const _this = this; // Clear the available instances. this._externalInstances.clear(); this._externalInstancesNames.clear(); for (const dispatcherInfo of this._mappingOfRemoteDispatchersAndInstances.values()) { dispatcherInfo.map((instance) => { // Check if the Instance exists: if (!_this._externalInstances.has(instance.identifier)) { _this._externalInstancesNames.add(instance.identifier); _this._externalInstances.set(instance.identifier, instance); } else if ( _this._externalInstances.get(instance.identifier)?.type !== instance.type ) { // A Miss Matching occurd. // That should lead to a fatal error ? _this._logger.warn( "An Instance with the name \"" + instance.identifier + "\" has already been declared", _this._externalInstances.get(instance.identifier)?.type, "!=", instance.type ); } }); } // Store the Instance Mapping and Publish it. this.availableInstances.setContent( Array.from(this._externalInstances.values()) ); } /** * Function to test if a specific Service exists. * * @param {string} id The Id of the Serivce * @return {boolean} The result of the Test. True if either local or remotly a service is known. * @memberof nopeDispatcher */ public serviceExists(id: string) { return ( this._definedFunctions.has(id) || this._externalProvidedServices.has(id) ); } /** * Function to test if an subscription exists, * for the given topic. * * @param {string} topic The topic to test. * @return {boolean} The result of the test. * @memberof nopeDispatcher */ public subscriptionExists(topic: string) { return this._externalSubscribed.has(topic); } /** * Function to test if a generator exists for * the given type idenfitier * * @param {string} typeIdentifier Identifier of the type. * @return {boolean} The result of the test. * @memberof nopeDispatcher */ public generatorExists(typeIdentifier: string) { return this._externalProvidedGenerators.has(typeIdentifier); } /** * Function to adapt a Request name. * Only used internally * * @protected * @param {string} id the original ID * @return {string} the adapted ID. * @memberof nopeDispatcher */ protected _getServiceName(id: string, type: "request" | "response"): string { return id.startsWith(`${type}/`) ? id : `${type}/${id}`; } /** * Internal Helper Function to subscribe to a Function element. * * @protected * @param {string} id the Id of the function * @param {(...args) => Promise} _cb The Callback of the Function * @return {void} the adapted ID. * @memberof nopeDispatcher */ protected _subscribeToService( id: string, _cb: (...args) => Promise ): void { const _req = this._getServiceName(id, "request"); if ( this._subscriptionMode === "individual" && !this._communicatorCallbacks.has(_req) ) { const _this = this; // Define a Function. const cb = (data: IRequestTaskMsg) => { if (data.type === "requestOfTask") { _this._handleExternalRequest(data, _cb); } }; // Add the Callback. this._communicatorCallbacks.set(_req, { registeredId: _req, type: "request", cb }); // Register Functions. this.communicator.onRpcRequest(_req, cb); if (this._logger?.enabledFor(Logger.DEBUG)) { // If there is a Logger: this._logger.debug( "Dispatcher \"" + this.id + "\" listening on: \"" + _req + "\"" ); } } } /** * Function, used to subscribe to results. * * @protected * @param {string} id * @param {boolean} deleteAfterCalling * @memberof nopeDispatcher */ protected _subscribeToResult(id: string, deleteAfterCalling: boolean): void { const _res = this._getServiceName(id, "response"); if ( this._publishingMode === "individual" && !this._communicatorCallbacks.has(_res) ) { const _this = this; // Define a Function. const cb = (data: IResponseTaskMsg) => { if (data.type === "response") { if (_this._handleExternalResponse(data) && deleteAfterCalling) { _this._removeRpcSubscription(_res); } } }; // Add the Callback. this._communicatorCallbacks.set(_res, { registeredId: _res, type: "response", cb }); // Register Functions. this.communicator.onRpcResponse(_res, cb); } } /** * Function to register a Function in the Dispatcher * * @param {(...args) => Promise} func The function which should be called if a request is mapped to the Function. * @param {{ * // Flag to enable unregistering the function after calling. * deleteAfterCalling?: boolean, * // Instead of generating a uuid an id could be provided * id?: string; * }} [options={}] Options to enhance the registered ID and enabling unregistering the Element after calling it. * @return {*} {(...args) => Promise} The registered Function * @memberof nopeDispatcher */ public registerFunction( func: (...args) => Promise, options: { /** Flag to enable unregistering the function after calling. */ deleteAfterCalling?: boolean; /** Instead of generating a uuid an id could be provided */ id?: string; /** Flag to enable / disable sending to registery */ preventSendingToRegistery?: boolean; } = {} ): (...args) => Promise { const _this = this; // Define / Use the ID of the Function. const _id = options.id || generateId(); let _func = func; if (options.deleteAfterCalling) { _func = async (...args) => { // Unregister the Method _this.unregisterFunction(_id, { preventSendingToRegistery: options.preventSendingToRegistery }); // Return the Result of the Original Function. return await func(...args); }; } // Define a ID for the Function _func["id"] = _id; // Define the callback. _func["unregister"] = () => _this.unregisterFunction(_id); // Reister the Function this._definedFunctions.set(_func["id"], _func); // Register the Callback: this._subscribeToService(_id, _func); if (!options.preventSendingToRegistery) { // Publish the Available Services. this._sendAvailableServices(); if (this._logger?.enabledFor(Logger.DEBUG)) { // If there is a Logger: this._logger.debug( "Dispatcher \"" + this.id + "\" registered: \"" + _id + "\"" ); } } // Return the Function. return _func; } /** * Function to unregister a Function from the Dispatcher * @param {(((...args) => void) | string | number)} func The Function to unregister * @return {*} {boolean} Flag, whether the element was removed (only if found) or not. * @memberof nopeDispatcher */ public unregisterFunction( func: ((...args) => void) | string, options: { /** Flag to enable / disable sending to registery */ preventSendingToRegistery?: boolean; } = {} ): boolean { const _id = typeof func === "string" ? func : (func["id"] as string) || "0"; this._removeRpcSubscription(_id); if (!options.preventSendingToRegistery) { // Publish the Available Services. this._sendAvailableServices(); if (this._logger?.enabledFor(Logger.DEBUG)) { // If there is a Logger: this._logger.debug( "Dispatcher \"" + this.id + "\" unregistered: \"" + _id + "\"" ); } } return this._definedFunctions.delete(_id); } public unregisterObservable( observable: INopeObservable | string, options: { // Flag to enable / disable sending to registery preventSendingToRegistery?: boolean; } = {} ): boolean { const _id = typeof observable === "string" ? observable : (observable.id as string) || "0"; if (!options.preventSendingToRegistery) { // Publish the Available Services. this._sendAvailableObservables(); if (this._logger?.enabledFor(Logger.DEBUG)) { // If there is a Logger: this._logger.debug( "Dispatcher \"" + this.id + "\" unregistered: \"" + _id + "\"" ); } } return this._definedFunctions.delete(_id); } protected _externallySubscribeObservables: Map< string, { observable: INopeObservable; cb: (...arg) => void; } >; protected _internallySubscribeObservables: Map< string, Set> >; /** * Creates an Event listener (if required) * * @protected * @param {string} event The Event to Listen. * @return {nopeObservable} An Listener on the Communication Channel. * @memberof nopeDispatcher */ protected _subscribeToEvent(event: string) { const item = this._externallySubscribeObservables.get(event) || { observable: this._generateObservable(), cb: () => {} }; if (!item.observable.hasSubscriptions) { const _this = this; const cb = (data: IExternalEventMsg) => { item.observable.setContent(data, _this.id); }; this.communicator.onEvent(event, cb); item.cb = cb; } // Set the Items. this._externallySubscribeObservables.set(event, item); return item.observable; } /** * Function to unsubscribe from an event of the channel. * * @protected * @param {string} event * @memberof nopeDispatcher */ protected _unsubscribeEvent(event: string) { const item = this._externallySubscribeObservables.get(event); if (item) { this.communicator.offEvent(event, item.cb); // Dispose the Observable const obs = this._externallySubscribeObservables.get(event).observable; obs.dispose(); // Remove the Observable this._externallySubscribeObservables.delete(event); } } /** * Helper Function to directly subscribe to a specific value. * @param event The Event. * @param callback The Callback used to subscribe to the event * @param options Additional Options used to specify the Subscribing. */ public async subscribeToEvent( event: string, callback: IObservableCallback, options: { pipe?: { pipe?: IPipe; scope?: { [index: string]: any }; }; preventSendingToRegistery?: boolean; mode?: "immediate" | "sync"; subscriptionOptions?: INopeSubscriptionOptions; } ) { // Create a new observable: const observable = this._generateObservable(); // register the newly created observable. this.registerObservable(observable, { mode: "subscribe", topic: event, preventSendingToRegistery: options.preventSendingToRegistery, pipe: options.pipe }); // Create an Observer by susbcribing to the external source (this is directly linked to the System) const observer = observable.subscribe( callback, options.mode, options.subscriptionOptions ); observer.unsubscribe = () => { observable.dispose(); }; // Return the Observer. return observer; } /** * Function to manually emit an Event. * @param _eventName * @param data * @param sender * @param timestamp * @param forced * @param args */ public async emit( _eventName: string, data: T, sender?: string, timestamp?: number, forced = false, ...args ) { // Only Publish data, if there exists a Subscription. if (forced || (this.subscriptionExists(_eventName) && this.id !== sender)) { // Use the Communicator to emit the Event. await this.communicator.emitEvent(_eventName, { data: data, topic: _eventName, sender: this.id, type: "event", timestamp }); } } public registerObservable( observable: INopeObservable, options: IPropertyOptions ): INopeObservable { // Reference to itself const _this = this; // Extract the Topic, pipe and scope. const _subTopic = typeof options.topic === "string" ? options.topic : options.topic.subscribe || null; const _pubTopic = typeof options.topic === "string" ? options.topic : options.topic.publish || null; const _pipe = typeof options.pipe === "function" ? options.pipe || null : null; const _scope = typeof options.pipe === "object" ? options.pipe.scope || null : null; // A Flag, indicating, whether the topic is new or not. const newElement = !this._externallySubscribeObservables.has(_subTopic); const _externalSource = this._subscribeToEvent(_subTopic); // Test if the Item should be subscribe or not. if ( options.mode == "subscribe" || (Array.isArray(options.mode) && options.mode.includes("subscribe")) ) { if (_pipe) { const observer = _externalSource.enhancedSubscription( (data: IExternalEventMsg) => { // Test if the Content, which has been forwared in here inst the own dispathcer. if (data.sender != _this.id) { if (!observable.setContent(data.data, _this.id, data.timestamp)) { observable.forcePublish(); } } }, { scope: _scope, pipe: _pipe } ); const dispose = observable.dispose; observable.dispose = () => { // Kill the Observer; observer.unsubscribe(); // Unsubscribe the Event _this._unsubscribeEvent(_subTopic); // Call the original Dispose function; dispose.apply(observable); }; } else { const observer = _externalSource.subscribe({ next(data: IExternalEventMsg) { if (_this.id !== data.sender) { // Externally force if required if (!observable.setContent(data.data, _this.id, data.timestamp)) { observable.forcePublish(); } } }, complete() { observable.observable.complete(); }, error(err) { observable.observable.error(err); } }); // Overwrite the Original Dispose Function. const dispose = observable.dispose; observable.dispose = () => { // Kill the Observer; observer.unsubscribe(); // Unsubscribe the Event _this._unsubscribeEvent(_subTopic); // Call the original Dispose function; dispose.apply(observable); }; } // Only if required => update the Value: // This is done, by using the lastly published / // received value if (!options.preventSendingUpdateOnNewSubscription) { // Get the available data. // First try to get current source-data (which in theory should be) // Up-to-date. // If this data isnt available try to use the lastly published data. const _sourceData = _externalSource.getContent(); const data = _sourceData !== null && _sourceData !== undefined ? _sourceData : this._lastPublishedEvent.get(_subTopic); if (data) { observable.setContent(data.data, _this.id, data.timestamp); } } } if ( options.mode == "publish" || (Array.isArray(options.mode) && options.mode.includes("publish")) ) { const cb = (data, sender?, timestamp?, ...args) => { // Only Publish data, if there exists a Subscription. if (_this.id !== sender) { const msg: IExternalEventMsg = { data: data, topic: _pubTopic, sender: _this.id, type: "event", timestamp }; // Test if the Message is subscribed externally: // if so, use the socket to send the data to other // dispatchers. if (_this.subscriptionExists(_pubTopic)) { // Use the Communicator to emit the Event. _this.communicator.emitEvent(_pubTopic, msg); } else if (_externalSource.observerLength > 0) { // The Observable it is used multiple times internally. // For this purpose, send the data using the "external source" // -channel. because we are using the original id of the sender, // we prevent endless loops. Observables are always checking // if an update has been send by them self or not. _externalSource.setContent({ data: data, topic: _pubTopic, sender: sender, type: "event", timestamp }); } // Store the lastly published message, this will be published if // a new subscription is provided if (_this._eventsToSendCurrentValueOnSubscription.has(_pubTopic)) { _this._lastPublishedEvent.set(_pubTopic, msg); } } }; // Register the Internally Subscribed Element. const _set01 = this._internallySubscribeObservables.get(_pubTopic) || new Set(); _set01.add(observable); this._internallySubscribeObservables.set(_pubTopic, _set01); // Mark for sending updates if required. if (!options.preventSendingUpdateOnNewSubscription) { const _set02 = this._eventsToSendCurrentValueOnSubscription.get(_pubTopic) || new Set(); _set02.add(observable); this._eventsToSendCurrentValueOnSubscription.set(_pubTopic, _set02); // Test if there exists content, if so => store it // to the Element. const data = observable.getContent(); if (data !== null && data !== undefined) { this._lastPublishedEvent.set(_pubTopic, { data: data, topic: _pubTopic, sender: _this.id, type: "event", timestamp: Date.now() }); // Use the External Source to store the Last Value _externalSource.setContent({ data: data, topic: _pubTopic, sender: observable.id, type: "event", timestamp: Date.now() }); } } if (_pipe) { const observer = observable.enhancedSubscription(cb, { scope: _scope, pipe: _pipe }); // Overwrite the Original Dispose Function. const dispose = observable.dispose; observable.dispose = () => { // Kill the Observer; observer.unsubscribe(); // Unsubscribe the Event _this._unsubscribeEvent(_subTopic); // Unregister the Internally Subscribed Element. const _set01 = _this._internallySubscribeObservables.get(_pubTopic) || new Set(); _set01.delete(observable); if (_set01.size > 0) { _this._internallySubscribeObservables.set(_pubTopic, _set01); } else { _this._internallySubscribeObservables.delete(_pubTopic); // Optionally send an update. if (!options.preventSendingToRegistery) { // Publish the Available Services. _this._sendAvailableObservables(); } } // Remove the Element. const _set02 = this._eventsToSendCurrentValueOnSubscription.get(_pubTopic) || new Set(); _set02.delete(observable); if (_set02.size > 0) { this._eventsToSendCurrentValueOnSubscription.set(_pubTopic, _set02); } else { // Topic isnt provided any more. // delete the flag _this._eventsToSendCurrentValueOnSubscription.delete(_pubTopic); _this._lastPublishedEvent.delete(_pubTopic); } // Call the original Dispose function; dispose.apply(observable); }; } else { const observer = observable.subscribe(cb); // Overwrite the Original Dispose Function. const dispose = observable.dispose; observable.dispose = () => { // Kill the Observer; observer.unsubscribe(); // Unsubscribe the Event _this._unsubscribeEvent(_subTopic); // Unregister the Internally Subscribed Element. const _set01 = _this._internallySubscribeObservables.get(_pubTopic) || new Set(); _set01.delete(observable); if (_set01.size > 0) { _this._internallySubscribeObservables.set(_pubTopic, _set01); } else { _this._internallySubscribeObservables.delete(_pubTopic); // Optionally send an update. if (!options.preventSendingToRegistery) { // Publish the Available Services. _this._sendAvailableObservables(); } } // Remove the Element. const _set02 = this._eventsToSendCurrentValueOnSubscription.get(_pubTopic) || new Set(); _set02.delete(observable); if (_set02.size > 0) { this._eventsToSendCurrentValueOnSubscription.set(_pubTopic, _set02); } else { // Topic isnt provided any more. // delete the flag _this._eventsToSendCurrentValueOnSubscription.delete(_pubTopic); _this._lastPublishedEvent.delete(_pubTopic); } // Call the original Dispose function; dispose.apply(observable); }; } } if (!options.preventSendingToRegistery && newElement) { // Publish the Available Services. this._sendAvailableObservables(); } // Return the Function. return observable; } protected _removeRpcSubscription(_id: string) { // Try to unregister the Callback from the communcator: if (this._communicatorCallbacks.has(_id)) { const _callbacks = this._communicatorCallbacks.get(_id); switch (_callbacks.type) { case "request": // Unregister the RPC-Request-Listener this.communicator.offRpcRequest( _callbacks.registeredId, _callbacks.cb ); break; case "response": // Unregister the RPC-Response-Listener this.communicator.offRpcResponse( _callbacks.registeredId, _callbacks.cb ); break; } // Remove the Callback this._communicatorCallbacks.delete(_id); } } /** * Function used to update the Available Services. * * @protected * @memberof nopeDispatcher */ protected _sendAvailableServices() { // Define the Message const message: IAvailableServicesMsg = { dispatcher: this.id, services: Array.from(this._definedFunctions.keys()) }; if (this._logger?.enabledFor(Logger.DEBUG)) { this._logger.debug("sending available services"); } // Send the Message. this.communicator.emitNewServicesAvailable(message); } /** * Function to emit the available topics. * * @protected * @memberof nopeDispatcher */ protected _sendAvailableObservables() { // Define the Message const message: IAvailableTopicsMsg = { dispatcher: this.id, published: Array.from(this._internallySubscribeObservables.keys()), subscribed: Array.from(this._externallySubscribeObservables.keys()) }; if (this._logger?.enabledFor(Logger.DEBUG)) { this._logger.debug( "sending available properties. (subscribing and publishing events)" ); } // Send the Message. this.communicator.emitNewObersvablesAvailable(message); } protected _sendAvailableGenerators() { // Define the Message const message: IAvailableInstanceGeneratorsMsg = { dispatcher: this.id, generators: Array.from(this._externalGenerators.keys()) }; if (this._logger?.enabledFor(Logger.DEBUG)) { this._logger.debug("sending available instance generators"); } // Send the Message. this.communicator.emitNewInstanceGeneratorsAvailable(message); } /** * Update the Available Instances * * @protected * @memberof nopeDispatcher */ protected _sendAvailableInstances(): void { const _this = this; // Update the Instances provided by this module. this.communicator.emitNewInstancesAvailable({ dispatcher: this.id, instances: Array.from(this._internalInstances).map((identifier) => // Generate the Module Description _this._instances.get(identifier).instance.toDescription() ) }); } /** * Function which is used to perform a call on the remote. * * @template T * @param {string} serviceName The Name / ID of the Function * @param {any[]} params The provided Parameters. * @param {({ * deletableCallbacks: Array; * })} [options={ * deletableCallbacks: [], * paramsHasNoCallback: false, * preventErrorTest: false * }] You could additiona Optionf for the callback. * @return {*} {Promise} The result of the Operation * @memberof nopeDispatcher */ public performCall( serviceName: string, params: any[], options: Partial = {} ): INopePromise { // Get a Call Id const _taskId = generateId(); const _registeredIdx: Array = []; const _this = this; const _options = Object.assign( { deletableCallbacks: [], paramsHasNoCallback: false, dynamicCallback: false, resultSink: this._publishingMode === "generic" ? "response" : this._getServiceName(serviceName, "response") }, options ) as ICallOptions; this._subscribeToResult(serviceName, _options.dynamicCallback); const clear = () => { // Delete all Callbacks. _registeredIdx.map((id) => _this.unregisterFunction(id)); // Remove the task: if (_this._runningInternalRequestedTasks.has(_taskId)) { const task = _this._runningInternalRequestedTasks.get(_taskId); if (task.timeout) { clearTimeout(task.timeout); } // Remove the Timeout. _this._runningInternalRequestedTasks.delete(_taskId); } if (_this._logger?.enabledFor(Logger.DEBUG)) { _this._logger.debug("Clearing Callbacks from " + _taskId); } }; if (_this._logger?.enabledFor(Logger.DEBUG)) { _this._logger.debug( "Dispatcher \"" + this.id + "\" requesting externally Function \"" + serviceName + "\" with task: \"" + _taskId + "\"" ); } // Define a Callback-Function, which will expect the Task. const ret = new NopePromise(async (resolve, reject) => { try { const requestedTask: any = { resolve, reject, clear, serviceName, timeout: null }; // Register the Handlers, _this._runningInternalRequestedTasks.set(_taskId, requestedTask); // Define a Task-Request const taskRequest: IRequestTaskMsg = { functionId: serviceName, params: [], callbacks: [], taskId: _taskId, type: "requestOfTask", resultSink: _options.resultSink }; // Test if there is no Callback integrated if (!options.paramsHasNoCallback) { // If so, the parameters has to be detailled: // Iterate over all Parameters and // Determin Callbacks. Based on the Parameter- // Type assign it either to packet.params ( // for parsable Parameters) and packet.callbacks // (for callback Parameters) for (const [idx, contentOfParameter] of params.entries()) { // Test if the parameter is a Function if (typeof contentOfParameter !== "function") { taskRequest.params.push({ idx, data: contentOfParameter }); } else { // The Parameter is a Callback => store a // Description of the Callback and register // the callback inside of the Dispatcher const deleteAfterCalling = _options.deletableCallbacks.includes( idx ); const _func = _this.registerFunction(contentOfParameter, { deleteAfterCalling, preventSendingToRegistery: true }); _registeredIdx.push(_func["id"]); // Register the Callback taskRequest.callbacks.push({ functionId: _func["id"], idx, deleteAfterCalling, dynamicCallback: true, deletableCallbacks: [], resultSink: _this._publishingMode === "generic" ? "response" : _this._getServiceName(_func["id"], "response") }); } } } else { for (const [idx, contentOfParameter] of params.entries()) { taskRequest.params.push({ idx, data: contentOfParameter }); } } if (!_options.dynamicCallback && !_this.serviceExists(serviceName)) { // Create an Error: const error = new Error( "No Service Provider known for \"" + serviceName + "\"" ); if (_this._logger) { _this._logger.error( "No Service Provider known for \"" + serviceName + "\"" ); _this._logger.error(error); } throw error; } // Send the Message to the specific element: if (_this._subscriptionMode === "individual") { await _this.communicator.emitRpcRequest( _this._getServiceName(taskRequest.functionId, "request"), taskRequest ); if (_this._logger?.enabledFor(Logger.DEBUG)) { _this._logger.debug( "Dispatcher \"" + this.id + "\" putting task \"" + _taskId + "\" on: \"" + _this._getServiceName(taskRequest.functionId, "request") + "\"" ); } } else { await _this.communicator.emitRpcRequest("request", taskRequest); if (_this._logger?.enabledFor(Logger.DEBUG)) { _this._logger.debug( "Dispatcher \"" + this.id + "\" putting task \"" + _taskId + "\" on: \"request\"" ); } } // If there is a timeout => if (options.timeout > 0) { requestedTask.timeout = setTimeout(() => { _this.cancelTask( _taskId, new Error( "TIMEOUT. The Service allowed execution time of " + options.timeout.toString() + "[ms] has been excided" ) ); }, options.timeout); } } catch (e) { // Clear all Elements of the Function: clear(); // Throw the error. reject(e); } }); ret.taskId = _taskId; ret.cancel = (reason) => { _this.cancelTask(_taskId, reason); }; return ret; } /** * Function to clear all pending tasks * * @memberof nopeDispatcher */ public clearTasks(): void { if (this._runningInternalRequestedTasks) this._runningInternalRequestedTasks.clear(); else this._runningInternalRequestedTasks = new Map(); } /** * Function to unregister all Functions of the Dispatcher. * * @memberof nopeDispatcher */ public unregisterAll(): void { if (this._definedFunctions) { for (const id of this._definedFunctions.keys()) { this.unregisterFunction(id); } this._definedFunctions.clear(); } else { this._definedFunctions = new Map Promise>(); } // Reset the Callbacks. this._communicatorCallbacks = new Map(); } /** * Function to reset the Dispatcher. * * @memberof nopeDispatcher */ public reset(): void { this._remotlyCalledFunctions = new Set(); this._mappingOfRemoteDispatchersAndServices = new Map(); this._mappingOfRemoteDispatchersAndPropsOrEvents = new Map(); this._mappingOfRemoteDispatchersAndGenerators = new Map(); this._mappingOfRemoteDispatchersAndInstances = new Map(); this._internalGenerators = new Map(); this._externalGenerators = new Map(); // If Instances Exists => Delete them. if (this._instances) { const _this = this; // Dispose all Instances. for (const [name, instance] of this._instances.entries()) { // Remove the Instance. this.deleteInstance(name, true).catch((e) => { if (_this._logger) { _this._logger.error("Failed Removing Instance \"" + name + "\""); _this._logger.error(e); } }); } } this._instances = new Map(); this._externalInstances = new Map(); this._internalInstances = new Set(); this._externalInstancesNames = new Set(); this._externalDispatchers = new Map(); this._lastPublishedEvent = new Map(); this._eventsToSendCurrentValueOnSubscription = new Map(); // Update the Instances this._sendAvailableInstances(); this._externalProvidedServices = new Set(); this._externalPublished = new Set(); this._externalSubscribed = new Set(); this._externalProvidedGenerators = new Set(); this._internallySubscribeObservables = new Map(); this._externallySubscribeObservables = new Map(); this.clearTasks(); this.unregisterAll(); } }