diff --git a/lib/dispatcher/nopeDispatcher.ts b/lib/dispatcher/nopeDispatcher.ts index 9d9a775..7aa3fee 100644 --- a/lib/dispatcher/nopeDispatcher.ts +++ b/lib/dispatcher/nopeDispatcher.ts @@ -10,6 +10,7 @@ import * as Logger from "js-logger"; import { ILogger } from "js-logger"; import { promisify } from "util"; import { avgOfArray, minOfArray } from "../helpers/arrayMethods"; +import { isAsyncFunction } from "../helpers/async"; import { generateId } from "../helpers/idMethods"; import { RUNNINGINNODE } from "../helpers/runtimeMethods"; import { determineDifference } from "../helpers/setMethods"; @@ -35,7 +36,10 @@ import { IDispatcherInfo, IGenerateRemoteInstanceCallback, IGenerateRemoteInstanceForOtherDispatcherCallback, - INopeDispatcher, INopeDispatcherOptions, ValidDefaultSelectors, ValidSelectorFunction + INopeDispatcher, + INopeDispatcherOptions, + ValidDefaultSelectors, + ValidSelectorFunction } from "../types/nope/nopeDispatcher.interface"; import { INopeModule, @@ -53,8 +57,6 @@ import { INopePromise } from "../types/nope/nopePromise.interface"; const sleep = promisify(setTimeout); - - /** * A Dispatcher to perform a function on a Remote * Dispatcher. Therefore a Task is created and forwarded @@ -75,12 +77,15 @@ export class nopeDispatcher implements INopeDispatcher { * @protected * @memberof nopeDispatcher */ - protected _definedFunctions: Map Promise - }>; + protected _definedFunctions: Map< + string, + { + options: { + preventSendingToRegistery: boolean; + }; + func: (...args) => Promise; + } + >; protected _remotlyCalledFunctions: Set; protected _communicatorCallbacks: Map< string, @@ -101,7 +106,7 @@ export class nopeDispatcher implements INopeDispatcher { * A Map holding the current Status of external dispatchers. * Key = Dispatcher-ID * Value = Last Known status of the dispatcher - * + * * @protected * @type {Map} * @memberof nopeDispatcher @@ -112,7 +117,7 @@ export class nopeDispatcher implements INopeDispatcher { * A Mapping of the Services a dispatcher is hosting. * Key = Dispatcher-ID * Value = Available Services - * + * * @protected * @type {Map< * string, @@ -137,11 +142,11 @@ export class nopeDispatcher implements INopeDispatcher { /** * Element holding the information of how-many times, - * the service has been provided. - * + * the service has been provided. + * * Key = Service Name * Value = Times of being provided - * + * * @example example: * service01: 2 * service02: 1 @@ -154,9 +159,9 @@ export class nopeDispatcher implements INopeDispatcher { protected _amountOfServicesProvidedByName: Map; /** - * Element holding the Mapping of the Dispatcher and its instance + * Element holding the Mapping of the Dispatcher and its instance * generators - * + * * Key = Dispatcher-ID * Value = Available Instance Generator of that Dispatcher * @@ -183,11 +188,11 @@ export class nopeDispatcher implements INopeDispatcher { /** * Element holding the information of how-many times, - * the instance-generator has been provided. - * + * the instance-generator has been provided. + * * Key = Generator-Name * Value = Times of being provided - * + * * @example example: * service01: 2 * service02: 1 @@ -256,7 +261,6 @@ export class nopeDispatcher implements INopeDispatcher { } >; - protected _forceEmittingUpdates: boolean; protected _runningExternalRequestedTasks: Set; protected _timeouts: { @@ -307,7 +311,8 @@ export class nopeDispatcher implements INopeDispatcher { const getLoad = () => { const cpus = os.cpus(); - let totalTime = 0, idleTime = 0; + let totalTime = 0, + idleTime = 0; // Determine the current load: for (const cpu of cpus) { @@ -329,17 +334,22 @@ export class nopeDispatcher implements INopeDispatcher { // Get the current CPU Times. const currentTimes = getLoad(); // Determine the difference between the old Times an the current Times. - _this._cpuLoad = 1 - (currentTimes.idleTime - oldTimes.idleTime) / (currentTimes.totalTime - oldTimes.totalTime); + _this._cpuLoad = + 1 - + (currentTimes.idleTime - oldTimes.idleTime) / + (currentTimes.totalTime - oldTimes.totalTime); // Store the current CPU-Times oldTimes = currentTimes; }, 100); } - /** * Define the flag, which will be used to force sending updates. */ - this._forceEmittingUpdates = typeof (options.forceEmittingUpdates) === "boolean" ? options.forceEmittingUpdates : false; + this._forceEmittingUpdates = + typeof options.forceEmittingUpdates === "boolean" + ? options.forceEmittingUpdates + : false; /** * Define A Proxy for accessing methods easier. @@ -388,7 +398,10 @@ export class nopeDispatcher implements INopeDispatcher { this.externalDispatchers.setContent([]); if (this._logger) { - this._logger.info("Dispatcher online. -> Reseting and Initializing: ", this.id); + this._logger.info( + "Dispatcher online. -> Reseting and Initializing: ", + this.id + ); } this.reset(); @@ -416,8 +429,8 @@ export class nopeDispatcher implements INopeDispatcher { if (this._logger?.enabledFor(Logger.DEBUG)) { this._logger.debug( "Adding instance generator for \"" + - identifier + - "\" to external Generators. Other Elements can now create instances of this type." + identifier + + "\" to external Generators. Other Elements can now create instances of this type." ); } @@ -514,13 +527,15 @@ export class nopeDispatcher implements INopeDispatcher { this._sendAvailableGenerators(); } - async unprovideInstanceGeneratorForExternalDispatchers(identifier: string): Promise { + async unprovideInstanceGeneratorForExternalDispatchers( + identifier: string + ): Promise { if (this._externalGenerators.has(identifier)) { if (this._logger?.enabledFor(Logger.DEBUG)) { this._logger.debug( "Removing instance generator for \"" + - identifier + - "\" from external Generators. Other Elements cant create instances of this type anymore." + identifier + + "\" from external Generators. Other Elements cant create instances of this type anymore." ); } @@ -541,8 +556,8 @@ export class nopeDispatcher implements INopeDispatcher { if (this._logger?.enabledFor(Logger.DEBUG)) { this._logger.debug( "Adding instance generator for \"" + - identifier + - "\" as internal Generator. This Generator wont be used externally." + identifier + + "\" as internal Generator. This Generator wont be used externally." ); } @@ -553,8 +568,8 @@ export class nopeDispatcher implements INopeDispatcher { if (this._logger?.enabledFor(Logger.DEBUG)) { this._logger.debug( "Rmoving instance generator for \"" + - identifier + - "\" from internal Generator. The sytem cant create elements of this type any more." + identifier + + "\" from internal Generator. The sytem cant create elements of this type any more." ); } @@ -586,7 +601,9 @@ export class nopeDispatcher implements INopeDispatcher { * @return {*} {Promise} * @memberof nopeDispatcher */ - public async unregisterCallback(func: string | ((...args) => any)): Promise { + public async unregisterCallback( + func: string | ((...args) => any) + ): Promise { if (typeof func === "string") { await this.communicator.emitUnregisterRpc({ dispatcherId: this.id, @@ -604,34 +621,45 @@ export class nopeDispatcher implements INopeDispatcher { /** * Helper Function, that will create a Selector Function for Generators or Service Calls. - * Therefore the "testCallback" will be used to determine the whether a dispatcher is + * Therefore the "testCallback" will be used to determine the whether a dispatcher is * allowed to execute the service or not. * * @param {string} name * @param {("service" | "generator")} type * @param {(dispatcherId: string) => Promise} testCallback - * @return {*} + * @return {*} * @memberof nopeDispatcher */ - protected async _generateSelector(name: string, type: "service" | "generator", testCallback: ValidSelectorFunction, options: { timeout?: number } = {}) { + protected async _generateSelector( + name: string, + type: "service" | "generator", + testCallback: ValidSelectorFunction, + options: { timeout?: number } = {} + ) { const _this = this; - const _options = Object.assign({ - timeout: 1000, - }, options); + const _options = Object.assign( + { + timeout: 1000 + }, + options + ); - - // Create a Variable, which will count the amout, the + // Create a Variable, which will count the amout, the // selection function has been called let called = 0; // Create a callback and Promise, that will be rejected // if a timer has been elapsed. let timerFinishedCallback: (...args) => void; - const timerFinishedPromise = new Promise((resolve, reject) => timerFinishedCallback = reject); + const timerFinishedPromise = new Promise( + (resolve, reject) => (timerFinishedCallback = reject) + ); // Create a callback and a Promise, that will be finished, // if all elements has called the selection function. let checksFinishedCallback: (...args) => void; - const checksFinishedPromise = new Promise(resolve => checksFinishedCallback = resolve); + const checksFinishedPromise = new Promise( + (resolve) => (checksFinishedCallback = resolve) + ); // An finally we need to kown, if the selection was fine. // Therefore we create a variable. let executorSelected = false; @@ -640,9 +668,9 @@ export class nopeDispatcher implements INopeDispatcher { const responses = new Set(); - // A Default callback, which should be called inside of a + // A Default callback, which should be called inside of a // selection function. This function will ensure, that we - // trigger the corresponding timers to unregister the + // trigger the corresponding timers to unregister the // selection function. const defaultCallbackHandler = () => { const first = firstCall; @@ -650,17 +678,24 @@ export class nopeDispatcher implements INopeDispatcher { if (firstCall) { firstCall = false; // Now we start a timer to make shure to function will be unregistered after a specific amount of time. - setTimeout(timerFinishedCallback, _options.timeout, new Error("Timeout: Selection timedout")); + setTimeout( + timerFinishedCallback, + _options.timeout, + new Error("Timeout: Selection timedout") + ); } // Determine how many calles are allowed before we have to remove the callback - const amountOfCallsBeforeUnregister = type == "generator" ? _this._amountOfGeneratorsProvidedByName.get(name) || 1 : _this._amountOfServicesProvidedByName.get(name) || 1; + const amountOfCallsBeforeUnregister = + type == "generator" + ? _this._amountOfGeneratorsProvidedByName.get(name) || 1 + : _this._amountOfServicesProvidedByName.get(name) || 1; // Now we make shure, we count this call called++; // If we have call the more items than required, - // we asume that we are taking care of the last + // we asume that we are taking care of the last // element. if (amountOfCallsBeforeUnregister <= called) { checksFinishedCallback(); @@ -673,11 +708,10 @@ export class nopeDispatcher implements INopeDispatcher { return { last: false, first - };; + }; }; let selectorFunc = async (externalDispatcherId: string) => { - // Prevent taking care of the same dispatcher twice. if (responses.has(externalDispatcherId)) { return false; @@ -688,10 +722,14 @@ export class nopeDispatcher implements INopeDispatcher { // Make shure we start the timers to unregister the function etc. // We receive an info, whether we are checking the last call: - const options = Object.assign(defaultCallbackHandler(), { timeout: _options.timeout - 50 }); + const options = Object.assign(defaultCallbackHandler(), { + timeout: _options.timeout - 50 + }); - - if (!executorSelected && await testCallback(externalDispatcherId, options)) { + if ( + !executorSelected && + (await testCallback(externalDispatcherId, options)) + ) { // The value of "executorSelected" may changed during calling the // testCallback. Therefore we check this again. if (executorSelected) { @@ -706,11 +744,10 @@ export class nopeDispatcher implements INopeDispatcher { }; selectorFunc = await this.registerFunction(selectorFunc, { - preventSendingToRegistery: true, + preventSendingToRegistery: true }); - - const selectorFuncOptions: (ICallOptions & { functionId: string }) = { + const selectorFuncOptions: ICallOptions & { functionId: string } = { resultSink: this._getServiceName(selectorFunc["id"], "response"), timeout: _options.timeout, paramsHasNoCallback: true, @@ -720,25 +757,27 @@ export class nopeDispatcher implements INopeDispatcher { let rejectSelectionSucess: () => void; - // Our final Promise, that will be used to show, that + // Our final Promise, that will be used to show, that // The Selection has failed. const selectionSucessPromise = new Promise((resolve, reject) => { rejectSelectionSucess = reject; }); - // Now we create a Callback which will be used to - Promise.race([timerFinishedPromise, checksFinishedPromise]).catch(err => { - // Now based on the result of the selection we may - // reject the "selectionSucess" - if (!executorSelected) { - rejectSelectionSucess(); - } - }).finally(async () => { - // Either all Elements has been - _this.unregisterFunction(selectorFunc, { preventSendingToRegistery: true }); - - - }); + // Now we create a Callback which will be used to + Promise.race([timerFinishedPromise, checksFinishedPromise]) + .catch((err) => { + // Now based on the result of the selection we may + // reject the "selectionSucess" + if (!executorSelected) { + rejectSelectionSucess(); + } + }) + .finally(async () => { + // Either all Elements has been + _this.unregisterFunction(selectorFunc, { + preventSendingToRegistery: true + }); + }); return { selectorFuncOptions, @@ -753,12 +792,19 @@ export class nopeDispatcher implements INopeDispatcher { * @param {("first" | "a")} selector * @memberof nopeDispatcher */ - protected _generateAndRegisterSelectorFunction(name: string, type: "service" | "generator", selector: ValidDefaultSelectors) { - return this._generateSelector(name, type, this._generateSelectorFunction(selector)); + protected _generateAndRegisterSelectorFunction( + name: string, + type: "service" | "generator", + selector: ValidDefaultSelectors + ) { + return this._generateSelector( + name, + type, + this._generateSelectorFunction(selector) + ); } protected _generateSelectorFunction(selector: ValidDefaultSelectors) { - const _this = this; let helperData: any; @@ -791,7 +837,9 @@ export class nopeDispatcher implements INopeDispatcher { // Now we find the Min CPU usage: const bestOption = minOfArray(helperData.arr, "cpuUsage").index; - for (const [index, { resolve }] of (helperData.arr as any[]).entries()) { + for (const [index, { resolve }] of ( + helperData.arr as any[] + ).entries()) { resolve(index === bestOption); } @@ -806,19 +854,21 @@ export class nopeDispatcher implements INopeDispatcher { }; return async (externalDispatcherId, options) => { - if (options.initalTest) { return true; } - let cpuUsage = _this._externalDispatchers.get(externalDispatcherId).host.cpu.usage; + let cpuUsage = + _this._externalDispatchers.get(externalDispatcherId).host.cpu.usage; cpuUsage = cpuUsage === -1 ? Infinity : cpuUsage; - // For every Call, we - const promise = new Promise(resolve => helperData.arr.push({ resolve, cpuUsage })); + // For every Call, we + const promise = new Promise((resolve) => + helperData.arr.push({ resolve, cpuUsage }) + ); // If we are calling the Element at the firsttime, - // we ensure, that we have callback, which will + // we ensure, that we have callback, which will // be called after the specified amount of time to // assing the selection. if (options.first) { @@ -841,7 +891,9 @@ export class nopeDispatcher implements INopeDispatcher { // Now we find the Min CPU usage: const bestOption = minOfArray(helperData.arr, "freeRam").index; - for (const [index, { resolve }] of (helperData.arr as any[]).entries()) { + for (const [index, { resolve }] of ( + helperData.arr as any[] + ).entries()) { resolve(index === bestOption); } @@ -856,19 +908,21 @@ export class nopeDispatcher implements INopeDispatcher { }; return async (externalDispatcherId, options) => { - if (options.initalTest) { return true; } - let freeRam = _this._externalDispatchers.get(externalDispatcherId).host.ram.free; + let freeRam = + _this._externalDispatchers.get(externalDispatcherId).host.ram.free; freeRam = freeRam === -1 ? Infinity : freeRam; - // For every Call, we - const promise = new Promise(resolve => helperData.arr.push({ resolve, freeRam })); + // For every Call, we + const promise = new Promise((resolve) => + helperData.arr.push({ resolve, freeRam }) + ); // If we are calling the Element at the firsttime, - // we ensure, that we have callback, which will + // we ensure, that we have callback, which will // be called after the specified amount of time to // assing the selection. if (options.first) { @@ -885,7 +939,7 @@ export class nopeDispatcher implements INopeDispatcher { } /** - * Function, that will extract the information of the instance and the + * Function, that will extract the information of the instance and the * provider. * * @author M.Karkowski @@ -893,23 +947,31 @@ export class nopeDispatcher implements INopeDispatcher { * @return {*} {(INopeModuleDescription & { dispatcher: IDispatcherInfo })} * @memberof nopeDispatcher */ - public getInstanceInfo(identifier: string): { description: INopeModuleDescription, dispatcher: IDispatcherInfo } | undefined { - + public getInstanceInfo( + identifier: string + ): + | { description: INopeModuleDescription; dispatcher: IDispatcherInfo } + | undefined { // First check if the instance exists. if (!this.instanceExists(identifier, false)) { return undefined; } // Define the return type - const ret: { description: INopeModuleDescription, dispatcher: IDispatcherInfo } = {} as any; + const ret: { + description: INopeModuleDescription; + dispatcher: IDispatcherInfo; + } = {} as any; // First we check if we are taking care of an internal instance, if so // we will use this instance to enrich the description, otherwise, we // will look in the external instances. if (this._instances.has(identifier)) { - ret.description = this._instances.get(identifier).instance.toDescription(); + ret.description = this._instances + .get(identifier) + .instance.toDescription(); } else { - // Now extract teh + // Now extract teh for (const modules of this._mappingOfRemoteDispatchersAndInstances.values()) { for (const mod of modules) { if (mod.identifier == identifier) { @@ -934,7 +996,9 @@ export class nopeDispatcher implements INopeDispatcher { * @return {*} {(IDispatcherInfo | undefined)} * @memberof nopeDispatcher */ - public getDispatcherForInstance(identifier: string): IDispatcherInfo | undefined { + public getDispatcherForInstance( + identifier: string + ): IDispatcherInfo | undefined { // Check if the instance exists in general if (!this.instanceExists(identifier, false)) { return undefined; @@ -957,16 +1021,13 @@ export class nopeDispatcher implements INopeDispatcher { } } - - return undefined; } - public async generateInstance( description: Partial, options: { - selector?: ValidDefaultSelectors | ValidSelectorFunction + selector?: ValidDefaultSelectors | ValidSelectorFunction; } = {} ): Promise { // Define the Default Description @@ -994,10 +1055,10 @@ export class nopeDispatcher implements INopeDispatcher { if (this._logger?.enabledFor(Logger.DEBUG)) { this._logger.debug( "Requesting an Instance of type: \"" + - _defDescription.type + - "\" with the identifier: \"" + - _defDescription.identifier + - "\"" + _defDescription.type + + "\" with the identifier: \"" + + _defDescription.identifier + + "\"" ); } @@ -1005,25 +1066,45 @@ export class nopeDispatcher implements INopeDispatcher { if (this._logger?.enabledFor(Logger.DEBUG)) { this._logger.debug( "Already created instance with the identifiert: \"" + - _defDescription.identifier + - "\" => returning this instance" + _defDescription.identifier + + "\" => returning this instance" ); } const _instanceDetails = this.getInstanceInfo(_description.identifier); - if (_instanceDetails !== undefined && _instanceDetails?.description.type !== _description.type) { - throw Error("There exists an Instance named: '" + _description.identifier + "' but it uses a different type. Requested type: '" + _description.type + "', given type: '" + _instanceDetails?.description.type + "'"); + if ( + _instanceDetails !== undefined && + _instanceDetails?.description.type !== _description.type + ) { + throw Error( + "There exists an Instance named: '" + + _description.identifier + + "' but it uses a different type. Requested type: '" + + _description.type + + "', given type: '" + + _instanceDetails?.description.type + + "'" + ); } const dispatcherToUse = _instanceDetails?.dispatcher.id; if (typeof options.selector === "function") { if (!(await options.selector(dispatcherToUse, { initalTest: true }))) { - throw Error("Instance is available, but selector prevents using the host."); + throw Error( + "Instance is available, but selector prevents using the host." + ); } } else if (typeof options.selector === "string") { // Test the function - if (!(await this._generateSelectorFunction(options.selector)(dispatcherToUse, { initalTest: true }))) { - throw Error("Instance is available, but selector prevents using the host."); + if ( + !(await this._generateSelectorFunction(options.selector)( + dispatcherToUse, + { initalTest: true } + )) + ) { + throw Error( + "Instance is available, but selector prevents using the host." + ); } } @@ -1055,8 +1136,8 @@ export class nopeDispatcher implements INopeDispatcher { if (this._logger?.enabledFor(Logger.DEBUG)) { this._logger.debug( "No instance with the identifiert: \"" + - _defDescription.identifier + - "\" found, but an internal generator is available. Using the internal one for creating the instance and requesting the \"real\" instance externally" + _defDescription.identifier + + "\" found, but an internal generator is available. Using the internal one for creating the instance and requesting the \"real\" instance externally" ); } @@ -1065,8 +1146,19 @@ export class nopeDispatcher implements INopeDispatcher { // try to extract its dispatcher-id and will use that as selector // to allow the function be called. const _instanceDetails = this.getInstanceInfo(_description.identifier); - if (_instanceDetails !== undefined && _instanceDetails?.description.type !== _description.type) { - throw Error("There exists an Instance named: '" + _description.identifier + "' but it uses a different type. Requested type: '" + _description.type + "', given type: '" + _instanceDetails?.description.type + "'"); + if ( + _instanceDetails !== undefined && + _instanceDetails?.description.type !== _description.type + ) { + throw Error( + "There exists an Instance named: '" + + _description.identifier + + "' but it uses a different type. Requested type: '" + + _description.type + + "', given type: '" + + _instanceDetails?.description.type + + "'" + ); } const dispatcherToUse = _instanceDetails?.dispatcher.id; @@ -1079,17 +1171,26 @@ export class nopeDispatcher implements INopeDispatcher { return dispatcherToUse === id; }; } else { - throw Error("Instance is available, but selector prevents using the host."); + throw Error( + "Instance is available, but selector prevents using the host." + ); } } else if (typeof options.selector === "string") { // Test the function - if (await this._generateSelectorFunction(options.selector)(dispatcherToUse, { initalTest: true })) { + if ( + await this._generateSelectorFunction(options.selector)( + dispatcherToUse, + { initalTest: true } + ) + ) { // Now we generate a specific method to check against the dispatcher options.selector = async (id: string) => { return dispatcherToUse === id; }; } else { - throw Error("Instance is available, but selector prevents using the host."); + throw Error( + "Instance is available, but selector prevents using the host." + ); } } } @@ -1223,19 +1324,18 @@ export class nopeDispatcher implements INopeDispatcher { // If there is a Logger: this._logger.debug( "Dispatcher \"" + - this.id + - "\" received request: \"" + - data.functionId + - "\" -> task: \"" + - data.taskId + - "\"" + this.id + + "\" received request: \"" + + data.functionId + + "\" -> task: \"" + + data.taskId + + "\"" ); } const _this = this; if (typeof _function === "function") { - // Now we check, if we have to perform test, whether // we are allowed to execute the task: if (data?.selector?.functionId) { @@ -1274,36 +1374,36 @@ export class nopeDispatcher implements INopeDispatcher { // trigger the remote. data.callbacks.map( (options) => - (args[options.idx] = async (..._args) => { - // And Create the Task and its Promise. - const servicePromise = _this.performCall( - options.functionId, - _args, - options - ); + (args[options.idx] = async (..._args) => { + // And Create the Task and its Promise. + const servicePromise = _this.performCall( + options.functionId, + _args, + options + ); - const cancelCallback = (reason) => { - // The Main Task has been canceled => - // We are allowed to canel the Subtask as well. - servicePromise.cancel(reason); - }; - cbs.push(cancelCallback); + const cancelCallback = (reason) => { + // The Main Task has been canceled => + // We are allowed to canel the Subtask as well. + servicePromise.cancel(reason); + }; + cbs.push(cancelCallback); - // Await the Result. If an Task is canceled => The Error is Thrown. - const result = await servicePromise; + // Await the Result. If an Task is canceled => The Error is Thrown. + const result = await servicePromise; - // Remove the Index - cbs.splice(cbs.indexOf(cancelCallback), 1); + // Remove the Index + cbs.splice(cbs.indexOf(cancelCallback), 1); - return result; - }) + return result; + }) ); // Perform the Task it self. const _resultPromise = _function(...args); if ( - typeof (_resultPromise as INopePromise).cancel === "function" + typeof (_resultPromise as INopePromise)?.cancel === "function" ) { // Push the Callback to the Result. cbs.push((reason) => @@ -1329,10 +1429,10 @@ export class nopeDispatcher implements INopeDispatcher { // If there is a Logger: this._logger.error( "Dispatcher \"" + - this.id + - "\" failed with request: \"" + - data.taskId + - "\"" + this.id + + "\" failed with request: \"" + + data.taskId + + "\"" ); this._logger.error(error); } @@ -1398,7 +1498,7 @@ export class nopeDispatcher implements INopeDispatcher { return true; } - } catch (e) { } + } catch (e) {} return false; } @@ -1474,7 +1574,7 @@ export class nopeDispatcher implements INopeDispatcher { ); } } - } catch (e) { } + } catch (e) {} }); // Subscribe to new available Topics. @@ -1496,10 +1596,17 @@ export class nopeDispatcher implements INopeDispatcher { } } // Update the all internal subscribed / published events. - this.subscribedEvents.setContent(Array.from(new Set([..._this._externalSubscribed, ...this._internalSubscribed]))); - this.publishedEvents.setContent(Array.from(new Set([..._this._externalPublished, ...this._internalPublished]))); - - } catch (e) { } + this.subscribedEvents.setContent( + Array.from( + new Set([..._this._externalSubscribed, ...this._internalSubscribed]) + ) + ); + this.publishedEvents.setContent( + Array.from( + new Set([..._this._externalPublished, ...this._internalPublished]) + ) + ); + } catch (e) {} }); await this.communicator.onNewInstanceGeneratorsAvailable((data) => { @@ -1517,7 +1624,7 @@ export class nopeDispatcher implements INopeDispatcher { data.dispatcher ); } - } catch (e) { } + } catch (e) {} }); await this.communicator.onStatusUpdate((info) => { @@ -1568,8 +1675,8 @@ export class nopeDispatcher implements INopeDispatcher { // If there is a Logger: _this._logger.debug( "Remote Dispatcher \"" + - message.dispatcher + - "\" updated its available instances" + message.dispatcher + + "\" updated its available instances" ); } }); @@ -1580,8 +1687,8 @@ export class nopeDispatcher implements INopeDispatcher { } }); - // Now we listen to - this.communicator.onUnregisterRpc(msg => { + // Now we listen to + this.communicator.onUnregisterRpc((msg) => { if (_this._definedFunctions.has(msg.identifier)) { const item = _this._definedFunctions.get(msg.identifier); @@ -1734,7 +1841,12 @@ export class nopeDispatcher implements INopeDispatcher { if (deleted && this._logger?.enabledFor(Logger.WARN)) { // If there is a Logger: - this._logger.warn("a dispatcher on", dispatcherInfo?.host.name || "unkown", "went offline. ID of the Dispatcher: ", dispatcher); + this._logger.warn( + "a dispatcher on", + dispatcherInfo?.host.name || "unkown", + "went offline. ID of the Dispatcher: ", + dispatcher + ); } } @@ -1760,14 +1872,17 @@ export class nopeDispatcher implements INopeDispatcher { host: { cores: cpus.length, cpu: { - model: `${cpus[0].model}`.slice(0, (cpus[0].model as string).indexOf("@") - 1), + model: `${cpus[0].model}`.slice( + 0, + (cpus[0].model as string).indexOf("@") - 1 + ), speed: avgOfArray(cpus, "speed"), usage: this._cpuLoad }, os: os.platform(), ram: { // Return the used Memory - usedPerc: 1 - (os.freemem() / os.totalmem()), + usedPerc: 1 - os.freemem() / os.totalmem(), // The Values are given in Byte but we want MByte free: Math.round(os.freemem() / 1048576), total: Math.round(os.totalmem() / 1048576) @@ -1903,7 +2018,7 @@ export class nopeDispatcher implements INopeDispatcher { } /** - * Internal function to update the amount of the services or + * Internal function to update the amount of the services or * generators, that are hosted. * * @author M.Karkowski @@ -1912,16 +2027,18 @@ export class nopeDispatcher implements INopeDispatcher { * @memberof nopeDispatcher */ protected _updateAmountOf(type: "services" | "generators"): void { - - const props = type === "services" ? { - map: "_amountOfServicesProvidedByName", - iter: "_mappingOfRemoteDispatchersAndServices", - name: "services" - } : { - map: "_amountOfGeneratorsProvidedByName", - iter: "_mappingOfRemoteDispatchersAndGenerators", - name: "generators" - }; + const props = + type === "services" + ? { + map: "_amountOfServicesProvidedByName", + iter: "_mappingOfRemoteDispatchersAndServices", + name: "services" + } + : { + map: "_amountOfGeneratorsProvidedByName", + iter: "_mappingOfRemoteDispatchersAndGenerators", + name: "generators" + }; const _this = this; @@ -1929,8 +2046,8 @@ export class nopeDispatcher implements INopeDispatcher { for (const dispatcherInfo of this[props.iter].values()) { dispatcherInfo[props.name].map((service) => { - // Add the service and tell the System, that - // we have an additional service-provider for that. + // Add the service and tell the System, that + // we have an additional service-provider for that. _this[props.map].set(service, (_this[props.map].get(service) || 0) + 1); }); } @@ -1957,7 +2074,7 @@ export class nopeDispatcher implements INopeDispatcher { // Store the Availabe Services before the Update. const _servicesBeforeUpdate = new Set(this._externalProvidedServices); - // Clear the Services and now add the available + // Clear the Services and now add the available // Services to this Array this._externalProvidedServices.clear(); for (const dispatcherInfo of this._mappingOfRemoteDispatchersAndServices.values()) { @@ -1969,14 +2086,15 @@ export class nopeDispatcher implements INopeDispatcher { } // Determine the Difference of the Items, before and afterwards - const difference = determineDifference(_servicesBeforeUpdate, _this._externalProvidedServices); + const difference = determineDifference( + _servicesBeforeUpdate, + _this._externalProvidedServices + ); // We combine the provided // and external available. this.externalProvidedServices.setContent( - Array.from( - this._externalProvidedServices - ) + Array.from(this._externalProvidedServices) ); // If there are unavailable tasks => cancel their tasks. @@ -2080,8 +2198,12 @@ export class nopeDispatcher implements INopeDispatcher { this._externalPublished = _published; // Update the Elements. - this.externallySubscribedEvents.setContent(Array.from(this._externalSubscribed)); - this.externallyPublishedEvents.setContent(Array.from(this._externalPublished)); + this.externallySubscribedEvents.setContent( + Array.from(this._externalSubscribed) + ); + this.externallyPublishedEvents.setContent( + Array.from(this._externalPublished) + ); } /** @@ -2112,8 +2234,8 @@ export class nopeDispatcher implements INopeDispatcher { // That should lead to a fatal error ? _this._logger.warn( "An Instance with the name \"" + - instance.identifier + - "\" has already been declared", + instance.identifier + + "\" has already been declared", _this._externalInstances.get(instance.identifier)?.type, "!=", instance.type @@ -2153,7 +2275,8 @@ export class nopeDispatcher implements INopeDispatcher { if (externalOnly) { return this._externalSubscribed.has(topic); } else { - this._internalSubscribed.has(topic) || this._externalSubscribed.has(topic); + this._internalSubscribed.has(topic) || + this._externalSubscribed.has(topic); } } @@ -2169,7 +2292,8 @@ export class nopeDispatcher implements INopeDispatcher { if (externalOnly) { return this._externalInstances.has(identifier); } else { - this._externalInstances.has(identifier) || this._instances.has(identifier); + this._externalInstances.has(identifier) || + this._instances.has(identifier); } } @@ -2286,7 +2410,7 @@ export class nopeDispatcher implements INopeDispatcher { protected _subscribeToEvent(event: string) { const item = this._externalTopicLinkedWithObservable.get(event) || { observable: this._generateObservable(), - cb: () => { } + cb: () => {} }; if (!item.observable.hasSubscriptions) { @@ -2335,6 +2459,10 @@ export class nopeDispatcher implements INopeDispatcher { let _func = func; + if (!isAsyncFunction(func)) { + throw Error("Only Async Functions are allowed"); + } + if (options.deleteAfterCalling) { _func = async (...args) => { // Unregister the Method @@ -2360,7 +2488,6 @@ export class nopeDispatcher implements INopeDispatcher { func: _func }); - // Register the Callback: this._subscribeToService(_id, _func); @@ -2448,7 +2575,8 @@ export class nopeDispatcher implements INopeDispatcher { (Array.isArray(options.mode) && options.mode.includes("subscribe")) ) { // Now we want to subscribe to external events. - newElement = newElement || !this._externalTopicLinkedWithObservable.has(_subTopic); + newElement = + newElement || !this._externalTopicLinkedWithObservable.has(_subTopic); // Now we store the external source. const _externalSource = this._subscribeToEvent(_subTopic); @@ -2533,9 +2661,9 @@ export class nopeDispatcher implements INopeDispatcher { options.mode == "publish" || (Array.isArray(options.mode) && options.mode.includes("publish")) ) { - // Now we want to subscribe to external events. - newElement = newElement || !this._externalTopicLinkedWithObservable.has(_pubTopic); + newElement = + newElement || !this._externalTopicLinkedWithObservable.has(_pubTopic); // Now we store the external source. const _externalSource = this._subscribeToEvent(_pubTopic); @@ -2556,7 +2684,10 @@ export class nopeDispatcher implements INopeDispatcher { // Test if the Message is subscribed externally or Emitting // Updates is enabled: if so, use the socket to send the data // to other dispatchers. - if (_this.subscriptionExists(_pubTopic) || _this._forceEmittingUpdates) { + if ( + _this.subscriptionExists(_pubTopic) || + _this._forceEmittingUpdates + ) { // Use the Communicator to emit the Event. _this.communicator.emitEvent(_pubTopic, msg); } @@ -2596,7 +2727,8 @@ export class nopeDispatcher implements INopeDispatcher { // Register the Internally Subscribed Observable to mark it as forwarded. const _set01 = - this._internalObservablesForwardDataToNetwork.get(_pubTopic) || new Set(); + this._internalObservablesForwardDataToNetwork.get(_pubTopic) || + new Set(); _set01.add(observable); this._internalObservablesForwardDataToNetwork.set(_pubTopic, _set01); @@ -2652,10 +2784,14 @@ export class nopeDispatcher implements INopeDispatcher { // Unregister the Internally Subscribed Element. const _set01 = - _this._internalObservablesForwardDataToNetwork.get(_pubTopic) || new Set(); + _this._internalObservablesForwardDataToNetwork.get(_pubTopic) || + new Set(); _set01.delete(observable); if (_set01.size > 0) { - _this._internalObservablesForwardDataToNetwork.set(_pubTopic, _set01); + _this._internalObservablesForwardDataToNetwork.set( + _pubTopic, + _set01 + ); } else { _this._internalObservablesForwardDataToNetwork.delete(_pubTopic); @@ -2698,10 +2834,14 @@ export class nopeDispatcher implements INopeDispatcher { // Unregister the Internally Subscribed Element. const _set01 = - _this._internalObservablesForwardDataToNetwork.get(_pubTopic) || new Set(); + _this._internalObservablesForwardDataToNetwork.get(_pubTopic) || + new Set(); _set01.delete(observable); if (_set01.size > 0) { - _this._internalObservablesForwardDataToNetwork.set(_pubTopic, _set01); + _this._internalObservablesForwardDataToNetwork.set( + _pubTopic, + _set01 + ); } else { _this._internalObservablesForwardDataToNetwork.delete(_pubTopic); @@ -2952,7 +3092,9 @@ export class nopeDispatcher implements INopeDispatcher { // Define the Message const message: IAvailableTopicsMsg = { dispatcher: this.id, - published: Array.from(this._internalObservablesForwardDataToNetwork.keys()), + published: Array.from( + this._internalObservablesForwardDataToNetwork.keys() + ), subscribed: Array.from(this._externalTopicLinkedWithObservable.keys()) }; @@ -2967,11 +3109,13 @@ export class nopeDispatcher implements INopeDispatcher { } /** - * Internal Function, which will enable to + * Internal Function, which will enable to * update the lists of internal subscribed and published values. - * @param considerHidden + * @param considerHidden */ - protected _updateListsOfInternallySubscribedAndPublishedValues(considerHidden = true): void { + protected _updateListsOfInternallySubscribedAndPublishedValues( + considerHidden = true + ): void { // Clear the Overview this._internalSubscribed.clear(); this._internalPublished.clear(); @@ -3054,10 +3198,10 @@ export class nopeDispatcher implements INopeDispatcher { public performCall( serviceName: string, params: any[], - options: (Partial & { - selector?: ValidDefaultSelectors | ValidSelectorFunction, - quite?: boolean - }) = {} + options: Partial & { + selector?: ValidDefaultSelectors | ValidSelectorFunction; + quite?: boolean; + } = {} ): INopePromise { // Get a Call Id const _taskId = generateId(); @@ -3101,12 +3245,12 @@ export class nopeDispatcher implements INopeDispatcher { if (_this._logger?.enabledFor(Logger.DEBUG)) { _this._logger.debug( "Dispatcher \"" + - this.id + - "\" requesting externally Function \"" + - serviceName + - "\" with task: \"" + - _taskId + - "\"" + this.id + + "\" requesting externally Function \"" + + serviceName + + "\" with task: \"" + + _taskId + + "\"" ); } @@ -3155,9 +3299,8 @@ export class nopeDispatcher implements INopeDispatcher { // Description of the Callback and register // the callback inside of the Dispatcher - const deleteAfterCalling = _options.deletableCallbacks.includes( - idx - ); + const deleteAfterCalling = + _options.deletableCallbacks.includes(idx); const _func = _this.registerFunction(contentOfParameter, { deleteAfterCalling, preventSendingToRegistery: true @@ -3203,9 +3346,13 @@ export class nopeDispatcher implements INopeDispatcher { if (_this._amountOfServicesProvidedByName.get(serviceName) > 1) { if (typeof options?.selector === "function") { - const selector = await this._generateSelector(serviceName, "service", options.selector); + const selector = await this._generateSelector( + serviceName, + "service", + options.selector + ); // Assign the Selector Promise - selector.selectionSucessPromise.catch(_ => { + selector.selectionSucessPromise.catch((_) => { _this.cancelTask( _taskId, new Error( @@ -3217,9 +3364,13 @@ export class nopeDispatcher implements INopeDispatcher { // Assign the Selector: taskRequest.selector = selector.selectorFuncOptions; } else { - const selector = await this._generateAndRegisterSelectorFunction(serviceName, "service", typeof options?.selector === "string" ? options.selector : "first"); + const selector = await this._generateAndRegisterSelectorFunction( + serviceName, + "service", + typeof options?.selector === "string" ? options.selector : "first" + ); // Assign the Selector Promise - selector.selectionSucessPromise.catch(_ => { + selector.selectionSucessPromise.catch((_) => { _this.cancelTask( _taskId, new Error( @@ -3242,12 +3393,12 @@ export class nopeDispatcher implements INopeDispatcher { if (_this._logger?.enabledFor(Logger.DEBUG)) { _this._logger.debug( "Dispatcher \"" + - this.id + - "\" putting task \"" + - _taskId + - "\" on: \"" + - _this._getServiceName(taskRequest.functionId, "request") + - "\"" + this.id + + "\" putting task \"" + + _taskId + + "\" on: \"" + + _this._getServiceName(taskRequest.functionId, "request") + + "\"" ); } @@ -3258,8 +3409,8 @@ export class nopeDispatcher implements INopeDispatcher { _taskId, new Error( "TIMEOUT. The Service allowed execution time of " + - options.timeout.toString() + - "[ms] has been excided" + options.timeout.toString() + + "[ms] has been excided" ), options.quite || false ); diff --git a/lib/index.browser.ts b/lib/index.browser.ts index a3fcb2d..7a43172 100644 --- a/lib/index.browser.ts +++ b/lib/index.browser.ts @@ -2,9 +2,30 @@ * @author Martin Karkowski * @email m.karkowski@zema.de * @create date 2021-08-04 15:46:01 - * @modify date 2021-08-04 15:46:01 + * @modify date 2021-08-27 20:23:38 * @desc [description] */ -export * as communcation from "./communication/index.browser"; +import "reflect-metadata"; +import * as communcation from "./communication/index.browser"; +import * as dispatcher from "./dispatcher/index"; +import * as helpers from "./helpers/index.browser"; +import * as loader from "./loader/index.browser"; +import * as logger from "./logger/index.browser"; +import * as modules from "./module/index"; +import * as observables from "./observables/index"; +import * as promises from "./promise/index"; +import * as types from "./types/index"; +export * from "./logger/index.browser"; +export { + communcation, + dispatcher, + helpers, + loader, + logger, + types, + modules, + observables, + promises +}; diff --git a/lib/index.nodejs.ts b/lib/index.nodejs.ts index 7339638..889a228 100644 --- a/lib/index.nodejs.ts +++ b/lib/index.nodejs.ts @@ -2,10 +2,34 @@ * @author Martin Karkowski * @email m.karkowski@zema.de * @create date 2021-08-04 15:46:01 - * @modify date 2021-08-04 15:47:56 + * @modify date 2021-08-27 20:23:34 * @desc [description] */ -export { readInArgs as readInRunArgs, runNopeBackend } from "./cli/runNopeBackend"; -export * as communcation from "./communication/index.nodejs"; +import "reflect-metadata"; +import * as communcation from "./communication/index.nodejs"; +import * as dispatcher from "./dispatcher/index"; +import * as helpers from "./helpers/index.nodejs"; +import * as loader from "./loader/index.nodejs"; +import * as logger from "./logger/index.nodejs"; +import * as modules from "./module/index"; +import * as observables from "./observables/index"; +import * as promises from "./promise/index"; +import * as types from "./types/index"; +export { + readInArgs as readInRunNopeBackendArgs, + runNopeBackend +} from "./cli/runNopeBackend"; +export * from "./logger/index.nodejs"; +export { + communcation, + dispatcher, + helpers, + loader, + logger, + types, + modules, + observables, + promises +}; diff --git a/lib/observables/nopeObservable.ts b/lib/observables/nopeObservable.ts index 4fc512d..93ac93e 100644 --- a/lib/observables/nopeObservable.ts +++ b/lib/observables/nopeObservable.ts @@ -30,7 +30,8 @@ import { * - enables performing a subscription with synced call or a immediate call. */ export class NopeObservable - implements INopeObservable { + implements INopeObservable +{ public observable: BehaviorSubject = new BehaviorSubject(undefined); public readonly id: string = generateId(); @@ -87,7 +88,7 @@ export class NopeObservable this._value = adapted.data; } else { // Adapt the Value if required. - this._value = (value as any) as T; + this._value = value as any as T; } const valueToPublish = this.getContent(); @@ -205,7 +206,7 @@ export class NopeObservable */ public getContent(): G | null { if (this.getter !== null) return this.getter(this._value); - return (this._value as any) as G; + return this._value as any as G; } /** @@ -311,7 +312,7 @@ export class NopeObservable pipe?: IPipe; } = {} ): Subscription { - let observable: Observable = (this as any) as Observable; + let observable: Observable = this as any as Observable; if (options.pipe) { observable = options.pipe(options.scope, this.observable); @@ -357,7 +358,8 @@ export class NopeObservable * @param options Additional Options for the Wait Function. */ public waitFor( - testCallback: IwaitForCallback, + testCallback: IwaitForCallback = (value) => + (value as any as boolean) == true, options: INopeWaitForOpitions = { testCurrent: true } ): Promise { const _this = this; diff --git a/lib/symbols/identifiers.ts b/lib/symbols/identifiers.ts index 3004459..ab403bd 100644 --- a/lib/symbols/identifiers.ts +++ b/lib/symbols/identifiers.ts @@ -1,6 +1,14 @@ -export const DISPATCHER_INSTANCE = Symbol('nope.dispatcher.instance'); -export const DISPATCHER_OPTIONS = Symbol('nope.dispatcher.options'); -export const OBSERVABLE_FACTORY = Symbol('nope.observable.factory'); -export const OBSERVABLE_INSTANCE = Symbol('nope.observable.instance'); -export const COMMUNICATION_LAYER = Symbol('nope.communication.layer'); -export const LOADER = Symbol('nope.package.loader'); \ No newline at end of file +/** + * @author Martin Karkowski + * @email m.karkowski@zema.de + * @create date 2021-08-27 20:31:42 + * @modify date 2021-08-27 20:31:42 + * @desc [description] + */ + +export const DISPATCHER_INSTANCE = Symbol("nope.dispatcher.instance"); +export const DISPATCHER_OPTIONS = Symbol("nope.dispatcher.options"); +export const OBSERVABLE_FACTORY = Symbol("nope.observable.factory"); +export const OBSERVABLE_INSTANCE = Symbol("nope.observable.instance"); +export const COMMUNICATION_LAYER = Symbol("nope.communication.layer"); +export const LOADER = Symbol("nope.package.loader"); diff --git a/lib/types/nope/nopeObservable.interface.ts b/lib/types/nope/nopeObservable.interface.ts index 1024d77..cae7b6d 100644 --- a/lib/types/nope/nopeObservable.interface.ts +++ b/lib/types/nope/nopeObservable.interface.ts @@ -205,7 +205,7 @@ export interface INopeObservable { * @param options Additional Options for the Wait Function. */ waitFor( - testCallback: IwaitForCallback, + testCallback?: IwaitForCallback, options?: INopeWaitForOpitions ): Promise;