import { generateId } from '../helpers/idMethods'; import { Logger } from "winston"; import { nopeObservable, pipe, observableCallback } from '../observables/nopeObservable'; import { observable } from 'rxjs'; /** * A Layer to communicate. * * @export * @interface ICommunicationInterface */ export interface ICommunicationInterface { readonly subscriptionMode?: 'individual' | 'generic', readonly resultSharing?: 'individual' | 'generic', /** * Funciton to emit a RPC Request. * * @param {string} name Name of the request * @param {requestTaskMsg} request The Request. * @memberof ICommunicationInterface */ emitRpcRequest(name: string, request: requestTaskMsg): void; /** * Function used to subscribe to updates. If there exist * some state change in the Communication Interface it should * be provided on the registered functions * * @param {string} name The Id of the Method. * @param {responseTaskMsg} result * @memberof ICommunicationInterface */ emitRpcResult(name: string, result: responseTaskMsg): void; /** * Function used to subscribe to RPC Results. Each method / function * can provide a unique eventname for the results. * * @param {string} name The Id of the Method. * @param {(result: responseTaskMsg) => void} cb The callback which should be called * @memberof ICommunicationInterface */ onRpcResult(name: string, cb: (result: responseTaskMsg) => void): void; /** * Function used to unsubscribe from Rpc-Results * * @param {string} name The Id of the Method. * @param {(result: responseTaskMsg) => void} cb The callback which should be called * @memberof ICommunicationInterface */ offRpcResponse(name: string, cb: (result: responseTaskMsg) => void): void; /** * Function to register RPC-Request. * * @param {string} name the name to listen for the request. * @param {(data: requestTaskMsg) => void} cb The callback which should be called, if a request for the method is detected * @memberof ICommunicationInterface */ onRpcRequest(name: string, cb: (data: requestTaskMsg) => void): void; /** * Unregister a listener for a RPC-Request. * * @param {string} name * @param {(data: requestTaskMsg) => void} cb * @memberof ICommunicationInterface */ offRpcRequest(name: string, cb: (data: requestTaskMsg) => void): void; /** * Function to Emit new Services. They will then be shared to other sytems. * * @param {availableServices} services * @memberof ICommunicationInterface */ emitNewServicesAvailable(services: availableServices): void; /** * Function to register a new Callback, which will be called if new Services are available. * * @param {() => void} cb The Callback to Call. * @memberof ICommunicationInterface */ onNewServicesAvailable(cb: (services: availableServices) => void); /** * Function to Emit new Services. They will then be shared to other sytems. * * @param {availableTopics} topics * @memberof ICommunicationInterface */ emitNewTopicsAvailable(topics: availableTopics): void; /** * Function to register a new Callback, which will be called if new Services are available. * * @param {(topics: availableTopics) => void} cb The Callback to Call. * @memberof ICommunicationInterface */ onNewTopicsAvailable(cb: (topics: availableTopics) => void); /** * Function to subscribe to an event * * @param {string} event The Event name (Usually the Topic.) * @param {(data: externalEvent) => void} cb The Callback which should be used to call if there are new Events. * @memberof ICommunicationInterface */ onEvent(event: string, cb: (data: externalEvent) => void): void; /** * Function used to emit an event on the given event channel * * @param {string} event The Name of the Event * @param {externalEvent} data A datapacket describing the Event * @memberof ICommunicationInterface */ emitEvent(event: string, data: externalEvent): void /** * Function to unregister an event listener * * @param {string} event The Name of the Event * @param {(data: externalEvent) => void} cb The desired Callback * @memberof ICommunicationInterface */ offEvent(event: string, cb: (data: externalEvent) => void): void; /** * Function, to subscribe to "Bonjour"-Messages of the Dispatcher Dispatchers. * * @param {(dispatcher: string) => void} cb The callback which should be called. * @memberof ICommunicationInterface */ onBonjour(cb: (dispatcher: string) => void): void; /** * Function to emit a Bonjour message. * * @param {string} dispatcher The ID of the new Dispatcher. * @memberof ICommunicationInterface */ emitBonjour(dispatcher: string): void; } export type availableTopics = { /** * The Id of the Dispatcher * * @type {string} */ dispatcher: string, /** * The List of published Topics (only the registered ones). * * @type {string[]} */ published: string[] /** * The List of subscribed Topics (only the registered ones). * * @type {string[]} */ subscribed: string[] } export type externalEvent = { type: 'event' data: any, topic: string, sender: string, timestamp?: number } export type requestOfService = { type: 'requestOfService' taskId: string, functionId: string, } export type requestTaskMsg = { type: 'requestOfTask', /** * UUID of a Task * * @type {string} */ taskId: string, /** * ID of the Function, on which it is available. * * @type {string} */ functionId: string, /** * The Parameters * * @type {{ * idx: number, * data: any * }[]} */ params: { /** * Index of the Parameter * * @type {number} */ idx: number, data: any }[] /** * Callbacks, that are available in a Dispatcher. * * @type {(({ * functionId: string, * idx: number, * deleteAfterCalling: boolean, * } & callOptions)[])} */ callbacks: ({ functionId: string, idx: number, deleteAfterCalling: boolean, } & callOptions)[] /** * Element, allowing to describe where the result should be hosted. * * @type {string} */ resultSink: string } export type callOptions = { /** * Array containing indexes of callbacks which could be deleted * * @type {Array} */ deletableCallbacks?: Array; /** * Flag, showing that the parameters set has no callback * * @type {boolean} */ paramsHasNoCallback?: boolean; /** * Flag to disable error-testing * * @type {boolean} */ dynamicCallback?: boolean; /** * Element, allowing to describe where the result should be hosted. * * @type {string} */ resultSink: string } export type responseTaskMsg = { type: 'response', /** * ID of the Task. * * @type {string} */ taskId: string, /** * Property containing the result. Is * only present, if no error exists. * * @type {*} */ result?: any, /** * Property containing the error, if * it occourd. * * @type {*} */ error?: any } export type availableServices = { /** * The Id of the Dispatcher * * @type {string} */ dispatcher: string, /** * The List of registered Service. * * @type {string[]} */ services: string[] } export type nopeRpcDispatcherOptions = { communicator: ICommunicationInterface } /** * A Dispatcher to perform a function on a Remote * Dispatcher. Therefore a Task is created and forwarded * to the remote. * * @export * @class nopeDispatcher */ export class nopeDispatcher { public readonly id: string; protected _logger: Logger; /** * Internal Element to store the registered Functions * * @protected * @memberof nopeDispatcher */ protected _definedFunctions: Map Promise>; protected _remotlyCalledFunctions: Set; protected _communicatorCallbacks: Map any }>; protected _communicator: ICommunicationInterface; protected _mappingOfRemoteDispatchersAndFunctions: Map; protected _externalServices: Set; public methodInterfaceWithOptions: { [index: string]: (optins: callOptions, ...args) => Promise } public methodInterface: { [index: string]: (...args) => Promise } protected _mappingOfRemoteDispatchersAndTopics: Map; protected _externalSubscribed: Set; protected _externalPublished: Set; /** * Internal Element to store the running tasks. * * @protected * @memberof nopeDispatcher */ protected _runningTasks: Map void; reject: (error: any) => void; }>; readonly _subscriptionMode: 'individual' | 'generic'; readonly _resultSharing: 'individual' | 'generic'; /** * Creates an instance of nopeDispatcher. * @param {ICommunicationInterface} communicator The Communication Layer which should be used. * @memberof nopeDispatcher */ constructor(public options: nopeRpcDispatcherOptions) { this._communicator = options.communicator; this._subscriptionMode = this._communicator.subscriptionMode || 'generic'; this._resultSharing = this._communicator.resultSharing || 'generic'; this.id = generateId(); /** * Define A Proxy for accessing methods easier. */ const _this = this; const _handlerWithOptions = { get(target, name) { return async (options: callOptions, ...args) => { return _this.performCall(name, args, options); } } } const _handlerWithoutOptions = { get(target, name) { return async (...args) => { return _this.performCall(name, args); } } } this.methodInterfaceWithOptions = new Proxy({}, _handlerWithOptions); this.methodInterface = new Proxy({}, _handlerWithoutOptions); this.reset(); this.init(); } /** * Internal Method to handle some requests. * * @protected * @param {requestTaskMsg} data The provided data of the request * @param {*} [_function=this._definedFunctions.get(data.functionId)] The Function can be provided * @return {Promise} * @memberof nopeDispatcher */ protected async _handleExternalRequest(data: requestTaskMsg, _function?: (...args) => Promise): Promise { try { // Try to get the function if not provided: if (typeof _function !== 'function') { _function = this._definedFunctions.get(data.functionId); } const _this = this; if (typeof _function === 'function') { // Only if the Function is present extract the arguments etc. const args = []; // First extract the basic arguments data.params.map(item => args[item.idx] = item.data); // Add the Callbacks. Therefore create a function which will // trigger the remote. data.callbacks.map(options => args[options.idx] = async (..._args) => { return await _this.performCall(options.functionId, _args, options); }); // Perform the Task it self. const _result = await _function(...args); // Define the Result message const result: responseTaskMsg = { result: typeof (_result) !== 'undefined' ? _result : null, taskId: data.taskId, type: 'response' } // Use the communicator to publish the event. this._communicator.emitRpcResult(data.resultSink, result); } } catch (error) { // An Error occourd => Forward the Error. const result: responseTaskMsg = { error, taskId: data.taskId, type: 'response' } // Send the Error via the communicator to the remote. this._communicator.emitRpcResult(data.resultSink, result); } } /** * Internal Function to handle responses. In Generale, * the dispatcher checks if there is an open task with * the provided id. If so => finish the promise. * * @protected * @param {responseTaskMsg} data The Data provided to handle the Response. * @return {boolean} Returns a boolean, indicating whether a corresponding task was found or not. * @memberof nopeDispatcher */ protected _handleExternalResponse(data: responseTaskMsg): boolean { try { // Extract the Task const task = this._runningTasks.get(data.taskId); // Delete the Task: this._runningTasks.delete(data.taskId); // Based on the Result of the Remote => if (task && data.error) { task.reject(data.error); return true; } if (task) { task.resolve(data.result); return true; } } catch (e) { } return false; } /** * Internal Function, used to initialize the Dispatcher. * It subscribes to the "Messages" of the communicator. * * @protected * @memberof nopeDispatcher */ protected init(): void { const _this = this; // Based on the Mode of the Subscription => // either create indivdual topics for the methods // or use the generice function. switch (this._subscriptionMode) { case 'individual': // Iterate over the Defined Functions. for (const [id, cb] of this._definedFunctions.entries()) { // Subscribe the Function this._subscribeToFunction(id, cb); } break; case 'generic': // Add a generic Subscription for callbacks: this._communicator.onRpcRequest('request', (data: requestTaskMsg) => { if (data.type === 'requestOfTask') { _this._handleExternalRequest(data); } }); // Subscribe to Responses this._communicator.onRpcResult('response', (data: responseTaskMsg) => { if (data.type === 'response') { _this._handleExternalResponse(data); } }); break; } // Subscribe to the availableServices of Remotes. // If there is a new Service => udpate the External Services this._communicator.onNewServicesAvailable(data => { try { if (data.dispatcher !== _this.id) { _this._mappingOfRemoteDispatchersAndFunctions.set(data.dispatcher, data); _this._updateExternalServices(); } } catch (e) { } }); this._communicator.onNewTopicsAvailable(data => { try { if (data.dispatcher !== _this.id) { _this._mappingOfRemoteDispatchersAndTopics.set(data.dispatcher, data); _this._updateExternalTopics(); } } catch (e) { } }); this._communicator.onBonjour((dispatcher: string) => { if (_this.id !== dispatcher){ _this._sendAvailableServices(); _this._sendAvailableTopic(); } }); this._communicator.emitBonjour(this.id); } /** * Function to update the used Services. * * @protected * @memberof serviceRegistry */ protected _updateExternalServices() { const _this = this; // Clear the Services this._externalServices.clear(); for (const dispatcherInfo of this._mappingOfRemoteDispatchersAndFunctions.values()) { dispatcherInfo.services.map(service => _this._externalServices.add(service)); } if (this._logger?.isDebugEnabled()) { // If there is a Logger: this._logger.debug( 'new services found: \n' + JSON.stringify( Array.from(this._externalServices), undefined, 4 ) ); } } /** * Internal Function to update the Listing of external Topcis. * This Function creates a list containing all subscriptions * and publishers which are external. * * @protected * @memberof nopeDispatcher */ protected _updateExternalTopics() { const _this = this; // Clear the Services this._externalPublished.clear(); this._externalSubscribed.clear(); for (const dispatcherInfo of this._mappingOfRemoteDispatchersAndTopics.values()) { dispatcherInfo.published.map(topic => _this._externalPublished.add(topic)); dispatcherInfo.subscribed.map(topic => _this._externalSubscribed.add(topic)); } if (this._logger?.isDebugEnabled()) { // If there is a Logger: this._logger.debug( 'new topics found: \n' + JSON.stringify( Array.from(this._externalSubscribed), undefined, 4 ) ); } } /** * Function to test if a specific Service exists. * * @param {string} id The Id of the Serivce * @return {boolean} The result of the Test. True if either local or remotly a service is known. * @memberof nopeDispatcher */ public serviceExists(id: string) { return this._definedFunctions.has(id) || this._externalServices.has(id); } public subscriptionExists(topic: string){ return this._externalSubscribed.has(topic); } /** * Function to adapt a Request name. * Only used internally * * @protected * @param {string} id the original ID * @return {string} the adapted ID. * @memberof nopeDispatcher */ protected _getServiceName(id: string, type: 'request' | 'response'): string { return id.startsWith(`${type}/`) ? id : `${type}/${id}`; } /** * Internal Helper Function to subscribe to a Function element. * * @protected * @param {string} id the Id of the function * @param {(...args) => Promise} _cb The Callback of the Function * @return {void} the adapted ID. * @memberof nopeDispatcher */ protected _subscribeToFunction(id: string, _cb: (...args) => Promise): void { const _req = this._getServiceName(id, 'request'); if ( this._subscriptionMode === 'individual' && !this._communicatorCallbacks.has(id) ) { const _this = this; // Define a Function. const cb = (data: requestTaskMsg) => { if (data.type === 'requestOfTask') { _this._handleExternalRequest(data, _cb); } }; // Add the Callback. this._communicatorCallbacks.set(id, { registeredId: _req, type: 'request', cb }); // Register Functions. this._communicator.onRpcRequest(_req, cb); } } protected _subscribeToResult(id: string, deleteAfterCalling: boolean): void { const _res = this._getServiceName(id, 'response'); if ( this._subscriptionMode === 'individual' && !this._communicatorCallbacks.has(id) ) { const _this = this; // Define a Function. const cb = (data: responseTaskMsg) => { if (data.type === 'response') { if (_this._handleExternalResponse(data)) { _this._removeRpcSubscription(id); } } }; // Add the Callback. this._communicatorCallbacks.set(id, { registeredId: _res, type: 'response', cb }); // Register Functions. this._communicator.onRpcResult(_res, cb); } } /** * Function to register a Function in the Dispatcher * * @param {(...args) => Promise} func The function which should be called if a request is mapped to the Function. * @param {{ * // Flag to enable unregistering the function after calling. * deleteAfterCalling?: boolean, * // Instead of generating a uuid an id could be provided * id?: string; * }} [options={}] Options to enhance the registered ID and enabling unregistering the Element after calling it. * @return {*} {(...args) => Promise} The registered Function * @memberof nopeDispatcher */ public registerFunction(func: (...args) => Promise, options: { /** Flag to enable unregistering the function after calling. */ deleteAfterCalling?: boolean, /** Instead of generating a uuid an id could be provided */ id?: string; /** Flag to enable / disable sending to registery */ preventSendingToRegistery?: boolean; } = {}): (...args) => Promise { const _this = this; // Define / Use the ID of the Function. const _id = options.id || generateId(); let _func = func; if (options.deleteAfterCalling) { _func = async (...args) => { // Unregister the Method _this.unregistFunction(_id, { preventSendingToRegistery: options.preventSendingToRegistery }); // Return the Result of the Original Function. return await func(...args); } } // Define a ID for the Function _func['id'] = _id; // Define the callback. _func['unregister'] = () => _this.unregistFunction(_id); // Reister the Function this._definedFunctions.set(_func['id'], _func); // Register the Callback: this._subscribeToFunction(_id, _func); if (!options.preventSendingToRegistery) { // Publish the Available Services. this._sendAvailableServices(); } // Return the Function. return _func; } /** * Function to unregister a Function from the Dispatcher * @param {(((...args) => void) | string | number)} func The Function to unregister * @return {*} {boolean} Flag, whether the element was removed (only if found) or not. * @memberof nopeDispatcher */ public unregistFunction(func: ((...args) => void) | string, options: { /** Flag to enable / disable sending to registery */ preventSendingToRegistery?: boolean; } = {}): boolean { const _id = typeof func === 'string' ? func : func['id'] as string || '0'; this._removeRpcSubscription(_id); if (!options.preventSendingToRegistery) { // Publish the Available Services. this._sendAvailableServices(); } return this._definedFunctions.delete(_id); } protected _externallySubscribeObservables: Map, cb: (...arg) => void, }>; protected _internallySubscribeObservables: Map>>; /** * Creates an Event listener (if required) * * @protected * @param {string} event The Event to Listen. * @return {nopeObservable} An Listener on the Communication Channel. * @memberof nopeDispatcher */ protected _subscribeToEvent(event: string) { const item = this._externallySubscribeObservables.get(event) || { observable: new nopeObservable(), cb: () => { }, }; if (!item.observable.hasSubscriptions) { const _this = this; const cb = (data: externalEvent) => { item.observable.setContent(data, _this.id); }; this._communicator.onEvent(event, cb); item.cb = cb; } // Set the Items. this._externallySubscribeObservables.set(event, item); return item.observable; } /** * Function to unsubscribe from an event of the channel. * * @protected * @param {string} event * @memberof nopeDispatcher */ protected _unsubscribeEvent(event: string) { const item = this._externallySubscribeObservables.get(event) || { observable: new nopeObservable(), cb: () => { }, count: 0, }; if (item && item.observable.hasSubscriptions) { this._communicator.offEvent(event, item.cb); this._externallySubscribeObservables.delete(event); } } /** * Function to register a Oberservable into the element. * * @template T * @template K * @template S * @template G * @param {nopeObservable} observable * @param {({ * mode: 'subscribe' | 'publish' | Array<'subscribe' | 'publish'>, * topic: string | { * subscribe?: string, * publish?: string, * }, * pipe?:{ * pipe?: pipe, * scope?: { [index: string]: any } * }, * preventSendingToRegistery?: boolean * })} options * @return {*} {nopeObservable} * @memberof nopeDispatcher */ public registerObservable(observable: nopeObservable, options: { mode: 'subscribe' | 'publish' | Array<'subscribe' | 'publish'>, topic: string | { subscribe?: string, publish?: string, }, pipe?: { pipe?: pipe, scope?: { [index: string]: any } }, preventSendingToRegistery?: boolean }): nopeObservable { // Reference to itself const _this = this; // Extract the Topics, pipe and scope. const _subTopic = typeof options.topic === 'string' ? options.topic : options.topic.subscribe || null; const _pubTopic = typeof options.topic === 'string' ? options.topic : options.topic.publish || null; const _pipe = typeof options.pipe === 'function' ? options.pipe || null : null; const _scope = typeof options.pipe === 'object' ? options.pipe.scope || null : null // A Flag, indicating, whether the topic is new or not. let newElement = false; // Test if the Item should be subscribe or not. if (options.mode == 'subscribe' || (Array.isArray(options.mode) && options.mode.includes('subscribe'))) { newElement = newElement || !this._externallySubscribeObservables.has(_subTopic); const _externalSource = this._subscribeToEvent(_subTopic); if (_pipe) { const observer = _externalSource.enhancedSubscription((data: externalEvent) => { // Test if the Content, which has been forwared in here inst the own dispathcer. if (data.sender != _this.id){ observable.setContent(data.data, _this.id, data.timestamp); } }, { scope: _scope, pipe: _pipe }); const dispose = observable.dispose; observable.dispose = () => { // Kill the Observer; observer.unsubscribe(); // Unsubscribe the Event _this._unsubscribeEvent(_subTopic); // Call the original Dispose function; dispose.apply(observable); } } else { const observer = _externalSource.subscribe({ next(data: externalEvent) { if (_this.id !== data.sender) { observable.setContent(data.data, _this.id, data.timestamp); } }, complete() { observable.observable.complete(); }, error(err) { observable.observable.error(err); } }); // Overwrite the Original Dispose Function. const dispose = observable.dispose; observable.dispose = () => { // Kill the Observer; observer.unsubscribe(); // Unsubscribe the Event _this._unsubscribeEvent(_subTopic); // Call the original Dispose function; dispose.apply(observable); } } } if (options.mode == 'publish' || (Array.isArray(options.mode) && options.mode.includes('publish'))) { const cb = (data, sender?, timestamp?, ...args) => { // Only Publish data, if there exists a Subscription. if (_this.subscriptionExists(_pubTopic) && _this.id !== sender) { // Use the Communicator to emit the Event. _this._communicator.emitEvent(_pubTopic, { data:data, topic: _pubTopic, sender: _this.id, type: 'event', timestamp }); } } // Update the Flag. newElement = newElement || !this._internallySubscribeObservables.has(_pubTopic); // Register the Internally Subscribed Element. const _set = this._internallySubscribeObservables.get(_pubTopic) || new Set(); _set.add(observable); this._internallySubscribeObservables.set(_pubTopic, _set); if (_pipe) { const observer = observable.enhancedSubscription(cb, { scope: _scope, pipe: _pipe }); // Overwrite the Original Dispose Function. const dispose = observable.dispose; observable.dispose = () => { // Kill the Observer; observer.unsubscribe(); // Unsubscribe the Event _this._unsubscribeEvent(_subTopic); // Unregister the Internally Subscribed Element. const _set = _this._internallySubscribeObservables.get(_pubTopic) || new Set(); _set.delete(observable); if (_set.size > 0) { _this._internallySubscribeObservables.set(_pubTopic, _set); } else { _this._internallySubscribeObservables.delete(_pubTopic); // Optionally send an update. if (!options.preventSendingToRegistery) { // Publish the Available Services. _this._sendAvailableTopic(); } } // Call the original Dispose function; dispose.apply(observable); } } else { const observer = observable.subscribe(cb); // Overwrite the Original Dispose Function. const dispose = observable.dispose; observable.dispose = () => { // Kill the Observer; observer.unsubscribe(); // Unsubscribe the Event _this._unsubscribeEvent(_subTopic); // Unregister the Internally Subscribed Element. const _set = _this._internallySubscribeObservables.get(_pubTopic) || new Set(); _set.delete(observable); if (_set.size > 0) { _this._internallySubscribeObservables.set(_pubTopic, _set); } else { _this._internallySubscribeObservables.delete(_pubTopic); // Optionally send an update. if (!options.preventSendingToRegistery) { // Publish the Available Services. _this._sendAvailableTopic(); } } // Call the original Dispose function; dispose.apply(observable); } } } if (!options.preventSendingToRegistery && newElement) { // Publish the Available Services. this._sendAvailableTopic(); } // Return the Function. return observable; } protected _removeRpcSubscription(_id: string) { // Try to unregister the Callback from the communcator: if (this._communicatorCallbacks.has(_id)) { const _callbacks = this._communicatorCallbacks.get(_id); switch (_callbacks.type) { case 'request': // Unregister the RPC-Request-Listener this._communicator.offRpcRequest(_callbacks.registeredId, _callbacks.cb); break; case 'response': // Unregister the RPC-Response-Listener this._communicator.offRpcResponse(_callbacks.registeredId, _callbacks.cb); break; } // Remove the Callback this._communicatorCallbacks.delete(_id); } } /** * Function used to update the Available Services. * * @protected * @memberof nopeDispatcher */ protected _sendAvailableServices() { // Define the Message const message: availableServices = { dispatcher: this.id, services: Array.from(this._definedFunctions.keys()) } // Send the Message. this._communicator.emitNewServicesAvailable(message); } /** * Function to emit the available topics. * * @protected * @memberof nopeDispatcher */ protected _sendAvailableTopic() { // Define the Message const message: availableTopics = { dispatcher: this.id, published: Array.from(this._internallySubscribeObservables.keys()), subscribed: Array.from(this._externallySubscribeObservables.keys()) } // Send the Message. this._communicator.emitNewTopicsAvailable(message); } /** * Function which is used to perform a call on the remote. * * @template T * @param {string} functionName The Name / ID of the Function * @param {any[]} params The provided Parameters. * @param {({ * deletableCallbacks: Array; * })} [options={ * deletableCallbacks: [], * paramsHasNoCallback: false, * preventErrorTest: false * }] You could additiona Optionf for the callback. * @return {*} {Promise} The result of the Operation * @memberof nopeDispatcher */ public performCall(functionName: string, params: any[], options: Partial = {}): Promise { // Get a Call Id const _taskId = generateId(); const _registeredIdx: Array = []; const _this = this; const _options = Object.assign({ deletableCallbacks: [], paramsHasNoCallback: false, dynamicCallback: false, resultSink: (this._resultSharing === 'generic' ? 'response' : this._getServiceName(functionName, 'response')) as string } as callOptions, options) as callOptions; this._subscribeToResult(functionName, _options.dynamicCallback) // Define a Callback-Function, which will expect the Task. return new Promise((resolve, reject) => { try { // Register the Handlers, _this._runningTasks.set(_taskId, { resolve, reject }); // Define a Task-Request const taskRequest: requestTaskMsg = { functionId: functionName, params, callbacks: [], taskId: _taskId, type: 'requestOfTask', resultSink: _options.resultSink } // Test if there is no Callback integrated if (!options.paramsHasNoCallback) { // If so, the parameters has to be detailled: // 1. Reset the Params list: taskRequest.params = []; // 2. Iterate over all Parameters and // Determin Callbacks. Based on the Parameter- // Type assign it either to packet.params ( // for parsable Parameters) and packet.callbacks // (for callback Parameters) for (const [idx, contentOfParameter] of params.entries()) { // Test if the parameter is a Function if (typeof contentOfParameter !== "function") { taskRequest.params.push({ idx, data: contentOfParameter }); } else { // The Parameter is a Callback => store a // Description of the Callback and register // the callback inside of the Dispatcher const deleteAfterCalling = _options.deletableCallbacks.includes(idx); const _func = _this.registerFunction(contentOfParameter, { deleteAfterCalling, preventSendingToRegistery: true, }); _registeredIdx.push(_func['id']); // Register the Callback taskRequest.callbacks.push({ functionId: _func['id'], idx, deleteAfterCalling, dynamicCallback: true, deletableCallbacks: [], resultSink: _this._resultSharing === 'generic' ? 'response' : _this._getServiceName(_func['id'], 'response') }); } } } if (!_options.dynamicCallback && !_this.serviceExists(functionName)) { // Create an Error: const error = new Error('No Service Provider known for "' + functionName + '"'); if (_this._logger) { _this._logger.error('No Service Provider known for "' + functionName + '"'); _this._logger.error(error) } throw error; } // Send the Message to the specific element: if (_this._subscriptionMode === 'individual') { _this._communicator.emitRpcRequest(_this._getServiceName(taskRequest.functionId, 'request'), taskRequest); } else { _this._communicator.emitRpcRequest('request', taskRequest); } } catch (e) { // Delete all Callbacks. _registeredIdx.map(id => _this.unregistFunction(id)); // Remove the task: if (_this._runningTasks.has(_taskId)) _this._runningTasks.delete(_taskId); // Throw an error. reject(e); } }); } /** * Function to clear all pending tasks * * @memberof nopeDispatcher */ public clearTasks(): void { if (this._runningTasks) this._runningTasks.clear(); else this._runningTasks = new Map void; reject: (error: any) => void; }>(); } /** * Function to unregister all Functions of the Dispatcher. * * @memberof nopeDispatcher */ public unregisterAll(): void { if (this._definedFunctions) { for (const id of this._definedFunctions.keys()) { this.unregistFunction(id) } this._definedFunctions.clear(); } else { this._definedFunctions = new Map Promise>(); } // Reset the Callbacks. this._communicatorCallbacks = new Map(); } /** * Function to reset the Dispatcher. * * @memberof nopeDispatcher */ public reset(): void { this._remotlyCalledFunctions = new Set(); this._mappingOfRemoteDispatchersAndFunctions = new Map(); this._mappingOfRemoteDispatchersAndTopics = new Map(); this._externalServices = new Set(); this._externalPublished = new Set(); this._externalSubscribed = new Set(); this._internallySubscribeObservables = new Map(); this._externallySubscribeObservables = new Map(); this.clearTasks(); this.unregisterAll(); } }