From 255efdaa4833b795a88887a6f7fda0aad3d6d874 Mon Sep 17 00:00:00 2001 From: Martin Karkowski Date: Fri, 19 Mar 2021 14:01:51 +0100 Subject: [PATCH] Trying new approach. --- lib/communication/bridge.ts | 958 +++++++++++------------------------- 1 file changed, 297 insertions(+), 661 deletions(-) diff --git a/lib/communication/bridge.ts b/lib/communication/bridge.ts index 5b129c3..a633a0d 100644 --- a/lib/communication/bridge.ts +++ b/lib/communication/bridge.ts @@ -2,28 +2,39 @@ * @author Martin Karkowski * @email m.karkowski@zema.de * @create date 2020-11-06 08:52:36 - * @modify date 2021-03-01 17:17:44 + * @modify date 2021-03-19 13:59:13 * @desc [description] */ - +import { EventEmitter } from "events"; import { ILogger } from "js-logger"; -import { determineDifference } from "../helpers/setMethods"; +import { generateId } from "../helpers/idMethods"; +import { getNopeLogger } from "../logger/getLogger"; +import { LoggerLevel } from "../logger/nopeLogger"; import { NopeObservable } from "../observables/nopeObservable"; -import { ICommunicationInterface } from "../types/nope/nopeCommunication.interface"; +import { + ICommunicationBridge, + ICommunicationInterface +} from "../types/nope/nopeCommunication.interface"; import { INopeObservable } from "../types/nope/nopeObservable.interface"; -const METHODS: Array = [ +const EMITTING_METHODS: Array = [ "emitAurevoir", "emitBonjour", - "emitEvent", "emitNewInstanceGeneratorsAvailable", "emitNewInstancesAvailable", "emitNewObersvablesAvailable", "emitNewServicesAvailable", - "emitRpcRequest", - "emitRpcResponse", "emitStatusUpdate", - "emitTaskCancelation", + "emitTaskCancelation" +]; + +const EMITTING_SPECIFIC_METHODS: Array = [ + "emitEvent", + "emitRpcRequest", + "emitRpcResponse" +]; + +const ON_METHODS: Array = [ "onAurevoir", "onBonjour", "onNewInstanceGeneratorsAvailable", @@ -34,702 +45,327 @@ const METHODS: Array = [ "onTaskCancelation" ]; -const ALLOWED_TO_INTERNALLY_FORWARD: Array = [ - "emitNewInstanceGeneratorsAvailable", - "emitNewInstancesAvailable", - "emitNewObersvablesAvailable", - "emitNewServicesAvailable", - "emitRpcRequest", - "emitRpcResponse", - "emitTaskCancelation" +const ON_SPECIFIC_METHODS: Array = [ + "onEvent", + "onRpcRequest", + "onRpcResponse" ]; -const METHOD_MAPPING: { - [P in keyof ICommunicationInterface]: keyof ICommunicationInterface; -} = { - onAurevoir: "emitAurevoir", - onBonjour: "emitBonjour", - // onEvent: "emitEvent", - onNewInstanceGeneratorsAvailable: "emitNewInstanceGeneratorsAvailable", - onNewInstancesAvailable: "emitNewInstancesAvailable", - onNewObservablesAvailable: "emitNewObersvablesAvailable", - onNewServicesAvailable: "emitNewServicesAvailable", - // onRpcRequest: "emitRpcRequest", - // onRpcResponse: "emitRpcResponse", - onTaskCancelation: "emitTaskCancelation", - onStatusUpdate: "emitStatusUpdate" +const METHOD_TO_EVENT: { [P in keyof ICommunicationInterface]: string } = { + // Default emitters + emitAurevoir: "aurevoir", + emitBonjour: "bonjour", + emitNewInstanceGeneratorsAvailable: "NewInstanceGeneratorsAvailable", + emitNewInstancesAvailable: "NewInstancesAvailable", + emitNewObersvablesAvailable: "NewObersvablesAvailable", + emitNewServicesAvailable: "NewServicesAvailable", + emitStatusUpdate: "StatusUpdate", + emitTaskCancelation: "TaskCancelation", + + // The default Listeners + onAurevoir: "aurevoir", + onBonjour: "bonjour", + onNewInstanceGeneratorsAvailable: "NewInstanceGeneratorsAvailable", + onNewInstancesAvailable: "NewInstancesAvailable", + onNewObservablesAvailable: "NewObersvablesAvailable", + onNewServicesAvailable: "NewServicesAvailable", + onStatusUpdate: "StatusUpdate", + onTaskCancelation: "TaskCancelation", + + // Specific: + onEvent: "event/", + onRpcRequest: "request/", + onRpcResponse: "response/", + emitEvent: "event/", + emitRpcRequest: "event/", + emitRpcResponse: "response/" }; -const STORING: Array = [ - "onAurevoir", - "onBonjour", - "onNewInstanceGeneratorsAvailable", - "onNewInstancesAvailable", - "onNewObservablesAvailable", - "onNewServicesAvailable", - "onTaskCancelation", - "onStatusUpdate" -]; - -const SPECIFIC_STORING: Array = [ - "onRpcResponse", - "onRpcRequest", - "onEvent" -]; - -const SPECIFIC_REMOVING: Array = [ +const OFF_METHODS: Array = [ "offEvent", "offRpcRequest", "offRpcResponse" ]; -METHODS.push(...SPECIFIC_REMOVING, ...SPECIFIC_STORING); +const MAPPING_METHODS: { + [P in keyof ICommunicationInterface]: keyof ICommunicationInterface; +} = { + onAurevoir: "emitAurevoir", + onBonjour: "emitBonjour", + onEvent: "emitEvent", + onNewInstanceGeneratorsAvailable: "emitNewInstanceGeneratorsAvailable", + onNewInstancesAvailable: "emitNewInstancesAvailable", + onNewObservablesAvailable: "emitNewObersvablesAvailable", + onNewServicesAvailable: "emitNewServicesAvailable", + onRpcRequest: "emitRpcRequest", + onRpcResponse: "emitRpcResponse", + onTaskCancelation: "emitTaskCancelation", + onStatusUpdate: "emitStatusUpdate" +}; + +const METHODS_WITH_NAME: Array = [ + "onRpcResponse", + "onRpcRequest", + "onEvent" +]; -/** - * A Communication Layer for the Dispatchers. - * Here, only a Events are used. - * - * @export - * @class EventLayer - * @implements {ICommunicationInterface} - */ //@ts-ignore Ignore the Interface. Its implemented manually -export class Bridge implements ICommunicationInterface { +export class BridgeV2 implements ICommunicationBridge { public connected: INopeObservable; public considerConnection = true; public allowServiceRedundancy = false; public ownDispatcherId: string; - public readonly id: string; + public id: string; - protected _layers: Map; - protected _connectedLayers: Map; - - // New, Please Add me TO PYTHON - protected _layerDispatcherMapping: Map>; - protected _dispatcherLayerMapping: Map; - protected _servicesLayerMapping: Map; - protected _dispatcherServicesMapping: Map>; - protected _dispatcherSubscribedTopicsMapping: Map>; - protected _callbacks: Map< + protected _logger: ILogger; + protected _internalEmitter: EventEmitter; + protected _layers: Map< string, { - callback: (...args) => any; - type: "onEvent" | "onRpcRequest" | "onRpcResponse"; - item: string; + layer: ICommunicationInterface; + considerConnection: boolean; + forwardData: boolean; } >; - protected _layerToSubscribedCallbacks: Map< - ICommunicationInterface, - Set - >; - public addLayer( - layer: ICommunicationInterface, - forward = false, - internalLayer = false - ): void { - if (!this._layers.has(layer)) { - this._layers.set(layer, forward); + protected _callbacks: Map any>>>; - const _this = this; - let connected = false; - - // Subscribe to the flag - layer.connected.subscribe((isConnected) => { - if (isConnected) { - // Store the Layer and set it as connected. - _this._connectedLayers.set(layer, forward); - } else { - // Remove the Layer. - _this._connectedLayers.delete(layer); - } - - if (_this._enableDynamicAdding && connected == false && isConnected) { - // Play the history: - for (const method of STORING) { - if ( - !internalLayer || - (internalLayer && ALLOWED_TO_INTERNALLY_FORWARD.includes(method)) - ) { - for (const cb of this[`_map_${method}`]) { - layer[method as any]( - forward ? this._wrapMethodWithoutName(cb, layer, method) : cb - ); - } - } - } - - // Create a dictionary, which maps the container - // to the corresponding Method. - const dict: { - [index: string]: "onRpcResponse" | "onRpcRequest" | "onEvent"; - } = { - _map_onRpcResponse: "onRpcResponse", - _map_onRpcRequest: "onRpcRequest", - _map_onEvent: "onEvent" - }; - - // Now create a Loop that performs the adding of - // adding all subscribed responses, request etc. - for (const container in dict) { - // 1. Extract the element: - const map: Map> = this[container]; - const method = dict[container]; - // 2. Iterate over the Elements. - for (const [event, callbacks] of map.entries()) { - // 3. Add all callbacks (Use the Map operator.) - Array.from(callbacks).map((cb) => - layer[method]( - event, - forward ? _this._wrapMethod(event, cb, layer, method) : cb - ) - ); - } - } - connected = true; - } - }); - // Store the Layer in the Dispatcher - this._layerDispatcherMapping.set(layer, new Set()); - this._layerToSubscribedCallbacks.set(layer, new Set()); - - // If we receive a "bonjour" on this layer, we have to make - // shure, that we: - // 1. Store the Dispatcher in the Layer - // 2. We check if this dispatcher is already known or not. (If not, we define it in) - layer.onBonjour((msg) => { - // Add the Dispatcher - _this._layerDispatcherMapping.get(layer).add(msg.id); - - // Add the Dispatcher to the Mapping - if (!_this._dispatcherSubscribedTopicsMapping.has(msg.id)) { - _this._dispatcherSubscribedTopicsMapping.set(msg.id, new Set()); - } - if (!_this._dispatcherServicesMapping.has(msg.id)) { - _this._dispatcherServicesMapping.set(msg.id, new Set()); - } - }); - - layer.onAurevoir((dispatcherId) => { - // Kick the Dispatcher, we dont considere it any more. - _this._layerDispatcherMapping.get(layer).delete(dispatcherId); - }); - - // We want to subscribe to the other elements, if we - // allow forwarding. - if (forward) { - // If new Observables are subscribed => Forward this subscription to the - // Other Layers. - layer.onNewObservablesAvailable((msg) => { - // We received new Events / Properties, time to update the Connection of - // The Layers - _this._updateLayerConnection( - layer, - msg.dispatcher, - msg.subscribed, - "events" - ); - }); - - layer.onNewServicesAvailable((msg) => { - // We received new services, time to update the Connection of - // the layers - _this._updateLayerConnection( - layer, - msg.dispatcher, - msg.services, - "services" - ); - }); - } - } - } - - /** - * Helper Function, which will connect Layers. - * - * @protected - * @param {ICommunicationInterface} layerToForwardTo - * @param {string} dispatcherID - * @param {string[]} content - * @param {("events" | "services")} mode - * @memberof Bridge - */ - protected _updateLayerConnection( - layerToForwardTo: ICommunicationInterface, - dispatcherID: string, - content: string[], - mode: "events" | "services" - ): void { - const oldItems = this._combineLayers(layerToForwardTo, mode); - // Store the Elements, that have been provided before. - switch (mode) { - case "events": - this._dispatcherSubscribedTopicsMapping.set( - dispatcherID, - new Set(content) - ); - break; - case "services": - this._dispatcherServicesMapping.set(dispatcherID, new Set(content)); - break; - } - - // Store the Elements, that now have been provided. - const newItems = this._combineLayers(layerToForwardTo, mode); - - const { added, removed } = determineDifference(oldItems, newItems); - - // Now we have determine the difference, we are able to subscribe - // the new events and remove the connection to the old events. - if (added.size > 0) { - // Iterate over the layers, and check which is allowed to forward - // data to other layers. - for (const [layerToSubscribe, forward] of this._layers.entries()) { - if (layerToSubscribe != layerToForwardTo && forward) { - // If the Layer is allowed to foward data to other layers, - // we add the callbacks to the layer. Based on the "mode" - // (events or services) we have to define, what should be - // linked. - for (const item of removed) { - switch (mode) { - case "events": - this._addCallback( - layerToForwardTo, - layerToSubscribe, - item, - "onEvent" - ); - break; - case "services": - this._addCallback( - layerToForwardTo, - layerToSubscribe, - item, - "onRpcRequest" - ); - this._addCallback( - layerToForwardTo, - layerToSubscribe, - item, - "onRpcResponse" - ); - break; - } - } - } - } - } - if (removed.size > 0) { - // Iterate over the layers, and check which is allowed to forward - // data to other layers. - for (const [_layer, _forward] of this._layers.entries()) { - // If the Layer is allowed to foward data to other layers, - // we add the callbacks to the layer. Based on the "mode" - // (events or services) we have to define, what should be - // linked. - if (_layer != layerToForwardTo && _forward) { - for (const item of removed) { - switch (mode) { - case "events": - this._removeCallback(layerToForwardTo, _layer, item, "onEvent"); - break; - case "services": - this._removeCallback( - layerToForwardTo, - _layer, - item, - "onRpcRequest" - ); - this._removeCallback( - layerToForwardTo, - _layer, - item, - "onRpcResponse" - ); - break; - } - } - } - } - } - } - - /** - * Helper Function, which will add Callbacks. - * - * @protected - * @param {ICommunicationInterface} layerForwarded The Layer, the data will be forwared to - * @param {ICommunicationInterface} layerSubscribed The Layer, the data will be subscribed and forwarded - * @param {string} item the "topic" / "rpc name" - * @param {("onEvent" |"onRpcRequest" | "onRpcResponse")} type The Type which we are handling. - * @memberof Bridge - */ - protected _addCallback( - layerForwarded: ICommunicationInterface, - layerSubscribed: ICommunicationInterface, - item: string, - type: "onEvent" | "onRpcRequest" | "onRpcResponse" - ): void { - const _name = layerSubscribed.id + layerForwarded.id + type + item; - const _mapping = { - onEvent: "emitEvent", - onRpcRequest: "emitRpcRequest", - onRpcResponse: "emitRpcResponse" - }; - - if (!this._callbacks.has(_name)) { - // Define the Callback and Store it. - const callback = (msg) => { - if (this._logger) { - this._logger.debug( - layerSubscribed.id, - "==", - _mapping[type], - "==>", - layerForwarded.id - ); - } - layerForwarded[_mapping[type]](item, msg); - }; - this._callbacks.set(_name, { - callback, - item, - type - }); - - // Register the Callback - layerSubscribed[type](item, callback); - - // Store, that we have subscribed this callback. - if (!this._layerToSubscribedCallbacks.has(layerSubscribed)) { - this._layerToSubscribedCallbacks.set(layerSubscribed, new Set()); - } - this._layerToSubscribedCallbacks.get(layerSubscribed).add(_name); - } else { - throw Error( - "That should not happen. Allready found a subscription for this element" - ); - } - } - - /** - * Function to remove the Subscribtions at a specific layer. - * - * @protected - * @param {ICommunicationInterface} layerForwarded The Layer, the data will be forwared to - * @param {ICommunicationInterface} layerSubscribed The Layer, the data will be subscribed and forwarded - * @param {string} item the "topic" / "rpc name" - * @param {("onEvent" |"onRpcRequest" | "onRpcResponse")} type The Type which we are handling. - * @memberof Bridge - */ - protected _removeCallback( - layerForwarded: ICommunicationInterface, - layerSubscribed: ICommunicationInterface, - item: string, - type: "onEvent" | "onRpcRequest" | "onRpcResponse" - ): void { - const _name = layerSubscribed.id + layerForwarded.id + type + item; - const _mapping = { - onEvent: "offEvent", - onRpcRequest: "offRpcRequest", - onRpcResponse: "offRpcResponse" - }; - - if (this._callbacks.has(_name)) { - // Define the Callback and Store it. - const data = this._callbacks.get(_name); - layerSubscribed[_mapping[data.type]](data.item, data.callback); - - // Remove the stored callback - this._callbacks.delete(_name); - - // Store, that we have subscribed this callback. - if (!this._layerToSubscribedCallbacks.has(layerSubscribed)) { - throw Error( - "That should not happen. Didnet found the Layer in '_layerToSubscribedCallbacks'" - ); - } - this._layerToSubscribedCallbacks.get(layerSubscribed).delete(_name); - } else { - throw Error( - "That should not happen. Didnt found a callback for the connected layers element" - ); - } - } - - /** - * Helperfunction, which will combine all "Services" or "Event/Properties" of a layer and - * returns it as set of string. - * - * @protected - * @param {ICommunicationInterface} layer The layer, which should be used for determining all "Services" and "Events/Properties" - * @param {("_dispatcherServicesMapping" | "_dispatcherSubscribedTopicsMapping")} type - * @return {*} {Set} - * @memberof Bridge - */ - protected _combineLayers( - layer: ICommunicationInterface, - type: "events" | "services" - ): Set { - const mapping = { - events: "_dispatcherSubscribedTopicsMapping", - services: "_dispatcherServicesMapping" - }; - - let combined: string[] = []; - const _this = this; - this._layerDispatcherMapping.get(layer).forEach((dispatcherID) => { - combined = [ - ...combined, - ...(Array.from(_this[mapping[type]].get(dispatcherID)) as string[]) - ]; - }); - return new Set(combined); - } - - /** - * Function to remove a Layer. - * - * @param {ICommunicationInterface} layer - * @memberof Bridge - */ - public removeLayer(layer: ICommunicationInterface): void { - if (this._layers.has(layer)) { - // Delete the Layer - this._layers.delete(layer); - } - if (this._connectedLayers.has(layer)) { - // Delete the Layer - this._connectedLayers.delete(layer); - } - if (this._layerToSubscribedCallbacks.has(layer)) { - // We although have to unsubscribe the events of the Layer - const _mapping = { - onEvent: "offEvent", - onRpcRequest: "offRpcRequest", - onRpcResponse: "offRpcResponse" - }; - - // Now iterate over the Callbacks, we have registered, - for (const callbackId of this._layerToSubscribedCallbacks.get(layer)) { - // Extract the Item. - const item = this._callbacks.get(callbackId); - - // Perform the Action to Remove the Subscription - layer[_mapping[item.type]](item.item, item.callback); - - // Explicitly delete the Callback - this._callbacks.delete(callbackId); - } - - this._layerToSubscribedCallbacks.delete(layer); - } - if (this._layerDispatcherMapping.has(layer)) { - this._layerDispatcherMapping.delete(layer); - } - } - - /** - * Creates a Bridge - * @param _enableDynamicAdding - * @param _logger - */ - constructor( - protected _enableDynamicAdding = false, - protected _logger?: ILogger - ) { - this._layers = new Map(); - this._connectedLayers = new Map(); - - // New, Please Add me TO PYTHON - this._layerDispatcherMapping = new Map(); - this._dispatcherLayerMapping = new Map(); - this._servicesLayerMapping = new Map(); - this._dispatcherServicesMapping = new Map(); - this._dispatcherSubscribedTopicsMapping = new Map(); + constructor(id = generateId(), level: LoggerLevel = "info") { + this._internalEmitter = new EventEmitter(); this._callbacks = new Map(); - this._layerToSubscribedCallbacks = new Map(); + this._layers = new Map(); + this._logger = getNopeLogger("Communication-Layer", level); + this.id = id; + + const _this = this; + + // Now we use the loops to define the functions, + // required by the interface. + + for (const method of ON_METHODS) { + this[method] = (cb) => { + if (_this._logger.enabledFor(_this._logger.DEBUG)) { + _this._logger.debug("calling ", method); + } + _this._on(method, METHOD_TO_EVENT[method], cb); + }; + } + for (const method of ON_SPECIFIC_METHODS) { + this[method] = (event: string, cb) => { + if (_this._logger.enabledFor(_this._logger.DEBUG)) { + _this._logger.debug("calling ", method, "on", event); + } + _this._on(method, METHOD_TO_EVENT[method] + event, cb); + }; + } + for (const method of OFF_METHODS) { + this[method] = (event: string, cb) => { + if (_this._logger.enabledFor(_this._logger.DEBUG)) { + _this._logger.debug("calling ", method, "on", event); + } + _this._off(method, event, cb); + }; + } + for (const method of EMITTING_METHODS) { + this[method] = (...args) => { + if (_this._logger.enabledFor(_this._logger.DEBUG)) { + _this._logger.debug("emitting", method, "on", event); + } + _this._emit(method, METHOD_TO_EVENT[method], ...args); + }; + } + for (const method of EMITTING_SPECIFIC_METHODS) { + this[method] = (event: string, ...args) => { + if (_this._logger.enabledFor(_this._logger.DEBUG)) { + _this._logger.debug("emitting", method, "on", event); + } + _this._emit(method, METHOD_TO_EVENT[method] + event, ...args); + }; + } this.connected = new NopeObservable(); this.connected.setContent(false); - const _this = this; - // Add a custom handler for the connect flag. // the Flag is defined as true, if every socket // is connected. this.connected.getter = () => { - for (const layer of _this._layers.keys()) { - if (layer.considerConnection && !layer.connected.getContent()) + for (const data of _this._layers.values()) { + if (data.considerConnection && !data.layer.connected.getContent()) return false; } return true; }; - - // Iterate over the Methods of the Element. - // Define for every method the corresponding method. - for (const method of METHODS) { - // If the Subscription should be stored and dynamic Adding is enabled => - // Add the methods. - if (_enableDynamicAdding && STORING.includes(method)) { - this[`_map_${method}`] = new Array(); - // Define a Function which stores the Actions: - this[method as any] = async (cb) => { - // Store the Call - _this[`_map_${method}`].push(cb); - - for (const [_layer, _forward] of _this._connectedLayers) { - _layer[method as any]( - _forward ? _this._wrapMethodWithoutName(cb, _layer, method) : cb - ); - } - }; - } else if ( - // Else if the method store the events individually => - // Creaete a different Method: - _enableDynamicAdding && - SPECIFIC_STORING.includes(method) - ) { - // Determine the Container name. - const _container = `_map_${method}`; - // Define the Container itself - this[_container] = new Map>(); - - // Store the Method. - this[method as any] = async (name, cb) => { - // Call the adapted Function - _this._adaptStore(_container as any, "add", name, cb); - - // Perform the Action on every available Layer. - for (const [_layer, _forward] of _this._connectedLayers) { - // Store the Callback. Because it is a "method" which listens on the - // events => wrap the method, to forward the event to the other layers. - _layer[method as any]( - name, - _forward ? _this._wrapMethod(name, cb, _layer, method) : cb - ); - } - }; - } else if ( - // Else if the method store the events individually => - // Creaete a different Method: - _enableDynamicAdding && - SPECIFIC_REMOVING.includes(method) - ) { - // Determine the Container name. - const _container = method.replace("off", "_on"); - // Define the Container itself - this[_container] = new Map>(); - // Store the Method. - this[method as any] = async (name, cb) => { - // Call the adapted Function - _this._adaptStore(_container as any, "delete", name, cb); - - // Perform the Action on every available Layer. - for (const [_layer, _forward] of _this._connectedLayers) { - // Store the Callback - _layer[method as any](name, _this._wrappedMethods.get(cb)); - } - - // Delete the Wrapped Method: - _this._wrappedMethods.delete(cb); - }; - } else { - // Define a Function which stores the Actions: - this[method as any] = async (...args) => { - if (_this._logger && method !== "emitStatusUpdate") { - _this._logger.info(method, ...args); - } - - for (const _layer of _this._connectedLayers.keys()) { - _layer[method as any](...args); - } - }; - } - } - } - - protected _wrappedMethods = new Map<(data) => any, (data) => any>(); - - protected _wrapMethod( - name: string, - cb: (data) => any, - layer: ICommunicationInterface, - method: keyof ICommunicationInterface - ): (data) => any { - const _this = this; - const _wrapped = (data) => { - // Log the Forwarding - if (_this._logger && method != "onStatusUpdate") { - _this._logger.info("Forwarding", method, "=>", METHOD_MAPPING[method]); - } - - for (const _layer of _this._connectedLayers.keys()) { - if (_layer != layer) { - // Emit the corresponding Event on the different channels. - _layer[METHOD_MAPPING[method] as any](name, data); - } - } - // Call the Callback! - cb(data); - }; - - this._wrappedMethods.set(cb, _wrapped); - return _wrapped; - } - - protected _wrapMethodWithoutName( - cb: (data) => any, - layer: ICommunicationInterface, - method: keyof ICommunicationInterface - ): (data) => any { - const _this = this; - return (data) => { - if (_this._logger && method != "onStatusUpdate") { - _this._logger.info("Forwarding", method, "=>", METHOD_MAPPING[method]); - } - - for (const _layer of _this._connectedLayers.keys()) { - if (_layer != layer) { - // Emit the corresponding Event on the different channels. - _layer[METHOD_MAPPING[method] as any](data); - } - } - // Call the Callback! - cb(data); - }; } /** - * Helper Function that is used to adapt the current active subscriptions - * @param name name of the data container - * @param mode the mode (delete or add) - * @param event the name of the event - * @param cb the callback + * Helper Function, which will forward an Event. + * + * @protected + * @param {ICommunicationInterface} layerToExclude Layer which should not be handled. + * @param {keyof ICommunicationInterface} method The method. + * @param {...any[]} args args, used during forwarding. + * @memberof BridgeV2 */ - protected _adaptStore( - name: "_onRpcResponse" | "_onRpcRequest" | "_onEvent", - mode: "add" | "delete", - event: string, - cb: any + protected _forward( + layerToExclude: ICommunicationInterface, + method: keyof ICommunicationInterface, + ...args: any[] ): void { - if (this._enableDynamicAdding) { - const _set01 = this[name].get(event) || new Set(); - _set01[mode](cb); - this[name].set(event, _set01); + // Iterate over the layers and forward the Data to the other Layers + for (const data of this._layers.values()) { + if (data.layer !== layerToExclude) { + data.layer[method as any](...args); + } } } - async dispose() { - // Disconnect all Layers. - const promises = Array.from(this._layers.keys()).map((layer) => - layer.dispose() - ); - await Promise.all(promises); + /** + * Helper Function, which will internally subscribe to the Events of the Layer. + * + * @protected + * @param {ICommunicationInterface} layer The Layer to consinder, on this layer, we will subscribe to the events + * @param {keyof ICommunicationInterface} method The method used for subscription + * @param {boolean} forwardData Flag, showing whether data will be forwarded or not. + * @memberof BridgeV2 + */ + protected _subscribeToCallback( + layer: ICommunicationInterface, + method: keyof ICommunicationInterface, + forwardData: boolean + ): void { + const _this = this; - // Dispose the Connection Flag. - this.connected.dispose(); + if (METHODS_WITH_NAME.includes(method)) { + layer[method as any](event, (...args) => { + // Define the Method to Forward data. + _this._internalEmitter[MAPPING_METHODS[method]](event, ...args); + + // Now we are able to iterate over the Methods and forward the content + // but only if the Layer forwards the content + if (forwardData) { + _this._forward(layer, method, event, ...args); + } + }); + } else { + // Subscribe to the Event. + layer[method as any]((...args) => { + // Define the Method to Forward data. + _this._internalEmitter[MAPPING_METHODS[method]](...args); + + // Now we are able to iterate over the Methods and forward the content + // but only if the Layer forwards the content + if (forwardData) { + _this._forward(layer, method, ...args); + } + }); + } + } + + protected _on( + method: keyof ICommunicationInterface, + event: string, + cb + ): void { + // Subscribe on the Event + this._internalEmitter.setMaxListeners( + this._internalEmitter.getMaxListeners() + 1 + ); + this._internalEmitter.on(event, cb); + + // Add the callbacks. + if (!this._callbacks.has(method)) { + this._callbacks.set(method, new Map()); + } else if (!this._callbacks.get(method).has(event)) { + this._callbacks.get(method).set(event, [cb]); + } else { + this._callbacks.get(method).get(event).push(cb); + } + // Iterate over the Layers and on the connected Layers, + // subscribe the methods. + for (const data of this._layers.values()) { + if (data.layer.connected.getContent()) { + this._subscribeToCallback(data.layer, method, data.forwardData); + } + } + } + + protected _off( + method: keyof ICommunicationInterface, + event: string, + cb + ): void { + this._internalEmitter.on(event, cb); + + if ( + !this._callbacks.has(method) || + !this._callbacks.get(method).has(event) + ) { + throw Error("That shouldn't happen"); + } else { + // Extract the Array + this._callbacks + .get(method) + .get(event) + .splice(this._callbacks.get(method).get(event).indexOf(cb), 1); + } + + // Theoretically we must remove the subscriptions. + // But we wont do this, because if the connection has + // been lost, or the Layer has been removed, the stored + // callbacks wont be called any more. + } + + protected _emit( + method: keyof ICommunicationInterface, + event: string, + ...args + ): void { + // Emit the Event on the internal Layer. + this._internalEmitter.emit(event, ...args); + + // Iterate over the Layers. + for (const data of this._layers.values()) { + // If the Layer has been conneced + if (data.layer.connected.getContent()) { + data.layer[method as any](event, ...args); + } + } + } + + public async addLayer( + layer: ICommunicationInterface, + forwardData = false, + considerConnection = false + ): Promise { + if (!this._layers.has(layer.id)) { + // Store the Layers: + this._layers.set(layer.id, { + layer, + considerConnection, + forwardData + }); + + // Forward the Events of the Layer + // being connected to our aggregated + // state + const _this = this; + layer.connected.subscribe(() => { + _this.connected.forcePublish(); + }); + + // Wait until the Layer is connected. + await layer.connected.waitFor((value) => value); + + // Register all know methods + for (const [method, map] of this._callbacks.entries()) { + for (const [event, cbs] of map.entries()) { + for (const cb of cbs) { + layer[method](event, cb); + } + } + } + } } }