/** * @author Martin Karkowski * @email m.karkowski@zema.de * @create date 2020-10-12 18:52:00 * @modify date 2020-10-16 14:54:49 * @desc [description] */ import { inject, injectable } from 'inversify'; import { Logger } from "winston"; import { generateId } from '../helpers/idMethods'; import { DISPATCHER_OPTIONS, OBSERVABLE_FACTORY } from '../symbols/identifiers'; import { IAvailableInstanceGenerators, IAvailableServicesMsg, IAvailableTopicsMsg, ICallOptions, ICommunicationInterface, IExternalEventMsg, IInstanceCreation, IInstanceDescription, IInstanceRemoval, INopeRpcDispatcherOptions, IRequestTaskMsg, IResponseTaskMsg } from '../types/communication.interface'; import { IGenerateRemoteInstanceCallback, IGenerateRemoteInstanceForOtherDispatcherCallback, INopeDispatcher } from '../types/nopeDispatcher.interface'; import { INopeModule } from '../types/nopeModule.interface'; import { INopeObservable, IPipe } from '../types/nopeObservable.interface'; /** * A Dispatcher to perform a function on a Remote * Dispatcher. Therefore a Task is created and forwarded * to the remote. * * @export * @class nopeDispatcher */ @injectable() export class nopeDispatcher implements INopeDispatcher { public readonly id: string; protected _logger: Logger; /** * Internal Element to store the registered Functions * * @protected * @memberof nopeDispatcher */ protected _definedFunctions: Map Promise>; protected _remotlyCalledFunctions: Set; protected _communicatorCallbacks: Map any }>; protected _communicator: ICommunicationInterface; protected _mappingOfRemoteDispatchersAndFunctions: Map; protected _externalProvidedServices: Set; protected _mappingOfRemoteDispatchersAndGenerators: Map; protected _externalProvidedGenerators: Set; public methodInterfaceWithOptions: { [index: string]: (optins: ICallOptions, ...args) => Promise } public methodInterface: { [index: string]: (...args) => Promise } protected _mappingOfRemoteDispatchersAndTopics: Map; protected _externalSubscribed: Set; protected _externalPublished: Set; /** * Internal Element to store the running tasks. * * @protected * @memberof nopeDispatcher */ protected _runningTasks: Map void; reject: (error: any) => void; clear: () => void; }>; readonly _subscriptionMode: 'individual' | 'generic'; readonly _resultSharing: 'individual' | 'generic'; /** * Creates an instance of nopeDispatcher. * @param {nopeRpcDispatcherOptions} options The Options, used by the Dispatcher. * @param {() => INopeObservable} _generateObservable A Helper, to generate Observables. * @memberof nopeDispatcher */ constructor( @inject(DISPATCHER_OPTIONS) public options: INopeRpcDispatcherOptions, @inject(OBSERVABLE_FACTORY) protected _generateObservable: () => INopeObservable, ) { this._communicator = options.communicator; if (options.logger) { this._logger = options.logger; } this._subscriptionMode = this._communicator.subscriptionMode || 'generic'; this._resultSharing = this._communicator.resultSharing || 'generic'; this.id = generateId(); /** * Define A Proxy for accessing methods easier. */ const _this = this; const _handlerWithOptions = { get(target, name) { return async (options: ICallOptions, ...args) => { return _this.performCall(name, args, options); } } } const _handlerWithoutOptions = { get(target, name) { return async (...args) => { return _this.performCall(name, args); } } } this.methodInterfaceWithOptions = new Proxy({}, _handlerWithOptions); this.methodInterface = new Proxy({}, _handlerWithoutOptions); this.reset(); this._init(); } provideInstanceGeneratorForExternalDispatchers(identifier: string, cb: IGenerateRemoteInstanceForOtherDispatcherCallback) { const _this = this; const _cb = this.registerFunction(async (data: IInstanceCreation) => { // Check if an instance exists or not. // if not => create an instance an store it. if (!_this._instances.has(data.identifier)) { // Create an Instance const _instance = await cb(_this, data.identifier); // Initialize the instance with the parameters. await _instance.init(...data.params); // A Function is registered, taking care of removing // an instances, if it isnt needed any more. _this.registerFunction(async (_data: IInstanceRemoval) => { if (_this._instances.get(data.identifier)?.usedBy) { const idx = _this._instances.get(data.identifier).usedBy.indexOf(_data.dispatcherID); if (idx > -1) { _this._instances.get(data.identifier).usedBy.splice(idx,1); } if ( _this._instances.get(data.identifier).usedBy.length == 0) { // Remove the Instance. await _instance.dispose(); // Delete the Entry. _this._instances.delete(data.identifier); // Remove the Function itself _this.unregisterFunction('instance_dispose_' + data.identifier) } } }, { deleteAfterCalling: false, id: 'instance_dispose_' + data.identifier }); // Store the Instance. _this._instances.set(data.identifier, { instance: _instance, usedBy: [ data.dispatcherID ] }); } else { // If an Element exists => Add the Element. _this._instances.get(data.identifier).usedBy.push(data.dispatcherID); } // Define the Response. const response: IInstanceDescription = { description: _this._instances.get(data.identifier).instance.toDescription(), type: data.type, } // Send the Response return response }, { id: 'generateInstance_' + identifier, }); this._externalGenerators.set(_cb['id'], _cb); } unprovideInstanceGeneratorForExternalDispatchers(identifier: string) { if (this._externalGenerators.has(identifier)) { this.unregisterFunction(this._externalGenerators.get(identifier)); this._externalGenerators.delete(identifier); } } registerInternalInstanceGenerator(identifier: string, cb: IGenerateRemoteInstanceCallback) { this._internalGenerators.set(identifier, cb); } unregisterInternalInstanceGenerator(identifier: string) { this._internalGenerators.delete(identifier); } protected _internalGenerators: Map>; protected _externalGenerators: Map>; protected _instances: Map, }>; public async generateInstance(description: Partial): Promise { const _defDescription: IInstanceCreation = { dispatcherID: this.id, identifier: 'error', params: [], type: 'unkown' } const _description = Object.assign(_defDescription, description, { dispatcherID: this.id }) as IInstanceCreation; if (_defDescription.type === 'unkown' || _description.identifier === 'error') { throw Error('Please Provide at least a "type" and "identifier" in the paremeters'); } if (this._instances.has(_description.identifier)) { // Add the Dispatcher to the Element: this._instances.get(_description.identifier).usedBy.push(_description.dispatcherID); // Return the Instance. return this._instances.get(_description.identifier).instance as I; } try { if (this._internalGenerators.has(_description.type)) { const result = await this.performCall('generateInstance_' + _description.type, [_description], { paramsHasNoCallback: true, }); // Create the Instance const instance = await this._internalGenerators.get(_description.type)(this, result.description) as I; // Store the Instances. this._instances.set(_description.identifier, { instance, usedBy: [_description.dispatcherID] }); return instance; } } catch (e) { if (this._logger) { this._logger.error('During creating an Instance, the following error Occurd'); this._logger.error(e) } throw e; } throw new Error("No Dynamic Interface provided"); } public async deleteInstance(instance: I | string): Promise { // Block to find the instance. // Based on the property (string or instance) // the corresponding instance object has to be select. let _instance: { instance: INopeModule; usedBy: Array; }; if (typeof instance === 'string'){ _instance = this._instances.get(instance); } else { for (const data of this._instances.values()){ if (instance == data.instance){ _instance = data; break; } } } // if the instance has been found => delete the instance. if (_instance) { _instance.usedBy.pop(); if (_instance.usedBy.length === 0){ const params: IInstanceRemoval = { dispatcherID: this.id, identifier: _instance.instance.identifier } // Call the corresponding Dispose Function for the "real" instance // All other elements are just accessors. await this.performCall('instance_dispose_' + _instance.instance.identifier, [params]); // Delete the Identifier this._instances.delete(_instance.instance.identifier); // Dispose the Handler; await _instance.instance.dispose(); } return true; } return false; } /** * Internal Method to handle some requests. * * @protected * @param {IRequestTaskMsg} data The provided data of the request * @param {*} [_function=this._definedFunctions.get(data.functionId)] The Function can be provided * @return {Promise} * @memberof nopeDispatcher */ protected async _handleExternalRequest(data: IRequestTaskMsg, _function?: (...args) => Promise): Promise { try { // Try to get the function if not provided: if (typeof _function !== 'function') { _function = this._definedFunctions.get(data.functionId); } if (this._logger?.isDebugEnabled()) { // If there is a Logger: this._logger.debug('Dispatcher "'+ this.id + '" received request: "' + data.functionId + '" -> task: "' + data.taskId + '"'); } const _this = this; if (typeof _function === 'function') { // Only if the Function is present extract the arguments etc. const args = []; // First extract the basic arguments data.params.map(item => args[item.idx] = item.data); // Add the Callbacks. Therefore create a function which will // trigger the remote. data.callbacks.map(options => args[options.idx] = async (..._args) => { return await _this.performCall(options.functionId, _args, options); }); // Perform the Task it self. const _result = await _function(...args); // Define the Result message const result: IResponseTaskMsg = { result: typeof (_result) !== 'undefined' ? _result : null, taskId: data.taskId, type: 'response' } // Use the communicator to publish the event. await this._communicator.emitRpcResult(data.resultSink, result); } } catch (error) { if (this._logger?.isErrorEnabled()) { // If there is a Logger: this._logger.error('Dispatcher "'+ this.id + '" failed with request: "' + data.taskId + '"'); this._logger.error(error); } // An Error occourd => Forward the Error. const result: IResponseTaskMsg = { error, taskId: data.taskId, type: 'response' } // Send the Error via the communicator to the remote. await this._communicator.emitRpcResult(data.resultSink, result); } } /** * Internal Function to handle responses. In Generale, * the dispatcher checks if there is an open task with * the provided id. If so => finish the promise. * * @protected * @param {IResponseTaskMsg} data The Data provided to handle the Response. * @return {boolean} Returns a boolean, indicating whether a corresponding task was found or not. * @memberof nopeDispatcher */ protected _handleExternalResponse(data: IResponseTaskMsg): boolean { try { // Extract the Task const task = this._runningTasks.get(data.taskId); // Delete the Task: this._runningTasks.delete(data.taskId); // Based on the Result of the Remote => if (task && data.error) { task.reject(data.error); return true; } if (task) { task.resolve(data.result); return true; } } catch (e) { } return false; } /** * Internal Function, used to initialize the Dispatcher. * It subscribes to the "Messages" of the communicator. * * @protected * @memberof nopeDispatcher */ protected _init(): void { const _this = this; // Based on the Mode of the Subscription => // either create indivdual topics for the methods // or use the generice function. switch (this._subscriptionMode) { case 'individual': // Iterate over the Defined Functions. for (const [id, cb] of this._definedFunctions.entries()) { // Subscribe the Function this._subscribeToFunction(id, cb); } break; case 'generic': // Add a generic Subscription for callbacks: this._communicator.onRpcRequest('request', (data: IRequestTaskMsg) => { if (data.type === 'requestOfTask') { _this._handleExternalRequest(data); } }); // Subscribe to Responses this._communicator.onRpcResult('response', (data: IResponseTaskMsg) => { if (data.type === 'response') { _this._handleExternalResponse(data); } }); break; } // Subscribe to the availableServices of Remotes. // If there is a new Service => udpate the External Services this._communicator.onNewServicesAvailable(data => { try { if (data.dispatcher !== _this.id) { _this._mappingOfRemoteDispatchersAndFunctions.set(data.dispatcher, data); _this._updateExternalServices(); } } catch (e) { } }); // Subscribe to new available Topics. this._communicator.onNewTopicsAvailable(data => { try { if (data.dispatcher !== _this.id) { _this._mappingOfRemoteDispatchersAndTopics.set(data.dispatcher, data); _this._updateExternalTopics(); } } catch (e) { } }); this._communicator.onNewInstanceGeneratorsAvailable(data => { try { if (data.dispatcher !== _this.id) { _this._mappingOfRemoteDispatchersAndGenerators.set(data.dispatcher, data); _this._updateExternalGenerators(); } } catch (e) { } }) this._communicator.onBonjour((dispatcher: string) => { if (_this.id !== dispatcher) { _this._sendAvailableServices(); _this._sendAvailableProperties(); _this._sendAvailableGenerators(); if (_this._logger?.isDebugEnabled()) { // If there is a Logger: _this._logger.debug('Remote Dispatcher "'+ dispatcher + '" went online'); } } }); this._communicator.emitBonjour(this.id); } /** * Function to update the used Services. * * @protected * @memberof serviceRegistry */ protected _updateExternalServices() { const _this = this; // Clear the Services this._externalProvidedServices.clear(); for (const dispatcherInfo of this._mappingOfRemoteDispatchersAndFunctions.values()) { dispatcherInfo.services.map(service => _this._externalProvidedServices.add(service)); } } /** * Function to update the used Services. * * @protected * @memberof serviceRegistry */ protected _updateExternalGenerators() { const _this = this; // Clear the Services this._externalProvidedGenerators.clear(); for (const generators of this._mappingOfRemoteDispatchersAndGenerators.values()) { generators.generators.map(gen => _this._externalProvidedGenerators.add(gen)); } } /** * Internal Function to update the Listing of external Topcis. * This Function creates a list containing all subscriptions * and publishers which are external. * * @protected * @memberof nopeDispatcher */ protected _updateExternalTopics() { const _this = this; // Clear the Services this._externalPublished.clear(); this._externalSubscribed.clear(); for (const dispatcherInfo of this._mappingOfRemoteDispatchersAndTopics.values()) { dispatcherInfo.published.map(topic => _this._externalPublished.add(topic)); dispatcherInfo.subscribed.map(topic => _this._externalSubscribed.add(topic)); } } /** * Function to test if a specific Service exists. * * @param {string} id The Id of the Serivce * @return {boolean} The result of the Test. True if either local or remotly a service is known. * @memberof nopeDispatcher */ public serviceExists(id: string) { return this._definedFunctions.has(id) || this._externalProvidedServices.has(id); } /** * Function to test if an subscription exists, * for the given topic. * * @param {string} topic * @return {*} * @memberof nopeDispatcher */ public subscriptionExists(topic: string) { return this._externalSubscribed.has(topic); } /** * Function to adapt a Request name. * Only used internally * * @protected * @param {string} id the original ID * @return {string} the adapted ID. * @memberof nopeDispatcher */ protected _getServiceName(id: string, type: 'request' | 'response'): string { return id.startsWith(`${type}/`) ? id : `${type}/${id}`; } /** * Internal Helper Function to subscribe to a Function element. * * @protected * @param {string} id the Id of the function * @param {(...args) => Promise} _cb The Callback of the Function * @return {void} the adapted ID. * @memberof nopeDispatcher */ protected _subscribeToFunction(id: string, _cb: (...args) => Promise): void { const _req = this._getServiceName(id, 'request'); if ( this._subscriptionMode === 'individual' && !this._communicatorCallbacks.has(id) ) { const _this = this; // Define a Function. const cb = (data: IRequestTaskMsg) => { if (data.type === 'requestOfTask') { _this._handleExternalRequest(data, _cb); } }; // Add the Callback. this._communicatorCallbacks.set(id, { registeredId: _req, type: 'request', cb }); // Register Functions. this._communicator.onRpcRequest(_req, cb); if (this._logger?.isDebugEnabled()) { // If there is a Logger: this._logger.debug('Dispatcher "'+ this.id + '" listening on: "' + _req + '"'); } } } /** * Function, used to subscribe to results. * * @protected * @param {string} id * @param {boolean} deleteAfterCalling * @memberof nopeDispatcher */ protected _subscribeToResult(id: string, deleteAfterCalling: boolean): void { const _res = this._getServiceName(id, 'response'); if ( this._subscriptionMode === 'individual' && !this._communicatorCallbacks.has(id) ) { const _this = this; // Define a Function. const cb = (data: IResponseTaskMsg) => { if (data.type === 'response') { if (_this._handleExternalResponse(data) && deleteAfterCalling) { _this._removeRpcSubscription(id); } } }; // Add the Callback. this._communicatorCallbacks.set(id, { registeredId: _res, type: 'response', cb }); // Register Functions. this._communicator.onRpcResult(_res, cb); } } /** * Function to register a Function in the Dispatcher * * @param {(...args) => Promise} func The function which should be called if a request is mapped to the Function. * @param {{ * // Flag to enable unregistering the function after calling. * deleteAfterCalling?: boolean, * // Instead of generating a uuid an id could be provided * id?: string; * }} [options={}] Options to enhance the registered ID and enabling unregistering the Element after calling it. * @return {*} {(...args) => Promise} The registered Function * @memberof nopeDispatcher */ public registerFunction(func: (...args) => Promise, options: { /** Flag to enable unregistering the function after calling. */ deleteAfterCalling?: boolean, /** Instead of generating a uuid an id could be provided */ id?: string; /** Flag to enable / disable sending to registery */ preventSendingToRegistery?: boolean; } = {}): (...args) => Promise { const _this = this; // Define / Use the ID of the Function. const _id = options.id || generateId(); let _func = func; if (options.deleteAfterCalling) { _func = async (...args) => { // Unregister the Method _this.unregisterFunction(_id, { preventSendingToRegistery: options.preventSendingToRegistery }); // Return the Result of the Original Function. return await func(...args); } } // Define a ID for the Function _func['id'] = _id; // Define the callback. _func['unregister'] = () => _this.unregisterFunction(_id); // Reister the Function this._definedFunctions.set(_func['id'], _func); // Register the Callback: this._subscribeToFunction(_id, _func); if (!options.preventSendingToRegistery) { // Publish the Available Services. this._sendAvailableServices(); if (this._logger?.isDebugEnabled()) { // If there is a Logger: this._logger.debug('Dispatcher "'+ this.id + '" registered: "' + _id + '"'); } } // Return the Function. return _func; } /** * Function to unregister a Function from the Dispatcher * @param {(((...args) => void) | string | number)} func The Function to unregister * @return {*} {boolean} Flag, whether the element was removed (only if found) or not. * @memberof nopeDispatcher */ public unregisterFunction(func: ((...args) => void) | string, options: { /** Flag to enable / disable sending to registery */ preventSendingToRegistery?: boolean; } = {}): boolean { const _id = typeof func === 'string' ? func : func['id'] as string || '0'; this._removeRpcSubscription(_id); if (!options.preventSendingToRegistery) { // Publish the Available Services. this._sendAvailableServices(); if (this._logger?.isDebugEnabled()) { // If there is a Logger: this._logger.debug('Dispatcher "'+ this.id + '" unregistered: "' + _id + '"'); } } return this._definedFunctions.delete(_id); } public unregisterObservable(observable: INopeObservable | string, options: { /** Flag to enable / disable sending to registery */ preventSendingToRegistery?: boolean; } = {}): boolean { const _id = typeof observable === 'string' ? observable : observable.id as string || '0'; if (!options.preventSendingToRegistery) { // Publish the Available Services. this._sendAvailableProperties(); if (this._logger?.isDebugEnabled()) { // If there is a Logger: this._logger.debug('Dispatcher "'+ this.id + '" unregistered: "' + _id + '"'); } } return this._definedFunctions.delete(_id); } protected _externallySubscribeObservables: Map, cb: (...arg) => void, }>; protected _internallySubscribeObservables: Map>>; /** * Creates an Event listener (if required) * * @protected * @param {string} event The Event to Listen. * @return {nopeObservable} An Listener on the Communication Channel. * @memberof nopeDispatcher */ protected _subscribeToEvent(event: string) { const item = this._externallySubscribeObservables.get(event) || { observable: this._generateObservable(), cb: () => { }, }; if (!item.observable.hasSubscriptions) { const _this = this; const cb = (data: IExternalEventMsg) => { item.observable.setContent(data, _this.id); }; this._communicator.onEvent(event, cb); item.cb = cb; } // Set the Items. this._externallySubscribeObservables.set(event, item); return item.observable; } /** * Function to unsubscribe from an event of the channel. * * @protected * @param {string} event * @memberof nopeDispatcher */ protected _unsubscribeEvent(event: string) { const item = this._externallySubscribeObservables.get(event); if (item) { this._communicator.offEvent(event, item.cb); // Dispose the Observable const obs = this._externallySubscribeObservables.get(event).observable; obs.dispose(); // Remove the Observable this._externallySubscribeObservables.delete(event); } } public registerObservable(observable: INopeObservable, options: { mode: 'subscribe' | 'publish' | Array<'subscribe' | 'publish'>, topic: string | { subscribe?: string, publish?: string, }, pipe?: { pipe?: IPipe, scope?: { [index: string]: any } }, preventSendingToRegistery?: boolean }): INopeObservable { // Reference to itself const _this = this; // Extract the Topics, pipe and scope. const _subTopic = typeof options.topic === 'string' ? options.topic : options.topic.subscribe || null; const _pubTopic = typeof options.topic === 'string' ? options.topic : options.topic.publish || null; const _pipe = typeof options.pipe === 'function' ? options.pipe || null : null; const _scope = typeof options.pipe === 'object' ? options.pipe.scope || null : null // A Flag, indicating, whether the topic is new or not. let newElement = false; // Test if the Item should be subscribe or not. if (options.mode == 'subscribe' || (Array.isArray(options.mode) && options.mode.includes('subscribe'))) { newElement = newElement || !this._externallySubscribeObservables.has(_subTopic); const _externalSource = this._subscribeToEvent(_subTopic); if (_pipe) { const observer = _externalSource.enhancedSubscription((data: IExternalEventMsg) => { // Test if the Content, which has been forwared in here inst the own dispathcer. if (data.sender != _this.id) { observable.setContent(data.data, _this.id, data.timestamp); } }, { scope: _scope, pipe: _pipe }); const dispose = observable.dispose; observable.dispose = () => { // Kill the Observer; observer.unsubscribe(); // Unsubscribe the Event _this._unsubscribeEvent(_subTopic); // Call the original Dispose function; dispose.apply(observable); } } else { const observer = _externalSource.subscribe({ next(data: IExternalEventMsg) { if (_this.id !== data.sender) { observable.setContent(data.data, _this.id, data.timestamp); } }, complete() { observable.observable.complete(); }, error(err) { observable.observable.error(err); } }); // Overwrite the Original Dispose Function. const dispose = observable.dispose; observable.dispose = () => { // Kill the Observer; observer.unsubscribe(); // Unsubscribe the Event _this._unsubscribeEvent(_subTopic); // Call the original Dispose function; dispose.apply(observable); } } } if (options.mode == 'publish' || (Array.isArray(options.mode) && options.mode.includes('publish'))) { const cb = (data, sender?, timestamp?, ...args) => { // Only Publish data, if there exists a Subscription. if (_this.subscriptionExists(_pubTopic) && _this.id !== sender) { // Use the Communicator to emit the Event. _this._communicator.emitEvent(_pubTopic, { data: data, topic: _pubTopic, sender: _this.id, type: 'event', timestamp }); } } // Update the Flag. newElement = newElement || !this._internallySubscribeObservables.has(_pubTopic); // Register the Internally Subscribed Element. const _set = this._internallySubscribeObservables.get(_pubTopic) || new Set(); _set.add(observable); this._internallySubscribeObservables.set(_pubTopic, _set); if (_pipe) { const observer = observable.enhancedSubscription(cb, { scope: _scope, pipe: _pipe }); // Overwrite the Original Dispose Function. const dispose = observable.dispose; observable.dispose = () => { // Kill the Observer; observer.unsubscribe(); // Unsubscribe the Event _this._unsubscribeEvent(_subTopic); // Unregister the Internally Subscribed Element. const _set = _this._internallySubscribeObservables.get(_pubTopic) || new Set(); _set.delete(observable); if (_set.size > 0) { _this._internallySubscribeObservables.set(_pubTopic, _set); } else { _this._internallySubscribeObservables.delete(_pubTopic); // Optionally send an update. if (!options.preventSendingToRegistery) { // Publish the Available Services. _this._sendAvailableProperties(); } } // Call the original Dispose function; dispose.apply(observable); } } else { const observer = observable.subscribe(cb); // Overwrite the Original Dispose Function. const dispose = observable.dispose; observable.dispose = () => { // Kill the Observer; observer.unsubscribe(); // Unsubscribe the Event _this._unsubscribeEvent(_subTopic); // Unregister the Internally Subscribed Element. const _set = _this._internallySubscribeObservables.get(_pubTopic) || new Set(); _set.delete(observable); if (_set.size > 0) { _this._internallySubscribeObservables.set(_pubTopic, _set); } else { _this._internallySubscribeObservables.delete(_pubTopic); // Optionally send an update. if (!options.preventSendingToRegistery) { // Publish the Available Services. _this._sendAvailableProperties(); } } // Call the original Dispose function; dispose.apply(observable); } } } if (!options.preventSendingToRegistery && newElement) { // Publish the Available Services. this._sendAvailableProperties(); } // Return the Function. return observable; } protected _removeRpcSubscription(_id: string) { // Try to unregister the Callback from the communcator: if (this._communicatorCallbacks.has(_id)) { const _callbacks = this._communicatorCallbacks.get(_id); switch (_callbacks.type) { case 'request': // Unregister the RPC-Request-Listener this._communicator.offRpcRequest(_callbacks.registeredId, _callbacks.cb); break; case 'response': // Unregister the RPC-Response-Listener this._communicator.offRpcResponse(_callbacks.registeredId, _callbacks.cb); break; } // Remove the Callback this._communicatorCallbacks.delete(_id); } } /** * Function used to update the Available Services. * * @protected * @memberof nopeDispatcher */ protected _sendAvailableServices() { // Define the Message const message: IAvailableServicesMsg = { dispatcher: this.id, services: Array.from(this._definedFunctions.keys()) } // Send the Message. this._communicator.emitNewServicesAvailable(message); } /** * Function to emit the available topics. * * @protected * @memberof nopeDispatcher */ protected _sendAvailableProperties() { // Define the Message const message: IAvailableTopicsMsg = { dispatcher: this.id, published: Array.from(this._internallySubscribeObservables.keys()), subscribed: Array.from(this._externallySubscribeObservables.keys()) } // Send the Message. this._communicator.emitNewTopicsAvailable(message); } protected _sendAvailableGenerators() { // Define the Message const message: IAvailableInstanceGenerators = { dispatcher: this.id, generators: Array.from(this._internallySubscribeObservables.keys()), } // Send the Message. this._communicator.emitNewInstanceGeneratorsAvailable(message); } /** * Function which is used to perform a call on the remote. * * @template T * @param {string} functionName The Name / ID of the Function * @param {any[]} params The provided Parameters. * @param {({ * deletableCallbacks: Array; * })} [options={ * deletableCallbacks: [], * paramsHasNoCallback: false, * preventErrorTest: false * }] You could additiona Optionf for the callback. * @return {*} {Promise} The result of the Operation * @memberof nopeDispatcher */ public performCall(functionName: string, params: any[], options: Partial = {}): Promise { // Get a Call Id const _taskId = generateId(); const _registeredIdx: Array = []; const _this = this; const _options = Object.assign({ deletableCallbacks: [], paramsHasNoCallback: false, dynamicCallback: false, resultSink: (this._resultSharing === 'generic' ? 'response' : this._getServiceName(functionName, 'response')), }, options) as ICallOptions; this._subscribeToResult(functionName, _options.dynamicCallback); const clear = () => { // Delete all Callbacks. _registeredIdx.map(id => _this.unregisterFunction(id)); // Remove the task: if (_this._runningTasks.has(_taskId)) _this._runningTasks.delete(_taskId); if (_this._logger && _this._logger.isDebugEnabled()) { _this._logger.debug('Clearing Callbacks from ' + _taskId); } } if (_this._logger && _this._logger.isDebugEnabled()) { _this._logger.debug('Dispatcher "'+ this.id + '" requesting externally Function "' + functionName + '" with task: "' + _taskId + '"'); } // Define a Callback-Function, which will expect the Task. return new Promise(async (resolve, reject) => { try { // Register the Handlers, _this._runningTasks.set(_taskId, { resolve, reject, clear }); // Define a Task-Request const taskRequest: IRequestTaskMsg = { functionId: functionName, params: [], callbacks: [], taskId: _taskId, type: 'requestOfTask', resultSink: _options.resultSink } // Test if there is no Callback integrated if (!options.paramsHasNoCallback) { // If so, the parameters has to be detailled: // Iterate over all Parameters and // Determin Callbacks. Based on the Parameter- // Type assign it either to packet.params ( // for parsable Parameters) and packet.callbacks // (for callback Parameters) for (const [idx, contentOfParameter] of params.entries()) { // Test if the parameter is a Function if (typeof contentOfParameter !== "function") { taskRequest.params.push({ idx, data: contentOfParameter }); } else { // The Parameter is a Callback => store a // Description of the Callback and register // the callback inside of the Dispatcher const deleteAfterCalling = _options.deletableCallbacks.includes(idx); const _func = _this.registerFunction(contentOfParameter, { deleteAfterCalling, preventSendingToRegistery: true, }); _registeredIdx.push(_func['id']); // Register the Callback taskRequest.callbacks.push({ functionId: _func['id'], idx, deleteAfterCalling, dynamicCallback: true, deletableCallbacks: [], resultSink: _this._resultSharing === 'generic' ? 'response' : _this._getServiceName(_func['id'], 'response') }); } } } else { for (const [idx, contentOfParameter] of params.entries()) { taskRequest.params.push({ idx, data: contentOfParameter }); } } if (!_options.dynamicCallback && !_this.serviceExists(functionName)) { // Create an Error: const error = new Error('No Service Provider known for "' + functionName + '"'); if (_this._logger) { _this._logger.error('No Service Provider known for "' + functionName + '"'); _this._logger.error(error) } throw error; } // Send the Message to the specific element: if (_this._subscriptionMode === 'individual') { await _this._communicator.emitRpcRequest(_this._getServiceName(taskRequest.functionId, 'request'), taskRequest); if (_this._logger && _this._logger.isDebugEnabled()) { _this._logger.debug('Dispatcher "'+ this.id + '" putting task "' +_taskId + '" on: "' + _this._getServiceName(taskRequest.functionId, 'request') + '"'); } } else { await _this._communicator.emitRpcRequest('request', taskRequest); if (_this._logger && _this._logger.isDebugEnabled()) { _this._logger.debug('Dispatcher "'+ this.id + '" putting task "' +_taskId + '" on: "request"'); } } } catch (e) { // Clear all Elements of the Function: clear(); // Throw the error. reject(e); } }); } /** * Function to clear all pending tasks * * @memberof nopeDispatcher */ public clearTasks(): void { if (this._runningTasks) this._runningTasks.clear(); else this._runningTasks = new Map(); } /** * Function to unregister all Functions of the Dispatcher. * * @memberof nopeDispatcher */ public unregisterAll(): void { if (this._definedFunctions) { for (const id of this._definedFunctions.keys()) { this.unregisterFunction(id) } this._definedFunctions.clear(); } else { this._definedFunctions = new Map Promise>(); } // Reset the Callbacks. this._communicatorCallbacks = new Map(); } /** * Function to reset the Dispatcher. * * @memberof nopeDispatcher */ public reset(): void { this._remotlyCalledFunctions = new Set(); this._mappingOfRemoteDispatchersAndFunctions = new Map(); this._mappingOfRemoteDispatchersAndTopics = new Map(); this._internalGenerators = new Map(); this._externalGenerators = new Map(); // If Instances Exists => Delete them. if (this._instances) { const _this = this; // Dispose all Instances. for (const [name, instance] of this._instances.entries()) { // Remove the Instance. this.deleteInstance(name).catch(e => { if (_this._logger) { _this._logger.error('Failed Removing Instance "' + name + '"'); _this._logger.error(e) } }); } } this._instances = new Map(); this._externalProvidedServices = new Set(); this._externalPublished = new Set(); this._externalSubscribed = new Set(); this._internallySubscribeObservables = new Map(); this._externallySubscribeObservables = new Map(); this.clearTasks(); this.unregisterAll(); } }