/** * @author Martin Karkowski * @email m.karkowski@zema.de * @create date 2020-11-06 08:52:36 * @modify date 2021-03-19 14:27:23 * @desc [description] */ import { EventEmitter } from "events"; import * as Logger from "js-logger"; import { ILogger } from "js-logger"; import { generateId } from "../helpers/idMethods"; import { getNopeLogger } from "../logger/getLogger"; import { LoggerLevel } from "../logger/nopeLogger"; import { NopeObservable } from "../observables/nopeObservable"; import { ICommunicationBridge, ICommunicationInterface } from "../types/nope/nopeCommunication.interface"; import { INopeObservable } from "../types/nope/nopeObservable.interface"; const EMITTING_METHODS: Array = [ "emitAurevoir", "emitBonjour", "emitNewInstanceGeneratorsAvailable", "emitNewInstancesAvailable", "emitNewObersvablesAvailable", "emitNewServicesAvailable", "emitStatusUpdate", "emitTaskCancelation" ]; const EMITTING_SPECIFIC_METHODS: Array = [ "emitEvent", "emitRpcRequest", "emitRpcResponse" ]; const ON_METHODS: Array = [ "onAurevoir", "onBonjour", "onNewInstanceGeneratorsAvailable", "onNewInstancesAvailable", "onNewObservablesAvailable", "onNewServicesAvailable", "onStatusUpdate", "onTaskCancelation" ]; const ON_SPECIFIC_METHODS: Array = [ "onEvent", "onRpcRequest", "onRpcResponse" ]; const METHOD_TO_EVENT: { [P in keyof ICommunicationInterface]: string } = { // Default emitters emitAurevoir: "aurevoir", emitBonjour: "bonjour", emitNewInstanceGeneratorsAvailable: "NewInstanceGeneratorsAvailable", emitNewInstancesAvailable: "NewInstancesAvailable", emitNewObersvablesAvailable: "NewObersvablesAvailable", emitNewServicesAvailable: "NewServicesAvailable", emitStatusUpdate: "StatusUpdate", emitTaskCancelation: "TaskCancelation", // The default Listeners onAurevoir: "aurevoir", onBonjour: "bonjour", onNewInstanceGeneratorsAvailable: "NewInstanceGeneratorsAvailable", onNewInstancesAvailable: "NewInstancesAvailable", onNewObservablesAvailable: "NewObersvablesAvailable", onNewServicesAvailable: "NewServicesAvailable", onStatusUpdate: "StatusUpdate", onTaskCancelation: "TaskCancelation", // Specific: onEvent: "event/", onRpcRequest: "", onRpcResponse: "", emitEvent: "event/", emitRpcRequest: "", emitRpcResponse: "" }; const OFF_METHODS: Array = [ "offEvent", "offRpcRequest", "offRpcResponse" ]; const MAPPING_METHODS: { [P in keyof ICommunicationInterface]: keyof ICommunicationInterface; } = { onAurevoir: "emitAurevoir", onBonjour: "emitBonjour", onEvent: "emitEvent", onNewInstanceGeneratorsAvailable: "emitNewInstanceGeneratorsAvailable", onNewInstancesAvailable: "emitNewInstancesAvailable", onNewObservablesAvailable: "emitNewObersvablesAvailable", onNewServicesAvailable: "emitNewServicesAvailable", onRpcRequest: "emitRpcRequest", onRpcResponse: "emitRpcResponse", onTaskCancelation: "emitTaskCancelation", onStatusUpdate: "emitStatusUpdate" }; const METHODS_WITH_NAME: Array = [ "onRpcResponse", "onRpcRequest", "onEvent" ]; //@ts-ignore Ignore the Interface. Its implemented manually export class Bridge implements ICommunicationBridge { public connected: INopeObservable; public considerConnection = true; public allowServiceRedundancy = false; public ownDispatcherId: string; public id: string; protected _logger: ILogger; protected _internalEmitter: EventEmitter; protected _layers: Map< string, { layer: ICommunicationInterface; considerConnection: boolean; forwardData: boolean; } >; protected _callbacks: Map< string, // Method Array<(...args) => any> >; protected _specificCallbacks: Array<{ callback: (...args) => any; method: keyof ICommunicationInterface; event: string; }>; /** * Creates an instance of Bridge. * @param {*} [id=generateId()] The ID. (this can be adapted later and is only used to simplify debugging) * @param {string} [loggerName="bridge"] The Name of the Logger. * @param {LoggerLevel} [level="info"] The Level of the Logger. * @memberof Bridge */ constructor( id = generateId(), loggerName = "bridge", level: LoggerLevel = "info" ) { this._internalEmitter = new EventEmitter(); this._callbacks = new Map(); this._specificCallbacks = []; this._layers = new Map(); this._logger = getNopeLogger(loggerName, level); this.id = id; const _this = this; // Now we use the loops to define the functions, // required by the interface. for (const method of ON_METHODS) { this[method] = async (cb) => { _this._on(method, cb); }; } for (const method of ON_SPECIFIC_METHODS) { this[method] = async (event: string, cb) => { _this._onSpecific(method, event, cb); }; } for (const method of OFF_METHODS) { this[method] = async (event: string, cb) => { _this._off(cb); }; } for (const method of EMITTING_METHODS) { this[method] = async (data) => { _this._emit(method, data); }; } for (const method of EMITTING_SPECIFIC_METHODS) { this[method] = (event: string, data) => { _this._emitSpecific(method, event, data); }; } this.connected = new NopeObservable(); this.connected.setContent(false); // Add a custom handler for the connect flag. // the Flag is defined as true, if every socket // is connected. this.connected.getter = () => { for (const data of _this._layers.values()) { if (data.considerConnection && !data.layer.connected.getContent()) return false; } return true; }; } /** * Helper Function, which will forward an Event. * * @protected * @param {ICommunicationInterface} layerToExclude Layer which should not be handled. * @param {keyof ICommunicationInterface} method The method. * @param {...any[]} args args, used during forwarding. * @memberof BridgeV2 */ protected _forward( layerToExclude: ICommunicationInterface, method: keyof ICommunicationInterface, ...args ): void { // Iterate over the layers and forward the Data to the other Layers for (const data of this._layers.values()) { if (data.layer !== layerToExclude) { data.layer[method as any](...args).catch((error) => { this._logger.error("failed forwarding", method, ...args); this._logger.error(error); }); } } } /** * Helper Function, which will internally subscribe to the Events of the Layer. * * @protected * @param {ICommunicationInterface} layer The Layer to consinder, on this layer, we will subscribe to the events * @param {keyof ICommunicationInterface} method The method used for subscription * @param {string} event The name of the Event * @param {boolean} forwardData Flag, showing whether data will be forwarded or not. * @memberof BridgeV2 */ protected _subscribeToCallback( layer: ICommunicationInterface, method: keyof ICommunicationInterface, event: string, forwardData: boolean ): void { const _this = this; if (METHODS_WITH_NAME.includes(method)) { layer[method as any](event, (data) => { // Define the Method to Forward data. _this._internalEmitter.emit(event, data); // Now we are able to iterate over the Methods and forward the content // but only if the Layer forwards the content if (forwardData) { _this._forward(layer, method, event, data); } }).catch((error) => { _this._logger.error("failed subscribing", method); _this._logger.error(error); }); } else { // Subscribe to the Event. layer[method as any]((data) => { // Define the Method to Forward data. _this._internalEmitter.emit(METHOD_TO_EVENT[method], data); // Now we are able to iterate over the Methods and forward the content // but only if the Layer forwards the content if (forwardData) { _this._forward(layer, method, data); } }).catch((error) => { _this._logger.error("failed subscribing", method); _this._logger.error(error); }); } } protected _on(method: keyof ICommunicationInterface, cb): void { // Determine the Corresponding Event, that should be used // to subescribe to. const event = METHOD_TO_EVENT[method]; // Subscribe on the Event this._internalEmitter.setMaxListeners( this._internalEmitter.getMaxListeners() + 1 ); if (this._logger.enabledFor(Logger.DEBUG) && event !== "StatusUpdate") { this._logger.debug("subscribe to", event); // If logging is enable, we subscribe to that. const _this = this; this._internalEmitter.on(event, (data) => { _this._logger.debug("received", event, data); }); } this._internalEmitter.on(event, cb); // Store the Unspecific callbacks if (!this._callbacks.has(method)) { this._callbacks.set(method, [cb]); } else { this._callbacks.get(method).push(cb); } // Iterate over the Layers and on the connected Layers, // subscribe the methods. for (const data of this._layers.values()) { if (data.layer.connected.getContent()) { this._subscribeToCallback(data.layer, method, event, data.forwardData); } } } protected _onSpecific( method: keyof ICommunicationInterface, event: string, callback ): void { // Determine the event that we are using internally. // It is a mix between the method alias channel and // the event. const eventToUse = METHOD_TO_EVENT[method] + event; // Subscribe on the Event this._internalEmitter.setMaxListeners( this._internalEmitter.getMaxListeners() + 1 ); if (this._logger.enabledFor(Logger.DEBUG) && event !== "StatusUpdate") { this._logger.debug("subscribe ", method, "specifically on", eventToUse); const _this = this; this._internalEmitter.on(eventToUse, (data) => { _this._logger.debug("received", event, data); }); } this._internalEmitter.on(eventToUse, callback); // Add the callbacks. this._specificCallbacks.push({ callback, method, event }); // Iterate over the Layers and on the connected Layers, // subscribe the methods. for (const data of this._layers.values()) { if (data.layer.connected.getContent()) { this._subscribeToCallback(data.layer, method, event, data.forwardData); } } } protected _off(cb): void { // Determine the event that we are using internally. // It is a mix between the method alias channel and // the event. for (let i = 0; i < this._specificCallbacks.length; i++) { if (this._specificCallbacks[i].callback == cb) { this._specificCallbacks.splice(i, 1); break; } } // Theoretically we must remove the subscriptions. // But we wont do this, because if the connection has // been lost, or the Layer has been removed, the stored // callbacks wont be called any more. } protected _emit( method: keyof ICommunicationInterface, dataToSend: any ): void { const event = METHOD_TO_EVENT[method]; if (this._logger.enabledFor(Logger.DEBUG) && event !== "StatusUpdate") { this._logger.debug("emitting", event, dataToSend); } // Emit the Event on the internal Layer. this._internalEmitter.emit(event, dataToSend); const _this = this; // Iterate over the Layers. for (const data of this._layers.values()) { // If the Layer has been conneced if (data.layer.connected.getContent()) { data.layer[method as any](dataToSend).catch((error) => { _this._logger.error("failed executing", method); _this._logger.error(error); }); } } } protected _emitSpecific( method: keyof ICommunicationInterface, event: string, dataToSend: any ): void { if (this._logger.enabledFor(Logger.DEBUG) && event !== "StatusUpdate") { this._logger.debug( "emitting", METHOD_TO_EVENT[method] + event, dataToSend ); } // Emit the Event on the internal Layer. this._internalEmitter.emit(METHOD_TO_EVENT[method] + event, dataToSend); const _this = this; // Iterate over the Layers. for (const data of this._layers.values()) { // If the Layer has been conneced if (data.layer.connected.getContent()) { data.layer[method as any](event, dataToSend).catch((error) => { _this._logger.error("failed executing", method); _this._logger.error(error); }); } } } public async addLayer( layer: ICommunicationInterface, forwardData = false, considerConnection = false ): Promise { if (!this._layers.has(layer.id)) { // Store the Layers: this._layers.set(layer.id, { layer, considerConnection, forwardData }); // Forward the Events of the Layer // being connected to our aggregated // state const _this = this; layer.connected.subscribe(() => { _this.connected.forcePublish(); }); // Wait until the Layer is connected. await layer.connected.waitFor((value) => value); // Register all know unspecific methods for (const [method, cbs] of this._callbacks.entries()) { for (const callback of cbs) { layer[method as any](callback); } } // Register all know specific methods for (const { method, event, callback } of this._specificCallbacks) { layer[method as any](event, callback); } } } public async removeLayer(layer: ICommunicationInterface): Promise { if (this._layers.has(layer.id)) { this._layers.delete(layer.id); } } }