diff --git a/lib/RpcManager/NopeRpcManager.ts b/lib/RpcManager/NopeRpcManager.ts index 4323272..cd9034b 100644 --- a/lib/RpcManager/NopeRpcManager.ts +++ b/lib/RpcManager/NopeRpcManager.ts @@ -65,14 +65,6 @@ export class NopeRpcManager implements INopeRpcManager { } >; - /** - * The used Communication interface - * - * @type {ICommunicationBridge} - * @memberof nopeDispatcher - */ - public readonly communicator: ICommunicationBridge; - /** * A Mapping of the Services a dispatcher is hosting. * Key = Dispatcher-ID @@ -169,6 +161,21 @@ export class NopeRpcManager implements INopeRpcManager { */ protected __warned: boolean; + /** + * The used Communication interface + * + */ + public readonly communicator: ICommunicationBridge; + + /** + * Flag to indicate, that the system is ready. + * + * @author M.Karkowski + * @type {INopeObservable} + * @memberof NopeRpcManager + */ + public readonly ready: INopeObservable; + /** * Creates an instance of nopeDispatcher. * @param {nopeRpcDispatcherOptions} options The Options, used by the Dispatcher. @@ -244,13 +251,13 @@ export class NopeRpcManager implements INopeRpcManager { } /** - * Internal Method to handle some requests. + * Internal Method to handle the rpcs requests. * * @protected * @param {IRequestTaskMsg} data The provided data of the request - * @param {*} [_function=this._definedFunctions.get(data.functionId)] The Function can be provided - * @return {Promise} - * @memberof nopeDispatcher + * @param {(...args) => Promise} [_function] + * @return {*} {Promise} + * @memberof NopeRpcManager */ protected async _handleExternalRequest( data: IRequestTaskMsg, @@ -481,8 +488,6 @@ export class NopeRpcManager implements INopeRpcManager { this.ready.setContent(true); } - public readonly ready: INopeObservable; - public removeDispatcher(dispatcher: string): void { // Delete the Generators of the Instances. this._mappingOfDispatchersAndServices.delete(dispatcher); @@ -939,6 +944,7 @@ export class NopeRpcManager implements INopeRpcManager { if (typeof options?.selector === "function") { const dispatcherToUse = await options.selector({ rpcManager: this, + serviceName, }); // Assign the Selector: @@ -946,6 +952,7 @@ export class NopeRpcManager implements INopeRpcManager { } else { const dispatcherToUse = await this._defaultSelector({ rpcManager: this, + serviceName, }); // Assign the Selector: diff --git a/lib/dispatcher/StatusManager/StatusManager.spec.ts b/lib/dispatcher/StatusManager/StatusManager.spec.ts new file mode 100644 index 0000000..208c7ac --- /dev/null +++ b/lib/dispatcher/StatusManager/StatusManager.spec.ts @@ -0,0 +1,160 @@ +/** + * @author Martin Karkowski + * @email m.karkowski@zema.de + * @create date 2022-01-04 10:03:41 + * @modify date 2022-01-04 10:03:41 + * @desc [description] + */ + +import { expect } from "chai"; +import { beforeEach, describe, it } from "mocha"; +import "reflect-metadata"; +import { getLayer } from "../../communication/getLayer.nodejs"; +import { sleep } from "../../helpers/async"; +import { NopeObservable } from "../../observables/nopeObservable"; +import { NopeStatusManager } from "./StatusManager"; + +describe("NopeStatusManager", function () { + // Describe the required Test: + let first = new NopeStatusManager( + { + communicator: getLayer("event", "", false), + logger: false, + }, + () => new NopeObservable(), + "first" + ); + + describe("RpcManager Communication", function () { + let communicator = getLayer("event", "", false); + + beforeEach(() => { + first.dispose(true); + + communicator = getLayer("event", "", false); + + // Create a new Observer + first = new NopeStatusManager( + { + communicator, + logger: false, + }, + () => new NopeObservable(), + "test" + ); + }); + + it("new detection", (done) => { + let sub = null; + sub = first.externalDispatchers.onChange.subscribe((changes) => { + if (changes.added.length >= 1) { + done(); + first.dispose(true); + second.dispose(true); + sub.unsubscribe(); + } else { + done(new Error("Not found")); + first.dispose(true); + second.dispose(true); + sub.unsubscribe(); + } + }); + + const second = new NopeStatusManager( + { + communicator, + logger: false, + timeouts: { + checkInterval: 10, + dead: 25, + remove: 30, + sendAliveInterval: 5, + slow: 15, + warn: 20, + }, + }, + () => new NopeObservable(), + "second" + ); + }); + + it("removed detection", (done) => { + let sub = null; + let firstCall = true; + sub = first.externalDispatchers.onChange.subscribe((changes) => { + if (firstCall) { + firstCall = false; + } else { + if (changes.removed.length >= 1) { + done(); + first.dispose(true); + second.dispose(true); + sub.unsubscribe(); + } else { + done(new Error("removing has not been detected")); + first.dispose(true); + second.dispose(true); + sub.unsubscribe(); + } + } + }); + + const second = new NopeStatusManager( + { + communicator, + logger: false, + timeouts: { + checkInterval: 10, + dead: 25, + remove: 30, + sendAliveInterval: 5, + slow: 15, + warn: 20, + }, + }, + () => new NopeObservable(), + "second" + ); + }); + + it("synchronizing time", async () => { + // Remove the Old Timer + first.dispose(true); + first = new NopeStatusManager( + { + communicator, + logger: false, + timeouts: { + checkInterval: 10, + dead: 25, + remove: 30, + sendAliveInterval: 5, + slow: 15, + warn: 20, + }, + }, + () => new NopeObservable(), + "second" + ); + const timestamp = Date.now(); + + let start = Date.now(); + await sleep(30); + let end = Date.now(); + + // We have waited something like 100 ms (+-) + // thats our delay. Now if we use that delay, + // we are able sync our time. + first.syncTime(timestamp, end - start); + + // Get the Adapted Timestamp. + const adapted = first.info.timestamp; + end = Date.now(); + + // Dispose our Delay. + first.dispose(true); + + expect(end - adapted).to.be.equal(0, "There should not be an delta."); + }); + }); +}); diff --git a/lib/dispatcher/StatusManager/StatusManager.ts b/lib/dispatcher/StatusManager/StatusManager.ts new file mode 100644 index 0000000..a9c9123 --- /dev/null +++ b/lib/dispatcher/StatusManager/StatusManager.ts @@ -0,0 +1,458 @@ +/** + * @author Martin Karkowski + * @email m.karkowski@zema.de + * @create date 2022-01-03 21:21:45 + * @modify date 2022-01-04 12:38:49 + * @desc [description] + */ + +import { ILogger } from "js-logger"; +import { avgOfArray } from "../../helpers/arrayMethods"; +import { sleep } from "../../helpers/async"; +import { generateId } from "../../helpers/idMethods"; +import { MapBasedMergeData } from "../../helpers/mergedData"; +import { RUNNINGINNODE } from "../../helpers/runtimeMethods"; +import { defineNopeLogger } from "../../logger/getLogger"; +import { + ENopeDispatcherStatus, + ICommunicationBridge, + IDispatcherInfo, + IMapBasedMergeData, + INopeDispatcherOptions, + INopeObservable, + INopeTimeOptions +} from "../../types/nope"; + +let os = null; +let cpus = null; + +/** + * A Dispatcher to perform a function on a Remote + * Dispatcher. Therefore a Task is created and forwarded + * to the remote. + * + * @export + * @class nopeDispatcher + */ +export class NopeStatusManager { + protected _logger: ILogger; + protected _deltaTime = 0; + + /** + * The used Communication interface + * + * @type {ICommunicationBridge} + * @memberof NopeStatusManager + */ + public readonly communicator: ICommunicationBridge; + + /** + * A Map holding the current Status of external dispatchers. + * Key = Dispatcher-ID + * Value = Last Known status of the dispatcher + * + * @protected + * @type {Map} + * @memberof NopeStatusManager + */ + protected _externalDispatchers: Map; + + public readonly ready: INopeObservable; + public readonly externalDispatchers: IMapBasedMergeData< + IDispatcherInfo, + string, + IDispatcherInfo + >; + + protected _forceEmittingUpdates: boolean; + protected _runningExternalRequestedTasks: Set; + protected _timeouts: INopeTimeOptions; + + protected _checkInterval: any = null; + protected _sendInterval: any = null; + protected _cpuInterval: any = null; + protected _cpuLoad = -1; + + /** + * Helper function, which will synchronize the Timestamp. + * Timestamp must be provided in UTC (https://www.timeanddate.de/stadt/info/zeitzone/utc) + * + * @author M.Karkowski + * @param {number} timestamp The UTC-Timestamp + * @param {number} [delay=0] The Delay, since the Timestamp has been generated + * @memberof NopeStatusManager + */ + public syncTime(timestamp: number, delay = 0) { + const _internalTimestamp = Date.now(); + this._deltaTime = _internalTimestamp - timestamp - delay; + } + + /** + * Generates the current Status Message of the Dispatcher. + * + * @author M.Karkowski + * @protected + * @return {*} {IDispatcherInfo} The current status of our dispatcher. + * @memberof NopeStatusManager + */ + public get info(): IDispatcherInfo { + if (RUNNINGINNODE) { + // If we are running our programm in node, + // we will load the corresponding libs, + // to calc the cpu load etc. + + if (os === null) { + // eslint-disable-next-line + os = require("os"); + } + if (cpus === null) { + // eslint-disable-next-line + cpus = os.cpus(); + } + return { + id: this.id, + env: "javascript", + version: "0.9.7", + host: { + cores: cpus.length, + cpu: { + model: `${cpus[0].model}`.slice( + 0, + (cpus[0].model as string).indexOf("@") - 1 + ), + speed: avgOfArray(cpus, "speed"), + usage: this._cpuLoad, + }, + os: os.platform(), + ram: { + // Return the used Memory + usedPerc: 1 - os.freemem() / os.totalmem(), + // The Values are given in Byte but we want MByte + free: Math.round(os.freemem() / 1048576), + total: Math.round(os.totalmem() / 1048576), + }, + name: os.hostname(), + }, + pid: process.pid, + timestamp: Date.now() + this._deltaTime, + status: ENopeDispatcherStatus.HEALTHY, + }; + } + return { + env: "javascript", + version: "0.9.5", + host: { + cores: -1, + cpu: { + model: "unkown", + speed: -1, + usage: -1, + }, + name: navigator.appCodeName + " " + navigator.appName, + os: navigator.platform, + ram: { + free: -1, + usedPerc: -1, + total: -1, + }, + }, + id: this.id, + pid: this.id, + timestamp: Date.now() + this._deltaTime, + status: ENopeDispatcherStatus.HEALTHY, + }; + } + + /** + * Creates an instance of nopeDispatcher. + * @param {nopeRpcDispatcherOptions} options The Options, used by the Dispatcher. + * @param {() => INopeObservable} _generateObservable A Helper, to generate Observables. + * @memberof NopeStatusManager + */ + constructor( + public options: INopeDispatcherOptions, + protected _generateObservable: () => INopeObservable, + public readonly id: string = null + ) { + this.communicator = options.communicator; + + if (id === null) { + this.id = generateId(); + } + + this._logger = defineNopeLogger(options.logger, "core.status.manager"); + + this._timeouts = { + sendAliveInterval: 500, + checkInterval: 250, + slow: 1000, + warn: 2000, + dead: 5000, + remove: 10000, + selectorTimeout: 1000, + }; + + // Define the Timeouts. + if (options.timeouts) { + this._timeouts = Object.assign(this._timeouts, options.timeouts); + } + + if (RUNNINGINNODE) { + // eslint-disable-next-line + const os = require("os"); + + const getLoad = () => { + const cpus = os.cpus(); + let totalTime = 0, + idleTime = 0; + + // Determine the current load: + for (const cpu of cpus) { + for (const name in cpu.times) { + totalTime += cpu.times[name]; + } + idleTime += cpu.times.idle; + } + + return { + totalTime, + idleTime, + }; + }; + + // Initally store the load + let oldTimes = getLoad(); + this._cpuInterval = setInterval(() => { + // Get the current CPU Times. + const currentTimes = getLoad(); + // Determine the difference between the old Times an the current Times. + _this._cpuLoad = + 1 - + (currentTimes.idleTime - oldTimes.idleTime) / + (currentTimes.totalTime - oldTimes.totalTime); + // Store the current CPU-Times + oldTimes = currentTimes; + }, 100); + } + + // Flag to show if the system is ready or not. + this.ready = this._generateObservable(); + this.ready.setContent(false); + + // Observable containing all Dispatcher Informations. + this._externalDispatchers = new Map(); + this.externalDispatchers = new MapBasedMergeData(this._externalDispatchers); + + if (this._logger) { + this._logger.info("core.status.manager", this.id, "is online"); + } + + this.reset(); + const _this = this; + this._init().catch((error) => { + if (_this._logger) { + _this._logger.error("Failed to intialize status manager"); + _this._logger.error(error); + + // Now we should exit the program (if we are running in nodejs) + if (RUNNINGINNODE) { + process.exit(1); + } + } + }); + } + + /** + * Internal Function, used to initialize the Dispatcher. + * It subscribes to the "Messages" of the communicator. + * + * @protected + * @memberof NopeStatusManager + */ + protected async _init(): Promise { + const _this = this; + + this.ready.setContent(false); + + // Wait until the Element is connected. + await this.communicator.connected.waitFor((value) => value); + + // Setup Test Intervals: + if (this._timeouts.checkInterval > 0) { + // Define a Checker, which will test the status + // of the external Dispatchers. + this._checkInterval = setInterval( + () => _this._checkDispatcherHealth(), + this._timeouts.checkInterval + ); + } + if (this._timeouts.sendAliveInterval > 0) { + // Define a Timer, which will emit Status updates with + // the disered delay. + this._sendInterval = setInterval( + () => _this._sendStatus(), + this._timeouts.sendAliveInterval + ); + } + + await this.communicator.onStatusUpdate((info) => { + _this._externalDispatchers.set(info.id, info); + _this.externalDispatchers.update(); + }); + + await this.communicator.onBonjour((info) => { + _this._externalDispatchers.set(info.id, info); + _this.externalDispatchers.update(); + + if (_this.id !== info.id) { + _this._sendStatus(); + + if (_this._logger?.enabledFor(_this._logger.DEBUG)) { + // If there is a Logger: + _this._logger.debug( + 'Remote Dispatcher "' + info.id + '" went online' + ); + } + } + }); + + await this.communicator.onAurevoir((dispatcher: string) => { + // Remove the Dispatcher. + _this._externalDispatchers.delete(dispatcher); + _this.externalDispatchers.update(); + }); + + if (this._logger) { + this._logger.info(this.id, "initialized"); + } + + // We sleep 500 ms + await sleep(500); + await this.emitBonjour(); + await sleep(500); + + this.ready.setContent(true); + } + + /** + * Helper Function to manually emit a Bonjour! + * + * @return {*} {Promise} + * @memberof NopeStatusManager + */ + public async emitBonjour(): Promise { + // Emit the Bonjour Message. + this.communicator.emitBonjour(this.info); + } + + /** + * Function, which will be called to update the + * Status to the Dispatchers + * + * @author M.Karkowski + * @protected + * @memberof NopeStatusManager + */ + protected _checkDispatcherHealth(): void { + const currentTime = Date.now(); + let changes = false; + + for (const status of this._externalDispatchers.values()) { + // determine the Difference + const diff = currentTime - status.timestamp; + + // Based on the Difference Determine the Status + if (diff > this._timeouts.remove) { + // remove the Dispatcher. But be quite. + // Perhaps more dispatchers will be removed + this._removeDispatcher(status.id, true); + changes = true; + } else if ( + diff > this._timeouts.dead && + status.status !== ENopeDispatcherStatus.DEAD + ) { + status.status = ENopeDispatcherStatus.DEAD; + changes = true; + } else if ( + diff > this._timeouts.warn && + diff <= this._timeouts.dead && + status.status !== ENopeDispatcherStatus.WARNING + ) { + status.status = ENopeDispatcherStatus.WARNING; + changes = true; + } else if ( + diff > this._timeouts.slow && + diff <= this._timeouts.warn && + status.status !== ENopeDispatcherStatus.SLOW + ) { + status.status = ENopeDispatcherStatus.SLOW; + changes = true; + } else if ( + diff <= this._timeouts.slow && + status.status !== ENopeDispatcherStatus.HEALTHY + ) { + status.status = ENopeDispatcherStatus.HEALTHY; + changes = true; + } + } + + if (changes) { + // Update the External Dispatchers + this.externalDispatchers.update(); + } + } + + protected _removeDispatcher(dispatcher: string, quite = false): void { + // Delete the Generators of the Instances. + const dispatcherInfo = this._externalDispatchers.get(dispatcher); + const deleted = this._externalDispatchers.delete(dispatcher); + + if (!quite) { + this.externalDispatchers.update(); + } + + if (deleted && this._logger?.enabledFor(this._logger?.WARN)) { + // If there is a Logger: + this._logger.warn( + "a dispatcher on", + dispatcherInfo?.host.name || "unkown", + "went offline. ID of the Dispatcher: ", + dispatcher + ); + } + } + + protected _sendStatus(): void { + this.communicator.emitStatusUpdate(this.info); + } + + /** + * Function to reset the Dispatcher. + * + * @memberof NopeStatusManager + */ + public reset(): void { + this._externalDispatchers.clear(); + this.externalDispatchers.update(this._externalDispatchers); + } + + /** + * Will dispose the Dispatcher. Must be called on exit for a clean exit. Otherwise it is defined as dirty exits + */ + public async dispose(quite = false): Promise { + if (this._sendInterval) { + clearInterval(this._sendInterval); + } + if (this._checkInterval) { + clearInterval(this._checkInterval); + } + if (this._cpuInterval) { + clearInterval(this._cpuInterval); + } + + // Emits the aurevoir Message. + if (!quite) { + this.communicator.emitAurevoir(this.id); + } + } +}