/** * @author Martin Karkowski * @email m.karkowski@zema.de * @create date 2020-11-06 08:52:36 * @modify date 2022-01-03 17:34:07 * @desc [description] */ import { EventEmitter } from "events"; import * as Logger from "js-logger"; import { ILogger } from "js-logger"; import { generateId } from "../helpers/idMethods"; import { copy } from "../helpers/objectMethods"; import { defineNopeLogger, ValidLoggerDefinition } from "../logger/getLogger"; import { LoggerLevel } from "../logger/nopeLogger"; import { NopeObservable } from "../observables/nopeObservable"; import { IAvailableInstanceGeneratorsMsg, IAvailableInstancesMsg, IAvailableServicesMsg, IAvailableTopicsMsg, ICommunicationBridge, ICommunicationInterface, ICommunicationMirror, IEmitter, IExecutingTaskMsg, IExternalEventMsg, IExternalPropertyChangedMsg, IRequestTaskMsg, IResponseTaskMsg, IRpcUnregisterMsg, ITaskCancelationMsg, ValidEventTypesOfMirror, } from "../types/nope/nopeCommunication.interface"; import { IDispatcherInfo } from "../types/nope/nopeDispatcher.interface"; import { INopeObservable } from "../types/nope/nopeObservable.interface"; const EMITTING_METHODS: Array = [ "emitAurevoir", "emitBonjour", "emitNewInstanceGeneratorsAvailable", "emitNewInstancesAvailable", "emitNewObservablesAvailable", "emitNewServicesAvailable", "emitStatusUpdate", "emitTaskCancelation", "emitUnregisterRpc", "emitExecutingTasks", ]; const EMITTING_SPECIFIC_METHODS: Array = [ "emitEvent", "emitRpcRequest", "emitRpcResponse", ]; const ON_METHODS: Array = [ "onAurevoir", "onBonjour", "onNewInstanceGeneratorsAvailable", "onNewInstancesAvailable", "onNewObservablesAvailable", "onNewServicesAvailable", "onStatusUpdate", "onTaskCancelation", "onUnregisterRpc", "onExecutingTasks", ]; const ON_SPECIFIC_METHODS: Array = [ "onEvent", "onRpcRequest", "onRpcResponse", ]; export const UNSPECIFIC_TO_METHODS: Partial<{ [key in Partial]: { emit: keyof Partial; subscribe: keyof Partial; }; }> = { Aurevoir: { emit: "emitAurevoir", subscribe: "onAurevoir", }, Bonjour: { emit: "emitBonjour", subscribe: "onBonjour", }, NewInstancesAvailable: { emit: "emitNewInstancesAvailable", subscribe: "onNewInstancesAvailable", }, NewInstanceGeneratorsAvailable: { emit: "emitNewInstanceGeneratorsAvailable", subscribe: "onNewInstanceGeneratorsAvailable", }, NewObservablesAvailable: { emit: "emitNewObservablesAvailable", subscribe: "onNewObservablesAvailable", }, NewServicesAvailable: { emit: "emitNewServicesAvailable", subscribe: "onNewServicesAvailable", }, StatusUpdate: { emit: "emitStatusUpdate", subscribe: "onStatusUpdate", }, TaskCancelation: { emit: "emitTaskCancelation", subscribe: "onTaskCancelation", }, RpcUnregister: { emit: "emitUnregisterRpc", subscribe: "onUnregisterRpc", }, Tasks: { emit: "emitExecutingTasks", subscribe: "onExecutingTasks", }, }; const LAYER_METHOD_TO_EVENT: { [P in keyof Partial]: string; } = { // Specific: onEvent: "event/", emitEvent: "event/", onRpcRequest: "", emitRpcRequest: "", onRpcResponse: "", emitRpcResponse: "", }; for (const eventName in UNSPECIFIC_TO_METHODS) { const data = UNSPECIFIC_TO_METHODS[eventName]; LAYER_METHOD_TO_EVENT[data.emit as any] = eventName; LAYER_METHOD_TO_EVENT[data.subscribe as any] = eventName; } // Define additionally a Mapping between the Events const MIRROR_METHOD_TO_EVENT: { [P in keyof ICommunicationInterface]: ValidEventTypesOfMirror; } = copy(LAYER_METHOD_TO_EVENT) as any; MIRROR_METHOD_TO_EVENT.onEvent = "Event"; MIRROR_METHOD_TO_EVENT.emitEvent = "Event"; MIRROR_METHOD_TO_EVENT.onRpcRequest = "RpcRequest"; MIRROR_METHOD_TO_EVENT.emitRpcRequest = "RpcRequest"; MIRROR_METHOD_TO_EVENT.onRpcResponse = "RpcResponse"; MIRROR_METHOD_TO_EVENT.emitRpcResponse = "RpcResponse"; const MIRROR_EVENT_TO_EMIT: { [P in keyof ValidEventTypesOfMirror]: keyof ICommunicationInterface; } = {} as any; // Define all Events, that must be considered in a mirror. Object.getOwnPropertyNames(MIRROR_METHOD_TO_EVENT).map((method) => { if (method.startsWith("emit")) { MIRROR_EVENT_TO_EMIT[MIRROR_METHOD_TO_EVENT[method]] = method as keyof ICommunicationInterface; } }); const OFF_METHODS: Array = [ "offEvent", "offRpcRequest", "offRpcResponse", ]; export class Bridge implements ICommunicationBridge { public connected: INopeObservable; public considerConnection = true; public allowsServiceRedundancy = false; public ownDispatcherId: string; public id: string; protected _useInternalEmitter: boolean; protected _logger: ILogger; protected _internalEmitter: EventEmitter; protected _layers: Map< string, { layer: ICommunicationInterface; considerConnection: boolean; forwardData: boolean; } >; protected _mirrors: Map< ICommunicationMirror, { considerConnection: boolean } >; protected _callbacks: Map< string, // Method Array<(...args) => any> >; protected _specificCallbacks: Array<{ callback: (...args) => any; method: keyof ICommunicationInterface; event: string; }>; /** * Creates an instance of Bridge. * @param {*} [id=generateId()] The ID. (this can be adapted later and is only used to simplify debugging) * @param {string} [loggerName="bridge"] The Name of the Logger. * @param {LoggerLevel} [level="info"] The Level of the Logger. * @memberof Bridge */ constructor(id = generateId(), logger: ValidLoggerDefinition = false) { this._internalEmitter = new EventEmitter(); this._callbacks = new Map(); this._specificCallbacks = []; this._layers = new Map(); this._mirrors = new Map(); this.id = id; this._logger = defineNopeLogger(logger, `nope.bridge`); this._useInternalEmitter = true; const _this = this; // Now we use the loops to define the functions, // required by the interface. for (const method of ON_METHODS) { (this[method] as any) = async (cb) => { _this._on(method, cb); }; } for (const method of ON_SPECIFIC_METHODS) { (this[method] as any) = async (event: string, cb) => { _this._onSpecific(method, event, cb); }; } for (const method of OFF_METHODS) { (this[method] as any) = async (event: string, cb) => { _this._off(cb); }; } for (const method of EMITTING_METHODS) { const eventName = LAYER_METHOD_TO_EVENT[method]; (this[method] as any) = async (data) => { _this._emit(eventName, null, data); }; } for (const method of EMITTING_SPECIFIC_METHODS) { (this[method] as any) = async (event: string, data) => { _this._emitSpecific(method, null, event, data); }; } this.connected = new NopeObservable(); this.connected.setContent(false); // Add a custom handler for the connect flag. // the Flag is defined as true, if every socket // is connected. this.connected.getter = () => { for (const data of _this._layers.values()) { if (data.considerConnection && !data.layer.connected.getContent()) { return false; } } for (const [mirror, { considerConnection }] of _this._mirrors.entries()) { if (considerConnection && !mirror.connected.getContent()) { return false; } } return true; }; } get receivesOwnMessages(): boolean { for (const layer of this._layers.values()) { if (!layer.layer.receivesOwnMessages) { return false; } } for (const mirror of this._mirrors.keys()) { if (!mirror.receivesOwnMessages) { return false; } } return true; } async dispose(): Promise { // Iterate over the Layers and dispose them. for (const item of this._layers.values()) { await item.layer.dispose(); } // Now dispose all mirrors for (const mirror of this._mirrors.keys()) { await mirror.dispose(); } } protected _checkInternalEmitter(): void { this._useInternalEmitter = true; for (const layer of this._layers.values()) { if (layer.layer.receivesOwnMessages) { this._useInternalEmitter = false; break; } } for (const mirror of this._mirrors.keys()) { if (mirror.receivesOwnMessages) { this._useInternalEmitter = false; break; } } } /** * Helper Function, which will internally subscribe to the Events of the Layer. * * @protected * @param {ICommunicationInterface} layer The Layer to consinder, on this layer, we will subscribe to the events * @param {keyof ICommunicationInterface} method The method used for subscription * @param {string} event The name of the Event * @param {boolean} forwardData Flag, showing whether data will be forwarded or not. * @memberof BridgeV2 */ protected _subscribeToCallback( layer: ICommunicationInterface, method: keyof ICommunicationInterface, event: string, forwardData: boolean ): void { const _this = this; if (ON_SPECIFIC_METHODS.includes(method)) { layer[method as any](event, (data) => { // Now we are able to iterate over the Methods and forward the content // but only if the Layer forwards the content if (forwardData) { _this._emitSpecific(method, layer, event, data, true); } else { _this._internalEmitter.emit(event, data); } }).catch((error) => { if (_this._logger) { _this._logger.error("failed subscribing", method); _this._logger.error(error); } }); } else { // Subscribe to the Event. layer[method as any]((data) => { // Now we are able to iterate over the Methods and forward the content // but only if the Layer forwards the content if (forwardData) { _this._emit(method, layer, data); } else { _this._internalEmitter.emit( LAYER_METHOD_TO_EVENT[method], data, true ); } }).catch((error) => { if (_this._logger) { _this._logger.error("failed subscribing", method); _this._logger.error(error); } }); } } protected _on(method: keyof ICommunicationInterface, cb): void { // Determine the Corresponding Event, that should be used // to subescribe to. const event = LAYER_METHOD_TO_EVENT[method]; // Subscribe on the Event this._internalEmitter.setMaxListeners( this._internalEmitter.getMaxListeners() + 1 ); if ( this._logger?.enabledFor((Logger as any).DEBUG) && event !== "StatusUpdate" ) { this._logger.debug("subscribe to", event); // If logging is enable, we subscribe to that. const _this = this; this._internalEmitter.on(event, (data) => { _this._logger.debug("received", event, data); }); } this._internalEmitter.on(event, cb); // Store the Unspecific callbacks if (!this._callbacks.has(method)) { this._callbacks.set(method, [cb]); } else { this._callbacks.get(method).push(cb); } // Iterate over the Layers and on the connected Layers, // subscribe the methods. for (const data of this._layers.values()) { if (data.layer.connected.getContent()) { this._subscribeToCallback(data.layer, method, event, data.forwardData); } } } protected _onSpecific( method: keyof ICommunicationInterface, event: string, callback ): void { // Determine the event that we are using internally. // It is a mix between the method alias channel and // the event. const eventToUse = LAYER_METHOD_TO_EVENT[method] + event; // Subscribe on the Event this._internalEmitter.setMaxListeners( this._internalEmitter.getMaxListeners() + 1 ); if ( this._logger?.enabledFor((Logger as any).DEBUG) && event !== "StatusUpdate" ) { this._logger.debug("subscribe", method, "specifically on", eventToUse); const _this = this; this._internalEmitter.on(eventToUse, (data) => { _this._logger.debug("received", event, data); }); } this._internalEmitter.on(eventToUse, callback); // Add the callbacks. this._specificCallbacks.push({ callback, method, event: eventToUse, }); // Iterate over the Layers and on the connected Layers, // subscribe the methods. for (const data of this._layers.values()) { if (data.layer.connected.getContent()) { this._subscribeToCallback( data.layer, method, eventToUse, data.forwardData ); } } } protected _off(cb): void { // Determine the event that we are using internally. // It is a mix between the method alias channel and // the event. for (let i = 0; i < this._specificCallbacks.length; i++) { if (this._specificCallbacks[i].callback == cb) { this._specificCallbacks.splice(i, 1); break; } } // Theoretically we must remove the subscriptions. // But we wont do this, because if the connection has // been lost, or the Layer has been removed, the stored // callbacks wont be called any more. } protected _emit( eventName: string, toExclude: ICommunicationInterface | ICommunicationMirror = null, dataToSend: any, force = false ): void { if ( this._logger?.enabledFor((Logger as any).WARN) && eventName !== "StatusUpdate" ) { this._logger.debug("emitting", eventName, dataToSend); } if (this._useInternalEmitter || force) { // Emit the Event on the internal Layer. this._internalEmitter.emit(eventName, dataToSend); } const _this = this; const _method = UNSPECIFIC_TO_METHODS[eventName].emit; // Iterate over the Layers. for (const data of this._layers.values()) { // If the Layer has been conneced if (data.layer !== toExclude && data.layer.connected.getContent()) { // Only Publish the Data, on which we are forwarding data.layer[_method as any](dataToSend).catch((error) => { if (_this._logger) { _this._logger.error("failed executing", _method); _this._logger.error(error); } }); } } for (const mirror of this._mirrors.keys()) { if (mirror != toExclude && mirror.connected.getContent()) { // TODO: Check if eventName = ValidEventTypesOfMirror mirror.emit(eventName as ValidEventTypesOfMirror, dataToSend); } } } protected _emitSpecific( method: keyof ICommunicationInterface, toExclude: ICommunicationInterface | ICommunicationMirror = null, event: string, dataToSend: any, force = false ): void { const eventToUse = LAYER_METHOD_TO_EVENT[method] + event; if ( this._logger?.enabledFor((Logger as any).DEBUG) && event !== "StatusUpdate" ) { this._logger.debug("emitting", eventToUse, dataToSend); } // Emit the Event on the internal Layer. if (this._useInternalEmitter || force) { this._internalEmitter.emit(eventToUse, dataToSend); } const _this = this; // Iterate over the Layers. for (const data of this._layers.values()) { // If the Layer has been conneced if (data.layer !== toExclude && data.layer.connected.getContent()) { data.layer[method as any](eventToUse, dataToSend).catch((error) => { if (_this._logger) { _this._logger.error("failed executing", method); _this._logger.error(error); } }); } } for (const mirror of this._mirrors.keys()) { if (mirror != toExclude && mirror.connected.getContent()) { mirror.emit(MIRROR_METHOD_TO_EVENT[method], { // Because we will adapt the event again (see "addMirror" -> it is calling "_emitSpecific" // which will cause some falsy behavior.) name: event, data: dataToSend, }); } } } public async addLayer( layer: ICommunicationInterface, forwardData = false, considerConnection = false ): Promise { if (!this._layers.has(layer.id)) { // Store the Layers: this._layers.set(layer.id, { layer, considerConnection, forwardData, }); // Forward the Events of the Layer // being connected to our aggregated // state const _this = this; layer.connected.subscribe(() => { _this.connected.forcePublish(); }); // Wait until the Layer is connected. await layer.connected.waitFor((value) => value); // Register all know unspecific methods for (const [method, cbs] of this._callbacks.entries()) { for (const callback of cbs) { layer[method as any](callback); } } // Register all know specific methods for (const { method, event, callback } of this._specificCallbacks) { layer[method as any](event, callback); } this._checkInternalEmitter(); } } public async removeLayer(layer: ICommunicationInterface): Promise { if (this._layers.has(layer.id)) { this._layers.delete(layer.id); this._checkInternalEmitter(); } } /** * Adds a Mirror to the System. * * @param {IEmitter} mirror the Mirror to use. * @return {*} {Promise} * @memberof Bridge */ public addMirror( mirror: ICommunicationMirror, considerConnection = true ): void { if (!this._mirrors.has(mirror)) { // Add the Mirror this._mirrors.set(mirror, { considerConnection, }); const _this = this; // Subscribe for the Connected Flag. if (considerConnection) { // We just forward the Signal mirror.connected.subscribe(() => _this.connected.forcePublish()); } // Now we wait until the connection has been established first time, // then, we will add our subscriptions. mirror.connected .waitFor((value) => value) .then(() => { for (const eventName of ValidEventTypesOfMirror) { if (UNSPECIFIC_TO_METHODS[eventName]) { mirror.on(eventName as ValidEventTypesOfMirror, (data) => { // Unless it is a Status-Update, we will log it. if ( _this._logger?.enabledFor((Logger as any).DEBUG) && eventName !== "StatusUpdate" ) { _this._logger.debug( "received data from mirror on", eventName, ". Forwarding event to", UNSPECIFIC_TO_METHODS[eventName].emit ); } // Now Forward the Data. _this._emit(eventName, mirror, data, true); }); } else { mirror.on(eventName as ValidEventTypesOfMirror, (data) => { // If logging is enabled => Forward this information if ( _this._logger?.enabledFor((Logger as any).DEBUG) && eventName !== "StatusUpdate" ) { _this._logger.debug( "received data from mirror on", eventName, data.name ); } // Now emit the data on the channel _this._emitSpecific( MIRROR_EVENT_TO_EMIT[eventName], mirror, data.name, data.data, true ); }); } } }); this._checkInternalEmitter(); } } /** * Removes a Mirror, from the system. * * @param {ICommunicationMirror} mirror * @return {*} {boolean} * @memberof Bridge */ public removeMirror(mirror: ICommunicationMirror): boolean { let removed = false; if (this._mirrors.has(mirror)) { this._mirrors.delete(mirror); this._checkInternalEmitter(); removed = true; } return removed; } // Implement the Default behavior of a Bridge. // All the Methods below will be implemented // during the initalization. So if something // fails => an Error is thrown. emitExecutingTasks(data: IExecutingTaskMsg): Promise { throw new Error("Method should be overwritten."); } onExecutingTasks(cb: (data: IExecutingTaskMsg) => void): Promise { throw new Error("Method should be overwritten."); } emitUnregisterRpc(data: IRpcUnregisterMsg): Promise { throw new Error("Method should be overwritten."); } onUnregisterRpc(cb: (data: IRpcUnregisterMsg) => void): Promise { throw new Error("Method should be overwritten."); } emitStatusUpdate(status: IDispatcherInfo): Promise { throw new Error("Method should be overwritten."); } onStatusUpdate(cb: (status: IDispatcherInfo) => void): Promise { throw new Error("Method should be overwritten."); } emitRpcRequest(name: string, request: IRequestTaskMsg): Promise { throw new Error("Method should be overwritten."); } emitRpcResponse(name: string, result: IResponseTaskMsg): Promise { throw new Error("Method should be overwritten."); } onRpcResponse( name: string, cb: (result: IResponseTaskMsg) => void ): Promise { throw new Error("Method should be overwritten."); } offRpcResponse( name: string, cb: (result: IResponseTaskMsg) => void ): Promise { throw new Error("Method should be overwritten."); } onRpcRequest( name: string, cb: (data: IRequestTaskMsg) => void ): Promise { throw new Error("Method should be overwritten."); } offRpcRequest( name: string, cb: (data: IRequestTaskMsg) => void ): Promise { throw new Error("Method should be overwritten."); } emitNewServicesAvailable(services: IAvailableServicesMsg): Promise { throw new Error("Method should be overwritten."); } onNewServicesAvailable( cb: (services: IAvailableServicesMsg) => void ): Promise { throw new Error("Method should be overwritten."); } emitNewObservablesAvailable(topics: IAvailableTopicsMsg): Promise { throw new Error("Method should be overwritten."); } onNewObservablesAvailable( cb: (topics: IAvailableTopicsMsg) => void ): Promise { throw new Error("Method should be overwritten."); } onEvent(event: string, cb: (data: IExternalEventMsg) => void): Promise { throw new Error("Method should be overwritten."); } emitEvent(event: string, data: IExternalEventMsg): Promise { throw new Error("Method should be overwritten."); } offEvent( event: string, cb: (data: IExternalEventMsg) => void ): Promise { throw new Error("Method should be overwritten."); } onBonjour(cb: (msg: IDispatcherInfo) => void): Promise { throw new Error("Method should be overwritten."); } emitBonjour(msg: IDispatcherInfo): Promise { throw new Error("Method should be overwritten."); } onAurevoir(cb: (dispatcher: string) => void): Promise { throw new Error("Method should be overwritten."); } emitAurevoir(dispatcher: string): Promise { throw new Error("Method should be overwritten."); } onTaskCancelation(cb: (msg: ITaskCancelationMsg) => void): Promise { throw new Error("Method should be overwritten."); } emitTaskCancelation(msg: ITaskCancelationMsg): Promise { throw new Error("Method should be overwritten."); } emitNewInstanceGeneratorsAvailable( creators: IAvailableInstanceGeneratorsMsg ): Promise { throw new Error("Method should be overwritten."); } onNewInstanceGeneratorsAvailable( cb: (creators: IAvailableInstanceGeneratorsMsg) => void ): Promise { throw new Error("Method should be overwritten."); } onNewInstancesAvailable( cb: (instances: IAvailableInstancesMsg) => void ): Promise { throw new Error("Method should be overwritten."); } emitNewInstancesAvailable(instances: IAvailableInstancesMsg): Promise { throw new Error("Method should be overwritten."); } onPropertyChange( name: string, cb: (data: IExternalPropertyChangedMsg) => void ): Promise { throw new Error("Method should be overwritten."); } emitPropertyChange( name: string, data: IExternalPropertyChangedMsg ): Promise { throw new Error("Method should be overwritten."); } offPropertyChange( name: string, cb: (data: IExternalPropertyChangedMsg) => void ): Promise { throw new Error("Method should be overwritten."); } }