/** * @author Martin Karkowski * @email m.karkowski@zema.de * @create date 2020-11-06 08:52:36 * @modify date 2021-03-24 07:58:29 * @desc [description] */ import { EventEmitter } from "events"; import * as Logger from "js-logger"; import { ILogger } from "js-logger"; import { generateId } from "../helpers/idMethods"; import { copy } from "../helpers/objectMethods"; import { getNopeLogger } from "../logger/getLogger"; import { LoggerLevel } from "../logger/nopeLogger"; import { NopeObservable } from "../observables/nopeObservable"; import { ICommunicationBridge, ICommunicationInterface, ICommunicationMirror, IEmitter, ValidEventTypesOfMirror } from "../types/nope/nopeCommunication.interface"; import { INopeObservable } from "../types/nope/nopeObservable.interface"; const EMITTING_METHODS: Array = [ "emitAurevoir", "emitBonjour", "emitNewInstanceGeneratorsAvailable", "emitNewInstancesAvailable", "emitNewObersvablesAvailable", "emitNewServicesAvailable", "emitStatusUpdate", "emitTaskCancelation" ]; const EMITTING_SPECIFIC_METHODS: Array = [ "emitEvent", "emitRpcRequest", "emitRpcResponse" ]; const ON_METHODS: Array = [ "onAurevoir", "onBonjour", "onNewInstanceGeneratorsAvailable", "onNewInstancesAvailable", "onNewObservablesAvailable", "onNewServicesAvailable", "onStatusUpdate", "onTaskCancelation" ]; const ON_SPECIFIC_METHODS: Array = [ "onEvent", "onRpcRequest", "onRpcResponse" ]; const METHOD_TO_EVENT: { [P in keyof ICommunicationInterface]: string } = { // Default emitters emitAurevoir: "aurevoir", emitBonjour: "bonjour", emitNewInstanceGeneratorsAvailable: "NewInstanceGeneratorsAvailable", emitNewInstancesAvailable: "NewInstancesAvailable", emitNewObersvablesAvailable: "NewObersvablesAvailable", emitNewServicesAvailable: "NewServicesAvailable", emitStatusUpdate: "StatusUpdate", emitTaskCancelation: "TaskCancelation", // The default Listeners onAurevoir: "aurevoir", onBonjour: "bonjour", onNewInstanceGeneratorsAvailable: "NewInstanceGeneratorsAvailable", onNewInstancesAvailable: "NewInstancesAvailable", onNewObservablesAvailable: "NewObersvablesAvailable", onNewServicesAvailable: "NewServicesAvailable", onStatusUpdate: "StatusUpdate", onTaskCancelation: "TaskCancelation", // Specific: onEvent: "event/", onRpcRequest: "", onRpcResponse: "", emitEvent: "event/", emitRpcRequest: "", emitRpcResponse: "" }; const MIRROR_METHOD_MAPPING: { [P in keyof ICommunicationInterface]: ValidEventTypesOfMirror; } = copy(METHOD_TO_EVENT) as any; MIRROR_METHOD_MAPPING.onEvent = "event"; MIRROR_METHOD_MAPPING.emitEvent = "event"; MIRROR_METHOD_MAPPING.onRpcRequest = "rpcRequest"; MIRROR_METHOD_MAPPING.emitRpcRequest = "rpcRequest"; MIRROR_METHOD_MAPPING.onRpcResponse = "rpcResponse"; MIRROR_METHOD_MAPPING.emitRpcResponse = "rpcResponse"; const MIRROR_EVENT_TO_EMIT: { [P in keyof ValidEventTypesOfMirror]: keyof ICommunicationInterface; } = {}; // Define all Events, that must be considered in a mirror. Object.getOwnPropertyNames(MIRROR_METHOD_MAPPING).map((method) => { if (method.startsWith("emit")) { MIRROR_EVENT_TO_EMIT[ MIRROR_METHOD_MAPPING[method] ] = method as keyof ICommunicationInterface; } }); const OFF_METHODS: Array = [ "offEvent", "offRpcRequest", "offRpcResponse" ]; const MAPPING_METHODS: { [P in keyof ICommunicationInterface]: keyof ICommunicationInterface; } = { onAurevoir: "emitAurevoir", onBonjour: "emitBonjour", onEvent: "emitEvent", onNewInstanceGeneratorsAvailable: "emitNewInstanceGeneratorsAvailable", onNewInstancesAvailable: "emitNewInstancesAvailable", onNewObservablesAvailable: "emitNewObersvablesAvailable", onNewServicesAvailable: "emitNewServicesAvailable", onRpcRequest: "emitRpcRequest", onRpcResponse: "emitRpcResponse", onTaskCancelation: "emitTaskCancelation", onStatusUpdate: "emitStatusUpdate" }; const METHODS_WITH_NAME: Array = [ "onRpcResponse", "onRpcRequest", "onEvent" ]; //@ts-ignore Ignore the Interface. Its implemented manually export class Bridge implements ICommunicationBridge { public connected: INopeObservable; public considerConnection = true; public allowServiceRedundancy = false; public ownDispatcherId: string; public id: string; protected _logger: ILogger; protected _internalEmitter: EventEmitter; protected _layers: Map< string, { layer: ICommunicationInterface; considerConnection: boolean; forwardData: boolean; } >; protected _mirrors: Map< ICommunicationMirror, { considerConnection: boolean } >; protected _callbacks: Map< string, // Method Array<(...args) => any> >; protected _specificCallbacks: Array<{ callback: (...args) => any; method: keyof ICommunicationInterface; event: string; }>; /** * Creates an instance of Bridge. * @param {*} [id=generateId()] The ID. (this can be adapted later and is only used to simplify debugging) * @param {string} [loggerName="bridge"] The Name of the Logger. * @param {LoggerLevel} [level="info"] The Level of the Logger. * @memberof Bridge */ constructor( id = generateId(), loggerName = "bridge", level: LoggerLevel = "info" ) { this._internalEmitter = new EventEmitter(); this._callbacks = new Map(); this._specificCallbacks = []; this._layers = new Map(); this._mirrors = new Map(); this._logger = getNopeLogger(loggerName, level); this.id = id; const _this = this; // Now we use the loops to define the functions, // required by the interface. for (const method of ON_METHODS) { this[method] = async (cb) => { _this._on(method, cb); }; } for (const method of ON_SPECIFIC_METHODS) { this[method] = async (event: string, cb) => { _this._onSpecific(method, event, cb); }; } for (const method of OFF_METHODS) { this[method] = async (event: string, cb) => { _this._off(cb); }; } for (const method of EMITTING_METHODS) { this[method] = async (data) => { _this._emit(method, null, data); }; } for (const method of EMITTING_SPECIFIC_METHODS) { this[method] = async (event: string, data) => { _this._emitSpecific(method, null, event, data); }; } this.connected = new NopeObservable(); this.connected.setContent(false); // Add a custom handler for the connect flag. // the Flag is defined as true, if every socket // is connected. this.connected.getter = () => { for (const data of _this._layers.values()) { if (data.considerConnection && !data.layer.connected.getContent()) { return false; } } for (const [mirror, { considerConnection }] of _this._mirrors.entries()) { if (considerConnection && !mirror.connected.getContent()) { return false; } } return true; }; } /** * Helper Function, which will internally subscribe to the Events of the Layer. * * @protected * @param {ICommunicationInterface} layer The Layer to consinder, on this layer, we will subscribe to the events * @param {keyof ICommunicationInterface} method The method used for subscription * @param {string} event The name of the Event * @param {boolean} forwardData Flag, showing whether data will be forwarded or not. * @memberof BridgeV2 */ protected _subscribeToCallback( layer: ICommunicationInterface, method: keyof ICommunicationInterface, event: string, forwardData: boolean ): void { const _this = this; if (METHODS_WITH_NAME.includes(method)) { layer[method as any](event, (data) => { // Define the Method to Forward data. _this._internalEmitter.emit(event, data); // Now we are able to iterate over the Methods and forward the content // but only if the Layer forwards the content if (forwardData) { _this._emitSpecific(method, layer, event, data); } }).catch((error) => { _this._logger.error("failed subscribing", method); _this._logger.error(error); }); } else { // Subscribe to the Event. layer[method as any]((data) => { // Define the Method to Forward data. _this._internalEmitter.emit(METHOD_TO_EVENT[method], data); // Now we are able to iterate over the Methods and forward the content // but only if the Layer forwards the content if (forwardData) { _this._emit(method, layer, data); } }).catch((error) => { _this._logger.error("failed subscribing", method); _this._logger.error(error); }); } } protected _on(method: keyof ICommunicationInterface, cb): void { // Determine the Corresponding Event, that should be used // to subescribe to. const event = METHOD_TO_EVENT[method]; // Subscribe on the Event this._internalEmitter.setMaxListeners( this._internalEmitter.getMaxListeners() + 1 ); if (this._logger.enabledFor(Logger.DEBUG) && event !== "StatusUpdate") { this._logger.debug("subscribe to", event); // If logging is enable, we subscribe to that. const _this = this; this._internalEmitter.on(event, (data) => { _this._logger.debug("received", event, data); }); } this._internalEmitter.on(event, cb); // Store the Unspecific callbacks if (!this._callbacks.has(method)) { this._callbacks.set(method, [cb]); } else { this._callbacks.get(method).push(cb); } // Iterate over the Layers and on the connected Layers, // subscribe the methods. for (const data of this._layers.values()) { if (data.layer.connected.getContent()) { this._subscribeToCallback(data.layer, method, event, data.forwardData); } } } protected _onSpecific( method: keyof ICommunicationInterface, event: string, callback ): void { // Determine the event that we are using internally. // It is a mix between the method alias channel and // the event. const eventToUse = METHOD_TO_EVENT[method] + event; // Subscribe on the Event this._internalEmitter.setMaxListeners( this._internalEmitter.getMaxListeners() + 1 ); if (this._logger.enabledFor(Logger.DEBUG) && event !== "StatusUpdate") { this._logger.debug("subscribe ", method, "specifically on", eventToUse); const _this = this; this._internalEmitter.on(eventToUse, (data) => { _this._logger.debug("received", event, data); }); } this._internalEmitter.on(eventToUse, callback); // Add the callbacks. this._specificCallbacks.push({ callback, method, event }); // Iterate over the Layers and on the connected Layers, // subscribe the methods. for (const data of this._layers.values()) { if (data.layer.connected.getContent()) { this._subscribeToCallback(data.layer, method, event, data.forwardData); } } } protected _off(cb): void { // Determine the event that we are using internally. // It is a mix between the method alias channel and // the event. for (let i = 0; i < this._specificCallbacks.length; i++) { if (this._specificCallbacks[i].callback == cb) { this._specificCallbacks.splice(i, 1); break; } } // Theoretically we must remove the subscriptions. // But we wont do this, because if the connection has // been lost, or the Layer has been removed, the stored // callbacks wont be called any more. } protected _emit( method: keyof ICommunicationInterface, toExclude: ICommunicationInterface | ICommunicationMirror = null, dataToSend: any ): void { const event = METHOD_TO_EVENT[method]; if (this._logger.enabledFor(Logger.DEBUG) && event !== "StatusUpdate") { this._logger.debug("emitting", event, dataToSend); } // Emit the Event on the internal Layer. this._internalEmitter.emit(event, dataToSend); const _this = this; // Iterate over the Layers. for (const data of this._layers.values()) { // If the Layer has been conneced if (data.layer !== toExclude && data.layer.connected.getContent()) { // Only Publish the Data, on which we are forwarding data.layer[method as any](dataToSend).catch((error) => { _this._logger.error("failed executing", method); _this._logger.error(error); }); } } for (const mirror of this._mirrors.keys()) { if (mirror != toExclude && mirror.connected.getContent()) { mirror.emit(MIRROR_METHOD_MAPPING[method], dataToSend); } } } protected _emitSpecific( method: keyof ICommunicationInterface, toExclude: ICommunicationInterface | ICommunicationMirror = null, event: string, dataToSend: any ): void { if (this._logger.enabledFor(Logger.DEBUG) && event !== "StatusUpdate") { this._logger.debug( "emitting", METHOD_TO_EVENT[method] + event, dataToSend ); } // Emit the Event on the internal Layer. this._internalEmitter.emit(METHOD_TO_EVENT[method] + event, dataToSend); const _this = this; // Iterate over the Layers. for (const data of this._layers.values()) { // If the Layer has been conneced if (data.layer !== toExclude && data.layer.connected.getContent()) { data.layer[method as any](event, dataToSend).catch((error) => { _this._logger.error("failed executing", method); _this._logger.error(error); }); } } for (const mirror of this._mirrors.keys()) { if (mirror != toExclude && mirror.connected.getContent()) { mirror.emit(MIRROR_METHOD_MAPPING[method], { name: event, data: dataToSend }); } } } public async addLayer( layer: ICommunicationInterface, forwardData = false, considerConnection = false ): Promise { if (!this._layers.has(layer.id)) { // Store the Layers: this._layers.set(layer.id, { layer, considerConnection, forwardData }); // Forward the Events of the Layer // being connected to our aggregated // state const _this = this; layer.connected.subscribe(() => { _this.connected.forcePublish(); }); // Wait until the Layer is connected. await layer.connected.waitFor((value) => value); // Register all know unspecific methods for (const [method, cbs] of this._callbacks.entries()) { for (const callback of cbs) { layer[method as any](callback); } } // Register all know specific methods for (const { method, event, callback } of this._specificCallbacks) { layer[method as any](event, callback); } } } public async removeLayer(layer: ICommunicationInterface): Promise { if (this._layers.has(layer.id)) { this._layers.delete(layer.id); } } /** * Adds a Mirror to the System. * * @param {IEmitter} mirror the Mirror to use. * @return {*} {Promise} * @memberof Bridge */ public addMirror( mirror: ICommunicationMirror, considerConnection = true ): void { if (!this._mirrors.has(mirror)) { // Add the Mirror this._mirrors.set(mirror, { considerConnection }); const _this = this; // Subscribe for the Connected Flag. if (considerConnection) { // We just forward the Signal mirror.connected.subscribe(() => _this.connected.forcePublish()); } // Now we wait until the connection has been established first time, // then, we will add our subscriptions. mirror.connected .waitFor((value) => value) .then(() => { for (const eventName of ValidEventTypesOfMirror) { if ( !EMITTING_SPECIFIC_METHODS.includes( MIRROR_EVENT_TO_EMIT[eventName] ) ) { mirror.on(eventName as ValidEventTypesOfMirror, (data) => { if ( _this._logger.enabledFor(Logger.DEBUG) && eventName !== "StatusUpdate" ) { _this._logger.debug( "received data from mirror on", eventName ); } // Now Forward the Data. _this._emit(MIRROR_EVENT_TO_EMIT[eventName], mirror, data); }); } else { mirror.on(eventName as ValidEventTypesOfMirror, (data) => { // If logging is enabled => Forward this information if ( _this._logger.enabledFor(Logger.DEBUG) && eventName !== "StatusUpdate" ) { _this._logger.debug( "received data from mirror on", eventName, data.name ); } // Now emit the data on the channel _this._emitSpecific( MIRROR_EVENT_TO_EMIT[eventName], mirror, data.name, data.data ); }); } } }); } } /** * Removes a Mirror, from the system. * * @param {ICommunicationMirror} mirror * @return {*} {boolean} * @memberof Bridge */ public removeMirror(mirror: ICommunicationMirror): boolean { return this._mirrors.delete(mirror); } }