diff --git a/lib/dispatcher/nopeDispatcher.ts b/lib/dispatcher/nopeDispatcher.ts index cbadc2e..e0ed80e 100644 --- a/lib/dispatcher/nopeDispatcher.ts +++ b/lib/dispatcher/nopeDispatcher.ts @@ -37,8 +37,14 @@ export interface ICommunicationInterface { send(topic: string, data): void; } -export type requestTask = { - type: 'request', +export type errorTestMsg = { + type: 'requestOfService' + taskId: number, + functionId: string, +} + +export type requestTaskMsg = { + type: 'requestOfTask', taskId: number, functionId: string, params: { @@ -52,13 +58,15 @@ export type requestTask = { } & callOptions)[] } + + export type callOptions = { deletableCallbacks: Array; - noCallbackItegrated?: boolean; - noErrorTest?: boolean; + paramsHasNoCallback?: boolean; + preventErrorTest?: boolean; } -export type responseOfTask = { +export type responseTaskMsg = { type: 'response', taskId: number, result?: any, @@ -94,6 +102,7 @@ export class nopeDispatcher { * @memberof nopeDispatcher */ protected _definedFunctions: Map Promise>; + protected _remotlyCalledFunctions: Set; protected _communicatorCallbacks: Map any>; protected _communicator: ICommunicationInterface @@ -129,12 +138,12 @@ export class nopeDispatcher { * Internal Method to handle some requests. * * @protected - * @param {requestTask} data The provided data of the request + * @param {requestTaskMsg} data The provided data of the request * @param {*} [_function=this._definedFunctions.get(data.functionId)] The Function can be provided * @return {Promise} * @memberof nopeDispatcher */ - protected async _handleExternalRequest(data: requestTask, _function?: (...args) => Promise): Promise { + protected async _handleExternalRequest(data: requestTaskMsg, _function?: (...args) => Promise): Promise { try { // Try to get the function if not provided: @@ -153,15 +162,15 @@ export class nopeDispatcher { // Add the Callbacks. Therefore create a function which will // trigger the remote. - data.callbacks.map(item => args[item.idx] = async (..._args) => { - return await _this.performCall(item.functionId, _args); + data.callbacks.map(options => args[options.idx] = async (..._args) => { + return await _this.performCall(options.functionId, _args, options); }); // Perform the Task it self. const _result = await _function(...args); // Define the Result message - const result: responseOfTask = { + const result: responseTaskMsg = { result: typeof (_result) !== 'undefined' ? _result : null, taskId: data.taskId, type: 'response' @@ -174,7 +183,7 @@ export class nopeDispatcher { } catch (error) { // An Error occourd => Forward the Error. - const result: responseOfTask = { + const result: responseTaskMsg = { error, taskId: data.taskId, type: 'response' @@ -191,11 +200,11 @@ export class nopeDispatcher { * the provided id. If so => finish the promise. * * @protected - * @param {responseOfTask} data The Data provided to handle the Response. + * @param {responseTaskMsg} data The Data provided to handle the Response. * @return {void} Nothing * @memberof nopeDispatcher */ - protected _handleExternalResponse(data: responseOfTask): void { + protected _handleExternalResponse(data: responseTaskMsg): void { try { // Extract the Task const task = this._runningTasks.get(data.taskId); @@ -238,8 +247,8 @@ export class nopeDispatcher { break; case 'generic': // Add a generic Subscription for callbacks: - this._communicator.on('request', (data: requestTask) => { - if (data.type === 'request') { + this._communicator.on('request', (data: requestTaskMsg) => { + if (data.type === 'requestOfTask') { _this._handleExternalRequest(data); } }); @@ -247,7 +256,7 @@ export class nopeDispatcher { } // Subscribe to Responses - this._communicator.on('response', (data: responseOfTask) => { + this._communicator.on('response', (data: responseTaskMsg) => { if (data.type === 'response') { _this._handleExternalResponse(data); } @@ -267,8 +276,8 @@ export class nopeDispatcher { const _this = this; // Define a Function. - const func = (data: requestTask) => { - if (data.type === 'request') { + const func = (data: requestTaskMsg) => { + if (data.type === 'requestOfTask') { _this._handleExternalRequest(data, cb); } }; @@ -304,20 +313,16 @@ export class nopeDispatcher { } = {}): (...args) => Promise { const _this = this; - let _id = options.id || generateId(); - - // if (this.options.subscriptionMode === 'generic') { - // _id = this._getRequestName(_id); - // } - - _id = this._getRequestName(_id); + const _id = this._getRequestName(options.id || generateId()); let _func = func; if (options.deleteAfterCalling) { _func = async (...args) => { // Unregister the Method - _this.unregistFunction(_id); + _this.unregistFunction(_id, { + preventSendingToRegistery: options.preventSendingToRegistery + }); // Return the Result of the Original Function. return await func(...args); } @@ -335,9 +340,10 @@ export class nopeDispatcher { // Register the Callback: this._subscribeToFunction(_id, _func); - if (!options.preventSendingToRegistery) + if (!options.preventSendingToRegistery) { // Publish the Available Services. this._sendAvailableServices(); + } // Return the Function. return _func; @@ -349,7 +355,10 @@ export class nopeDispatcher { * @return {*} {boolean} Flag, whether the element was removed (only if found) or not. * @memberof nopeDispatcher */ - public unregistFunction(func: ((...args) => void) | string | number): boolean { + public unregistFunction(func: ((...args) => void) | string, options: { + /** Flag to enable / disable sending to registery */ + preventSendingToRegistery?: boolean; + } = {}): boolean { const _id = typeof func === 'string' ? func : func['id'] as string || '0'; // Try to unregister the Callback from the communcator: @@ -360,8 +369,10 @@ export class nopeDispatcher { this._communicatorCallbacks.delete(_id); } - // Publish the Available Services. - this._sendAvailableServices(); + if (!options.preventSendingToRegistery) { + // Publish the Available Services. + this._sendAvailableServices(); + } return this._definedFunctions.delete(_id); } @@ -393,23 +404,21 @@ export class nopeDispatcher { * @param {({ * deletableCallbacks: Array; * })} [options={ - * deletableCallbacks: [] - * }] You could provide the index of removeable callbacks. + * deletableCallbacks: [], + * paramsHasNoCallback: false, + * preventErrorTest: false + * }] You could additiona Optionf for the callback. * @return {*} {Promise} The result of the Operation * @memberof nopeDispatcher */ - public performCall(functionName: string, params: any[], options: { - deletableCallbacks: Array; - noCallbackItegrated?: boolean; - noErrorTest?: boolean; - } = { - deletableCallbacks: [], - noCallbackItegrated: false, - noErrorTest: true - }): Promise { + public performCall(functionName: string, params: any[], options: callOptions = { + deletableCallbacks: [], + paramsHasNoCallback: false, + preventErrorTest: false + }): Promise { // Get a Call Id const _taskId = generateId(); - const _registeredIdx: Array = []; + const _registeredIdx: Array = []; const _this = this; // Define a Callback-Function, which will expect the Task. @@ -422,51 +431,42 @@ export class nopeDispatcher { reject }); - // Test if there is no Callback integrated => Speedup the Function. - if (options.noCallbackItegrated) { - // Perform the call. Therefore create the data package. - const packet: requestTask = { - functionId: this._getRequestName(functionName), - params, - callbacks: [], - taskId: _taskId, - type: 'request' - } + // Define a Task-Request + const taskRequest: requestTaskMsg = { + functionId: this._getRequestName(functionName), + params, + callbacks: [], + taskId: _taskId, + type: 'requestOfTask' + } - // Send the Message, to the general Request Path: - _this._communicator.send('request', packet); + // Test if there is no Callback integrated + if (!options.paramsHasNoCallback) { + // If so, the parameters has to be detailled: - // Send the Message to the specific element: - if (_this.options.subscriptionMode === 'individual') { - _this._communicator.send(packet.functionId, packet); - } + // 1. Reset the Params list: + taskRequest.params = []; - // leave the Method. - return; - } else { + // 2. Iterate over all Parameters and + // Determin Callbacks. Based on the Parameter- + // Type assign it either to packet.params ( + // for parsable Parameters) and packet.callbacks + // (for callback Parameters) + for (const [idx, contentOfParameter] of params.entries()) { - const _parameters: { - idx: number, - data: any, - }[] = []; - - const _callbacks: ({ - functionId: string, - idx: number, - deleteAfterCalling: boolean, - } & callOptions)[] = []; - - // Detail the Parameters. - for (const [idx, data] of params.entries()) { - if (typeof data !== "function") { - _parameters.push({ + // Test if the parameter is a Function + if (typeof contentOfParameter !== "function") { + taskRequest.params.push({ idx, - data + data: contentOfParameter }); } else { + // The Parameter is a Callback => store a + // Description of the Callback and register + // the callback inside of the Dispatcher const deleteAfterCalling = options.deletableCallbacks.includes(idx); - const _func = _this.registerFunction(data, { + const _func = _this.registerFunction(contentOfParameter, { deleteAfterCalling, preventSendingToRegistery: true }); @@ -474,35 +474,37 @@ export class nopeDispatcher { _registeredIdx.push(_func['id']); // Register the Callback - _callbacks.push({ + taskRequest.callbacks.push({ functionId: _func['id'], idx, deleteAfterCalling, - noErrorTest: true, + preventErrorTest: true, deletableCallbacks: [] }); } } + } - // Perform the call. Therefore create the data package. - const packet: requestTask = { - functionId: _this._getRequestName(functionName), - params: _parameters, - callbacks: _callbacks, + if (!_this._remotlyCalledFunctions.has(taskRequest.functionId) && !options.preventErrorTest) { + + // Define e Test of an Message. + const errorTestRequest: errorTestMsg = { + functionId: taskRequest.functionId, taskId: _taskId, - type: 'request' - } + type: 'requestOfService' + }; - // Send the Message to the specific element: - if (_this.options.subscriptionMode === 'individual') { - if (!options.noErrorTest) { - // Send the Message, to the general Request Path: - _this._communicator.send('request', packet); - } - _this._communicator.send(packet.functionId, packet); - } else { - _this._communicator.send('request', packet); - } + // Send the Message, to the general Request Path: + _this._communicator.send('requestedService', errorTestRequest); + + _this._remotlyCalledFunctions.add(taskRequest.functionId) + } + + // Send the Message to the specific element: + if (_this.options.subscriptionMode === 'individual') { + _this._communicator.send(taskRequest.functionId, taskRequest); + } else { + _this._communicator.send('request', taskRequest); } } catch (e) { // Delete all Callbacks. @@ -558,6 +560,7 @@ export class nopeDispatcher { * @memberof nopeDispatcher */ public reset(): void { + this._remotlyCalledFunctions = new Set(); this.clearTasks(); this.unregisterAll(); } diff --git a/lib/dispatcher/serviceRegistry.ts b/lib/dispatcher/serviceRegistry.ts index d23ec08..3c16eec 100644 --- a/lib/dispatcher/serviceRegistry.ts +++ b/lib/dispatcher/serviceRegistry.ts @@ -1,6 +1,6 @@ import { Logger } from "winston"; import { getLogger } from "../logger/getLogger"; -import { availableServices, ICommunicationInterface, requestTask, responseOfTask } from './nopeDispatcher'; +import { availableServices, errorTestMsg, ICommunicationInterface, responseTaskMsg } from './nopeDispatcher'; export class serviceRegistry { @@ -27,13 +27,11 @@ export class serviceRegistry { public init() { const _this = this; - this._communicator.on('request', (task: requestTask) => { + this._communicator.on('requestedService', (task: errorTestMsg) => { - _this._logger.debug('Getting Request for ' + task.functionId) - - if (!_this._services.has(task.functionId)) { + if (task.type === 'requestOfService' && !_this._services.has(task.functionId)) { // An Error occourd => Forward the Error. - const result: responseOfTask = { + const result: responseTaskMsg = { error: new Error('No Service Provider known for "' + task.functionId + '"'), taskId: task.taskId, type: 'response'