/** * @author Martin Karkowski * @email m.karkowski@zema.de * @create date 2020-10-12 18:52:00 * @modify date 2021-10-19 09:15:25 * @desc [description] */ import * as Logger from "js-logger"; import { ILogger } from "js-logger"; import { avgOfArray, minOfArray } from "../helpers/arrayMethods"; import { isAsyncFunction, sleep, waitFor } from "../helpers/async"; import { generateId } from "../helpers/idMethods"; import { RUNNINGINNODE } from "../helpers/runtimeMethods"; import { determineDifference } from "../helpers/setMethods"; import { getNopeLogger } from "../logger/getLogger"; import { NopeGenericWrapper } from "../module/GenericWrapper"; import { NopePromise } from "../promise/nopePromise"; import { ENopeDispatcherStatus, IAvailableInstanceGeneratorsMsg, IAvailableServicesMsg, IAvailableTopicsMsg, ICallOptions, ICommunicationBridge, IDataPubSubSystem, IDispatcherInfo, IEventAdditionalData, IEventCallback, IExternalEventMsg, IGenerateRemoteInstanceCallback, IGenerateRemoteInstanceForOtherDispatcherCallback, IInstanceCreationMsg, IInstanceDescriptionMsg, IInstanceRemovalMsg, INopeDispatcher, INopeDispatcherOptions, INopeEventEmitter, INopeModule, INopeModuleDescription, INopeObservable, INopeObserver, INopePromise, INopeSubscriptionOptions, INopeTimeOptions, INopeTopic, IPipe, IPropertyOptions, IPubSubSystem, IRequestTaskMsg, IResponseTaskMsg, ITaskCancelationMsg, ValidDefaultSelectors, ValidSelectorFunction, } from "../types/nope/index"; /** * A Dispatcher to perform a function on a Remote * Dispatcher. Therefore a Task is created and forwarded * to the remote. * * @export * @class nopeDispatcher */ export class nopeDispatcher implements INopeDispatcher { public readonly id: string; protected _logger: ILogger; /** * Internal Element to store the registered Functions * * @protected * @memberof nopeDispatcher */ protected _definedFunctions: Map< string, { options: { preventSendingToRegistery: boolean; }; func: (...args) => Promise; } >; protected _remotlyCalledFunctions: Set; protected _communicatorCallbacks: Map< string, { registeredId: string; type: "request" | "response"; cb: (data) => any; } >; /** * The used Communication interface * * @type {ICommunicationBridge} * @memberof nopeDispatcher */ public readonly communicator: ICommunicationBridge; /** * A Map holding the current Status of external dispatchers. * Key = Dispatcher-ID * Value = Last Known status of the dispatcher * * @protected * @type {Map} * @memberof nopeDispatcher */ protected _externalDispatchers: Map; /** * A Mapping of the Services a dispatcher is hosting. * Key = Dispatcher-ID * Value = Available Services * * @protected * @type {Map< * string, * IAvailableServicesMsg * >} * @memberof nopeDispatcher */ protected _mappingOfRemoteDispatchersAndServices: Map< string, IAvailableServicesMsg >; /** * List with Services, which are available and will * be performed by different dispatchers. * * @protected * @type {Set} * @memberof nopeDispatcher */ protected _externalProvidedServices: Set; /** * Element holding the information of how-many times, * the service has been provided. * * Key = Service Name * Value = Times of being provided * * @example example: * service01: 2 * service02: 1 * ... * * @protected * @type {Map} * @memberof nopeDispatcher */ protected _amountOfServicesProvidedByName: Map; /** * Element holding the Mapping of the Dispatcher and its instance * generators * * Key = Dispatcher-ID * Value = Available Instance Generator of that Dispatcher * * @protected * @type {Map< * string, * IAvailableInstanceGeneratorsMsg * >} * @memberof nopeDispatcher */ protected _mappingOfRemoteDispatchersAndGenerators: Map< string, IAvailableInstanceGeneratorsMsg >; /** * Summary of the external provided generators. * * @protected * @type {Set} * @memberof nopeDispatcher */ protected _externalProvidedGenerators: Set; /** * Element holding the information of how-many times, * the instance-generator has been provided. * * Key = Generator-Name * Value = Times of being provided * * @example example: * service01: 2 * service02: 1 * ... * * @protected * @type {Map} * @memberof nopeDispatcher */ protected _amountOfGeneratorsProvidedByName: Map; public methodInterfaceWithOptions: { [index: string]: (options: ICallOptions, ...args) => INopePromise; }; public methodInterface: { [index: string]: (...args) => INopePromise }; protected _mappingOfRemoteDispatchersAndPropsOrEvents: Map< string, IAvailableTopicsMsg >; protected _externalSubscribed: Set; protected _externalPublished: Set; protected _internalSubscribed: Set; protected _internalPublished: Set; protected _registeredObservables: Map, IPropertyOptions>; protected _mappingOfRemoteDispatchersAndInstances: Map< string, INopeModuleDescription[] >; protected _externalInstancesNames: Set; protected _externalInstances: Map; protected _eventsToSendCurrentValueOnSubscription: Map< string, Set> >; protected _lastPublishedEvent: Map; public readonly externallySubscribedProperties: INopeObservable; public readonly externallyPublishedProperties: INopeObservable; public readonly subscribedProperties: INopeObservable; public readonly publishedProperties: INopeObservable; public readonly externallySubscribedEvents: INopeObservable; public readonly externallyPublishedEvents: INopeObservable; public readonly subscribedEvents: INopeObservable; public readonly publishedEvents: INopeObservable; public readonly externalProvidedServices: INopeObservable; public readonly canceledTask: INopeObservable; public readonly ready: INopeObservable; public readonly availableInstances: INopeObservable; public readonly externalDispatchers: INopeObservable; /** * Internal Element to store the running tasks. * * @protected * @memberof nopeDispatcher */ protected _runningInternalRequestedTasks: Map< string, { resolve: (value: any) => void; reject: (error: any) => void; clear: () => void; serviceName: string; timeout?: any; } >; protected _forceEmittingUpdates: boolean; protected _runningExternalRequestedTasks: Set; protected _timeouts: INopeTimeOptions; private __warned: boolean; /** * Creates an instance of nopeDispatcher. * @param {nopeRpcDispatcherOptions} options The Options, used by the Dispatcher. * @param {() => INopeObservable} _generateObservable A Helper, to generate Observables. * @memberof nopeDispatcher */ constructor( public options: INopeDispatcherOptions, protected _generateObservable: () => INopeObservable ) { this.communicator = options.communicator; this.id = generateId(); if (!options.logger) { this._logger = getNopeLogger("dispatcher " + this.id, "info"); } else { this._logger = options.logger; } this._timeouts = { sendAliveInterval: 500, checkInterval: 250, slow: 1000, warn: 2000, dead: 5000, remove: 10000, selectorTimeout: 1000, }; // Define the Timeouts. if (options.timeouts) { this._timeouts = Object.assign(this._timeouts, options.timeouts); } this.__warned = false; if (RUNNINGINNODE) { // eslint-disable-next-line const os = require("os"); const getLoad = () => { const cpus = os.cpus(); let totalTime = 0, idleTime = 0; // Determine the current load: for (const cpu of cpus) { for (const name in cpu.times) { totalTime += cpu.times[name]; } idleTime += cpu.times.idle; } return { totalTime, idleTime, }; }; // Initally store the load let oldTimes = getLoad(); this._cpuInterval = setInterval(() => { // Get the current CPU Times. const currentTimes = getLoad(); // Determine the difference between the old Times an the current Times. _this._cpuLoad = 1 - (currentTimes.idleTime - oldTimes.idleTime) / (currentTimes.totalTime - oldTimes.totalTime); // Store the current CPU-Times oldTimes = currentTimes; }, 100); } // Use the default selector. this.options.defaultSelector = this.options.defaultSelector ? this.options.defaultSelector : "first"; // Define the flag, which will be used to force sending updates. this._forceEmittingUpdates = typeof options.forceEmittingUpdates === "boolean" ? options.forceEmittingUpdates : false; // Define A Proxy for accessing methods easier. const _this = this; const _handlerWithOptions = { get(target, name) { return (options: ICallOptions, ...args) => { return _this.performCall(name, args, options); }; }, }; const _handlerWithoutOptions = { get(target, name) { return (...args) => { return _this.performCall(name, args); }; }, }; this.methodInterfaceWithOptions = new Proxy({}, _handlerWithOptions); this.methodInterface = new Proxy({}, _handlerWithoutOptions); // Define the Observables provided by the dispatcher. this.externallySubscribedProperties = this._generateObservable(); this.externallySubscribedProperties.setContent([]); this.externallyPublishedProperties = this._generateObservable(); this.externallyPublishedProperties.setContent([]); this.publishedProperties = this._generateObservable(); this.publishedProperties.setContent([]); this.subscribedProperties = this._generateObservable(); this.subscribedProperties.setContent([]); this.externallyPublishedEvents = this._generateObservable(); this.externallyPublishedEvents.setContent([]); this.externallySubscribedEvents = this._generateObservable(); this.externallySubscribedEvents.setContent([]); this.subscribedEvents = this._generateObservable(); this.subscribedEvents.setContent([]); this.publishedEvents = this._generateObservable(); this.publishedEvents.setContent([]); this.externalProvidedServices = this._generateObservable(); this.externalProvidedServices.setContent([]); this.canceledTask = this._generateObservable(); // Flag to show if the system is ready or not. this.ready = this._generateObservable(); this.ready.setContent(false); // Holding all available instances. this.availableInstances = this._generateObservable(); this.availableInstances.setContent([]); // Observable containing all Dispatcher Informations. this.externalDispatchers = this._generateObservable(); this.externalDispatchers.setContent([]); if (this._logger) { this._logger.info( "Dispatcher online. -> Reseting and Initializing: ", this.id ); } this.reset(); this._init().catch((error) => { if (_this._logger) { _this._logger.error("Failed to intialize the Dispatcher", error); } }); } eventDistributor: IPubSubSystem< INopeEventEmitter, INopeTopic >; propertyDistributor: IDataPubSubSystem; info: IDispatcherInfo; protected _initializingInstance: Map; /** * Helper to provide an instance-generator for other dispatchers * * @template I * @param {string} identifier * @param {IGenerateRemoteInstanceForOtherDispatcherCallback} cb * @memberof nopeDispatcher */ async provideInstanceGeneratorForExternalDispatchers( identifier: string, cb: IGenerateRemoteInstanceForOtherDispatcherCallback ): Promise { const _this = this; if (this._logger?.enabledFor((Logger as any).DEBUG)) { this._logger.debug( 'Adding instance generator for "' + identifier + '" to external Generators. Other Elements can now create instances of this type.' ); } const _cb = await this.registerFunction( async (data: IInstanceCreationMsg) => { // Check if an instance exists or not. // if not => create an instance an store it. if (!_this._instances.has(data.identifier)) { const hashable = [data.identifier, data.params, data.type]; const hash = "hashedSettings"; // It might happen, that an instance is requested multiple times. // therefore we have to make shure, we wont create them multiple times: // We will test it by using the "_internalInstances" set if (!_this._initializingInstance.has(data.identifier)) { // Mark the Instance as available. _this._initializingInstance.set(data.identifier, hash); // Create an Instance const _instance = await cb(_this, data.identifier); // Make shure the Data is expressed as Array. if (!Array.isArray(data.params)) { data.params = [data.params]; } // Initialize the instance with the parameters. await _instance.init(...data.params); // A Function is registered, taking care of removing // an instances, if it isnt needed any more. _this.registerFunction( async (_data: IInstanceRemovalMsg) => { if (_this._instances.get(data.identifier)?.usedBy) { // Get the Index of the dispatcher, which is using // the element const idx = _this._instances .get(data.identifier) .usedBy.indexOf(_data.dispatcherId); if (idx > -1) { _this._instances.get(data.identifier).usedBy.splice(idx, 1); } if ( _this._instances.get(data.identifier).usedBy.length == 0 ) { // Unmark as internal instance _this._internalInstances.delete(data.identifier); // Remove the Instance. await _instance.dispose(); // Delete the Entry. _this._instances.delete(data.identifier); // Remove the Function itself _this.unregisterFunction( "instance_dispose_" + data.identifier ); } } }, { deleteAfterCalling: false, id: "instance_dispose_" + data.identifier, } ); // Store the Instance. _this._instances.set(data.identifier, { instance: _instance, usedBy: [data.dispatcherId], }); _this._internalInstances.add(data.identifier); // Update the available instances: _this._sendAvailableInstances(); // Make shure, we remove this instance.hash _this._initializingInstance.delete(data.identifier); } else if (_this._initializingInstance.get(data.identifier) != hash) { throw Error( "Providing different Parameters for the same Identifier" ); } else { // Check if the Instance is ready. let firstHint = true; await waitFor( () => { if (firstHint) { _this._logger.warn( `Parallel request for the same Instance "${data.identifier}" => Waiting until the Instance has been initialized` ); firstHint = false; } return _this._instances.has(data.identifier); }, { testFirst: true, delay: 100, } ); } } else { // If an Element exists => Add the Element. _this._instances.get(data.identifier).usedBy.push(data.dispatcherId); } // Define the Response. const response: IInstanceDescriptionMsg = { description: _this._instances .get(data.identifier) .instance.toDescription(), type: data.type, }; // Send the Response return response; }, { id: "generateInstance_" + identifier, } ); // Store the Generator. this._externalGenerators.set(identifier, _cb); this._updateAmountOf("generators"); // Send an update of the available Generators this._sendAvailableGenerators(); } async unprovideInstanceGeneratorForExternalDispatchers( identifier: string ): Promise { if (this._externalGenerators.has(identifier)) { if (this._logger?.enabledFor((Logger as any).DEBUG)) { this._logger.debug( 'Removing instance generator for "' + identifier + '" from external Generators. Other Elements cant create instances of this type anymore.' ); } this.unregisterFunction(this._externalGenerators.get(identifier)); this._externalGenerators.delete(identifier); this._updateAmountOf("generators"); // Send an update of the available Generators this._sendAvailableGenerators(); } } public registerInternalWrapperGenerator( identifier: string, cb: IGenerateRemoteInstanceCallback ): void { if (this._logger?.enabledFor((Logger as any).DEBUG)) { this._logger.debug( 'Adding instance generator for "' + identifier + '" as internal Generator. This Generator wont be used externally.' ); } this._internalGenerators.set(identifier, cb); } public unregisterInternalWrapperGenerator(identifier: string): void { if (this._logger?.enabledFor((Logger as any).DEBUG)) { this._logger.debug( 'Rmoving instance generator for "' + identifier + '" from internal Generator. The sytem cant create elements of this type any more.' ); } this._internalGenerators.delete(identifier); } protected _internalGenerators: Map< string, IGenerateRemoteInstanceCallback >; protected _externalGenerators: Map< string, IGenerateRemoteInstanceCallback >; protected _instances: Map< string, { instance: INopeModule; usedBy: Array; manual?: boolean; } >; protected _internalInstances: Set; /** * Function used to unregister a function. * * @param {(string | ((...args) => any))} func * @return {*} {Promise} * @memberof nopeDispatcher */ public async unregisterCallback( func: string | ((...args) => any) ): Promise { if (typeof func === "string") { await this.communicator.emitUnregisterRpc({ dispatcherId: this.id, identifier: func, }); } else if (typeof func === "object" && func["id"] !== undefined) { await this.communicator.emitUnregisterRpc({ dispatcherId: this.id, identifier: func["id"], }); } else { throw Error("Wrong type provided"); } } /** * Helper Function, that will create a Selector Function for Generators or Service Calls. * Therefore the "testCallback" will be used to determine the whether a dispatcher is * allowed to execute the service or not. * * @param {string} name * @param {("service" | "generator")} type * @param {(dispatcherId: string) => Promise} testCallback * @return {*} * @memberof nopeDispatcher */ protected async _generateSelector( name: string, type: "service" | "generator", testCallback: ValidSelectorFunction, options: { timeout?: number } = {} ) { const _this = this; const _options = Object.assign( { timeout: this._timeouts.selectorTimeout, }, options ); // Create a Variable, which will count the amout, the // selection function has been called let called = 0; // Create a callback and Promise, that will be rejected // if a timer has been elapsed. let timerFinishedCallback: (...args) => void; const timerFinishedPromise = new Promise( (resolve, reject) => (timerFinishedCallback = reject) ); // Create a callback and a Promise, that will be finished, // if all elements has called the selection function. let checksFinishedCallback: (...args) => void; const checksFinishedPromise = new Promise( (resolve) => (checksFinishedCallback = resolve) ); // An finally we need to kown, if the selection was fine. // Therefore we create a variable. let executorSelected = false; let firstCall = true; const responses = new Set(); // A Default callback, which should be called inside of a // selection function. This function will ensure, that we // trigger the corresponding timers to unregister the // selection function. const defaultCallbackHandler = () => { const first = firstCall; if (firstCall) { firstCall = false; // Now we start a timer to make shure to function will be unregistered after a specific amount of time. setTimeout( timerFinishedCallback, _options.timeout, new Error("Timeout: Selection timedout") ); } // Determine how many calles are allowed before we have to remove the callback const amountOfCallsBeforeUnregister = type == "generator" ? _this._amountOfGeneratorsProvidedByName.get(name) || 1 : _this._amountOfServicesProvidedByName.get(name) || 1; // Now we make shure, we count this call called++; // If we have call the more items than required, // we asume that we are taking care of the last // element. if (amountOfCallsBeforeUnregister <= called) { checksFinishedCallback(); return { last: true, first, }; } return { last: false, first, }; }; let selectorFunc = async (externalDispatcherId: string) => { // Prevent taking care of the same dispatcher twice. if (responses.has(externalDispatcherId)) { return false; } // Mark this dispatcher, that a request hast been considered responses.add(externalDispatcherId); // Make shure we start the timers to unregister the function etc. // We receive an info, whether we are checking the last call: const options = Object.assign(defaultCallbackHandler(), { timeout: _options.timeout - 50, }); if ( !executorSelected && (await testCallback(externalDispatcherId, options)) ) { // The value of "executorSelected" may changed during calling the // testCallback. Therefore we check this again. if (executorSelected) { return false; } executorSelected = true; return true; } // Return the result return false; }; selectorFunc = await this.registerFunction(selectorFunc, { preventSendingToRegistery: true, }); const selectorFuncOptions: ICallOptions & { functionId: string } = { resultSink: this._getServiceName(selectorFunc["id"], "response"), timeout: _options.timeout, paramsHasNoCallback: true, functionId: selectorFunc["id"], dynamicCallback: true, }; let rejectSelectionSucess: () => void; // Our final Promise, that will be used to show, that // The Selection has failed. const selectionSucessPromise = new Promise((resolve, reject) => { rejectSelectionSucess = reject; }); // Now we create a Callback which will be used to Promise.race([timerFinishedPromise, checksFinishedPromise]) .catch((err) => { // Now based on the result of the selection we may // reject the "selectionSucess" if (!executorSelected) { rejectSelectionSucess(); } }) .finally(async () => { // Either all Elements has been _this.unregisterFunction(selectorFunc, { preventSendingToRegistery: true, }); }); return { selectorFuncOptions, selectionSucessPromise, }; } /** * Generates a selector function for services/generators that have been provided multiple-Times * * @protected * @param {("first" | "a")} selector * @memberof nopeDispatcher */ protected _generateAndRegisterSelectorFunction( name: string, type: "service" | "generator", selector: ValidDefaultSelectors ) { return this._generateSelector( name, type, this._generateSelectorFunction(selector) ); } protected _generateSelectorFunction(selector: ValidDefaultSelectors) { const _this = this; let helperData: any; switch (selector) { default: case "first": return async (dispatcher: string) => { return true; }; case "dispatcher": // Our selector compares the dispatcher - id return async (externalDispatcher, options) => { return externalDispatcher == _this.id; }; case "host": // Our selector compares the host-name: // 1. Get the current Host name of our dispatcher helperData = this._genAliveMessage().host.name; return async (externalDispatcher) => { const host = _this._externalDispatchers.get(externalDispatcher); return host?.host?.name == helperData; }; case "cpu-usage": helperData = { arr: [], timeout: null, done: false, finish: () => { if (!helperData.done) { // Now we find the Min CPU usage: const bestOption = minOfArray(helperData.arr, "cpuUsage").index; for (const [index, { resolve }] of ( helperData.arr as any[] ).entries()) { resolve(index === bestOption); } helperData.done = true; } if (helperData.timeout !== null) { clearTimeout(helperData.timeout); helperData.timeout = null; } }, }; return async (externalDispatcherId, options) => { if (options.initalTest) { return true; } let cpuUsage = _this._externalDispatchers.get(externalDispatcherId).host.cpu.usage; cpuUsage = cpuUsage === -1 ? Infinity : cpuUsage; // For every Call, we const promise = new Promise((resolve) => helperData.arr.push({ resolve, cpuUsage }) ); // If we are calling the Element at the firsttime, // we ensure, that we have callback, which will // be called after the specified amount of time to // assing the selection. if (options.first) { helperData.timeout = setTimeout(helperData.finish, options.timeout); } if (options.last) { helperData.finish(); } return await promise; }; case "free-ram": helperData = { arr: [], timeout: null, done: false, finish: () => { if (!helperData.done) { // Now we find the Min CPU usage: const bestOption = minOfArray(helperData.arr, "freeRam").index; for (const [index, { resolve }] of ( helperData.arr as any[] ).entries()) { resolve(index === bestOption); } helperData.done = true; } if (helperData.timeout !== null) { clearTimeout(helperData.timeout); helperData.timeout = null; } }, }; return async (externalDispatcherId, options) => { if (options.initalTest) { return true; } let freeRam = _this._externalDispatchers.get(externalDispatcherId).host.ram.free; freeRam = freeRam === -1 ? Infinity : freeRam; // For every Call, we const promise = new Promise((resolve) => helperData.arr.push({ resolve, freeRam }) ); // If we are calling the Element at the firsttime, // we ensure, that we have callback, which will // be called after the specified amount of time to // assing the selection. if (options.first) { helperData.timeout = setTimeout(helperData.finish, options.timeout); } if (options.last) { helperData.finish(); } return await promise; }; } } /** * Function, that will extract the information of the instance and the * provider. * * @author M.Karkowski * @param {string} identifier The identifier of instance * @return {*} {(INopeModuleDescription & { dispatcher: IDispatcherInfo })} * @memberof nopeDispatcher */ public getInstanceInfo( identifier: string ): | { description: INopeModuleDescription; dispatcher: IDispatcherInfo } | undefined { // First check if the instance exists. if (!this.instanceExists(identifier, false)) { return undefined; } // Define the return type const ret: { description: INopeModuleDescription; dispatcher: IDispatcherInfo; } = {} as any; // First we check if we are taking care of an internal instance, if so // we will use this instance to enrich the description, otherwise, we // will look in the external instances. if (this._instances.has(identifier)) { ret.description = this._instances .get(identifier) .instance.toDescription(); } else { // Now extract teh for (const modules of this._mappingOfRemoteDispatchersAndInstances.values()) { for (const mod of modules) { if (mod.identifier == identifier) { ret.description = mod; break; } } } } // Additionally add some information about the used dispatcher. ret.dispatcher = this.getDispatcherForInstance(identifier); return ret; } /** * Helper, which will return the corresponding dispathcer, which hosts * the instance. * * @author M.Karkowski * @param {string} identifier * @return {*} {(IDispatcherInfo | undefined)} * @memberof nopeDispatcher */ public getDispatcherForInstance( identifier: string ): IDispatcherInfo | undefined { // Check if the instance exists in general if (!this.instanceExists(identifier, false)) { return undefined; } // First we will check if the instance is available internally. if (this._internalInstances.has(identifier)) { return this._genAliveMessage(); } // If that isnt the case, we will check all dispatchers and search the instance. for (const [ dispatcher, modules, ] of this._mappingOfRemoteDispatchersAndInstances.entries()) { for (const mod of modules) { if (mod.identifier == identifier) { return this._externalDispatchers.get(dispatcher); } } } return undefined; } /** * Function, that will create an instance and will return an interface * for that instance. * * @author M.Karkowski * @template I * @param {Partial} description * @param {({ * selector?: ValidDefaultSelectors | ValidSelectorFunction; * })} [options={}] * @return {*} {Promise} * @memberof nopeDispatcher */ public async generateInstance( description: Partial, options: { selector?: ValidDefaultSelectors | ValidSelectorFunction; } = {} ): Promise { // Define the Default Description // which will lead to an error. const _defDescription: IInstanceCreationMsg = { dispatcherId: this.id, identifier: "error", params: [], type: "unkown", }; // Assign the provided Description const _description = Object.assign(_defDescription, description, { dispatcherId: this.id, }) as IInstanceCreationMsg; // Check if the description is complete if ( _defDescription.type === "unkown" || _description.identifier === "error" ) { throw Error( 'Please Provide at least a "type" and "identifier" in the paremeters' ); } if (this._logger?.enabledFor((Logger as any).DEBUG)) { this._logger.debug( 'Requesting an Instance of type: "' + _defDescription.type + '" with the identifier: "' + _defDescription.identifier + '"' ); } if (this._instances.has(_description.identifier)) { if (this._logger?.enabledFor((Logger as any).DEBUG)) { this._logger.debug( 'Already created instance with the identifiert: "' + _defDescription.identifier + '" => returning this instance' ); } const _instanceDetails = this.getInstanceInfo(_description.identifier); // There is already an instance, now lets check if the types matches if ( _instanceDetails !== undefined && _instanceDetails?.description.type !== _description.type ) { throw Error( "There exists an Instance named: '" + _description.identifier + "' but it uses a different type. Requested type: '" + _description.type + "', given type: '" + _instanceDetails?.description.type + "'" ); } // Now we need to assign the default selector. // options.selector = options.selector ? options.selector : this.options.defaultSelector; // If there exists an specific selector which we want to use. if (options.selector) { const dispatcherToUse = _instanceDetails?.dispatcher.id; if (typeof options.selector === "function") { if ( !(await options.selector(dispatcherToUse, { initalTest: true })) ) { throw Error( "Instance is available, but selector prevents using the provider." ); } } else if (typeof options.selector === "string") { // Test the function if ( !(await this._generateSelectorFunction(options.selector)( dispatcherToUse, { initalTest: true } )) ) { throw Error( "Instance is available, but selector prevents using the provider." ); } } } // Add the Dispatcher to the Element: this._instances .get(_description.identifier) .usedBy.push(_description.dispatcherId); // Return the Instance. return this._instances.get(_description.identifier).instance as I; } try { let _type = _description.type; if (!this._internalGenerators.has(_type)) { // No default type is present for a remote // => assing the default type which is "*"" _type = "*"; } if (!this.generatorExists(_description.type)) { throw Error( 'Generator "' + _description.type + '" isnt present in the network!' ); } if (this._internalGenerators.has(_type)) { if (this._logger?.enabledFor((Logger as any).DEBUG)) { this._logger.debug( 'No instance with the identifiert: "' + _defDescription.identifier + '" found, but an internal generator is available. Using the internal one for creating the instance and requesting the "real" instance externally' ); } // Now test if there is allready an instance with this name and type. // If so, we check if we have the correct type etc. Additionally we // try to extract its dispatcher-id and will use that as selector // to allow the function be called. const _instanceDetails = this.getInstanceInfo(_description.identifier); if ( _instanceDetails !== undefined && _instanceDetails?.description.type !== _description.type ) { throw Error( "There exists an Instance named: '" + _description.identifier + "' but it uses a different type. Requested type: '" + _description.type + "', given type: '" + _instanceDetails?.description.type + "'" ); } const dispatcherToUse = _instanceDetails?.dispatcher.id; if (dispatcherToUse) { if (typeof options.selector === "function") { if (await options.selector(dispatcherToUse, { initalTest: true })) { // Now we generate a specific method to check against the dispatcher options.selector = async (id: string) => { return dispatcherToUse === id; }; } else { throw Error( "Instance is available, but selector prevents using the host." ); } } else if (typeof options.selector === "string") { // Test the function if ( await this._generateSelectorFunction(options.selector)( dispatcherToUse, { initalTest: true } ) ) { // Now we generate a specific method to check against the dispatcher options.selector = async (id: string) => { return dispatcherToUse === id; }; } else { throw Error( "Instance is available, but selector prevents using the host." ); } } } const result = await this.performCall( "generateInstance_" + _description.type, [_description], Object.assign(options, { paramsHasNoCallback: true, }) ); if (this._logger?.enabledFor((Logger as any).DEBUG)) { this._logger.debug("Received a description for the instance"); } // Create the Instance const instance = (await this._internalGenerators.get(_type)( this, result.description )) as I; // Store the Instances. this._instances.set(_description.identifier, { instance, usedBy: [_description.dispatcherId], }); return instance; } throw Error("No internal generator Available!"); } catch (e) { if (this._logger) { this._logger.error( "During creating an Instance, the following error Occurd" ); this._logger.error(e); } throw e; } } public async registerInstance( instance: I ): Promise { // Store the Instances. this._instances.set(instance.identifier, { instance, usedBy: [], manual: true, }); return instance; } public async deleteInstance( instance: I | string, preventSendingUpdate = false ): Promise { // Block to find the instance. // Based on the property (string or instance) // the corresponding instance object has to be select. let _instance: { instance: INopeModule; usedBy: Array }; if (typeof instance === "string") { _instance = this._instances.get(instance); } else { for (const data of this._instances.values()) { if (instance == data.instance) { _instance = data; break; } } } // if the instance has been found => delete the instance. if (_instance) { _instance.usedBy.pop(); if (_instance.usedBy.length === 0) { const params: IInstanceRemovalMsg = { dispatcherId: this.id, identifier: _instance.instance.identifier, }; // Call the corresponding Dispose Function for the "real" instance // All other elements are just accessors. await this.performCall( "instance_dispose_" + _instance.instance.identifier, [params] ); // Delete the Identifier this._instances.delete(_instance.instance.identifier); // Check if an update should be emitted or not. if (!preventSendingUpdate) { // Update the Instances provided by this module. this._sendAvailableInstances(); } // Dispose the Handler; await _instance.instance.dispose(); } return true; } return false; } /** * Internal Method to handle some requests. * * @protected * @param {IRequestTaskMsg} data The provided data of the request * @param {*} [_function=this._definedFunctions.get(data.functionId)] The Function can be provided * @return {Promise} * @memberof nopeDispatcher */ protected async _handleExternalRequest( data: IRequestTaskMsg, _function?: (...args) => Promise ): Promise { try { // Try to get the function if not provided: if (typeof _function !== "function") { _function = this._definedFunctions.get(data.functionId)?.func; } if (this._logger?.enabledFor((Logger as any).DEBUG)) { // If there is a Logger: this._logger.debug( 'Dispatcher "' + this.id + '" received request: "' + data.functionId + '" -> task: "' + data.taskId + '"' ); } const _this = this; if (typeof _function === "function") { // Now we check, if we have to perform test, whether // we are allowed to execute the task: if (data?.target?.functionId) { const allowedToExecute = await _this.performCall( data.target.functionId, [_this.id], { // Make shure, the selector doesnt requires a selector: preventSelector: true, // Provide the rest of the data. ...data.target, } ); // We now test if our system is allowed, if (!allowedToExecute) { return; } } // Callbacks const cbs: Array<(reason) => void> = []; const observer = _this.canceledTask.subscribe((cancelEvent) => { if (cancelEvent.taskId == data.taskId) { // Call Every Callback. cbs.map((cb) => cb(cancelEvent.reason)); // Although we are allowed to Cancel the Subscription observer.unsubscribe(); } }); // Only if the Function is present extract the arguments etc. const args = []; // First extract the basic arguments data.params.map((item) => (args[item.idx] = item.data)); // Add the Callbacks. Therefore create a function which will // trigger the remote. data.callbacks.map( (options) => (args[options.idx] = async (..._args) => { // And Create the Task and its Promise. const servicePromise = _this.performCall( options.functionId, _args, options ); const cancelCallback = (reason) => { // The Main Task has been canceled => // We are allowed to canel the Subtask as well. servicePromise.cancel(reason); }; cbs.push(cancelCallback); // Await the Result. If an Task is canceled => The Error is Thrown. const result = await servicePromise; // Remove the Index cbs.splice(cbs.indexOf(cancelCallback), 1); return result; }) ); // Perform the Task it self. const _resultPromise = _function(...args); if ( typeof (_resultPromise as INopePromise)?.cancel === "function" ) { // Push the Callback to the Result. cbs.push((reason) => (_resultPromise as INopePromise).cancel(reason) ); } // Wait for the Result to finish. const _result = await _resultPromise; // Define the Result message const result: IResponseTaskMsg = { result: typeof _result !== "undefined" ? _result : null, taskId: data.taskId, type: "response", }; // Use the communicator to publish the result. await this.communicator.emitRpcResponse(data.resultSink, result); } } catch (error) { if (this._logger) { // If there is a Logger: this._logger.error( 'Dispatcher "' + this.id + '" failed with request: "' + data.taskId + '"' ); this._logger.error(error); } // An Error occourd => Forward the Error. const result: IResponseTaskMsg = { error: { error, msg: error.toString(), }, taskId: data.taskId, type: "response", }; // Send the Error via the communicator to the remote. await this.communicator.emitRpcResponse(data.resultSink, result); } } /** * Internal Function to handle responses. In Generale, * the dispatcher checks if there is an open task with * the provided id. If so => finish the promise. * * @protected * @param {IResponseTaskMsg} data The Data provided to handle the Response. * @return {boolean} Returns a boolean, indicating whether a corresponding task was found or not. * @memberof nopeDispatcher */ protected _handleExternalResponse(data: IResponseTaskMsg): boolean { try { // Extract the Task const task = this._runningInternalRequestedTasks.get(data.taskId); // Delete the Task: this._runningInternalRequestedTasks.delete(data.taskId); // Based on the Result of the Remote => proceed. // Either throw an error or forward the result if (task && data.error) { if (this._logger) { this._logger.error("Failed with task " + data.taskId); this._logger.error("Reason: " + data.error.msg); this._logger.error(data.error); } task.reject(data.error); // Clearout the Timer if (task.timeout) { clearTimeout(task.timeout); } return true; } if (task) { task.resolve(data.result); // Clearout the Timer if (task.timeout) { clearTimeout(task.timeout); } return true; } } catch (e) { this._logger.error("Error during handling an external response"); this._logger.error(e); } return false; } protected _checkInterval: any = null; protected _sendInterval: any = null; protected _cpuInterval: any = null; /** * Internal Function, used to initialize the Dispatcher. * It subscribes to the "Messages" of the communicator. * * @protected * @memberof nopeDispatcher */ protected async _init(): Promise { const _this = this; // Wait until the Element is connected. await this.communicator.connected.waitFor((value) => value, { testCurrent: true, }); // Setup Test Intervals: if (this._timeouts.checkInterval > 0) { // Define a Checker, which will test the status // of the external Dispatchers. this._checkInterval = setInterval( () => _this._checkDispachterHealth(), this._timeouts.checkInterval ); } if (this._timeouts.sendAliveInterval > 0) { // Define a Timer, which will emit Status updates with // the disered delay. this._sendInterval = setInterval( () => _this._emitStatus(), this._timeouts.sendAliveInterval ); } this.registerInternalWrapperGenerator( "*", async (dispather, description) => { const mod = new NopeGenericWrapper( dispather, _this._generateObservable ); await mod.fromDescription(description, "overwrite"); return mod; } ); // Iterate over the Defined Functions and create Subscriptions for (const [id, item] of this._definedFunctions.entries()) { // Subscribe the Function this._subscribeToService(id, item.func); } // Subscribe to the availableServices of Remotes. // If there is a new Service => udpate the External Services await this.communicator.onNewServicesAvailable((data) => { try { if (data.dispatcher !== _this.id) { _this._mappingOfRemoteDispatchersAndServices.set( data.dispatcher, data ); _this._updateExternalServices(); if (this._logger?.enabledFor((Logger as any).DEBUG)) { // If there is a Logger: this._logger.debug( this.id, "received new services from", data.dispatcher ); } } } catch (e) { this._logger.error("Error during handling an onNewServicesAvailable"); this._logger.error(e); } }); // Subscribe to new available Topics. await this.communicator.onNewObservablesAvailable((data) => { try { if (data.dispatcher !== _this.id) { // Update the new Subscribed/Published Observables _this._mappingOfRemoteDispatchersAndPropsOrEvents.set( data.dispatcher, data ); // Update the new Observables of the dispatcher _this._updateExternalObservables(data.dispatcher); if (this._logger?.enabledFor((Logger as any).DEBUG)) { // If there is a Logger: this._logger.debug( this.id, "received new Observables from", data.dispatcher ); } } // Update the all internal subscribed / published events. this.subscribedProperties.setContent( Array.from( new Set([..._this._externalSubscribed, ...this._internalSubscribed]) ) ); this.publishedProperties.setContent( Array.from( new Set([..._this._externalPublished, ...this._internalPublished]) ) ); } catch (e) { this._logger.error( "Error during handling an onNewObservablesAvailable" ); this._logger.error(e); } }); await this.communicator.onNewInstanceGeneratorsAvailable((data) => { try { _this._mappingOfRemoteDispatchersAndGenerators.set( data.dispatcher, data ); _this._updateExternalGenerators(); if (this._logger?.enabledFor((Logger as any).DEBUG)) { // If there is a Logger: this._logger.debug( this.id, "received new generators from", data.dispatcher ); } } catch (e) { this._logger.error( "Error during handling an onNewInstanceGeneratorsAvailable" ); this._logger.error(e); } }); await this.communicator.onStatusUpdate((info) => { _this._externalDispatchers.set(info.id, info); _this.externalDispatchers.setContent( Array.from(_this._externalDispatchers.values()) ); }); await this.communicator.onBonjour((info) => { _this._externalDispatchers.set(info.id, info); _this.externalDispatchers.setContent( Array.from(_this._externalDispatchers.values()) ); if (_this.id !== info.id) { _this._sendAvailableServices(); _this._sendAvailableObservables(); _this._sendAvailableGenerators(); _this._sendAvailableInstances(); _this._emitStatus(); if (_this._logger?.enabledFor((Logger as any).DEBUG)) { // If there is a Logger: _this._logger.debug( 'Remote Dispatcher "' + info.id + '" went online' ); } } }); await this.communicator.onAurevoir((dispatcher: string) => _this._removeDispatcher(dispatcher) ); // Listen to newly created instances. await this.communicator.onNewInstancesAvailable((message) => { // Store the instances: _this._mappingOfRemoteDispatchersAndInstances.set( message.dispatcher, message.instances ); // Update the Mapping: _this._updateExternalInstances(); if (_this._logger?.enabledFor((Logger as any).DEBUG)) { // If there is a Logger: _this._logger.debug( 'Remote Dispatcher "' + message.dispatcher + '" updated its available instances' ); } }); await this.communicator.onTaskCancelation((event) => { if (event.dispatcher !== _this.id) { _this.canceledTask.setContent(event); } }); // Now we listen to this.communicator.onUnregisterRpc((msg) => { if (_this._definedFunctions.has(msg.identifier)) { const item = _this._definedFunctions.get(msg.identifier); _this.unregisterFunction(msg.identifier, { preventSendingToRegistery: item.options.preventSendingToRegistery, }); } }); this.communicator.connected.subscribe((connected) => { // Handle an unconnect. if (connected) { _this._logger.debug("Sending Bonjour"); if (RUNNINGINNODE) { _this.emitBonjour(); } else { setTimeout(() => { _this.emitBonjour(); }, 2000); } } }); if (this._logger) { this._logger.info(this.id, "initialized"); } // We sleep 500 ms await sleep(500); await this.emitBonjour(); await sleep(500); this.ready.setContent(true); } /** * Helper Function to manually emit a Bonjour! * * @return {*} {Promise} * @memberof nopeDispatcher */ public async emitBonjour(): Promise { this.communicator.emitBonjour(this._genAliveMessage()); this._sendAvailableServices(); this._sendAvailableObservables(); this._sendAvailableGenerators(); this._sendAvailableInstances(); } protected _checkDispachterHealth(): void { const currentTime = Date.now(); let changes = false; for (const status of this._externalDispatchers.values()) { // determine the Difference const diff = currentTime - status.timestamp; // Based on the Difference Determine the Status if (diff > this._timeouts.remove) { // remove the Dispatcher. But be quite. // Perhaps more dispatchers will be removed this._removeDispatcher(status.id, true); changes = true; } else if ( diff > this._timeouts.dead && status.status !== ENopeDispatcherStatus.DEAD ) { status.status = ENopeDispatcherStatus.DEAD; changes = true; } else if ( diff > this._timeouts.warn && diff <= this._timeouts.dead && status.status !== ENopeDispatcherStatus.WARNING ) { status.status = ENopeDispatcherStatus.WARNING; changes = true; } else if ( diff > this._timeouts.slow && diff <= this._timeouts.warn && status.status !== ENopeDispatcherStatus.SLOW ) { status.status = ENopeDispatcherStatus.SLOW; changes = true; } else if ( diff <= this._timeouts.slow && status.status !== ENopeDispatcherStatus.HEALTHY ) { status.status = ENopeDispatcherStatus.HEALTHY; changes = true; } } if (changes) { // Update the External Dispatchers this.externalDispatchers.setContent( Array.from(this._externalDispatchers.values()) ); } } protected _removeDispatcher(dispatcher: string, quite = false): void { // Delete the Generators of the Instances. this._mappingOfRemoteDispatchersAndGenerators.delete(dispatcher); this._mappingOfRemoteDispatchersAndServices.delete(dispatcher); this._mappingOfRemoteDispatchersAndPropsOrEvents.delete(dispatcher); this._mappingOfRemoteDispatchersAndInstances.delete(dispatcher); const dispatcherInfo = this._externalDispatchers.get(dispatcher); const deleted = this._externalDispatchers.delete(dispatcher); // Iterate over the available instances and remove the providers: for (const instance of this._instances.values()) { // Remove all Dispachers: let idx = instance.usedBy.indexOf(dispatcher); while (idx !== -1) { instance.usedBy.splice(idx, 1); idx = instance.usedBy.indexOf(dispatcher); } // Check if the Element isnt required any more => delete it. if (instance.usedBy.length === 0 && !instance.manual) { this._logger.info( "Disposing instance, because it isnt used any more.", instance.instance.identifier ); instance.instance .dispose() .catch((e) => { this._logger.error("Failed to remove the Instance.", e); }) .then(() => { this._logger.info( "sucessfully disposed instance ", instance.instance.identifier ); }); } } this._updateExternalServices(); this._updateExternalGenerators(); this._updateExternalObservables(); this._updateExternalInstances(); if (!quite) { this.externalDispatchers.setContent( Array.from(this._externalDispatchers.values()) ); } if (deleted && this._logger?.enabledFor((Logger as any).WARN)) { // If there is a Logger: this._logger.warn( "a dispatcher on", dispatcherInfo?.host.name || "unkown", "went offline. ID of the Dispatcher: ", dispatcher ); } } protected _cpuLoad = -1; /** * Generates the current Status Message of the Dispatcher. * * @author M.Karkowski * @protected * @return {*} {IDispatcherInfo} The current status of our dispatcher. * @memberof nopeDispatcher */ protected _genAliveMessage(): IDispatcherInfo { if (RUNNINGINNODE) { // eslint-disable-next-line const os = require("os"); const cpus = os.cpus(); return { id: this.id, env: "javascript", version: "0.9.7", host: { cores: cpus.length, cpu: { model: `${cpus[0].model}`.slice( 0, (cpus[0].model as string).indexOf("@") - 1 ), speed: avgOfArray(cpus, "speed"), usage: this._cpuLoad, }, os: os.platform(), ram: { // Return the used Memory usedPerc: 1 - os.freemem() / os.totalmem(), // The Values are given in Byte but we want MByte free: Math.round(os.freemem() / 1048576), total: Math.round(os.totalmem() / 1048576), }, name: os.hostname(), }, pid: process.pid, timestamp: Date.now(), status: ENopeDispatcherStatus.HEALTHY, }; } return { env: "javascript", version: "0.9.5", host: { cores: -1, cpu: { model: "unkown", speed: -1, usage: -1, }, name: navigator.appCodeName + " " + navigator.appName, os: navigator.platform, ram: { free: -1, usedPerc: -1, total: -1, }, }, id: this.id, pid: this.id, timestamp: Date.now(), status: ENopeDispatcherStatus.HEALTHY, }; } /** * Function to cancel an indivual Task. * * @param {string} taskId The Id of the Task. Which should be canceled. * @param {Error} reason The Reason, why the Task should be canceled (In general shoudl be something meaning full) * @return {*} Flag, that indicates, whether cancelation was sucessfull or not. * @memberof nopeDispatcher */ public cancelTask(taskId: string, reason: Error, quite = false) { if (this._runningInternalRequestedTasks.has(taskId)) { const task = this._runningInternalRequestedTasks.get(taskId); // Delete the task this._runningInternalRequestedTasks.delete(taskId); // Propagate the Cancellation (internally): task.reject(reason); // Propagate the Cancellation externally. // Therefore use the desired Mode. this.communicator.emitTaskCancelation({ dispatcher: this.id, reason, taskId, quite, }); // Indicate a successful cancelation. return true; } // Task hasnt been found => Cancel the Task. return false; } /** * Internal Helper Function, used to close all tasks with a specific service. * * @protected * @param {string} serviceName The Name of the Service. * @param {Error} reason The provided Reason, why cancelation is reuqired. * @memberof nopeDispatcher */ public cancelRunningTasksOfService(serviceName: string, reason: Error) { // Provide a List containing all Tasks, that has to be canceled const _tasksToCancel: { reject: (error: Error) => void; id: string; }[] = []; // Filter all Tasks that shoud be canceled. for (const [id, task] of this._runningInternalRequestedTasks.entries()) { // Therefore compare the reuqired Service by the Task if (task.serviceName === serviceName) { // if the service matches, put it to our list. _tasksToCancel.push({ id, reject: task.reject, }); } } if (_tasksToCancel.length > 0) { // First remove all Tasks. // Then cancel them to avoid side effects for (const item of _tasksToCancel) { this._runningInternalRequestedTasks.delete(item.id); } // Now Reject all Tasks. for (const item of _tasksToCancel) { item.reject(reason); } } } /** * Will dispose the Dispatcher. Must be called on exit for a clean exit. Otherwise it is defined as dirty exits */ public async dispose(): Promise { for (const task of Array.from(this._runningInternalRequestedTasks.keys())) { this.cancelTask(task, new Error("Client going offline"), false); } if (this._sendInterval) { clearInterval(this._sendInterval); } if (this._checkInterval) { clearInterval(this._checkInterval); } if (this._cpuInterval) { clearInterval(this._cpuInterval); } // Emits the aurevoir Message. this.communicator.emitAurevoir(this.id); } /** * Internal function to update the amount of the services or * generators, that are hosted. * * @author M.Karkowski * @protected * @param {("services" | "generators")} type * @memberof nopeDispatcher */ protected _updateAmountOf(type: "services" | "generators"): void { const props = type === "services" ? { map: "_amountOfServicesProvidedByName", iter: "_mappingOfRemoteDispatchersAndServices", name: "services", } : { map: "_amountOfGeneratorsProvidedByName", iter: "_mappingOfRemoteDispatchersAndGenerators", name: "generators", }; const _this = this; this[props.map].clear(); for (const dispatcherInfo of this[props.iter].values()) { dispatcherInfo[props.name].map((service) => { // Add the service and tell the System, that // we have an additional service-provider for that. _this[props.map].set(service, (_this[props.map].get(service) || 0) + 1); }); } for (const service of this._definedFunctions.keys()) { this[props.map].set(service, (this[props.map].get(service) || 0) + 1); } } /** * Function to update the used Services. * * @protected * @memberof serviceRegistry */ protected _updateExternalServices(): { // Contains the "new" services addedServices: string[]; // contains the "unavailable services" removedServices: string[]; } { const _this = this; // Store the Availabe Services before the Update. const _servicesBeforeUpdate = new Set(this._externalProvidedServices); // Clear the Services and now add the available // Services to this Array this._externalProvidedServices.clear(); for (const dispatcherInfo of this._mappingOfRemoteDispatchersAndServices.values()) { dispatcherInfo.services.map((service) => { // Add the Service to the Set => Then we kown which services // are available. _this._externalProvidedServices.add(service); }); } // Determine the Difference of the Items, before and afterwards const difference = determineDifference( _servicesBeforeUpdate, _this._externalProvidedServices ); // We combine the provided // and external available. this.externalProvidedServices.setContent( Array.from(this._externalProvidedServices) ); // If there are unavailable tasks => cancel their tasks. if (_servicesBeforeUpdate.size > 0) { for (const unavailable of difference.removed) { // Cancel the Tasks this.cancelRunningTasksOfService( unavailable, new Error("Service unavailable!") ); } } this._updateAmountOf("services"); return { // Contains the "new" services addedServices: Array.from(difference.added), // contains the "unavailable services" removedServices: Array.from(difference.removed), }; } /** * Function to update the used Generators. * * @protected * @memberof nopeDispatcher */ protected _updateExternalGenerators(): void { const _this = this; // Clear the known Generators this._externalProvidedGenerators.clear(); for (const generators of this._mappingOfRemoteDispatchersAndGenerators.values()) { generators.generators.map((gen) => { // Add the Generator to the Element _this._externalProvidedGenerators.add(gen); }); } this._updateAmountOf("generators"); } /** * Internal Function to update the Listing of external Topcis. * This Function creates a list containing all subscriptions * and publishers which are external. * * @protected * @memberof nopeDispatcher */ protected _updateExternalObservables(externalDispatcher = ""): void { const _this = this; // Clear the Services const _published = new Set(); const _subscribed = new Set(); for (const dispatcherInfo of this._mappingOfRemoteDispatchersAndPropsOrEvents.values()) { dispatcherInfo.published.map((propOrEvent) => _published.add(propOrEvent) ); dispatcherInfo.subscribed.map((propOrEvent) => _subscribed.add(propOrEvent) ); } // Send the Values of the Requested Properties // Therefore iterate over the new elements and // provide the last value // 1. Filter the new Subscribed Elements: const _newlySubscribed = new Set( [..._subscribed].filter((item) => !_this._externalSubscribed.has(item)) ); if (externalDispatcher !== "") { // Make shure, that the subscribed elements of the // new element are added correctly. this._mappingOfRemoteDispatchersAndPropsOrEvents .get(externalDispatcher) .subscribed.map((item) => _newlySubscribed.add(item)); } // 2. Iterate over the Elements and // add the lastly available value. // TODO: CHECK IF REQUIRED. for (const _topic of _newlySubscribed) { if (_this._logger.enabledFor((Logger as any).DEBUG)) { _this._logger.debug( `Determined the topic: "${_topic}" and checking if th` ); } if (this._lastPublishedEvent.has(_topic)) { // Send the Element this.communicator.emitEvent( _topic, this._lastPublishedEvent.get(_topic) ); } } // Store the new values: this._externalSubscribed = _subscribed; this._externalPublished = _published; // Update the Elements. this.externallySubscribedProperties.setContent( Array.from(this._externalSubscribed) ); this.externallyPublishedProperties.setContent( Array.from(this._externalPublished) ); } /** * Internal Function to update the Listing of external provided instances * * * @protected * @memberof nopeDispatcher */ protected _updateExternalInstances(): void { const _this = this; // Clear the available instances. this._externalInstances.clear(); this._externalInstancesNames.clear(); for (const dispatcherInfo of this._mappingOfRemoteDispatchersAndInstances.values()) { dispatcherInfo.map((instance) => { // Check if the Instance exists: if (!_this._externalInstances.has(instance.identifier)) { _this._externalInstancesNames.add(instance.identifier); _this._externalInstances.set(instance.identifier, instance); } else if ( _this._externalInstances.get(instance.identifier)?.type !== instance.type ) { // A Miss Matching occurd. // That should lead to a fatal error ? _this._logger.warn( 'An Instance with the name "' + instance.identifier + '" has already been declared', _this._externalInstances.get(instance.identifier)?.type, "!=", instance.type ); } }); } // Store the Instance Mapping and Publish it. this.availableInstances.setContent( Array.from(this._externalInstances.values()) ); } /** * Function to test if a specific Service exists. * * @param {string} id The Id of the Serivce * @return {boolean} The result of the Test. True if either local or remotly a service is known. * @memberof nopeDispatcher */ public serviceExists(id: string): boolean { return ( this._definedFunctions.has(id) || this._externalProvidedServices.has(id) ); } /** * Function to test if an subscription exists, * for the given topic. * * @param {string} topic The topic to test. * @return {boolean} The result of the test. True if an external subscription exsits * @memberof nopeDispatcher */ public subscriptionForPropertyExists( topic: string, externalOnly = true ): boolean { if (externalOnly) { return this._externalSubscribed.has(topic); } else { return ( this._internalSubscribed.has(topic) || this._externalSubscribed.has(topic) ); } } public publisherForPropertyExists( topic: string, externalOnly = true ): boolean { if (externalOnly) { return this._externalPublished.has(topic); } else { this._internalPublished.has(topic) || this._externalPublished.has(topic); } } public instanceExists(identifier: string, externalOnly = true): boolean { if (externalOnly) { return this._externalInstances.has(identifier); } else { this._externalInstances.has(identifier) || this._instances.has(identifier); } } /** * Function to test if a generator exists for * the given type idenfitier * * @param {string} typeIdentifier Identifier of the type. * @return {boolean} The result of the test. True if an external generator exsits. * @memberof nopeDispatcher */ public generatorExists(typeIdentifier: string): boolean { return this._externalProvidedGenerators.has(typeIdentifier); } /** * Function to adapt a Request name. * Only used internally * * @protected * @param {string} id the original ID * @return {string} the adapted ID. * @memberof nopeDispatcher */ protected _getServiceName(id: string, type: "request" | "response"): string { return id.startsWith(`${type}/`) ? id : `${type}/${id}`; } /** * Internal Helper Function to subscribe to a Function element. * * @protected * @param {string} id the Id of the function * @param {(...args) => Promise} _cb The Callback of the Function * @return {void} the adapted ID. * @memberof nopeDispatcher */ protected _subscribeToService( id: string, _cb: (...args) => Promise ): void { const _req = this._getServiceName(id, "request"); if (!this._communicatorCallbacks.has(_req)) { const _this = this; // Define a Function. const cb = (data: IRequestTaskMsg) => { if (data.type === "requestOfTask") { _this._handleExternalRequest(data, _cb); } }; // Add the Callback. this._communicatorCallbacks.set(_req, { registeredId: _req, type: "request", cb, }); // Register Functions. this.communicator.onRpcRequest(_req, cb); if (this._logger?.enabledFor((Logger as any).DEBUG)) { // If there is a Logger: this._logger.debug( 'Dispatcher "' + this.id + '" listening on: "' + _req + '"' ); } } } /** * Function, used to subscribe to results. * * @protected * @param {string} id * @param {boolean} deleteAfterCalling * @memberof nopeDispatcher */ protected _subscribeToResult(id: string, deleteAfterCalling: boolean): void { const _res = this._getServiceName(id, "response"); if (!this._communicatorCallbacks.has(_res)) { const _this = this; // Define a Function. const cb = (data: IResponseTaskMsg) => { if (data.type === "response") { if (_this._handleExternalResponse(data) && deleteAfterCalling) { _this._removeRpcSubscription(_res); } } }; // Add the Callback. this._communicatorCallbacks.set(_res, { registeredId: _res, type: "response", cb, }); // Register Functions. this.communicator.onRpcResponse(_res, cb); } } /** * Creates an Event listener (if required) * * @protected * @param {string} event The Event to Listen. * @return {nopeObservable} An Listener on the Communication Channel. * @memberof nopeDispatcher */ protected _subscribeToEvent(event: string): { newSubscription: boolean; observable: INopeObservable; } { const item = this._externalTopicLinkedWithObservable.get(event) || { observable: this._generateObservable(), cb: () => { // Default callback }, // we must know, whether this element is new or not. newSubscription: this._externalTopicLinkedWithObservable.has(event), }; if (!item.observable.hasSubscriptions) { const _this = this; const cb = (data: IExternalEventMsg) => { item.observable.setContent(data, { sender: _this.id }); }; this.communicator.onEvent(event, cb); item.cb = cb; } // Set the Items. this._externalTopicLinkedWithObservable.set(event, item); const { cb, ...ret } = item; return ret as { newSubscription: boolean; observable: INopeObservable; }; } /** * Function to register a Function in the Dispatcher * * @param {(...args) => Promise} func The function which should be called if a request is mapped to the Function. * @param {{ * // Flag to enable unregistering the function after calling. * deleteAfterCalling?: boolean, * // Instead of generating a uuid an id could be provided * id?: string; * }} [options={}] Options to enhance the registered ID and enabling unregistering the Element after calling it. * @return {*} {(...args) => Promise} The registered Function * @memberof nopeDispatcher */ public registerFunction( func: (...args) => Promise, options: { /** Flag to enable unregistering the function after calling. */ deleteAfterCalling?: boolean; /** Instead of generating a uuid an id could be provided */ id?: string; /** Flag to enable / disable sending to registery */ preventSendingToRegistery?: boolean; } = {} ): (...args) => Promise { const _this = this; // Define / Use the ID of the Function. const _id = options.id || generateId(); let _func = func; if (!this.__warned && !isAsyncFunction(func)) { this._logger.warn( "!!! You have provided synchronous functions. They may break NoPE. Use them with care !!!" ); this.__warned = true; } if (options.deleteAfterCalling) { _func = async (...args) => { // Unregister the Method _this.unregisterFunction(_id, { preventSendingToRegistery: options.preventSendingToRegistery, }); // Return the Result of the Original Function. return await func(...args); }; } // Define a ID for the Function _func["id"] = _id; // Define the callback. _func["unregister"] = () => _this.unregisterFunction(_id); // Reister the Function this._definedFunctions.set(_func["id"], { options: { preventSendingToRegistery: options.preventSendingToRegistery || false, }, func: _func, }); // Register the Callback: this._subscribeToService(_id, _func); if (!options.preventSendingToRegistery) { this._updateAmountOf("services"); // Publish the Available Services. this._sendAvailableServices(); if (this._logger?.enabledFor((Logger as any).DEBUG)) { // If there is a Logger: this._logger.debug( 'Dispatcher "' + this.id + '" registered: "' + _id + '"' ); } } // Return the Function. return _func; } /** * Function to unregister a Function from the Dispatcher * @param {(((...args) => void) | string | number)} func The Function to unregister * @return {*} {boolean} Flag, whether the element was removed (only if found) or not. * @memberof nopeDispatcher */ public unregisterFunction( func: ((...args) => void) | string, options: { /** Flag to enable / disable sending to registery */ preventSendingToRegistery?: boolean; } = {} ): boolean { const _id = typeof func === "string" ? func : (func["id"] as string) || "0"; this._removeRpcSubscription(_id); if (!options.preventSendingToRegistery) { this._updateAmountOf("services"); // Publish the Available Services. this._sendAvailableServices(); if (this._logger?.enabledFor((Logger as any).DEBUG)) { // If there is a Logger: this._logger.debug( 'Dispatcher "' + this.id + '" unregistered: "' + _id + '"' ); } } return this._definedFunctions.delete(_id); } public registerObservable( observable: INopeObservable, options: IPropertyOptions ): INopeObservable { // Reference to itself const _this = this; // Extract the Topic, pipe and scope. const _subTopic = typeof options.topic === "string" ? options.topic : options.topic.subscribe || null; const _pubTopic = typeof options.topic === "string" ? options.topic : options.topic.publish || null; this._registeredObservables.set(observable, options); // A Flag, indicating, whether the topic is new or not. let newElement = false; // Test if the Item should be subscribe or not. if ( options.mode == "subscribe" || (Array.isArray(options.mode) && options.mode.includes("subscribe")) ) { // Now we want to subscribe to external events. newElement = newElement || !this._externalTopicLinkedWithObservable.has(_subTopic); // Now we store the external source. const _externalSource = this._subscribeToEvent(_subTopic); const observer = _externalSource.observable.subscribe({ next(data: IExternalEventMsg) { // We will forward the content, if its from a different sender or its forced if ( (data.forced && data.sender != _this.id) || data.sender != _this.id ) { if (_this._logger?.enabledFor((Logger as any).DEBUG)) { _this._logger.debug( 'Forwarding data on "' + _subTopic + '" to registered observables' ); } observable.setContent(data.data, { sender: _this.id, timestamp: data.timestamp, forced: data.forced || false, }); } }, complete() { observable.observable.complete(); }, error(err) { observable.observable.error(err); }, }); // Overwrite the Original Dispose Function. const dispose = observable.dispose; observable.dispose = () => { // Kill the Observer; observer.unsubscribe(); // Unsubscribe the Event _this._unsubscribeObservable(_subTopic); // Call the original Dispose function; dispose.apply(observable); }; // Only if required => update the Value: // This is done, by using the lastly published / // received value if (!options.preventSendingUpdateOnNewSubscription) { // Get the available data. // try to use the lastly published data. const data = this._lastPublishedEvent.get(_subTopic); // This Step is not clear. it should be prevented. Otherwise inconsistent // bahavoir. Commented out for testing purposes. // const data = // _sourceData !== null && _sourceData !== undefined // ? _sourceData // : _externalSource.getContent(); if (typeof data !== "undefined" && data !== null) { if ( !observable.setContent(data.data, { sender: _this.id, timestamp: data.timestamp, }) ) { observable.forcePublish(); } } } } if ( options.mode == "publish" || (Array.isArray(options.mode) && options.mode.includes("publish")) ) { // Now we want to subscribe to external events. newElement = newElement || !this._externalTopicLinkedWithObservable.has(_pubTopic); // Now we store the external source. const _externalSource = this._subscribeToEvent(_pubTopic); // In Here, we define a callback, which we will use // to forward the events of this particular source // to the network. const cb: IEventCallback = (data, options) => { // Only Publish data, if there exists a Subscription. if (_this.id !== options.sender) { const msg: IExternalEventMsg = { data: data, topic: _pubTopic, sender: _this.id, type: "event", timestamp: options.timestamp, }; if (options.forced) { msg.forced = true; } // Test if the Message is subscribed externally or Emitting // Updates is enabled: if so, use the socket to send the data // to other dispatchers. if ( _this.subscriptionForPropertyExists(_pubTopic) || _this._forceEmittingUpdates || // If the Update is forced options.forced ) { // Use the Communicator to emit the Event. _this.communicator.emitEvent(_pubTopic, msg); } // Store the lastly published message, this will be published if // a new subscription is provided if (_this._eventsToSendCurrentValueOnSubscription.has(_pubTopic)) { if (_this._logger.enabledFor((Logger as any).DEBUG)) { _this._logger.debug(`Storing data for "${_pubTopic}"`); } _this._lastPublishedEvent.set(_pubTopic, msg); if (_this._logger.enabledFor((Logger as any).DEBUG)) { _this._logger.debug(`Forwarding data for "${_pubTopic}"`); } _externalSource.observable.setContent({ data: data, topic: _pubTopic, // Watchout => We are using the provided sender here. // It is used sender: options.sender, type: "event", timestamp: options.timestamp, }); } else if (_externalSource.observable.observerLength > 0) { // The Observable it is used multiple times internally. // For this purpose, send the data using the "external source" // -channel. because we are using the original id of the sender, // we prevent endless loops. Observables are always checking // if an update has been send by them self or not. if (_this._logger.enabledFor((Logger as any).DEBUG)) { _this._logger.debug( `Forwarding data for "${_pubTopic}" because there is someone listening` ); } _externalSource.observable.setContent({ data: data, topic: _pubTopic, // Watchout => We are using the provided sender here. // It is used sender: options.sender, type: "event", timestamp: options.timestamp, }); } } }; // Register the Internally Subscribed Observable to mark it as forwarded. const _set01 = this._internalObservablesForwardDataToNetwork.get(_pubTopic) || new Set(); _set01.add(observable); this._internalObservablesForwardDataToNetwork.set(_pubTopic, _set01); // If desired, we mark the Observable as forwarded on new subscription. // We mark it by storing it in the set "_eventsToSendCurrentValueOnSubscription" // If we detect a new subscription in this section, we will update the value. // Additionally, we will store the current value in the "external-source" which // is linked to the network. if (!options.preventSendingUpdateOnNewSubscription) { const _set02 = this._eventsToSendCurrentValueOnSubscription.get(_pubTopic) || new Set(); _set02.add(observable); this._eventsToSendCurrentValueOnSubscription.set(_pubTopic, _set02); // Test if there exists content, if so => store it // to the external source and to the "_lastPublishedEvent" // Should not be required, because during calling subscribe this action will be called: // const data = observable.getContent(); // if (data !== null && data !== undefined) { // this._lastPublishedEvent.set(_pubTopic, { // data: data, // topic: _pubTopic, // sender: _this.id, // type: "event", // timestamp: Date.now() // }); // // Use the External Source to store the Last Value // _externalSource.setContent({ // data: data, // topic: _pubTopic, // sender: observable.id, // type: "event", // timestamp: Date.now() // }); // } } const observer = observable.subscribe(cb); // Overwrite the Original Dispose Function. const dispose = observable.dispose; observable.dispose = () => { // Kill the Observer; observer.unsubscribe(); // Unsubscribe the Event _this._unsubscribeObservable(_subTopic); // Unregister the Internally Subscribed Element. const _set01 = _this._internalObservablesForwardDataToNetwork.get(_pubTopic) || new Set(); _set01.delete(observable); if (_set01.size > 0) { _this._internalObservablesForwardDataToNetwork.set(_pubTopic, _set01); } else { _this._internalObservablesForwardDataToNetwork.delete(_pubTopic); // Optionally send an update. if (!options.preventSendingToRegistery) { // Publish the Available Services. _this._sendAvailableObservables(); } } // Remove the Element. const _set02 = this._eventsToSendCurrentValueOnSubscription.get(_pubTopic) || new Set(); _set02.delete(observable); if (_set02.size > 0) { this._eventsToSendCurrentValueOnSubscription.set(_pubTopic, _set02); } else { // Topic isnt provided any more. // delete the flag _this._eventsToSendCurrentValueOnSubscription.delete(_pubTopic); _this._lastPublishedEvent.delete(_pubTopic); } // Call the original Dispose function; dispose.apply(observable); }; } if (newElement) { this._updateListsOfInternallySubscribedAndPublishedValues(); } if (!options.preventSendingToRegistery && newElement) { // Publish the Available Services. this._sendAvailableObservables(); } // Return the Function. return observable; } public unregisterObservable( observable: INopeObservable | string, options: { // Flag to enable / disable sending to registery preventSendingToRegistery?: boolean; } = {} ): boolean { const _id = typeof observable === "string" ? observable : (observable.id as string) || "0"; let _observable: INopeObservable = null; for (const obs of this._registeredObservables.keys()) { if (obs.id == _id) { _observable = obs; } } if (!options.preventSendingToRegistery) { // Publish the Available Services. this._sendAvailableObservables(); if (this._logger?.enabledFor((Logger as any).DEBUG)) { // If there is a Logger: this._logger.debug( 'Dispatcher "' + this.id + '" unregistered: "' + _id + '"' ); } } return this._registeredObservables.delete(_observable); } protected _externalTopicLinkedWithObservable: Map< string, { observable: INopeObservable; cb: (...arg) => void; } >; /** * Internal Observables, which will forward data to the network. * The "key" represents the topic */ protected _internalObservablesForwardDataToNetwork: Map< string, Set> >; /** * Function to unsubscribe from an event of the channel. * * @protected * @param {string} path * @memberof nopeDispatcher */ protected _unsubscribeObservable(path: string) { const item = this._externalTopicLinkedWithObservable.get(path); if (item) { this.communicator.offEvent(path, item.cb); // Dispose the Observable const obs = item.observable; obs.dispose(); // Remove the Observable this._externalTopicLinkedWithObservable.delete(path); } } /** * Helper Function to directly subscribe to a specific value. * @param event The Event. * @param callback The Callback used to subscribe to the event * @param options Additional Options used to specify the Subscribing. */ public async subscribeToEvent( event: string, callback: IEventCallback, options: { pipe?: { pipe?: IPipe; scope?: { [index: string]: any }; }; preventSendingToRegistery?: boolean; subscriptionOptions?: INopeSubscriptionOptions; } = {} ): Promise { // Create a new observable: const observable = this._generateObservable(); // register the newly created observable. this.registerObservable(observable, { mode: "subscribe", topic: event, preventSendingToRegistery: options.preventSendingToRegistery, schema: {}, }); // Create an Observer by susbcribing to the external source (this is directly linked to the System) const observer = observable.subscribe( callback, options.subscriptionOptions ); observer.unsubscribe = () => { observable.dispose(); }; // Return the Observer. return observer; } /** * Function to manually emit an Event. * @param _eventName * @param data * @param sender * @param timestamp * @param forced * @param args */ public async emit( _eventName: string, data: T, sender?: string, timestamp?: number, forced = false, ...args ): Promise { // Only Publish data, if there exists a Subscription. if ( forced || (this.subscriptionForPropertyExists(_eventName) && this.id !== sender) ) { // Use the Communicator to emit the Event or its forced await this.communicator.emitEvent(_eventName, { forced, data: data, topic: _eventName, sender: this.id, type: "event", timestamp, }); } } protected _removeRpcSubscription(_id: string): void { // Try to unregister the Callback from the communcator: if (this._communicatorCallbacks.has(_id)) { const _callbacks = this._communicatorCallbacks.get(_id); switch (_callbacks.type) { case "request": // Unregister the RPC-Request-Listener this.communicator.offRpcRequest( _callbacks.registeredId, _callbacks.cb ); break; case "response": // Unregister the RPC-Response-Listener this.communicator.offRpcResponse( _callbacks.registeredId, _callbacks.cb ); break; } // Remove the Callback this._communicatorCallbacks.delete(_id); } } /** * Function used to update the Available Services. * * @protected * @memberof nopeDispatcher */ protected _sendAvailableServices(): void { // Define the Message const message: IAvailableServicesMsg = { dispatcher: this.id, services: Array.from(this._definedFunctions.keys()), }; if (this._logger?.enabledFor((Logger as any).DEBUG)) { this._logger.debug("sending available services"); } // Send the Message. this.communicator.emitNewServicesAvailable(message); } /** * Function to emit the available topics. * * @protected * @memberof nopeDispatcher */ protected _sendAvailableObservables(): void { // Define the Message const message: IAvailableTopicsMsg = { dispatcher: this.id, published: Array.from( this._internalObservablesForwardDataToNetwork.keys() ), subscribed: Array.from(this._externalTopicLinkedWithObservable.keys()), }; if (this._logger?.enabledFor((Logger as any).DEBUG)) { this._logger.debug( "sending available properties. (subscribing and publishing events)" ); } // Send the Message. this.communicator.emitNewObservablesAvailable(message); } /** * Internal Function, which will enable to * update the lists of internal subscribed and published values. * @param considerHidden */ protected _updateListsOfInternallySubscribedAndPublishedValues( considerHidden = true ): void { // Clear the Overview this._internalSubscribed.clear(); this._internalPublished.clear(); for (const options of this._registeredObservables.values()) { if (!options.preventSendingToRegistery || considerHidden) { // Extract the Topic, pipe and scope. const _subTopic = typeof options.topic === "string" ? options.topic : options.topic.subscribe || null; const _pubTopic = typeof options.topic === "string" ? options.topic : options.topic.publish || null; if (_subTopic !== null) { this._internalSubscribed.add(_subTopic); } if (_pubTopic !== null) { this._internalPublished.add(_pubTopic); } } } } protected _emitStatus(): void { this.communicator.emitStatusUpdate(this._genAliveMessage()); } protected _sendAvailableGenerators(): void { // Define the Message const message: IAvailableInstanceGeneratorsMsg = { dispatcher: this.id, generators: Array.from(this._externalGenerators.keys()), }; if (this._logger?.enabledFor((Logger as any).DEBUG)) { this._logger.debug("sending available instance generators"); } // Send the Message. this.communicator.emitNewInstanceGeneratorsAvailable(message); } /** * Update the Available Instances * * @protected * @memberof nopeDispatcher */ protected _sendAvailableInstances(): void { const _this = this; // Update the Instances provided by this module. this.communicator.emitNewInstancesAvailable({ dispatcher: this.id, instances: Array.from(this._internalInstances).map((identifier) => // Generate the Module Description _this._instances.get(identifier).instance.toDescription() ), }); } /** * Function which is used to perform a call on the remote. * * @author M.Karkowski * @template T * @param {string} serviceName serviceName The Name / ID of the Function * @param {any[]} params * @param {(Partial & { * selector?: ValidDefaultSelectors | ValidSelectorFunction; * quite?: boolean; * })} [options={}] Options for the Call. You can assign a different selector. * @return {*} {INopePromise} The result of the call * @memberof nopeDispatcher */ public performCall( serviceName: string, params: any[], options: Partial & { selector?: ValidDefaultSelectors | ValidSelectorFunction; quite?: boolean; preventSelector?: boolean; } = {} ): INopePromise { // Get a Call Id const _taskId = generateId(); const _registeredIdx: Array = []; const _this = this; const _options = Object.assign( { deletableCallbacks: [], paramsHasNoCallback: false, dynamicCallback: false, resultSink: this._getServiceName(serviceName, "response"), selector: this.options.defaultSelector, }, options ) as ICallOptions; this._subscribeToResult(serviceName, _options.dynamicCallback); const clear = () => { // Delete all Callbacks. _registeredIdx.map((id) => _this.unregisterFunction(id)); // Remove the task: if (_this._runningInternalRequestedTasks.has(_taskId)) { const task = _this._runningInternalRequestedTasks.get(_taskId); // Remove the Timeout. if (task.timeout) { clearTimeout(task.timeout); } // Remove the Task itself _this._runningInternalRequestedTasks.delete(_taskId); } if (_this._logger?.enabledFor((Logger as any).DEBUG)) { _this._logger.debug("Clearing Callbacks from " + _taskId); } }; if (_this._logger?.enabledFor((Logger as any).DEBUG)) { _this._logger.debug( 'Dispatcher "' + this.id + '" requesting externally Function "' + serviceName + '" with task: "' + _taskId + '"' ); } // Define a Callback-Function, which will expect the Task. const ret = new NopePromise(async (resolve, reject) => { try { const requestedTask: any = { resolve, reject, clear, serviceName, timeout: null, }; // Register the Handlers, _this._runningInternalRequestedTasks.set(_taskId, requestedTask); // Define a Task-Request const taskRequest: IRequestTaskMsg = { functionId: serviceName, params: [], callbacks: [], taskId: _taskId, type: "requestOfTask", resultSink: _options.resultSink, }; // Test if there is no Callback integrated if (!options.paramsHasNoCallback) { // If so, the parameters has to be detailled: // Iterate over all Parameters and // Determin Callbacks. Based on the Parameter- // Type assign it either to packet.params ( // for parsable Parameters) and packet.callbacks // (for callback Parameters) for (const [idx, contentOfParameter] of params.entries()) { // Test if the parameter is a Function if (typeof contentOfParameter !== "function") { taskRequest.params.push({ idx, data: contentOfParameter, }); } else { // The Parameter is a Callback => store a // Description of the Callback and register // the callback inside of the Dispatcher const deleteAfterCalling = _options.deletableCallbacks.includes(idx); const _func = _this.registerFunction(contentOfParameter, { deleteAfterCalling, preventSendingToRegistery: true, }); _registeredIdx.push(_func["id"]); // Register the Callback taskRequest.callbacks.push({ functionId: _func["id"], idx, deleteAfterCalling, dynamicCallback: true, deletableCallbacks: [], resultSink: _this._getServiceName(_func["id"], "response"), }); } } } else { for (const [idx, contentOfParameter] of params.entries()) { taskRequest.params.push({ idx, data: contentOfParameter, }); } } if (!_options.dynamicCallback && !_this.serviceExists(serviceName)) { // Create an Error: const error = new Error( 'No Service Provider known for "' + serviceName + '"' ); if (!options.quite && _this._logger) { _this._logger.error( 'No Service Provider known for "' + serviceName + '"' ); _this._logger.error(error); } throw error; } if ( !options.preventSelector && (_this.options.forceUsingSelectors || _this._amountOfServicesProvidedByName.get(serviceName)) > 1 ) { if (typeof options?.selector === "function") { const selector = await this._generateSelector( serviceName, "service", options.selector ); // Assign the Selector Promise selector.selectionSucessPromise.catch((_) => { _this.cancelTask( _taskId, new Error( "No dispatcher has been selected for executing the task!" ), false ); }); // Assign the Selector: taskRequest.target = selector.selectorFuncOptions; } else { const selector = await this._generateAndRegisterSelectorFunction( serviceName, "service", typeof options?.selector === "string" ? options.selector : this.options.defaultSelector ); // Assign the Selector Promise selector.selectionSucessPromise.catch((_) => { _this.cancelTask( _taskId, new Error( "No dispatcher has been selected for executing the task!" ), false ); }); // Assign the Selector: taskRequest.target = selector.selectorFuncOptions; } } // Send the Message to the specific element: await _this.communicator.emitRpcRequest( _this._getServiceName(taskRequest.functionId, "request"), taskRequest ); if (_this._logger?.enabledFor((Logger as any).DEBUG)) { _this._logger.debug( 'Dispatcher "' + this.id + '" putting task "' + _taskId + '" on: "' + _this._getServiceName(taskRequest.functionId, "request") + '"' ); } // If there is a timeout => if (options.timeout > 0) { requestedTask.timeout = setTimeout(() => { _this.cancelTask( _taskId, new Error( "TIMEOUT. The Service allowed execution time of " + options.timeout.toString() + "[ms] has been excided" ), options.quite || false ); }, options.timeout); } } catch (e) { // Clear all Elements of the Function: clear(); // Throw the error. reject(e); } }); ret.taskId = _taskId; ret.cancel = (reason) => { _this.cancelTask(_taskId, reason); }; return ret; } /** * Function to clear all pending tasks * * @memberof nopeDispatcher */ public clearTasks(): void { if (this._runningInternalRequestedTasks) this._runningInternalRequestedTasks.clear(); else this._runningInternalRequestedTasks = new Map(); } /** * Function to unregister all Functions of the Dispatcher. * * @memberof nopeDispatcher */ public unregisterAll(): void { if (this._definedFunctions) { for (const id of this._definedFunctions.keys()) { this.unregisterFunction(id); } this._definedFunctions.clear(); } else { this._definedFunctions = new Map(); } // Reset the Callbacks. this._communicatorCallbacks = new Map(); } /** * Function to reset the Dispatcher. * * @memberof nopeDispatcher */ public reset(): void { this._remotlyCalledFunctions = new Set(); this._mappingOfRemoteDispatchersAndServices = new Map(); this._mappingOfRemoteDispatchersAndPropsOrEvents = new Map(); this._mappingOfRemoteDispatchersAndGenerators = new Map(); this._mappingOfRemoteDispatchersAndInstances = new Map(); this._internalGenerators = new Map(); this._externalGenerators = new Map(); // If Instances Exists => Delete them. if (this._instances) { const _this = this; // Dispose all Instances. for (const [name, instance] of this._instances.entries()) { // Remove the Instance. this.deleteInstance(name, true).catch((e) => { if (_this._logger) { _this._logger.error('Failed Removing Instance "' + name + '"'); _this._logger.error(e); } }); } } this._instances = new Map(); this._externalInstances = new Map(); this._internalInstances = new Set(); this._initializingInstance = new Map(); this._externalInstancesNames = new Set(); this._externalDispatchers = new Map(); this._lastPublishedEvent = new Map(); this._eventsToSendCurrentValueOnSubscription = new Map(); if (this.communicator.connected.getContent()) { // Update the Instances this._sendAvailableInstances(); } this._externalProvidedServices = new Set(); this._amountOfServicesProvidedByName = new Map(); this._externalPublished = new Set(); this._externalSubscribed = new Set(); this._internalPublished = new Set(); this._internalSubscribed = new Set(); this._externalProvidedGenerators = new Set(); this._amountOfGeneratorsProvidedByName = new Map(); this._registeredObservables = new Map(); this._internalObservablesForwardDataToNetwork = new Map(); this._externalTopicLinkedWithObservable = new Map(); this.clearTasks(); this.unregisterAll(); } }