diff --git a/.eslintrc.json b/.eslintrc.json index d9aa7ae..37c039f 100644 --- a/.eslintrc.json +++ b/.eslintrc.json @@ -26,6 +26,9 @@ "semi": [ "warn", "always" + ], + "@typescript-eslint/no-this-alias": [ + "warn" ] // , // "@typescript-eslint/member-ordering": [ diff --git a/lib/communication/IoSocketClient.ts b/lib/communication/IoSocketClient.ts index 67f1568..e1e952a 100644 --- a/lib/communication/IoSocketClient.ts +++ b/lib/communication/IoSocketClient.ts @@ -31,7 +31,7 @@ export class IoSocketClient extends Bridge { * @memberof IoSocketServer */ constructor(public uri: string) { - super("individual","individual",true, getNopeLogger("io-socket-client")); + super("individual","individual",false, getNopeLogger("io-socket-client", "info")); // Make shure we use the http before connecting. this.uri = this.uri.startsWith("http://") ? this.uri : "http://" + this.uri; diff --git a/lib/communication/IoSocketServer.ts b/lib/communication/IoSocketServer.ts index 359d6cd..88082a1 100644 --- a/lib/communication/IoSocketServer.ts +++ b/lib/communication/IoSocketServer.ts @@ -31,7 +31,7 @@ export class IoSocketServer extends Bridge { * @memberof IoSocketServer */ constructor(public port: number, server?: Server) { - super("individual","individual",true, getNopeLogger("io-socket-server")); + super("individual","individual",true, getNopeLogger("io-socket-server", "info")); if (server) { this._socket = (io as any)(server); @@ -46,7 +46,7 @@ export class IoSocketServer extends Bridge { // Add a default Event-Layer // Perhaps that can be removed. - this.addLayer(new EventLayer(new EventEmitter(), "individual","individual",getNopeLogger("io-socket-server-internal"))); + this.addLayer(new EventLayer(new EventEmitter(), "individual","individual",getNopeLogger("io-socket-server-internal")),false); this._logger.info( "waiting for connection. Listening on port: " + port.toString() @@ -62,7 +62,7 @@ export class IoSocketServer extends Bridge { // Add the socket as new layer: const _layer = new EventLayer(client, "individual","individual"); - _this.addLayer(_layer); + _this.addLayer(_layer, true); // Subscribe to Loosing connection: client.on("disconnect", () => { diff --git a/lib/communication/bridge.ts b/lib/communication/bridge.ts index 04c3bc5..694ffbc 100644 --- a/lib/communication/bridge.ts +++ b/lib/communication/bridge.ts @@ -20,7 +20,8 @@ const METHODS: Array = [ "emitNewObersvablesAvailable", "emitNewServicesAvailable", "emitRpcRequest", - "emitRpcResult", + "emitRpcResponse", + "emitStatusUpdate", "emitTaskCancelation", "onAurevoir", "onBonjour", @@ -28,9 +29,24 @@ const METHODS: Array = [ "onNewInstancesAvailable", "onNewObservablesAvailable", "onNewServicesAvailable", - "onTaskCancelation", + "onStatusUpdate", + "onTaskCancelation", ]; +const METHOD_MAPPING: {[P in keyof ICommunicationInterface]: keyof ICommunicationInterface} = { + "onAurevoir": "emitAurevoir", + "onBonjour": "emitBonjour", + "onEvent":"emitEvent", + "onNewInstanceGeneratorsAvailable": "emitNewInstanceGeneratorsAvailable", + "onNewInstancesAvailable": "emitNewInstancesAvailable", + "onNewObservablesAvailable": "emitNewObersvablesAvailable", + "onNewServicesAvailable":"emitNewServicesAvailable", + "onRpcRequest":"emitRpcRequest", + "onRpcResponse": "emitRpcResponse", + "onTaskCancelation": "emitTaskCancelation", + "onStatusUpdate":"emitStatusUpdate" +}; + const STORING: Array = [ "onAurevoir", "onBonjour", @@ -39,6 +55,7 @@ const STORING: Array = [ "onNewObservablesAvailable", "onNewServicesAvailable", "onTaskCancelation", + "onStatusUpdate" ]; const SPECIFIC_STORING: Array = [ @@ -67,17 +84,20 @@ METHODS.push(...SPECIFIC_REMOVING, ...SPECIFIC_STORING); export class Bridge implements ICommunicationInterface { connected: INopeObservable; - protected _layers: Set; + protected _layers: Map; - public addLayer(layer: ICommunicationInterface) { + public addLayer(layer: ICommunicationInterface, forward = false) { if (!this._layers.has(layer)) { - this._layers.add(layer); + this._layers.set(layer,forward); if (this._enableDynamicAdding) { + + const _this = this; + // Play the history: for (const method of STORING) { - for (const args of this[`_map_${method}`]) { - layer[method as any](...args); + for (const cb of this[`_map_${method}`]) { + layer[method as any](forward ? this._wrapMethodWithoutName(cb,layer,method) : cb); } } @@ -94,10 +114,11 @@ export class Bridge implements ICommunicationInterface { for (const container in dict){ // 1. Extract the element: const map: Map> = this[container]; + const method = dict[container]; // 2. Iterate over the Elements. for (const [event, callbacks] of map.entries()){ // 3. Add all callbacks (Use the Map operator.) - Array.from(callbacks).map(cb => layer[dict[container]](event,cb)); + Array.from(callbacks).map(cb => layer[method](event,forward ? _this._wrapMethod(event,cb,layer,method) : cb)); } } } @@ -124,7 +145,7 @@ export class Bridge implements ICommunicationInterface { protected _enableDynamicAdding = false, protected _logger?: ILogger ) { - this._layers = new Set(); + this._layers = new Map(); this.connected = new NopeObservable(); this.connected.setContent(false); @@ -134,7 +155,7 @@ export class Bridge implements ICommunicationInterface { // the Flag is defined as true, if every socket // is connected. this.connected.getter = ()=> { - for (const layer of _this._layers){ + for (const layer of _this._layers.keys()){ if (!layer.connected.getContent()) return false; } @@ -153,44 +174,67 @@ export class Bridge implements ICommunicationInterface { ) { this[`_map_${method}`] = new Array(); // Define a Function which stores the Actions: - this[method as any] = async (...args) => { + this[method as any] = async (cb) => { // Store the Call - _this[`_map_${method}`].push(args); + _this[`_map_${method}`].push(cb); - for (const _layer of _this._layers) { - _layer[method as any](...args); + for (const [_layer,_forward] of _this._layers) { + _layer[method as any](_forward? _this._wrapMethodWithoutName(cb,_layer,method): cb); } }; } else if - // Else if the method store the events individually => - // Creaete a different Method: - (_enableDynamicAdding && - (SPECIFIC_STORING.includes(method) || SPECIFIC_REMOVING.includes(method))) { - // Determine the Container name. - const _container = SPECIFIC_STORING.includes(method) ? `_${method}` : method.replace("off","_on"); - // Define the Container itself - this[_container] = new Map>(); - // Based on the Function determine the Action: - const mode: "add" | "delete" = SPECIFIC_STORING.includes(method) ? "add": "delete"; - // Store the Method. - this[method as any] = async (name, cb) => { + // Else if the method store the events individually => + // Creaete a different Method: + (_enableDynamicAdding && + SPECIFIC_STORING.includes(method)) { + // Determine the Container name. + const _container = `_${method}`; + // Define the Container itself + this[_container] = new Map>(); + + // Store the Method. + this[method as any] = async (name, cb) => { + // Call the adapted Function + _this._adaptStore(_container as any,"add", name, cb); - // Call the adapted Function - _this._adaptStore(_container as any,mode, name, cb); + // Perform the Action on every available Layer. + for (const [_layer,_forward] of _this._layers) { + // Store the Callback. Because it is a "method" which listens on the + // events => wrap the method, to forward the event to the other layers. + _layer[method as any](name, _forward ? _this._wrapMethod(name, cb, _layer, method) : cb); + } + }; + }else if + // Else if the method store the events individually => + // Creaete a different Method: + (_enableDynamicAdding && + SPECIFIC_REMOVING.includes(method)) { + // Determine the Container name. + const _container = method.replace("off","_on"); + // Define the Container itself + this[_container] = new Map>(); + // Store the Method. + this[method as any] = async (name, cb) => { + // Call the adapted Function + _this._adaptStore(_container as any,"delete", name, cb); - // Perform the Action on every available Layer. - for (const _layer of _this._layers) { - _layer[method as any](name, cb); - } - }; - }else { + // Perform the Action on every available Layer. + for (const [_layer,_forward] of _this._layers) { + // Store the Callback + _layer[method as any](name, _this._wrappedMethods.get(cb)); + } + + // Delete the Wrapped Method: + _this._wrappedMethods.delete(cb); + }; + }else { // Define a Function which stores the Actions: this[method as any] = async (...args) => { if (_this._logger) { _this._logger.debug(method, ...args); } - for (const _layer of _this._layers) { + for (const _layer of _this._layers.keys()) { _layer[method as any](...args); } }; @@ -198,6 +242,39 @@ export class Bridge implements ICommunicationInterface { } } + protected _wrappedMethods = new Map<(data) => any,(data) => any>(); + protected _wrapMethod(name: string, cb: (data) => any, layer: ICommunicationInterface, method: keyof ICommunicationInterface): (data) => any{ + const _this = this; + const _wrapped = (data) => { + // _this._logger.info("Forwarding",method,name); + for (const _layer of _this._layers.keys()){ + if (_layer != layer){ + // Emit the corresponding Event on the different channels. + _layer[METHOD_MAPPING[method] as any](name, data); + } + } + // Call the Callback! + cb(data); + }; + + this._wrappedMethods.set(cb, _wrapped); + return _wrapped; + } + protected _wrapMethodWithoutName(cb: (data) => any, layer: ICommunicationInterface, method: keyof ICommunicationInterface): (data) => any{ + const _this = this; + return (data) => { + // _this._logger.info("Forwarding",method); + for (const _layer of _this._layers.keys()){ + if (_layer != layer){ + // Emit the corresponding Event on the different channels. + _layer[METHOD_MAPPING[method] as any]( data); + } + } + // Call the Callback! + cb(data); + }; + } + /** * Helper Function that is used to adapt the current active subscriptions * @param name name of the data container diff --git a/lib/communication/eventLayer.ts b/lib/communication/eventLayer.ts index b20e0a8..7488652 100644 --- a/lib/communication/eventLayer.ts +++ b/lib/communication/eventLayer.ts @@ -20,8 +20,15 @@ import { IResponseTaskMsg, ITaskCancelationMsg } from "../types/nope/nopeCommunication.interface"; +import { IDispatcherInfo } from "../types/nope/nopeDispatcher.interface"; import { INopeObservable } from "../types/nope/nopeObservable.interface"; +export interface IEmitter { + on(event: string | symbol, listener: (...args: any[]) => void): this; + off(event: string | symbol, listener: (...args: any[]) => void): this; + emit(event: string | symbol, ...args: any[]): boolean; +} + /** * A Communication Layer for the Dispatchers. * Here, only a Events are used. @@ -42,7 +49,7 @@ export class EventLayer implements ICommunicationInterface { * @param _logger */ constructor( - protected _emitter = new EventEmitter(), + protected _emitter: IEmitter = new EventEmitter(), public readonly subscriptionMode: "individual" | "generic" = "individual", public readonly resultSharing: "individual" | "generic" = "individual", protected _logger?: ILogger @@ -51,6 +58,14 @@ export class EventLayer implements ICommunicationInterface { this.connected.setContent(true); } + async emitStatusUpdate(status: IDispatcherInfo): Promise { + this._emitter.emit("statusUdpate",status); + } + + async onStatusUpdate(cb: (status: IDispatcherInfo) => void): Promise { + this._emitter.on("statusUdpate",cb); + } + async onNewInstancesAvailable( cb: (instances: IAvailableInstancesMsg) => void ): Promise { @@ -97,7 +112,7 @@ export class EventLayer implements ICommunicationInterface { this._emitter.emit(name, request); } - async emitRpcResult(name: string, result: IResponseTaskMsg): Promise { + async emitRpcResponse(name: string, result: IResponseTaskMsg): Promise { this._emitter.emit(name, result); } diff --git a/lib/dispatcher/nopeDispatcher.ts b/lib/dispatcher/nopeDispatcher.ts index ad66a9d..8a279ee 100644 --- a/lib/dispatcher/nopeDispatcher.ts +++ b/lib/dispatcher/nopeDispatcher.ts @@ -10,6 +10,7 @@ import * as Logger from "js-logger"; import { ILogger } from "js-logger"; import { generateId } from "../helpers/idMethods"; import { RUNNINGINNODE } from "../helpers/runtimeMethods"; +import { getNopeLogger } from "../logger/getLogger"; import { NopeGenericModule } from "../module/GenericModule"; import { NopePromise } from "../promise/nopePromise"; import { @@ -28,7 +29,7 @@ import { ITaskCancelationMsg } from "../types/nope/nopeCommunication.interface"; import { - IDispatcherInfo, + ENopeDispatcherStatus, IDispatcherInfo, IGenerateRemoteInstanceCallback, IGenerateRemoteInstanceForOtherDispatcherCallback, INopeDispatcher @@ -46,6 +47,7 @@ import { } from "../types/nope/nopeObservable.interface"; import { INopePromise } from "../types/nope/nopePromise.interface"; + /** * A Dispatcher to perform a function on a Remote * Dispatcher. Therefore a Task is created and forwarded @@ -78,6 +80,7 @@ export class nopeDispatcher implements INopeDispatcher { >; public readonly communicator: ICommunicationInterface; protected _externalDispatchers: Map; + protected _mappingOfRemoteDispatchersAndServices: Map< string, IAvailableServicesMsg @@ -141,6 +144,15 @@ export class nopeDispatcher implements INopeDispatcher { >; protected _runningExternalRequestedTasks: Set; + protected timeouts: { + sendAliveInterval: number; + checkInterval: number, + slow: number, + warn: number, + dead: number, + remove: number, + } + readonly _subscriptionMode: "individual" | "generic"; readonly _publishingMode: "individual" | "generic"; @@ -156,16 +168,30 @@ export class nopeDispatcher implements INopeDispatcher { protected _generateObservable: () => INopeObservable ) { this.communicator = options.communicator; + + this.id = generateId(); if (options.logger) { - this._logger = options.logger; + this._logger = getNopeLogger("dispatcher "+this.id, "info"); + } + + // Define the Timeouts. + if (options.timeouts){ + this.timeouts = options.timeouts; + } else { + this.timeouts = { + sendAliveInterval: 500, + checkInterval: 250, + slow: 1000, + warn: 2000, + dead: 5000, + remove: 10000, + }; } this._subscriptionMode = this.communicator.subscriptionMode || "generic"; this._publishingMode = this.communicator.resultSharing || "generic"; - this.id = generateId(); - /** * Define A Proxy for accessing methods easier. */ @@ -649,7 +675,7 @@ export class nopeDispatcher implements INopeDispatcher { }; // Use the communicator to publish the event. - await this.communicator.emitRpcResult(data.resultSink, result); + await this.communicator.emitRpcResponse(data.resultSink, result); } } catch (error) { if (this._logger) { @@ -675,7 +701,7 @@ export class nopeDispatcher implements INopeDispatcher { }; // Send the Error via the communicator to the remote. - await this.communicator.emitRpcResult(data.resultSink, result); + await this.communicator.emitRpcResponse(data.resultSink, result); } // Unsubscribe the Observable. @@ -732,6 +758,9 @@ export class nopeDispatcher implements INopeDispatcher { return false; } + protected _checkInterval: any = null; + protected _sendInterval: any = null; + /** * Internal Function, used to initialize the Dispatcher. * It subscribes to the "Messages" of the communicator. @@ -747,6 +776,17 @@ export class nopeDispatcher implements INopeDispatcher { testCurrent: true }); + if (this.timeouts.checkInterval > 0){ + // Define a Checker, which will test the status + // of the external Dispatchers. + this._checkInterval = setInterval(() => _this._checkDispachterHealth(), this.timeouts.checkInterval ); + } + if (this.timeouts.sendAliveInterval > 0){ + // Define a Timer, which will emit Status updates with + // the disered delay. + this._sendInterval = setInterval(() => _this.communicator.emitStatusUpdate(_this._genAliveMessage()), this.timeouts.sendAliveInterval); + } + this.communicator.connected.subscribe((connected) => { // Handle an unconnect. if (connected) _this.communicator.emitBonjour(_this._genAliveMessage()); @@ -804,7 +844,7 @@ export class nopeDispatcher implements INopeDispatcher { _this._updateExternalServices(); if (this._logger?.enabledFor(Logger.DEBUG)) { // If there is a Logger: - this._logger.debug("received new services"); + this._logger.debug(this.id,"received new services from", data.dispatcher); } } } catch (e) {} @@ -821,7 +861,7 @@ export class nopeDispatcher implements INopeDispatcher { _this._updateExternalEvents(); if (this._logger?.enabledFor(Logger.DEBUG)) { // If there is a Logger: - this._logger.debug("received new events"); + this._logger.debug(this.id,"received new Observables from", data.dispatcher); } } } catch (e) {} @@ -836,23 +876,29 @@ export class nopeDispatcher implements INopeDispatcher { _this._updateExternalGenerators(); if (this._logger?.enabledFor(Logger.DEBUG)) { // If there is a Logger: - this._logger.debug("received new generators"); + this._logger.debug(this.id,"received new generators from", data.dispatcher); } } catch (e) {} }); - this.communicator.onBonjour((info) => { - if (_this.id !== info.id) { - _this._externalDispatchers.set(info.id, info); + this.communicator.onStatusUpdate((info) => { + _this._externalDispatchers.set(info.id, info); + _this.externalDispatchers.setContent( + Array.from(_this._externalDispatchers.values()) + ); + }); + this.communicator.onBonjour((info) => { + _this._externalDispatchers.set(info.id, info); + _this.externalDispatchers.setContent( + Array.from(_this._externalDispatchers.values()) + ); + + if (_this.id !== info.id) { _this._sendAvailableServices(); _this._sendAvailableObservables(); _this._sendAvailableGenerators(); - _this._sendAvailableInstances(); - - _this.externalDispatchers.setContent( - Array.from(_this._externalDispatchers.values()) - ); + _this._sendAvailableInstances(); if (_this._logger?.enabledFor(Logger.DEBUG)) { // If there is a Logger: @@ -863,53 +909,7 @@ export class nopeDispatcher implements INopeDispatcher { } }); - this.communicator.onAurevoir((dispatcher: string) => { - // Delete the Generators of the Instances. - _this._mappingOfRemoteDispatchersAndGenerators.delete(dispatcher); - _this._mappingOfRemoteDispatchersAndServices.delete(dispatcher); - _this._mappingOfRemoteDispatchersAndPropsOrEvents.delete(dispatcher); - _this._mappingOfRemoteDispatchersAndInstances.delete(dispatcher); - _this._externalDispatchers.delete(dispatcher); - - _this._updateExternalServices(); - _this._updateExternalGenerators(); - _this._updateExternalEvents(); - _this._updateExternalInstances(); - - // Iterate over the available instances and remove the providers: - for (const instance of _this._instances.values()) { - // Remove all Dispachers: - let idx = instance.usedBy.indexOf(dispatcher); - while (idx !== -1) { - instance.usedBy.splice(idx, 1); - idx = instance.usedBy.indexOf(dispatcher); - } - - // Check if the Element isnt required any more => delete it. - if (instance.usedBy.length === 0) { - _this._logger.info( - "Disposing instance, because it isnt used any more.", - instance.instance.identifier - ); - instance.instance - .dispose() - .catch((e) => { - _this._logger.error("Failed to remove the Instance.", e); - }) - .then(() => { - _this._logger.info( - "sucessfully disposed instance ", - instance.instance.identifier - ); - }); - } - } - - if (this._logger?.enabledFor(Logger.DEBUG)) { - // If there is a Logger: - this._logger.debug("a dispatcher went offline"); - } - }); + this.communicator.onAurevoir((dispatcher: string) => _this._removeDispatcher(dispatcher)); // Listen to newly created instances. this.communicator.onNewInstancesAvailable((message) => { @@ -945,6 +945,98 @@ export class nopeDispatcher implements INopeDispatcher { this.ready.setContent(true); } + protected _checkDispachterHealth():void{ + const currentTime = Date.now(); + let changes = false; + + for (const status of this._externalDispatchers.values()){ + // determine the Difference + const diff = currentTime - status.timestamp; + + // Based on the Difference Determine the Status + if (diff > this.timeouts.remove){ + // remove the Dispatcher. But be quite. + // Perhaps more dispatchers will be removed + this._removeDispatcher(status.id, true); + changes = true; + } else if (diff > this.timeouts.dead && status.status !== ENopeDispatcherStatus.DEAD){ + status.status = ENopeDispatcherStatus.DEAD; + changes = true; + } else if (diff > this.timeouts.warn && diff <= this.timeouts.dead && status.status !== ENopeDispatcherStatus.WARNING){ + status.status = ENopeDispatcherStatus.WARNING; + changes = true; + } else if (diff > this.timeouts.slow && diff <= this.timeouts.warn && status.status !== ENopeDispatcherStatus.SLOW){ + status.status = ENopeDispatcherStatus.SLOW; + changes = true; + } else if (diff <= this.timeouts.slow && status.status !== ENopeDispatcherStatus.HEALTHY){ + status.status = ENopeDispatcherStatus.HEALTHY; + changes = true; + } + } + + if (changes){ + + // Update the External Dispatchers + this.externalDispatchers.setContent( + Array.from(this._externalDispatchers.values()) + ); + } + } + + protected _removeDispatcher(dispatcher: string, quite = false): void { + // Delete the Generators of the Instances. + this._mappingOfRemoteDispatchersAndGenerators.delete(dispatcher); + this._mappingOfRemoteDispatchersAndServices.delete(dispatcher); + this._mappingOfRemoteDispatchersAndPropsOrEvents.delete(dispatcher); + this._mappingOfRemoteDispatchersAndInstances.delete(dispatcher); + this._externalDispatchers.delete(dispatcher); + + // Iterate over the available instances and remove the providers: + for (const instance of this._instances.values()) { + // Remove all Dispachers: + let idx = instance.usedBy.indexOf(dispatcher); + while (idx !== -1) { + instance.usedBy.splice(idx, 1); + idx = instance.usedBy.indexOf(dispatcher); + } + + // Check if the Element isnt required any more => delete it. + if (instance.usedBy.length === 0) { + this._logger.info( + "Disposing instance, because it isnt used any more.", + instance.instance.identifier + ); + instance.instance + .dispose() + .catch((e) => { + this._logger.error("Failed to remove the Instance.", e); + }) + .then(() => { + this._logger.info( + "sucessfully disposed instance ", + instance.instance.identifier + ); + }); + } + } + + this._updateExternalServices(); + this._updateExternalGenerators(); + this._updateExternalEvents(); + this._updateExternalInstances(); + + if (!quite){ + this.externalDispatchers.setContent( + Array.from(this._externalDispatchers.values()) + ); + } + + if (this._logger?.enabledFor(Logger.DEBUG)) { + // If there is a Logger: + this._logger.debug("a dispatcher went offline"); + } + } + protected _genAliveMessage(): IDispatcherInfo { if (RUNNINGINNODE) { const os = require("os"); @@ -958,20 +1050,22 @@ export class nopeDispatcher implements INopeDispatcher { name: os.hostname() }, pid: process.pid, - timestamp: Date.now() + timestamp: Date.now(), + status: ENopeDispatcherStatus.HEALTHY }; } return { host: { cores: -1, cpu: "unkown", - name: "browser", - os: "unkown", + name: navigator.appCodeName + " " + navigator.appVersion, + os: navigator.platform, ram: 1 }, id: this.id, - pid: -1, - timestamp: Date.now() + pid: this.id, + timestamp: Date.now(), + status: ENopeDispatcherStatus.HEALTHY }; } diff --git a/lib/loader/generateNopeBasicPackage.ts b/lib/loader/generateNopeBasicPackage.ts index b268fb2..8e2e79a 100644 --- a/lib/loader/generateNopeBasicPackage.ts +++ b/lib/loader/generateNopeBasicPackage.ts @@ -53,7 +53,10 @@ export function generateNopeBasicPackage(options: INopeDispatcherOptions) { description: { name: "nopeDispatcher", selector: TYPES.dispatcher, - type: InjectableNopeDispatcher + type: InjectableNopeDispatcher, + options: { + scope: "inSingletonScope" + } }, settings: { allowInstanceGeneration: false diff --git a/lib/types/nope/nopeCommunication.interface.ts b/lib/types/nope/nopeCommunication.interface.ts index 362e747..b48b5ed 100644 --- a/lib/types/nope/nopeCommunication.interface.ts +++ b/lib/types/nope/nopeCommunication.interface.ts @@ -6,6 +6,7 @@ * @desc [description] */ +import { ILogger } from "js-logger"; import { IDispatcherInfo } from "./nopeDispatcher.interface"; import { INopeModuleDescription } from "./nopeModule.interface"; import { INopeObservable } from "./nopeObservable.interface"; @@ -20,6 +21,18 @@ export interface ICommunicationInterface { readonly subscriptionMode?: "individual" | "generic"; readonly resultSharing?: "individual" | "generic"; + /** + * Function to Emit a Status of the Dispatcher + * @param status The Status + */ + emitStatusUpdate(status: IDispatcherInfo): Promise; + + /** + * The Possiblity to subscribe to status-updates. + * @param cb The callback which will be called on during an update. + */ + onStatusUpdate(cb: (status: IDispatcherInfo) => void): Promise; + /** * Funciton to emit a RPC Request. * @@ -38,7 +51,7 @@ export interface ICommunicationInterface { * @param {IResponseTaskMsg} result * @memberof ICommunicationInterface */ - emitRpcResult(name: string, result: IResponseTaskMsg): Promise; + emitRpcResponse(name: string, result: IResponseTaskMsg): Promise; /** * Function used to subscribe to RPC Results. Each method / function @@ -514,5 +527,13 @@ export type IAvailableServicesMsg = { export type INopeDispatcherOptions = { communicator: ICommunicationInterface; - logger?: Logger; + logger?: ILogger; + timeouts?: { + sendAliveInterval: number; + checkInterval: number, + slow: number, + warn: number, + dead: number, + remove: number, + } }; diff --git a/lib/types/nope/nopeDispatcher.interface.ts b/lib/types/nope/nopeDispatcher.interface.ts index bd8b02c..323af52 100644 --- a/lib/types/nope/nopeDispatcher.interface.ts +++ b/lib/types/nope/nopeDispatcher.interface.ts @@ -42,6 +42,7 @@ export type IValidPromise = Promise | INopePromise; export interface IDispatcherInfo { id: string; timestamp: number; + status: ENopeDispatcherStatus, host: { cores: number; cpu: string; @@ -49,7 +50,14 @@ export interface IDispatcherInfo { ram: number; name: string; }; - pid: number; + pid: number | string; +} + +export enum ENopeDispatcherStatus { + HEALTHY = 0, + SLOW = 1, + WARNING = 2, + DEAD = 3 } /** diff --git a/resources/admin-shell/dispatchers.tsx b/resources/admin-shell/dispatchers.tsx index a549de9..9e9f42f 100644 --- a/resources/admin-shell/dispatchers.tsx +++ b/resources/admin-shell/dispatchers.tsx @@ -7,9 +7,9 @@ */ import React from "react"; -import { Card, Col, ProgressBar, Row, Table } from "react-bootstrap"; +import { Badge, Card, Col, ProgressBar, Row, Table } from "react-bootstrap"; import { - IDispatcherInfo, + ENopeDispatcherStatus, INopeDispatcher } from "../../lib/types/nope/nopeDispatcher.interface"; import { INopeObserver } from "../../lib/types/nope/nopeObservable.interface"; @@ -54,24 +54,69 @@ class MemoryStatusComponent extends React.Component< class HostStatusComponent extends React.Component< { status: { - cores: number; cpu: string; + cores: number; os: string; ram: number; name: string; - pids: { pid: number; dispatchers: string[] }[]; + pids: { pid: number; dispatchers: {id: string, status: ENopeDispatcherStatus, timestamp: number}[] }[]; + status: ENopeDispatcherStatus, + timestamp: number }; }, - {} + { + lastUpdate: string, + variant: "success" | "warning" | "danger", + badge: boolean, + badgeText: string + } > { + + protected _intervall: any = null; + + get _state(){ + + const dict = { + 0: "success", + 1: "info", + 2: "warning", + 3: "danger" + }; + + return { + lastUpdate: (new Date(this.props.status.timestamp)).toISOString(), + variant: dict[this.props.status.status], + badge: this.props.status.status !== ENopeDispatcherStatus.HEALTHY, + badgeText: ENopeDispatcherStatus[this.props.status.status] + }; + } + + constructor(props){ + super(props); + this.state = this._state; + } + + componentDidMount(){ + const _this = this; + this._intervall = setInterval(() => { + _this.setState(this._state); + }, 200); + } + + componentWillUnmount(){ + if (this._intervall){ + clearInterval(this._intervall); + } + } + render() { return ( - + - {this.props.status.name} + {this.state.badge ? <>{this.props.status.name} ELEMENT {this.state.badgeText} ! : <>{this.props.status.name} } - + { this.props.status.ram > 0 ? ( - + ) : <>} @@ -89,6 +134,10 @@ class HostStatusComponent extends React.Component< + + + + @@ -97,7 +146,7 @@ class HostStatusComponent extends React.Component<
RAM {" "} @@ -80,7 +125,7 @@ class HostStatusComponent extends React.Component< value={this.props.status.ram} >
CPU {this.props.status.cpu}Cores {this.props.status.cores}
Platform{this.props.status.os}
Nope-Processes {this.props.status.pids.length}
- Last updated 3 mins ago + Last updated {this.state.lastUpdate}
); @@ -106,7 +155,14 @@ class HostStatusComponent extends React.Component< class AvailableDispatchers extends React.Component< { dispatcher: INopeDispatcher }, - { dispatchers: IDispatcherInfo[][]; connected: boolean } + { dispatchers: { + cpu: string; + cores: number; + os: string; + ram: number; + name: string; + pids: { pid: number; dispatchers: {id: string, status: ENopeDispatcherStatus, timestamp: number}[] }[]; + }[][]; connected: boolean } > { private _observer: INopeObserver[] = []; @@ -118,7 +174,9 @@ class AvailableDispatchers extends React.Component< os: string; ram: number; name: string; - pids: { [index: string]: string[] }; + pids: { [index: string]: {id: string, status: ENopeDispatcherStatus, timestamp: number}[] }; + status: ENopeDispatcherStatus, + timestamp: number }; } = {}; const ret: { @@ -127,7 +185,9 @@ class AvailableDispatchers extends React.Component< os: string; ram: number; name: string; - pids: { pid: number; dispatchers: string[] }; + pids: { pid: number; dispatchers: {id: string, status: ENopeDispatcherStatus, timestamp: number}[] }[]; + status: ENopeDispatcherStatus, + timestamp: number }[] = []; for (const dispatcher of this.props.dispatcher.externalDispatchers.getContent()) { @@ -138,15 +198,24 @@ class AvailableDispatchers extends React.Component< name: dispatcher.host.name, os: dispatcher.host.os, pids: {}, - ram: dispatcher.host.ram + ram: dispatcher.host.ram, + status: ENopeDispatcherStatus.HEALTHY, + timestamp: dispatcher.timestamp }; } if (sorted[dispatcher.host.name].pids[dispatcher.pid] === undefined) { - console.log(dispatcher.pid); sorted[dispatcher.host.name].pids[dispatcher.pid] = []; } + + sorted[dispatcher.host.name].status = Math.max(sorted[dispatcher.host.name].status, dispatcher.status); + sorted[dispatcher.host.name].timestamp = Math.min(sorted[dispatcher.host.name].timestamp, dispatcher.timestamp); - sorted[dispatcher.host.name].pids[dispatcher.pid].push(dispatcher.id); + // Store the Dispatcher Status + sorted[dispatcher.host.name].pids[dispatcher.pid].push({ + id: dispatcher.id, + status: dispatcher.status, + timestamp: dispatcher.timestamp + }); } for (const hostname in sorted) { @@ -159,10 +228,11 @@ class AvailableDispatchers extends React.Component< }; } ); - ret.push(_toAdd); } + // console.log(this.props.dispatcher.externalDispatchers.getContent().length,ret.map(item => `${item.name}:${item.status}`).toString()); + return ret; } @@ -170,12 +240,7 @@ class AvailableDispatchers extends React.Component< const connected = this.props.dispatcher.communicator.connected.getContent(); const dispatchers = []; const maxItems = 3; - const sorted = [ - ...this.__sort(), - ...this.__sort(), - ...this.__sort(), - ...this.__sort() - ]; + const sorted = this.__sort(); // Function to Split the Dispatcher Array into an array containing max 3 elements. for (let i = 0; i < sorted.length; i = i + maxItems) {