/** * @author Martin Karkowski * @email m.karkowski@zema.de * @create date 2020-10-12 18:52:00 * @modify date 2021-10-19 09:15:25 * @desc [description] */ import * as Logger from "js-logger"; import { ILogger } from "js-logger"; import { NopeEventEmitter } from "../eventEmitter/index"; import { isAsyncFunction } from "../helpers/async"; import { generateId } from "../helpers/idMethods"; import { MapBasedMergeData } from "../helpers/mergedData"; import { defineNopeLogger } from "../logger/getLogger"; import { NopePromise } from "../promise/nopePromise"; import { IAvailableServicesMsg, ICallOptions, ICommunicationBridge, IExternalEventMsg, IMapBasedMergeData, INopeDispatcherOptions, INopeEventEmitter, INopeObservable, INopePromise, INopeRpcManager, IRequestTaskMsg, IResponseTaskMsg, ITaskCancelationMsg, ValidSelectorFunction, } from "../types/nope/index"; /** * A Dispatcher to perform a function on a Remote * Dispatcher. Therefore a Task is created and forwarded * to the remote. * * @export * @class nopeDispatcher */ export class NopeRpcManager implements INopeRpcManager { protected _logger: ILogger; /** * Internal Element to store the registered Functions * * @protected * @memberof nopeDispatcher */ protected _registeredServices: Map< string, { options: T; func: (...args) => Promise; } >; protected _communicatorCallbacks: Map< string, { registeredId: string; type: "request" | "response"; cb: (data) => any; } >; /** * A Mapping of the Services a dispatcher is hosting. * Key = Dispatcher-ID * Value = Available Services * * @protected * @type {Map< * string, * IAvailableServicesMsg * >} * @memberof nopeDispatcher */ protected _mappingOfDispatchersAndServices: Map< string, IAvailableServicesMsg >; /** * Proxy for accessing the Methods. This proxy provides additional * options, which can be used to detail the calls. * * @author M.Karkowski * @memberof NopeRpcManager */ public methodInterfaceWithOptions: { [index: string]: ( options: Partial, ...args ) => INopePromise; }; /** * Proxy for accessing the Methods. This proxy provides additional * options, which can be used to detail the calls. * * @author M.Karkowski * @memberof NopeRpcManager */ public methodInterface: { [index: string]: (...args) => INopePromise }; /** * Element showing the available services. * Its more or less a map, that maps the * services with their dispatchers. * * T = services name. * K = dispatcher - ids * * @author M.Karkowski * @type {IMapBasedMergeData} * @memberof INopeRpcManager */ public readonly services: IMapBasedMergeData< string, string, IAvailableServicesMsg >; /** * An event Emitter, which will be called when a task is getting * canceled. * * @author M.Karkowski * @type {INopeEventEmitter} * @memberof NopeRpcManager */ public readonly onCancelTask: INopeEventEmitter; /** * Internal Element to store the running tasks. * * @protected * @memberof nopeDispatcher */ protected _runningInternalRequestedTasks: Map< string, { resolve: (value: any) => void; reject: (error: any) => void; clear: () => void; serviceName: string; timeout?: any; target: string; } >; /** * List, with external tasks, that are running. */ protected _runningExternalRequestedTasks: Set; /** * Flag to show an inital warning */ protected __warned: boolean; /** * The used Communication interface * */ public readonly communicator: ICommunicationBridge; /** * Flag to indicate, that the system is ready. * * @author M.Karkowski * @type {INopeObservable} * @memberof NopeRpcManager */ public readonly ready: INopeObservable; /** * Creates an instance of nopeDispatcher. * @param {nopeRpcDispatcherOptions} options The Options, used by the Dispatcher. * @param {() => INopeObservable} _generateObservable A Helper, to generate Observables. * @memberof nopeDispatcher */ constructor( public options: INopeDispatcherOptions, protected _generateObservable: () => INopeObservable, protected _defaultSelector: ValidSelectorFunction, public readonly id: string = null ) { this.communicator = options.communicator; if (id == null) { this.id = generateId(); } this._logger = defineNopeLogger(options.logger, `rpc-manager`); // Flag to show if the system is ready or not. this.ready = this._generateObservable(); this.ready.setContent(false); this.__warned = false; // Define A Proxy for accessing methods easier. const _this = this; const _handlerWithOptions = { get(target, name) { return (options: ICallOptions, ...args) => _this.performCall(name, args, options); }, }; // Define the Proxy without the Options const _handlerWithoutOptions = { get(target, name) { return (...args) => _this.performCall(name, args); }, }; this.methodInterfaceWithOptions = new Proxy({}, _handlerWithOptions); this.methodInterface = new Proxy({}, _handlerWithoutOptions); this.services = new MapBasedMergeData( this._mappingOfDispatchersAndServices, "services" ); this.onCancelTask = new NopeEventEmitter(); if (this._logger) { this._logger.info("RPC-Manager Online", this.id); } this.reset(); this._init().catch((error) => { if (_this._logger) { _this._logger.error("Failed to intialize the Dispatcher", error); } }); } /** * Function, which will be called, if an dispatcher matches * * @author M.Karkowski * @param {IAvailableServicesMsg} msg * @memberof NopeRpcManager */ public updateDispatcher(msg: IAvailableServicesMsg): void { this._mappingOfDispatchersAndServices.set(msg.dispatcher, msg); this.services.update(); } /** * Internal Method to handle the rpcs requests. * * @protected * @param {IRequestTaskMsg} data The provided data of the request * @param {(...args) => Promise} [_function] * @return {*} {Promise} * @memberof NopeRpcManager */ protected async _handleExternalRequest( data: IRequestTaskMsg, _function?: (...args) => Promise ): Promise { try { // Try to get the function if not provided: if (typeof _function !== "function") { _function = this._registeredServices.get(data.functionId)?.func; } if (this._logger?.enabledFor((Logger as any).DEBUG)) { // If there is a Logger: this._logger.debug( `Dispatcher "${this.id}" received request: "${data.functionId}" -> task: "${data.taskId}"` ); } const _this = this; if (typeof _function === "function") { // Now we check, if we have to perform test, whether // we are allowed to execute the task: if (data.target && data.target !== this.id) { return; } // Callbacks const cbs: Array<(reason) => void> = []; const observer = _this.onCancelTask.subscribe((cancelEvent) => { if (cancelEvent.taskId == data.taskId) { // Call Every Callback. cbs.map((cb) => cb(cancelEvent.reason)); // Although we are allowed to Cancel the Subscription observer.unsubscribe(); } }); // Only if the Function is present extract the arguments etc. const args = []; // First extract the basic arguments data.params.map((item) => (args[item.idx] = item.data)); // Perform the Task it self. const _resultPromise = _function(...args); if ( typeof (_resultPromise as INopePromise)?.cancel === "function" ) { // Push the Callback to the Result. cbs.push((reason) => (_resultPromise as INopePromise).cancel(reason) ); } // Wait for the Result to finish. const _result = await _resultPromise; // Define the Result message const result: IResponseTaskMsg = { result: typeof _result !== "undefined" ? _result : null, taskId: data.taskId, type: "response", }; // Use the communicator to publish the result. await this.communicator.emitRpcResponse(data.resultSink, result); } } catch (error) { if (this._logger) { // If there is a Logger: this._logger.error( `Dispatcher "${this.id}" failed with request: "${data.taskId}"` ); this._logger.error(error); } // An Error occourd => Forward the Error. const result: IResponseTaskMsg = { error: { error, msg: error.toString(), }, taskId: data.taskId, type: "response", }; // Send the Error via the communicator to the remote. await this.communicator.emitRpcResponse(data.resultSink, result); } } /** * Internal Function to handle responses. In Generale, * the dispatcher checks if there is an open task with * the provided id. If so => finish the promise. * * @protected * @param {IResponseTaskMsg} data The Data provided to handle the Response. * @return {boolean} Returns a boolean, indicating whether a corresponding task was found or not. * @memberof nopeDispatcher */ protected _handleExternalResponse(data: IResponseTaskMsg): boolean { try { // Extract the Task const task = this._runningInternalRequestedTasks.get(data.taskId); // Delete the Task: this._runningInternalRequestedTasks.delete(data.taskId); // Based on the Result of the Remote => proceed. // Either throw an error or forward the result if (task && data.error) { if (this._logger) { this._logger.error(`Failed with task ${data.taskId}`); this._logger.error(`Reason: ${data.error.msg}`); this._logger.error(data.error); } task.reject(data.error); // Clearout the Timer if (task.timeout) { clearTimeout(task.timeout); } return true; } if (task) { task.resolve(data.result); // Clearout the Timer if (task.timeout) { clearTimeout(task.timeout); } return true; } } catch (e) { this._logger.error("Error during handling an external response"); this._logger.error(e); } return false; } /** * Function used to update the Available Services. * * @protected * @memberof nopeDispatcher */ protected _sendAvailableServices(): void { // Define the Message const message: IAvailableServicesMsg = { dispatcher: this.id, services: Array.from(this._registeredServices.keys()), }; if (this._logger?.enabledFor((Logger as any).DEBUG)) { this._logger.debug("sending available services"); } // Send the Message. this.communicator.emitNewServicesAvailable(message); } /** * Internal Function, used to initialize the Dispatcher. * It subscribes to the "Messages" of the communicator. * * @protected * @memberof nopeDispatcher */ protected async _init(): Promise { const _this = this; this.ready.setContent(false); // Wait until the Element is connected. await this.communicator.connected.waitFor((value) => value); // Iterate over the Defined Functions and create Subscriptions for (const [id, item] of this._registeredServices.entries()) { // Subscribe the Function this._listenForRequest(id, item.func); } // Subscribe to the availableServices of Remotes. // If there is a new Service => udpate the External Services await this.communicator.onNewServicesAvailable((data) => { try { _this.updateDispatcher(data); } catch (e) { this._logger.error("Error during handling an onNewServicesAvailable"); this._logger.error(e); } }); await this.communicator.onBonjour((info) => { _this._sendAvailableServices(); }); await this.communicator.onAurevoir((dispatcher: string) => _this.removeDispatcher(dispatcher) ); await this.communicator.onTaskCancelation((event) => { if (event.dispatcher !== _this.id) { _this.onCancelTask.emit(event); } }); // Now we listen to this.communicator.onUnregisterRpc((msg) => { if (_this._registeredServices.has(msg.identifier)) { // const item = _this._registeredCallbacks.get(msg.identifier); _this._unregisterService(msg.identifier); } }); if (this._logger) { this._logger.info(this.id, "initialized"); } this.ready.setContent(true); } public removeDispatcher(dispatcher: string): void { // Delete the Generators of the Instances. this._mappingOfDispatchersAndServices.delete(dispatcher); this.services.update(); // Now we need to cancel every Task of the dispatcher, // which isnt present any more. this.cancelRunningTasksOfDispatcher( dispatcher, new Error( "Dispatcher has been removed! Tasks cannot be executed any more." ) ); } /** * Function to cancel an indivual Task. * * @param {string} taskId The Id of the Task. Which should be canceled. * @param {Error} reason The Reason, why the Task should be canceled (In general shoudl be something meaning full) * @return {*} Flag, that indicates, whether cancelation was sucessfull or not. * @memberof nopeDispatcher */ public async cancelTask( taskId: string, reason: Error, quite = false ): Promise { if (this._runningInternalRequestedTasks.has(taskId)) { const task = this._runningInternalRequestedTasks.get(taskId); // Delete the task this._runningInternalRequestedTasks.delete(taskId); // Propagate the Cancellation (internally): task.reject(reason); // Propagate the Cancellation externally. // Therefore use the desired Mode. await this.communicator.emitTaskCancelation({ dispatcher: this.id, reason, taskId, quite, }); // Indicate a successful cancelation. return true; } // Task hasnt been found => Cancel the Task. return false; } /** * Internal Helper Function, used to close all tasks with a specific service. * * @protected * @param {string} serviceName The Name of the Service. * @param {Error} reason The provided Reason, why cancelation is reuqired. * @memberof nopeDispatcher */ public async cancelRunningTasksOfService(serviceName: string, reason: Error) { // Provide a List containing all Tasks, that has to be canceled const _tasksToCancel: string[] = []; // Filter all Tasks that shoud be canceled. for (const [id, task] of this._runningInternalRequestedTasks.entries()) { // Therefore compare the reuqired Service by the Task if (task.serviceName === serviceName) { // if the service matches, put it to our list. _tasksToCancel.push(id); } } const promises: Promise[] = []; if (_tasksToCancel.length > 0) { // First remove all Tasks. // Then cancel them to avoid side effects for (const id of _tasksToCancel) { promises.push(this.cancelTask(id, reason)); } } await Promise.all(promises); } /** * Cancels all Tasks of the given Dispatcher * * @author M.Karkowski * @param {string} dispatcher * @param {Error} reason * @memberof NopeRpcManager */ public async cancelRunningTasksOfDispatcher( dispatcher: string, reason: Error ): Promise { // Provide a List containing all Tasks, that has to be canceled const _tasksToCancel: string[] = []; // Filter all Tasks that shoud be canceled. for (const [id, task] of this._runningInternalRequestedTasks.entries()) { // Therefore compare the reuqired Service by the Task if (task.target === dispatcher) { // if the service matches, put it to our list. _tasksToCancel.push(id); } } const promises: Promise[] = []; if (_tasksToCancel.length > 0) { // First remove all Tasks. // Then cancel them to avoid side effects for (const id of _tasksToCancel) { promises.push(this.cancelTask(id, reason)); } } await Promise.all(promises); } /** * Function to test if a specific Service exists. * * @param {string} id The Id of the Serivce * @return {boolean} The result of the Test. True if either local or remotly a service is known. * @memberof nopeDispatcher */ public serviceExists(id: string): boolean { return this.services.amountOf.has(id); } /** * Function to adapt a Request name. * Only used internally * * @protected * @param {string} id the original ID * @return {string} the adapted ID. * @memberof nopeDispatcher */ protected _getServiceName(id: string, type: "request" | "response"): string { return id.startsWith(`${type}/`) ? id : `${type}/${id}`; } /** * Internal Helper Function to subscribe to a Function element. * * @protected * @param {string} id the Id of the function * @param {(...args) => Promise} _cb The Callback of the Function * @return {void} the adapted ID. * @memberof nopeDispatcher */ protected _listenForRequest( id: string, _cb: (...args) => Promise ): void { const _req = this._getServiceName(id, "request"); if (!this._communicatorCallbacks.has(_req)) { const _this = this; // Define a Function. const cb = (data: IRequestTaskMsg) => { if (data.type === "requestOfTask") { _this._handleExternalRequest(data, _cb); } }; // Add the Callback. this._communicatorCallbacks.set(_req, { registeredId: _req, type: "request", cb, }); // Register Functions. this.communicator.onRpcRequest(_req, cb); if (this._logger?.enabledFor((Logger as any).DEBUG)) { // If there is a Logger: this._logger.debug(`Dispatcher "${this.id}" listening on: "${_req}"`); } } } /** * Function, used to subscribe to results. * * @protected * @param {string} id * @param {boolean} deleteAfterCalling * @memberof nopeDispatcher */ protected _listenForResult( id: string, deleteAfterCalling: boolean = false ): void { const _res = this._getServiceName(id, "response"); if (!this._communicatorCallbacks.has(_res)) { const _this = this; // Define a Function. const cb = (data: IResponseTaskMsg) => { if (data.type === "response") { if (_this._handleExternalResponse(data) && deleteAfterCalling) { _this._removeRpcSubscription(_res); } } }; // Add the Callback. this._communicatorCallbacks.set(_res, { registeredId: _res, type: "response", cb, }); // Register Functions. this.communicator.onRpcResponse(_res, cb); } } /** * Function to register a Function in the Dispatcher * * @param {(...args) => Promise} func The function which should be called if a request is mapped to the Function. * @param {{ * // Flag to enable unregistering the function after calling. * deleteAfterCalling?: boolean, * // Instead of generating a uuid an id could be provided * id?: string; * }} [options={}] Options to enhance the registered ID and enabling unregistering the Element after calling it. * @return {*} {(...args) => Promise} The registered Function * @memberof nopeDispatcher */ public registerService( func: (...args) => Promise, options: { /** Instead of generating a uuid an id could be provided */ id?: string; } & Partial = {} ): (...args) => Promise { const _this = this; // Define / Use the ID of the Function. const _id = options.id || generateId(); let _func = func; if (!this.__warned && !isAsyncFunction(func)) { this._logger.warn( "!!! You have provided synchronous functions. They may break NoPE. Use them with care !!!" ); this.__warned = true; } // Define a ID for the Function (_func as any).id = _id; // Define the callback. (_func as any).unregister = () => _this._unregisterService(_id); // Reister the Function this._registeredServices.set((_func as any).id, { options: options as T, func: _func, }); // Register the Callback: this._listenForRequest(_id, _func); // Publish the Available Services. this._sendAvailableServices(); if (this._logger?.enabledFor((Logger as any).DEBUG)) { // If there is a Logger: this._logger.debug(`Dispatcher "${this.id}" registered: "${_id}"`); } // Return the Function. return _func; } public async unregisterService( func: string | ((...args: any[]) => any) ): Promise { this._unregisterService(func); } /** * Function to unregister a Function from the Dispatcher * @param {(((...args) => void) | string | number)} func The Function to unregister * @return {*} {boolean} Flag, whether the element was removed (only if found) or not. * @memberof nopeDispatcher */ public _unregisterService(func: ((...args) => void) | string): boolean { const _id = typeof func === "string" ? func : ((func as any).id as string) || "0"; this._removeRpcSubscription(_id); // Publish the Available Services. this._sendAvailableServices(); if (this._logger?.enabledFor((Logger as any).DEBUG)) { // If there is a Logger: this._logger.debug(`Dispatcher "${this.id}" unregistered: "${_id}"`); } return this._registeredServices.delete(_id); } protected _removeRpcSubscription(_id: string): void { // Try to unregister the Callback from the communcator: if (this._communicatorCallbacks.has(_id)) { const _callbacks = this._communicatorCallbacks.get(_id); switch (_callbacks.type) { case "request": // Unregister the RPC-Request-Listener this.communicator.offRpcRequest( _callbacks.registeredId, _callbacks.cb ); break; case "response": // Unregister the RPC-Response-Listener this.communicator.offRpcResponse( _callbacks.registeredId, _callbacks.cb ); break; } // Remove the Callback this._communicatorCallbacks.delete(_id); } } /** * Function which is used to perform a call on the remote. * * @author M.Karkowski * @template T * @param {string} serviceName serviceName The Name / ID of the Function * @param {any[]} params * @param {(Partial & { * selector?: ValidSelectorFunction; * quite?: boolean; * })} [options={}] Options for the Call. You can assign a different selector. * @return {*} {INopePromise} The result of the call * @memberof nopeDispatcher */ public performCall( serviceName: string, params: any[], options: Partial & { selector?: ValidSelectorFunction; } = {} ): INopePromise { // Get a Call Id const _taskId = generateId(); const _this = this; const _options = { deletableCallbacks: [], paramsHasNoCallback: false, dynamicCallback: false, resultSink: this._getServiceName(serviceName, "response"), ...options, } as ICallOptions; this._listenForResult(serviceName); const clear = () => { // Remove the task: if (_this._runningInternalRequestedTasks.has(_taskId)) { const task = _this._runningInternalRequestedTasks.get(_taskId); // Remove the Timeout. if (task.timeout) { clearTimeout(task.timeout); } // Remove the Task itself _this._runningInternalRequestedTasks.delete(_taskId); } if (_this._logger?.enabledFor((Logger as any).DEBUG)) { _this._logger.debug(`Clearing Callbacks from ${_taskId}`); } }; if (_this._logger?.enabledFor((Logger as any).DEBUG)) { _this._logger.debug( `Dispatcher "${this.id}" requesting externally Function "${serviceName}" with task: "${_taskId}"` ); } // Define a Callback-Function, which will expect the Task. const ret = new NopePromise(async (resolve, reject) => { try { const requestedTask: any = { resolve, reject, clear, serviceName, timeout: null, }; // Register the Handlers, _this._runningInternalRequestedTasks.set(_taskId, requestedTask); // Define a Task-Request const taskRequest: IRequestTaskMsg = { functionId: serviceName, params: [], callbacks: [], taskId: _taskId, type: "requestOfTask", resultSink: _options.resultSink, }; for (const [idx, contentOfParameter] of params.entries()) { taskRequest.params.push({ idx, data: contentOfParameter, }); } if (!_this.serviceExists(serviceName)) { // Create an Error: const error = new Error( `No Service Provider known for "${serviceName}"` ); if (_this._logger) { _this._logger.error( `No Service Provider known for "${serviceName}"` ); _this._logger.error(error); } throw error; } if ( _this.options.forceUsingSelectors || this.services.amountOf.get(serviceName) > 1 ) { if (typeof options?.selector === "function") { const dispatcherToUse = await options.selector({ rpcManager: this, serviceName, }); // Assign the Selector: taskRequest.target = dispatcherToUse; } else { const dispatcherToUse = await this._defaultSelector({ rpcManager: this, serviceName, }); // Assign the Selector: taskRequest.target = dispatcherToUse; } } // Send the Message to the specific element: await _this.communicator.emitRpcRequest( _this._getServiceName(taskRequest.functionId, "request"), taskRequest ); if (_this._logger?.enabledFor((Logger as any).DEBUG)) { _this._logger.debug( `Dispatcher "${ this.id }" putting task "${_taskId}" on: "${_this._getServiceName( taskRequest.functionId, "request" )}"` ); } // If there is a timeout => if (options.timeout > 0) { requestedTask.timeout = setTimeout(() => { _this.cancelTask( _taskId, new Error( `TIMEOUT. The Service allowed execution time of ${options.timeout.toString()}[ms] has been excided` ), false ); }, options.timeout); } } catch (e) { // Clear all Elements of the Function: clear(); // Throw the error. reject(e); } }); ret.taskId = _taskId; ret.cancel = (reason) => { _this.cancelTask(_taskId, reason); }; return ret; } /** * Function to clear all pending tasks * * @memberof nopeDispatcher */ public clearTasks(): void { if (this._runningInternalRequestedTasks) { this._runningInternalRequestedTasks.clear(); } else this._runningInternalRequestedTasks = new Map(); } /** * Function to unregister all Functions of the Dispatcher. * * @memberof nopeDispatcher */ public unregisterAll(): void { if (this._registeredServices) { for (const id of this._registeredServices.keys()) { this._unregisterService(id); } this._registeredServices.clear(); } else { this._registeredServices = new Map(); } // Reset the Callbacks. this._communicatorCallbacks = new Map(); } /** * Function to reset the Dispatcher. * * @memberof nopeDispatcher */ public reset(): void { this._mappingOfDispatchersAndServices = new Map(); this.services.update(this._mappingOfDispatchersAndServices); this.clearTasks(); this.unregisterAll(); } }