import { generateId } from '../helpers/idMethods'; import { Logger } from "winston"; /** * A Layer to communicate. * * @export * @interface ICommunicationInterface */ export interface ICommunicationInterface { readonly subscriptionMode?: 'individual' | 'generic', readonly resultSharing?: 'individual' | 'generic', /** * Funciton to emit a RPC Request. * * @param {string} name Name of the request * @param {requestTaskMsg} request The Request. * @memberof ICommunicationInterface */ emitRpcRequest(name: string, request: requestTaskMsg): void; /** * Function used to subscribe to updates. If there exist * some state change in the Communication Interface it should * be provided on the registered functions * * @param {string} name The Id of the Method. * @param {responseTaskMsg} result * @memberof ICommunicationInterface */ emitRpcResult(name: string, result: responseTaskMsg): void; /** * Function used to subscribe to RPC Results. Each method / function * can provide a unique eventname for the results. * * @param {string} name The Id of the Method. * @param {(result: responseTaskMsg) => void} cb The callback which should be called * @memberof ICommunicationInterface */ onRpcResult(name: string, cb: (result: responseTaskMsg) => void): void; /** * Function used to unsubscribe from Rpc-Results * * @param {string} name The Id of the Method. * @param {(result: responseTaskMsg) => void} cb The callback which should be called * @memberof ICommunicationInterface */ offRpcResponse(name: string, cb: (result: responseTaskMsg) => void): void; /** * Function to register RPC-Request. * * @param {string} name the name to listen for the request. * @param {(data: requestTaskMsg) => void} cb The callback which should be called, if a request for the method is detected * @memberof ICommunicationInterface */ onRpcRequest(name: string, cb: (data: requestTaskMsg) => void): void; /** * Unregister a listener for a RPC-Request. * * @param {string} name * @param {(data: requestTaskMsg) => void} cb * @memberof ICommunicationInterface */ offRpcRequest(name: string, cb: (data: requestTaskMsg) => void): void; /** * Function to Emit new Services. They will then be shared to other sytems. * * @param {availableServices} services * @memberof ICommunicationInterface */ emitNewServicesAvailable(services: availableServices): void; /** * Function to register a new Callback, which will be called if new Services are available. * * @param {() => void} cb The Callback to Call. * @memberof ICommunicationInterface */ onNewServicesAvailable(cb: (services: availableServices) => void); } export type requestOfService = { type: 'requestOfService' taskId: string, functionId: string, } export type requestTaskMsg = { type: 'requestOfTask', /** * UUID of a Task * * @type {string} */ taskId: string, /** * ID of the Function, on which it is available. * * @type {string} */ functionId: string, /** * The Parameters * * @type {{ * idx: number, * data: any * }[]} */ params: { /** * Index of the Parameter * * @type {number} */ idx: number, data: any }[] /** * Callbacks, that are available in a Dispatcher. * * @type {(({ * functionId: string, * idx: number, * deleteAfterCalling: boolean, * } & callOptions)[])} */ callbacks: ({ functionId: string, idx: number, deleteAfterCalling: boolean, } & callOptions)[] /** * Element, allowing to describe where the result should be hosted. * * @type {string} */ resultSink: string // /** // * Flag, indicating, // * // * @type {boolean} // */ // deleteAfterCalling: boolean; } export type callOptions = { /** * Array containing indexes of callbacks which could be deleted * * @type {Array} */ deletableCallbacks?: Array; /** * Flag, showing that the parameters set has no callback * * @type {boolean} */ paramsHasNoCallback?: boolean; /** * Flag to disable error-testing * * @type {boolean} */ dynamicCallback?: boolean; /** * Element, allowing to describe where the result should be hosted. * * @type {string} */ resultSink: string } export type responseTaskMsg = { type: 'response', /** * ID of the Task. * * @type {string} */ taskId: string, /** * Property containing the result. Is * only present, if no error exists. * * @type {*} */ result?: any, /** * Property containing the error, if * it occourd. * * @type {*} */ error?: any } export type availableServices = { /** * The Id of the Dispatcher * * @type {string} */ dispatcher: string, /** * The List of registered Service. * * @type {string[]} */ services: string[] } export type nopeDispatcherOptions = { communicator: ICommunicationInterface } /** * A Dispatcher to perform a function on a Remote * Dispatcher. Therefore a Task is created and forwarded * to the remote. * * @export * @class nopeDispatcher */ export class nopeDispatcher { public readonly id: string; protected _logger: Logger; /** * Internal Element to store the registered Functions * * @protected * @memberof nopeDispatcher */ protected _definedFunctions: Map Promise>; protected _remotlyCalledFunctions: Set; protected _communicatorCallbacks: Map any }>; protected _communicator: ICommunicationInterface; protected _mappingOfRemoteDispatchersAndFunctions: Map; protected _externalServices: Set; public methodInterfaceWithOptions: { [index: string]: (optins: callOptions, ...args) => Promise } public methodInterface: { [index: string]: (...args) => Promise } /** * Internal Element to store the running tasks. * * @protected * @memberof nopeDispatcher */ protected _runningTasks: Map void; reject: (error: any) => void; }>; readonly _subscriptionMode: 'individual' | 'generic'; readonly _resultSharing: 'individual' | 'generic'; /** * Creates an instance of nopeDispatcher. * @param {ICommunicationInterface} communicator The Communication Layer which should be used. * @memberof nopeDispatcher */ constructor(public options: nopeDispatcherOptions) { this._communicator = options.communicator; this._subscriptionMode = this._communicator.subscriptionMode || 'generic'; this._resultSharing = this._communicator.resultSharing || 'generic'; this.id = generateId(); /** * Define A Proxy for accessing methods easier. */ const _this = this; const _handlerWithOptions = { get(target, name) { return async (options: callOptions, ...args) => { return _this.performCall(name, args, options); } } } const _handlerWithoutOptions = { get(target, name) { return async (...args) => { return _this.performCall(name, args); } } } this.methodInterfaceWithOptions = new Proxy({}, _handlerWithOptions); this.methodInterface = new Proxy({}, _handlerWithoutOptions); this.reset(); this.init(); } /** * Internal Method to handle some requests. * * @protected * @param {requestTaskMsg} data The provided data of the request * @param {*} [_function=this._definedFunctions.get(data.functionId)] The Function can be provided * @return {Promise} * @memberof nopeDispatcher */ protected async _handleExternalRequest(data: requestTaskMsg, _function?: (...args) => Promise): Promise { try { // Try to get the function if not provided: if (typeof _function !== 'function') { _function = this._definedFunctions.get(data.functionId); } const _this = this; if (typeof _function === 'function') { // Only if the Function is present extract the arguments etc. const args = []; // First extract the basic arguments data.params.map(item => args[item.idx] = item.data); // Add the Callbacks. Therefore create a function which will // trigger the remote. data.callbacks.map(options => args[options.idx] = async (..._args) => { return await _this.performCall(options.functionId, _args, options); }); // Perform the Task it self. const _result = await _function(...args); // Define the Result message const result: responseTaskMsg = { result: typeof (_result) !== 'undefined' ? _result : null, taskId: data.taskId, type: 'response' } // Use the communicator to publish the event. this._communicator.emitRpcResult(data.resultSink,result); } } catch (error) { // An Error occourd => Forward the Error. const result: responseTaskMsg = { error, taskId: data.taskId, type: 'response' } // Send the Error via the communicator to the remote. this._communicator.emitRpcResult(data.resultSink, result); } } /** * Internal Function to handle responses. In Generale, * the dispatcher checks if there is an open task with * the provided id. If so => finish the promise. * * @protected * @param {responseTaskMsg} data The Data provided to handle the Response. * @return {boolean} Returns a boolean, indicating whether a corresponding task was found or not. * @memberof nopeDispatcher */ protected _handleExternalResponse(data: responseTaskMsg): boolean { try { // Extract the Task const task = this._runningTasks.get(data.taskId); // Delete the Task: this._runningTasks.delete(data.taskId); // Based on the Result of the Remote => if (task && data.error) { task.reject(data.error); return true; } if (task) { task.resolve(data.result); return true; } } catch (e) { } return false; } /** * Internal Function, used to initialize the Dispatcher. * It subscribes to the "Messages" of the communicator. * * @protected * @memberof nopeDispatcher */ protected init(): void { const _this = this; // Based on the Mode of the Subscription => // either create indivdual topics for the methods // or use the generice function. switch (this._subscriptionMode) { case 'individual': // Iterate over the Defined Functions. for (const [id, cb] of this._definedFunctions.entries()) { // Subscribe the Function this._subscribeToFunction(id, cb); } break; case 'generic': // Add a generic Subscription for callbacks: this._communicator.onRpcRequest('request', (data: requestTaskMsg) => { if (data.type === 'requestOfTask') { _this._handleExternalRequest(data); } }); // Subscribe to Responses this._communicator.onRpcResult('response', (data: responseTaskMsg) => { if (data.type === 'response') { _this._handleExternalResponse(data); } }); break; } // Subscribe to the availableServices of Remotes. // If there is a new Service => udpate the External Services this._communicator.onNewServicesAvailable((data: availableServices) => { try { if (data.dispatcher !== this.id){ _this._mappingOfRemoteDispatchersAndFunctions.set(data.dispatcher, data); _this._updateExternalServices(); } } catch (e) { } }); } /** * Function to update the used Services. * * @protected * @memberof serviceRegistry */ protected _updateExternalServices() { const _this = this; // Clear the Services this._externalServices.clear(); for (const dispatcherInfo of this._mappingOfRemoteDispatchersAndFunctions.values()) { dispatcherInfo.services.map(service => _this._externalServices.add(service)); } if (this._logger?.isDebugEnabled()) { // If there is a Logger: this._logger.debug( 'new services found: \n' + JSON.stringify( Array.from(this._externalServices), undefined, 4 ) ); } } /** * Function to test if a specific Service exists. * * @param {string} id The Id of the Serivce * @return {boolean} The result of the Test. True if either local or remotly a service is known. * @memberof nopeDispatcher */ public serviceExists(id: string){ return this._definedFunctions.has(id) || this._externalServices.has(id); } /** * Function to adapt a Request name. * Only used internally * * @protected * @param {string} id the original ID * @return {string} the adapted ID. * @memberof nopeDispatcher */ protected _getName(id: string, type: 'request' | 'response'): string { return id.startsWith(`${type}/`) ? id : `${type}/${id}`; } /** * Internal Helper Function to subscribe to a Function element. * * @protected * @param {string} id the Id of the function * @param {(...args) => Promise} _cb The Callback of the Function * @return {void} the adapted ID. * @memberof nopeDispatcher */ protected _subscribeToFunction(id: string, _cb: (...args) => Promise): void { const _req = this._getName(id, 'request'); if ( this._subscriptionMode === 'individual' && !this._communicatorCallbacks.has(id) ) { const _this = this; // Define a Function. const cb = (data: requestTaskMsg) => { if (data.type === 'requestOfTask') { _this._handleExternalRequest(data, _cb); } }; // Add the Callback. this._communicatorCallbacks.set(id, { registeredId: _req, type: 'request', cb }); // Register Functions. this._communicator.onRpcRequest(_req, cb); } } protected _subscribeToResult(id: string, deleteAfterCalling: boolean): void { const _res = this._getName(id, 'response'); if ( this._subscriptionMode === 'individual' && !this._communicatorCallbacks.has(id) ) { const _this = this; // Define a Function. const cb = (data: responseTaskMsg) => { if (data.type === 'response') { if (_this._handleExternalResponse(data)) { _this._removeSubscription(id); } } }; // Add the Callback. this._communicatorCallbacks.set(id, { registeredId: _res, type: 'response', cb }); // Register Functions. this._communicator.onRpcResult(_res, cb); } } /** * Function to register a Function in the Dispatcher * * @param {(...args) => Promise} func The function which should be called if a request is mapped to the Function. * @param {{ * // Flag to enable unregistering the function after calling. * deleteAfterCalling?: boolean, * // Instead of generating a uuid an id could be provided * id?: string; * }} [options={}] Options to enhance the registered ID and enabling unregistering the Element after calling it. * @return {*} {(...args) => Promise} The registered Function * @memberof nopeDispatcher */ public registerFunction(func: (...args) => Promise, options: { /** Flag to enable unregistering the function after calling. */ deleteAfterCalling?: boolean, /** Instead of generating a uuid an id could be provided */ id?: string; /** Flag to enable / disable sending to registery */ preventSendingToRegistery?: boolean; } = {}): (...args) => Promise { const _this = this; // Define / Use the ID of the Function. const _id = options.id || generateId(); let _func = func; if (options.deleteAfterCalling) { _func = async (...args) => { // Unregister the Method _this.unregistFunction(_id, { preventSendingToRegistery: options.preventSendingToRegistery }); // Return the Result of the Original Function. return await func(...args); } } // Define a ID for the Function _func['id'] = _id; // Define the callback. _func['unregister'] = () => _this.unregistFunction(_id); // Reister the Function this._definedFunctions.set(_func['id'], _func); // Register the Callback: this._subscribeToFunction(_id, _func); if (!options.preventSendingToRegistery) { // Publish the Available Services. this._sendAvailableServices(); } // Return the Function. return _func; } /** * Function to unregister a Function from the Dispatcher * @param {(((...args) => void) | string | number)} func The Function to unregister * @return {*} {boolean} Flag, whether the element was removed (only if found) or not. * @memberof nopeDispatcher */ public unregistFunction(func: ((...args) => void) | string, options: { /** Flag to enable / disable sending to registery */ preventSendingToRegistery?: boolean; } = {}): boolean { const _id = typeof func === 'string' ? func : func['id'] as string || '0'; this._removeSubscription(_id); if (!options.preventSendingToRegistery) { // Publish the Available Services. this._sendAvailableServices(); } return this._definedFunctions.delete(_id); } protected _removeSubscription(_id: string){ // Try to unregister the Callback from the communcator: if (this._communicatorCallbacks.has(_id)) { const _callbacks = this._communicatorCallbacks.get(_id); switch (_callbacks.type) { case 'request': // Unregister the RPC-Request-Listener this._communicator.offRpcRequest(_callbacks.registeredId, _callbacks.cb); break; case 'response': // Unregister the RPC-Response-Listener this._communicator.offRpcResponse(_callbacks.registeredId, _callbacks.cb); break; } // Remove the Callback this._communicatorCallbacks.delete(_id); } } /** * Function used to update the Available Services. * * @protected * @memberof nopeDispatcher */ protected _sendAvailableServices() { // Define the Message const message: availableServices = { dispatcher: this.id, services: Array.from(this._definedFunctions.keys()) } // Send the Message. this._communicator.emitNewServicesAvailable(message); } /** * Function which is used to perform a call on the remote. * * @template T * @param {string} functionName The Name / ID of the Function * @param {any[]} params The provided Parameters. * @param {({ * deletableCallbacks: Array; * })} [options={ * deletableCallbacks: [], * paramsHasNoCallback: false, * preventErrorTest: false * }] You could additiona Optionf for the callback. * @return {*} {Promise} The result of the Operation * @memberof nopeDispatcher */ public performCall(functionName: string, params: any[], options: Partial = {}): Promise { // Get a Call Id const _taskId = generateId(); const _registeredIdx: Array = []; const _this = this; const _options = Object.assign({ deletableCallbacks: [], paramsHasNoCallback: false, dynamicCallback: false, resultSink: (this._resultSharing === 'generic' ? 'response' : this._getName(functionName, 'response')) as string } as callOptions, options) as callOptions; this._subscribeToResult(functionName, _options.dynamicCallback) // Define a Callback-Function, which will expect the Task. return new Promise((resolve, reject) => { try { // Register the Handlers, _this._runningTasks.set(_taskId, { resolve, reject }); // Define a Task-Request const taskRequest: requestTaskMsg = { functionId: functionName, params, callbacks: [], taskId: _taskId, type: 'requestOfTask', resultSink: _options.resultSink } // Test if there is no Callback integrated if (!options.paramsHasNoCallback) { // If so, the parameters has to be detailled: // 1. Reset the Params list: taskRequest.params = []; // 2. Iterate over all Parameters and // Determin Callbacks. Based on the Parameter- // Type assign it either to packet.params ( // for parsable Parameters) and packet.callbacks // (for callback Parameters) for (const [idx, contentOfParameter] of params.entries()) { // Test if the parameter is a Function if (typeof contentOfParameter !== "function") { taskRequest.params.push({ idx, data: contentOfParameter }); } else { // The Parameter is a Callback => store a // Description of the Callback and register // the callback inside of the Dispatcher const deleteAfterCalling = _options.deletableCallbacks.includes(idx); const _func = _this.registerFunction(contentOfParameter, { deleteAfterCalling, preventSendingToRegistery: true, }); _registeredIdx.push(_func['id']); // Register the Callback taskRequest.callbacks.push({ functionId: _func['id'], idx, deleteAfterCalling, dynamicCallback: true, deletableCallbacks: [], resultSink: _this._resultSharing === 'generic' ? 'response' : _this._getName(_func['id'], 'response') }); } } } if (!_options.dynamicCallback && !_this.serviceExists(functionName)) { // Create an Error: const error = new Error('No Service Provider known for "' + functionName + '"'); if (_this._logger){ _this._logger.error('No Service Provider known for "' + functionName + '"'); _this._logger.error(error) } throw error; } // Send the Message to the specific element: if (_this._subscriptionMode === 'individual') { _this._communicator.emitRpcRequest(_this._getName(taskRequest.functionId, 'request'), taskRequest); } else { _this._communicator.emitRpcRequest('request', taskRequest); } } catch (e) { // Delete all Callbacks. _registeredIdx.map(id => _this.unregistFunction(id)); // Remove the task: if (_this._runningTasks.has(_taskId)) _this._runningTasks.delete(_taskId); // Throw an error. reject(e); } }); } /** * Function to clear all pending tasks * * @memberof nopeDispatcher */ public clearTasks(): void { if (this._runningTasks) this._runningTasks.clear(); else this._runningTasks = new Map void; reject: (error: any) => void; }>(); } /** * Function to unregister all Functions of the Dispatcher. * * @memberof nopeDispatcher */ public unregisterAll(): void { if (this._definedFunctions) { for (const id of this._definedFunctions.keys()) { this.unregistFunction(id) } this._definedFunctions.clear(); } else { this._definedFunctions = new Map Promise>(); } // Reset the Callbacks. this._communicatorCallbacks = new Map(); } /** * Function to reset the Dispatcher. * * @memberof nopeDispatcher */ public reset(): void { this._remotlyCalledFunctions = new Set(); this._externalServices = new Set(); this._mappingOfRemoteDispatchersAndFunctions = new Map(); this.clearTasks(); this.unregisterAll(); } }