implement keep alive

This commit is contained in:
martin 2020-12-05 02:28:33 +01:00
parent 43eae6abe3
commit 8e4b6aac06
10 changed files with 423 additions and 137 deletions

View File

@ -26,6 +26,9 @@
"semi": [
"warn",
"always"
],
"@typescript-eslint/no-this-alias": [
"warn"
]
// ,
// "@typescript-eslint/member-ordering": [

View File

@ -31,7 +31,7 @@ export class IoSocketClient extends Bridge {
* @memberof IoSocketServer
*/
constructor(public uri: string) {
super("individual","individual",true, getNopeLogger("io-socket-client"));
super("individual","individual",false, getNopeLogger("io-socket-client", "info"));
// Make shure we use the http before connecting.
this.uri = this.uri.startsWith("http://") ? this.uri : "http://" + this.uri;

View File

@ -31,7 +31,7 @@ export class IoSocketServer extends Bridge {
* @memberof IoSocketServer
*/
constructor(public port: number, server?: Server) {
super("individual","individual",true, getNopeLogger("io-socket-server"));
super("individual","individual",true, getNopeLogger("io-socket-server", "info"));
if (server) {
this._socket = (io as any)(server);
@ -46,7 +46,7 @@ export class IoSocketServer extends Bridge {
// Add a default Event-Layer
// Perhaps that can be removed.
this.addLayer(new EventLayer(new EventEmitter(), "individual","individual",getNopeLogger("io-socket-server-internal")));
this.addLayer(new EventLayer(new EventEmitter(), "individual","individual",getNopeLogger("io-socket-server-internal")),false);
this._logger.info(
"waiting for connection. Listening on port: " + port.toString()
@ -62,7 +62,7 @@ export class IoSocketServer extends Bridge {
// Add the socket as new layer:
const _layer = new EventLayer(client, "individual","individual");
_this.addLayer(_layer);
_this.addLayer(_layer, true);
// Subscribe to Loosing connection:
client.on("disconnect", () => {

View File

@ -20,7 +20,8 @@ const METHODS: Array<keyof ICommunicationInterface> = [
"emitNewObersvablesAvailable",
"emitNewServicesAvailable",
"emitRpcRequest",
"emitRpcResult",
"emitRpcResponse",
"emitStatusUpdate",
"emitTaskCancelation",
"onAurevoir",
"onBonjour",
@ -28,9 +29,24 @@ const METHODS: Array<keyof ICommunicationInterface> = [
"onNewInstancesAvailable",
"onNewObservablesAvailable",
"onNewServicesAvailable",
"onTaskCancelation",
"onStatusUpdate",
"onTaskCancelation",
];
const METHOD_MAPPING: {[P in keyof ICommunicationInterface]: keyof ICommunicationInterface} = {
"onAurevoir": "emitAurevoir",
"onBonjour": "emitBonjour",
"onEvent":"emitEvent",
"onNewInstanceGeneratorsAvailable": "emitNewInstanceGeneratorsAvailable",
"onNewInstancesAvailable": "emitNewInstancesAvailable",
"onNewObservablesAvailable": "emitNewObersvablesAvailable",
"onNewServicesAvailable":"emitNewServicesAvailable",
"onRpcRequest":"emitRpcRequest",
"onRpcResponse": "emitRpcResponse",
"onTaskCancelation": "emitTaskCancelation",
"onStatusUpdate":"emitStatusUpdate"
};
const STORING: Array<keyof ICommunicationInterface> = [
"onAurevoir",
"onBonjour",
@ -39,6 +55,7 @@ const STORING: Array<keyof ICommunicationInterface> = [
"onNewObservablesAvailable",
"onNewServicesAvailable",
"onTaskCancelation",
"onStatusUpdate"
];
const SPECIFIC_STORING: Array<keyof ICommunicationInterface> = [
@ -67,17 +84,20 @@ METHODS.push(...SPECIFIC_REMOVING, ...SPECIFIC_STORING);
export class Bridge implements ICommunicationInterface {
connected: INopeObservable<boolean>;
protected _layers: Set<ICommunicationInterface>;
protected _layers: Map<ICommunicationInterface,boolean>;
public addLayer(layer: ICommunicationInterface) {
public addLayer(layer: ICommunicationInterface, forward = false) {
if (!this._layers.has(layer)) {
this._layers.add(layer);
this._layers.set(layer,forward);
if (this._enableDynamicAdding) {
const _this = this;
// Play the history:
for (const method of STORING) {
for (const args of this[`_map_${method}`]) {
layer[method as any](...args);
for (const cb of this[`_map_${method}`]) {
layer[method as any](forward ? this._wrapMethodWithoutName(cb,layer,method) : cb);
}
}
@ -94,10 +114,11 @@ export class Bridge implements ICommunicationInterface {
for (const container in dict){
// 1. Extract the element:
const map: Map<string,Set<any>> = this[container];
const method = dict[container];
// 2. Iterate over the Elements.
for (const [event, callbacks] of map.entries()){
// 3. Add all callbacks (Use the Map operator.)
Array.from(callbacks).map(cb => layer[dict[container]](event,cb));
Array.from(callbacks).map(cb => layer[method](event,forward ? _this._wrapMethod(event,cb,layer,method) : cb));
}
}
}
@ -124,7 +145,7 @@ export class Bridge implements ICommunicationInterface {
protected _enableDynamicAdding = false,
protected _logger?: ILogger
) {
this._layers = new Set<ICommunicationInterface>();
this._layers = new Map();
this.connected = new NopeObservable();
this.connected.setContent(false);
@ -134,7 +155,7 @@ export class Bridge implements ICommunicationInterface {
// the Flag is defined as true, if every socket
// is connected.
this.connected.getter = ()=> {
for (const layer of _this._layers){
for (const layer of _this._layers.keys()){
if (!layer.connected.getContent())
return false;
}
@ -153,44 +174,67 @@ export class Bridge implements ICommunicationInterface {
) {
this[`_map_${method}`] = new Array<any[]>();
// Define a Function which stores the Actions:
this[method as any] = async (...args) => {
this[method as any] = async (cb) => {
// Store the Call
_this[`_map_${method}`].push(args);
_this[`_map_${method}`].push(cb);
for (const _layer of _this._layers) {
_layer[method as any](...args);
for (const [_layer,_forward] of _this._layers) {
_layer[method as any](_forward? _this._wrapMethodWithoutName(cb,_layer,method): cb);
}
};
} else if
// Else if the method store the events individually =>
// Creaete a different Method:
(_enableDynamicAdding &&
(SPECIFIC_STORING.includes(method) || SPECIFIC_REMOVING.includes(method))) {
// Determine the Container name.
const _container = SPECIFIC_STORING.includes(method) ? `_${method}` : method.replace("off","_on");
// Define the Container itself
this[_container] = new Map<string, Set<any>>();
// Based on the Function determine the Action:
const mode: "add" | "delete" = SPECIFIC_STORING.includes(method) ? "add": "delete";
// Store the Method.
this[method as any] = async (name, cb) => {
// Else if the method store the events individually =>
// Creaete a different Method:
(_enableDynamicAdding &&
SPECIFIC_STORING.includes(method)) {
// Determine the Container name.
const _container = `_${method}`;
// Define the Container itself
this[_container] = new Map<string, Set<any>>();
// Store the Method.
this[method as any] = async (name, cb) => {
// Call the adapted Function
_this._adaptStore(_container as any,"add", name, cb);
// Call the adapted Function
_this._adaptStore(_container as any,mode, name, cb);
// Perform the Action on every available Layer.
for (const [_layer,_forward] of _this._layers) {
// Store the Callback. Because it is a "method" which listens on the
// events => wrap the method, to forward the event to the other layers.
_layer[method as any](name, _forward ? _this._wrapMethod(name, cb, _layer, method) : cb);
}
};
}else if
// Else if the method store the events individually =>
// Creaete a different Method:
(_enableDynamicAdding &&
SPECIFIC_REMOVING.includes(method)) {
// Determine the Container name.
const _container = method.replace("off","_on");
// Define the Container itself
this[_container] = new Map<string, Set<any>>();
// Store the Method.
this[method as any] = async (name, cb) => {
// Call the adapted Function
_this._adaptStore(_container as any,"delete", name, cb);
// Perform the Action on every available Layer.
for (const _layer of _this._layers) {
_layer[method as any](name, cb);
}
};
}else {
// Perform the Action on every available Layer.
for (const [_layer,_forward] of _this._layers) {
// Store the Callback
_layer[method as any](name, _this._wrappedMethods.get(cb));
}
// Delete the Wrapped Method:
_this._wrappedMethods.delete(cb);
};
}else {
// Define a Function which stores the Actions:
this[method as any] = async (...args) => {
if (_this._logger) {
_this._logger.debug(method, ...args);
}
for (const _layer of _this._layers) {
for (const _layer of _this._layers.keys()) {
_layer[method as any](...args);
}
};
@ -198,6 +242,39 @@ export class Bridge implements ICommunicationInterface {
}
}
protected _wrappedMethods = new Map<(data) => any,(data) => any>();
protected _wrapMethod(name: string, cb: (data) => any, layer: ICommunicationInterface, method: keyof ICommunicationInterface): (data) => any{
const _this = this;
const _wrapped = (data) => {
// _this._logger.info("Forwarding",method,name);
for (const _layer of _this._layers.keys()){
if (_layer != layer){
// Emit the corresponding Event on the different channels.
_layer[METHOD_MAPPING[method] as any](name, data);
}
}
// Call the Callback!
cb(data);
};
this._wrappedMethods.set(cb, _wrapped);
return _wrapped;
}
protected _wrapMethodWithoutName(cb: (data) => any, layer: ICommunicationInterface, method: keyof ICommunicationInterface): (data) => any{
const _this = this;
return (data) => {
// _this._logger.info("Forwarding",method);
for (const _layer of _this._layers.keys()){
if (_layer != layer){
// Emit the corresponding Event on the different channels.
_layer[METHOD_MAPPING[method] as any]( data);
}
}
// Call the Callback!
cb(data);
};
}
/**
* Helper Function that is used to adapt the current active subscriptions
* @param name name of the data container

View File

@ -20,8 +20,15 @@ import {
IResponseTaskMsg,
ITaskCancelationMsg
} from "../types/nope/nopeCommunication.interface";
import { IDispatcherInfo } from "../types/nope/nopeDispatcher.interface";
import { INopeObservable } from "../types/nope/nopeObservable.interface";
export interface IEmitter {
on(event: string | symbol, listener: (...args: any[]) => void): this;
off(event: string | symbol, listener: (...args: any[]) => void): this;
emit(event: string | symbol, ...args: any[]): boolean;
}
/**
* A Communication Layer for the Dispatchers.
* Here, only a Events are used.
@ -42,7 +49,7 @@ export class EventLayer implements ICommunicationInterface {
* @param _logger
*/
constructor(
protected _emitter = new EventEmitter(),
protected _emitter: IEmitter = new EventEmitter(),
public readonly subscriptionMode: "individual" | "generic" = "individual",
public readonly resultSharing: "individual" | "generic" = "individual",
protected _logger?: ILogger
@ -51,6 +58,14 @@ export class EventLayer implements ICommunicationInterface {
this.connected.setContent(true);
}
async emitStatusUpdate(status: IDispatcherInfo): Promise<void> {
this._emitter.emit("statusUdpate",status);
}
async onStatusUpdate(cb: (status: IDispatcherInfo) => void): Promise<void> {
this._emitter.on("statusUdpate",cb);
}
async onNewInstancesAvailable(
cb: (instances: IAvailableInstancesMsg) => void
): Promise<void> {
@ -97,7 +112,7 @@ export class EventLayer implements ICommunicationInterface {
this._emitter.emit(name, request);
}
async emitRpcResult(name: string, result: IResponseTaskMsg): Promise<void> {
async emitRpcResponse(name: string, result: IResponseTaskMsg): Promise<void> {
this._emitter.emit(name, result);
}

View File

@ -10,6 +10,7 @@ import * as Logger from "js-logger";
import { ILogger } from "js-logger";
import { generateId } from "../helpers/idMethods";
import { RUNNINGINNODE } from "../helpers/runtimeMethods";
import { getNopeLogger } from "../logger/getLogger";
import { NopeGenericModule } from "../module/GenericModule";
import { NopePromise } from "../promise/nopePromise";
import {
@ -28,7 +29,7 @@ import {
ITaskCancelationMsg
} from "../types/nope/nopeCommunication.interface";
import {
IDispatcherInfo,
ENopeDispatcherStatus, IDispatcherInfo,
IGenerateRemoteInstanceCallback,
IGenerateRemoteInstanceForOtherDispatcherCallback,
INopeDispatcher
@ -46,6 +47,7 @@ import {
} from "../types/nope/nopeObservable.interface";
import { INopePromise } from "../types/nope/nopePromise.interface";
/**
* A Dispatcher to perform a function on a Remote
* Dispatcher. Therefore a Task is created and forwarded
@ -78,6 +80,7 @@ export class nopeDispatcher implements INopeDispatcher {
>;
public readonly communicator: ICommunicationInterface;
protected _externalDispatchers: Map<string, IDispatcherInfo>;
protected _mappingOfRemoteDispatchersAndServices: Map<
string,
IAvailableServicesMsg
@ -141,6 +144,15 @@ export class nopeDispatcher implements INopeDispatcher {
>;
protected _runningExternalRequestedTasks: Set<string>;
protected timeouts: {
sendAliveInterval: number;
checkInterval: number,
slow: number,
warn: number,
dead: number,
remove: number,
}
readonly _subscriptionMode: "individual" | "generic";
readonly _publishingMode: "individual" | "generic";
@ -156,16 +168,30 @@ export class nopeDispatcher implements INopeDispatcher {
protected _generateObservable: <T>() => INopeObservable<T>
) {
this.communicator = options.communicator;
this.id = generateId();
if (options.logger) {
this._logger = options.logger;
this._logger = getNopeLogger("dispatcher "+this.id, "info");
}
// Define the Timeouts.
if (options.timeouts){
this.timeouts = options.timeouts;
} else {
this.timeouts = {
sendAliveInterval: 500,
checkInterval: 250,
slow: 1000,
warn: 2000,
dead: 5000,
remove: 10000,
};
}
this._subscriptionMode = this.communicator.subscriptionMode || "generic";
this._publishingMode = this.communicator.resultSharing || "generic";
this.id = generateId();
/**
* Define A Proxy for accessing methods easier.
*/
@ -649,7 +675,7 @@ export class nopeDispatcher implements INopeDispatcher {
};
// Use the communicator to publish the event.
await this.communicator.emitRpcResult(data.resultSink, result);
await this.communicator.emitRpcResponse(data.resultSink, result);
}
} catch (error) {
if (this._logger) {
@ -675,7 +701,7 @@ export class nopeDispatcher implements INopeDispatcher {
};
// Send the Error via the communicator to the remote.
await this.communicator.emitRpcResult(data.resultSink, result);
await this.communicator.emitRpcResponse(data.resultSink, result);
}
// Unsubscribe the Observable.
@ -732,6 +758,9 @@ export class nopeDispatcher implements INopeDispatcher {
return false;
}
protected _checkInterval: any = null;
protected _sendInterval: any = null;
/**
* Internal Function, used to initialize the Dispatcher.
* It subscribes to the "Messages" of the communicator.
@ -747,6 +776,17 @@ export class nopeDispatcher implements INopeDispatcher {
testCurrent: true
});
if (this.timeouts.checkInterval > 0){
// Define a Checker, which will test the status
// of the external Dispatchers.
this._checkInterval = setInterval(() => _this._checkDispachterHealth(), this.timeouts.checkInterval );
}
if (this.timeouts.sendAliveInterval > 0){
// Define a Timer, which will emit Status updates with
// the disered delay.
this._sendInterval = setInterval(() => _this.communicator.emitStatusUpdate(_this._genAliveMessage()), this.timeouts.sendAliveInterval);
}
this.communicator.connected.subscribe((connected) => {
// Handle an unconnect.
if (connected) _this.communicator.emitBonjour(_this._genAliveMessage());
@ -804,7 +844,7 @@ export class nopeDispatcher implements INopeDispatcher {
_this._updateExternalServices();
if (this._logger?.enabledFor(Logger.DEBUG)) {
// If there is a Logger:
this._logger.debug("received new services");
this._logger.debug(this.id,"received new services from", data.dispatcher);
}
}
} catch (e) {}
@ -821,7 +861,7 @@ export class nopeDispatcher implements INopeDispatcher {
_this._updateExternalEvents();
if (this._logger?.enabledFor(Logger.DEBUG)) {
// If there is a Logger:
this._logger.debug("received new events");
this._logger.debug(this.id,"received new Observables from", data.dispatcher);
}
}
} catch (e) {}
@ -836,23 +876,29 @@ export class nopeDispatcher implements INopeDispatcher {
_this._updateExternalGenerators();
if (this._logger?.enabledFor(Logger.DEBUG)) {
// If there is a Logger:
this._logger.debug("received new generators");
this._logger.debug(this.id,"received new generators from", data.dispatcher);
}
} catch (e) {}
});
this.communicator.onBonjour((info) => {
if (_this.id !== info.id) {
_this._externalDispatchers.set(info.id, info);
this.communicator.onStatusUpdate((info) => {
_this._externalDispatchers.set(info.id, info);
_this.externalDispatchers.setContent(
Array.from(_this._externalDispatchers.values())
);
});
this.communicator.onBonjour((info) => {
_this._externalDispatchers.set(info.id, info);
_this.externalDispatchers.setContent(
Array.from(_this._externalDispatchers.values())
);
if (_this.id !== info.id) {
_this._sendAvailableServices();
_this._sendAvailableObservables();
_this._sendAvailableGenerators();
_this._sendAvailableInstances();
_this.externalDispatchers.setContent(
Array.from(_this._externalDispatchers.values())
);
_this._sendAvailableInstances();
if (_this._logger?.enabledFor(Logger.DEBUG)) {
// If there is a Logger:
@ -863,53 +909,7 @@ export class nopeDispatcher implements INopeDispatcher {
}
});
this.communicator.onAurevoir((dispatcher: string) => {
// Delete the Generators of the Instances.
_this._mappingOfRemoteDispatchersAndGenerators.delete(dispatcher);
_this._mappingOfRemoteDispatchersAndServices.delete(dispatcher);
_this._mappingOfRemoteDispatchersAndPropsOrEvents.delete(dispatcher);
_this._mappingOfRemoteDispatchersAndInstances.delete(dispatcher);
_this._externalDispatchers.delete(dispatcher);
_this._updateExternalServices();
_this._updateExternalGenerators();
_this._updateExternalEvents();
_this._updateExternalInstances();
// Iterate over the available instances and remove the providers:
for (const instance of _this._instances.values()) {
// Remove all Dispachers:
let idx = instance.usedBy.indexOf(dispatcher);
while (idx !== -1) {
instance.usedBy.splice(idx, 1);
idx = instance.usedBy.indexOf(dispatcher);
}
// Check if the Element isnt required any more => delete it.
if (instance.usedBy.length === 0) {
_this._logger.info(
"Disposing instance, because it isnt used any more.",
instance.instance.identifier
);
instance.instance
.dispose()
.catch((e) => {
_this._logger.error("Failed to remove the Instance.", e);
})
.then(() => {
_this._logger.info(
"sucessfully disposed instance ",
instance.instance.identifier
);
});
}
}
if (this._logger?.enabledFor(Logger.DEBUG)) {
// If there is a Logger:
this._logger.debug("a dispatcher went offline");
}
});
this.communicator.onAurevoir((dispatcher: string) => _this._removeDispatcher(dispatcher));
// Listen to newly created instances.
this.communicator.onNewInstancesAvailable((message) => {
@ -945,6 +945,98 @@ export class nopeDispatcher implements INopeDispatcher {
this.ready.setContent(true);
}
protected _checkDispachterHealth():void{
const currentTime = Date.now();
let changes = false;
for (const status of this._externalDispatchers.values()){
// determine the Difference
const diff = currentTime - status.timestamp;
// Based on the Difference Determine the Status
if (diff > this.timeouts.remove){
// remove the Dispatcher. But be quite.
// Perhaps more dispatchers will be removed
this._removeDispatcher(status.id, true);
changes = true;
} else if (diff > this.timeouts.dead && status.status !== ENopeDispatcherStatus.DEAD){
status.status = ENopeDispatcherStatus.DEAD;
changes = true;
} else if (diff > this.timeouts.warn && diff <= this.timeouts.dead && status.status !== ENopeDispatcherStatus.WARNING){
status.status = ENopeDispatcherStatus.WARNING;
changes = true;
} else if (diff > this.timeouts.slow && diff <= this.timeouts.warn && status.status !== ENopeDispatcherStatus.SLOW){
status.status = ENopeDispatcherStatus.SLOW;
changes = true;
} else if (diff <= this.timeouts.slow && status.status !== ENopeDispatcherStatus.HEALTHY){
status.status = ENopeDispatcherStatus.HEALTHY;
changes = true;
}
}
if (changes){
// Update the External Dispatchers
this.externalDispatchers.setContent(
Array.from(this._externalDispatchers.values())
);
}
}
protected _removeDispatcher(dispatcher: string, quite = false): void {
// Delete the Generators of the Instances.
this._mappingOfRemoteDispatchersAndGenerators.delete(dispatcher);
this._mappingOfRemoteDispatchersAndServices.delete(dispatcher);
this._mappingOfRemoteDispatchersAndPropsOrEvents.delete(dispatcher);
this._mappingOfRemoteDispatchersAndInstances.delete(dispatcher);
this._externalDispatchers.delete(dispatcher);
// Iterate over the available instances and remove the providers:
for (const instance of this._instances.values()) {
// Remove all Dispachers:
let idx = instance.usedBy.indexOf(dispatcher);
while (idx !== -1) {
instance.usedBy.splice(idx, 1);
idx = instance.usedBy.indexOf(dispatcher);
}
// Check if the Element isnt required any more => delete it.
if (instance.usedBy.length === 0) {
this._logger.info(
"Disposing instance, because it isnt used any more.",
instance.instance.identifier
);
instance.instance
.dispose()
.catch((e) => {
this._logger.error("Failed to remove the Instance.", e);
})
.then(() => {
this._logger.info(
"sucessfully disposed instance ",
instance.instance.identifier
);
});
}
}
this._updateExternalServices();
this._updateExternalGenerators();
this._updateExternalEvents();
this._updateExternalInstances();
if (!quite){
this.externalDispatchers.setContent(
Array.from(this._externalDispatchers.values())
);
}
if (this._logger?.enabledFor(Logger.DEBUG)) {
// If there is a Logger:
this._logger.debug("a dispatcher went offline");
}
}
protected _genAliveMessage(): IDispatcherInfo {
if (RUNNINGINNODE) {
const os = require("os");
@ -958,20 +1050,22 @@ export class nopeDispatcher implements INopeDispatcher {
name: os.hostname()
},
pid: process.pid,
timestamp: Date.now()
timestamp: Date.now(),
status: ENopeDispatcherStatus.HEALTHY
};
}
return {
host: {
cores: -1,
cpu: "unkown",
name: "browser",
os: "unkown",
name: navigator.appCodeName + " " + navigator.appVersion,
os: navigator.platform,
ram: 1
},
id: this.id,
pid: -1,
timestamp: Date.now()
pid: this.id,
timestamp: Date.now(),
status: ENopeDispatcherStatus.HEALTHY
};
}

View File

@ -53,7 +53,10 @@ export function generateNopeBasicPackage(options: INopeDispatcherOptions) {
description: {
name: "nopeDispatcher",
selector: TYPES.dispatcher,
type: InjectableNopeDispatcher
type: InjectableNopeDispatcher,
options: {
scope: "inSingletonScope"
}
},
settings: {
allowInstanceGeneration: false

View File

@ -6,6 +6,7 @@
* @desc [description]
*/
import { ILogger } from "js-logger";
import { IDispatcherInfo } from "./nopeDispatcher.interface";
import { INopeModuleDescription } from "./nopeModule.interface";
import { INopeObservable } from "./nopeObservable.interface";
@ -20,6 +21,18 @@ export interface ICommunicationInterface {
readonly subscriptionMode?: "individual" | "generic";
readonly resultSharing?: "individual" | "generic";
/**
* Function to Emit a Status of the Dispatcher
* @param status The Status
*/
emitStatusUpdate(status: IDispatcherInfo): Promise<void>;
/**
* The Possiblity to subscribe to status-updates.
* @param cb The callback which will be called on during an update.
*/
onStatusUpdate(cb: (status: IDispatcherInfo) => void): Promise<void>;
/**
* Funciton to emit a RPC Request.
*
@ -38,7 +51,7 @@ export interface ICommunicationInterface {
* @param {IResponseTaskMsg} result
* @memberof ICommunicationInterface
*/
emitRpcResult(name: string, result: IResponseTaskMsg): Promise<void>;
emitRpcResponse(name: string, result: IResponseTaskMsg): Promise<void>;
/**
* Function used to subscribe to RPC Results. Each method / function
@ -514,5 +527,13 @@ export type IAvailableServicesMsg = {
export type INopeDispatcherOptions = {
communicator: ICommunicationInterface;
logger?: Logger;
logger?: ILogger;
timeouts?: {
sendAliveInterval: number;
checkInterval: number,
slow: number,
warn: number,
dead: number,
remove: number,
}
};

View File

@ -42,6 +42,7 @@ export type IValidPromise<T> = Promise<T> | INopePromise<T>;
export interface IDispatcherInfo {
id: string;
timestamp: number;
status: ENopeDispatcherStatus,
host: {
cores: number;
cpu: string;
@ -49,7 +50,14 @@ export interface IDispatcherInfo {
ram: number;
name: string;
};
pid: number;
pid: number | string;
}
export enum ENopeDispatcherStatus {
HEALTHY = 0,
SLOW = 1,
WARNING = 2,
DEAD = 3
}
/**

View File

@ -7,9 +7,9 @@
*/
import React from "react";
import { Card, Col, ProgressBar, Row, Table } from "react-bootstrap";
import { Badge, Card, Col, ProgressBar, Row, Table } from "react-bootstrap";
import {
IDispatcherInfo,
ENopeDispatcherStatus,
INopeDispatcher
} from "../../lib/types/nope/nopeDispatcher.interface";
import { INopeObserver } from "../../lib/types/nope/nopeObservable.interface";
@ -54,24 +54,69 @@ class MemoryStatusComponent extends React.Component<
class HostStatusComponent extends React.Component<
{
status: {
cores: number;
cpu: string;
cores: number;
os: string;
ram: number;
name: string;
pids: { pid: number; dispatchers: string[] }[];
pids: { pid: number; dispatchers: {id: string, status: ENopeDispatcherStatus, timestamp: number}[] }[];
status: ENopeDispatcherStatus,
timestamp: number
};
},
{}
{
lastUpdate: string,
variant: "success" | "warning" | "danger",
badge: boolean,
badgeText: string
}
> {
protected _intervall: any = null;
get _state(){
const dict = {
0: "success",
1: "info",
2: "warning",
3: "danger"
};
return {
lastUpdate: (new Date(this.props.status.timestamp)).toISOString(),
variant: dict[this.props.status.status],
badge: this.props.status.status !== ENopeDispatcherStatus.HEALTHY,
badgeText: ENopeDispatcherStatus[this.props.status.status]
};
}
constructor(props){
super(props);
this.state = this._state;
}
componentDidMount(){
const _this = this;
this._intervall = setInterval(() => {
_this.setState(this._state);
}, 200);
}
componentWillUnmount(){
if (this._intervall){
clearInterval(this._intervall);
}
}
render() {
return (
<Card>
<Card border={this.state.variant}>
<Card.Body>
<Card.Title>{this.props.status.name}</Card.Title>
<Card.Title> {this.state.badge ? <>{this.props.status.name} <Badge variant={this.state.variant}>ELEMENT {this.state.badgeText} !</Badge> </> : <>{this.props.status.name}</> } </Card.Title>
<Table bordered>
<tbody>
<tr>
{ this.props.status.ram > 0 ? (<tr>
<td>RAM</td>
<td>
{" "}
@ -80,7 +125,7 @@ class HostStatusComponent extends React.Component<
value={this.props.status.ram}
></MemoryStatusComponent>
</td>
</tr>
</tr>) : <></>}
<tr>
<td>CPU</td>
<td>{this.props.status.cpu}</td>
@ -89,6 +134,10 @@ class HostStatusComponent extends React.Component<
<td>Cores</td>
<td>{this.props.status.cores}</td>
</tr>
<tr>
<td>Platform</td>
<td>{this.props.status.os}</td>
</tr>
<tr>
<td>Nope-Processes</td>
<td>{this.props.status.pids.length}</td>
@ -97,7 +146,7 @@ class HostStatusComponent extends React.Component<
</Table>
</Card.Body>
<Card.Footer>
<small className="text-muted">Last updated 3 mins ago</small>
<small className="text-muted">Last updated <b>{this.state.lastUpdate}</b></small>
</Card.Footer>
</Card>
);
@ -106,7 +155,14 @@ class HostStatusComponent extends React.Component<
class AvailableDispatchers extends React.Component<
{ dispatcher: INopeDispatcher },
{ dispatchers: IDispatcherInfo[][]; connected: boolean }
{ dispatchers: {
cpu: string;
cores: number;
os: string;
ram: number;
name: string;
pids: { pid: number; dispatchers: {id: string, status: ENopeDispatcherStatus, timestamp: number}[] }[];
}[][]; connected: boolean }
> {
private _observer: INopeObserver[] = [];
@ -118,7 +174,9 @@ class AvailableDispatchers extends React.Component<
os: string;
ram: number;
name: string;
pids: { [index: string]: string[] };
pids: { [index: string]: {id: string, status: ENopeDispatcherStatus, timestamp: number}[] };
status: ENopeDispatcherStatus,
timestamp: number
};
} = {};
const ret: {
@ -127,7 +185,9 @@ class AvailableDispatchers extends React.Component<
os: string;
ram: number;
name: string;
pids: { pid: number; dispatchers: string[] };
pids: { pid: number; dispatchers: {id: string, status: ENopeDispatcherStatus, timestamp: number}[] }[];
status: ENopeDispatcherStatus,
timestamp: number
}[] = [];
for (const dispatcher of this.props.dispatcher.externalDispatchers.getContent()) {
@ -138,15 +198,24 @@ class AvailableDispatchers extends React.Component<
name: dispatcher.host.name,
os: dispatcher.host.os,
pids: {},
ram: dispatcher.host.ram
ram: dispatcher.host.ram,
status: ENopeDispatcherStatus.HEALTHY,
timestamp: dispatcher.timestamp
};
}
if (sorted[dispatcher.host.name].pids[dispatcher.pid] === undefined) {
console.log(dispatcher.pid);
sorted[dispatcher.host.name].pids[dispatcher.pid] = [];
}
sorted[dispatcher.host.name].status = Math.max(sorted[dispatcher.host.name].status, dispatcher.status);
sorted[dispatcher.host.name].timestamp = Math.min(sorted[dispatcher.host.name].timestamp, dispatcher.timestamp);
sorted[dispatcher.host.name].pids[dispatcher.pid].push(dispatcher.id);
// Store the Dispatcher Status
sorted[dispatcher.host.name].pids[dispatcher.pid].push({
id: dispatcher.id,
status: dispatcher.status,
timestamp: dispatcher.timestamp
});
}
for (const hostname in sorted) {
@ -159,10 +228,11 @@ class AvailableDispatchers extends React.Component<
};
}
);
ret.push(_toAdd);
}
// console.log(this.props.dispatcher.externalDispatchers.getContent().length,ret.map(item => `${item.name}:${item.status}`).toString());
return ret;
}
@ -170,12 +240,7 @@ class AvailableDispatchers extends React.Component<
const connected = this.props.dispatcher.communicator.connected.getContent();
const dispatchers = [];
const maxItems = 3;
const sorted = [
...this.__sort(),
...this.__sort(),
...this.__sort(),
...this.__sort()
];
const sorted = this.__sort();
// Function to Split the Dispatcher Array into an array containing max 3 elements.
for (let i = 0; i < sorted.length; i = i + maxItems) {