/** * @author Martin Karkowski * @email m.karkowski@zema.de * @create date 2020-11-06 08:52:36 * @modify date 2021-03-01 17:17:44 * @desc [description] */ import { ILogger } from "js-logger"; import { NopeObservable } from "../observables/nopeObservable"; import { ICommunicationInterface } from "../types/nope/nopeCommunication.interface"; import { INopeObservable } from "../types/nope/nopeObservable.interface"; const METHODS: Array = [ "emitAurevoir", "emitBonjour", "emitEvent", "emitNewInstanceGeneratorsAvailable", "emitNewInstancesAvailable", "emitNewObersvablesAvailable", "emitNewServicesAvailable", "emitRpcRequest", "emitRpcResponse", "emitStatusUpdate", "emitTaskCancelation", "onAurevoir", "onBonjour", "onNewInstanceGeneratorsAvailable", "onNewInstancesAvailable", "onNewObservablesAvailable", "onNewServicesAvailable", "onStatusUpdate", "onTaskCancelation" ]; const ALLOWED_TO_INTERNALLY_FORWARD: Array = [ "emitNewInstanceGeneratorsAvailable", "emitNewInstancesAvailable", "emitNewObersvablesAvailable", "emitNewServicesAvailable", "emitRpcRequest", "emitRpcResponse", "emitTaskCancelation" ]; const METHOD_MAPPING: { [P in keyof ICommunicationInterface]: keyof ICommunicationInterface; } = { onAurevoir: "emitAurevoir", onBonjour: "emitBonjour", onEvent: "emitEvent", onNewInstanceGeneratorsAvailable: "emitNewInstanceGeneratorsAvailable", onNewInstancesAvailable: "emitNewInstancesAvailable", onNewObservablesAvailable: "emitNewObersvablesAvailable", onNewServicesAvailable: "emitNewServicesAvailable", onRpcRequest: "emitRpcRequest", onRpcResponse: "emitRpcResponse", onTaskCancelation: "emitTaskCancelation", onStatusUpdate: "emitStatusUpdate" }; const STORING: Array = [ "onAurevoir", "onBonjour", "onNewInstanceGeneratorsAvailable", "onNewInstancesAvailable", "onNewObservablesAvailable", "onNewServicesAvailable", "onTaskCancelation", "onStatusUpdate" ]; const SPECIFIC_STORING: Array = [ "onRpcResponse", "onRpcRequest", "onEvent" ]; const SPECIFIC_REMOVING: Array = [ "offEvent", "offRpcRequest", "offRpcResponse" ]; METHODS.push(...SPECIFIC_REMOVING, ...SPECIFIC_STORING); /** * A Communication Layer for the Dispatchers. * Here, only a Events are used. * * @export * @class EventLayer * @implements {ICommunicationInterface} */ //@ts-ignore Ignore the Interface. Its implemented manually export class Bridge implements ICommunicationInterface { public connected: INopeObservable; public considerConnection = true; protected _layers: Map; protected _connectedLayers: Map; public addLayer(layer: ICommunicationInterface, forward = false, internalLayer = false) { if (!this._layers.has(layer)) { this._layers.set(layer, forward); const _this = this; let connected = false; // Subscribe to the flag layer.connected.subscribe((isConnected) => { if (isConnected) { // Store the Layer and set it as connected. _this._connectedLayers.set(layer, forward); } else { // Remove the Layer. _this._connectedLayers.delete(layer); } if (_this._enableDynamicAdding && connected == false && isConnected) { // Play the history: for (const method of STORING) { if (!internalLayer || (internalLayer && ALLOWED_TO_INTERNALLY_FORWARD.includes(method))) { for (const cb of this[`_map_${method}`]) { layer[method as any]( forward ? this._wrapMethodWithoutName(cb, layer, method) : cb ); } } } // Create a dictionary, which maps the container // to the corresponding Method. const dict: { [index: string]: "onRpcResponse" | "onRpcRequest" | "onEvent"; } = { _map_onRpcResponse: "onRpcResponse", _map_onRpcRequest: "onRpcRequest", _map_onEvent: "onEvent" }; // Now create a Loop that performs the adding of // adding all subscribed responses, request etc. for (const container in dict) { // 1. Extract the element: const map: Map> = this[container]; const method = dict[container]; // 2. Iterate over the Elements. for (const [event, callbacks] of map.entries()) { // 3. Add all callbacks (Use the Map operator.) Array.from(callbacks).map((cb) => layer[method]( event, forward ? _this._wrapMethod(event, cb, layer, method) : cb ) ); } } connected = true; } }); } } public removeLayer(layer: ICommunicationInterface) { if (this._layers.has(layer)) { // Delete the Layer this._layers.delete(layer); } if (this._connectedLayers.has(layer)){ // Delete the Layer this._connectedLayers.delete(layer); } } /** * Creates a Bridge * @param subscriptionMode * @param resultSharing * @param _enableDynamicAdding * @param _logger */ constructor( public readonly subscriptionMode: "individual" | "generic" = "individual", public readonly resultSharing: "individual" | "generic" = "individual", protected _enableDynamicAdding = false, protected _logger?: ILogger ) { this._layers = new Map(); this._connectedLayers = new Map(); this.connected = new NopeObservable(); this.connected.setContent(false); const _this = this; // Add a custom handler for the connect flag. // the Flag is defined as true, if every socket // is connected. this.connected.getter = () => { for (const layer of _this._layers.keys()) { if (layer.considerConnection && !layer.connected.getContent()) return false; } return true; }; // Iterate over the Methods of the Element. // Define for every method the corresponding method. for (const method of METHODS) { // If the Subscription should be stored and dynamic Adding is enabled => // Add the methods. if (_enableDynamicAdding && STORING.includes(method)) { this[`_map_${method}`] = new Array(); // Define a Function which stores the Actions: this[method as any] = async (cb) => { // Store the Call _this[`_map_${method}`].push(cb); for (const [_layer, _forward] of _this._connectedLayers) { _layer[method as any]( _forward ? _this._wrapMethodWithoutName(cb, _layer, method) : cb ); } }; } else if ( // Else if the method store the events individually => // Creaete a different Method: _enableDynamicAdding && SPECIFIC_STORING.includes(method) ) { // Determine the Container name. const _container = `_map_${method}`; // Define the Container itself this[_container] = new Map>(); // Store the Method. this[method as any] = async (name, cb) => { // Call the adapted Function _this._adaptStore(_container as any, "add", name, cb); // Perform the Action on every available Layer. for (const [_layer, _forward] of _this._connectedLayers) { // Store the Callback. Because it is a "method" which listens on the // events => wrap the method, to forward the event to the other layers. _layer[method as any]( name, _forward ? _this._wrapMethod(name, cb, _layer, method) : cb ); } }; } else if ( // Else if the method store the events individually => // Creaete a different Method: _enableDynamicAdding && SPECIFIC_REMOVING.includes(method) ) { // Determine the Container name. const _container = method.replace("off", "_on"); // Define the Container itself this[_container] = new Map>(); // Store the Method. this[method as any] = async (name, cb) => { // Call the adapted Function _this._adaptStore(_container as any, "delete", name, cb); // Perform the Action on every available Layer. for (const [_layer, _forward] of _this._connectedLayers) { // Store the Callback _layer[method as any](name, _this._wrappedMethods.get(cb)); } // Delete the Wrapped Method: _this._wrappedMethods.delete(cb); }; } else { // Define a Function which stores the Actions: this[method as any] = async (...args) => { if (_this._logger) { _this._logger.debug(method, ...args); } for (const _layer of _this._connectedLayers.keys()) { _layer[method as any](...args); } }; } } } protected _wrappedMethods = new Map<(data) => any, (data) => any>(); protected _wrapMethod( name: string, cb: (data) => any, layer: ICommunicationInterface, method: keyof ICommunicationInterface ): (data) => any { const _this = this; const _wrapped = (data) => { // Log the Forwarding if (_this._logger && method != "onStatusUpdate"){ _this._logger.debug("Forwarding",method, "=>", METHOD_MAPPING[method]); } for (const _layer of _this._connectedLayers.keys()) { if (_layer != layer) { // Emit the corresponding Event on the different channels. _layer[METHOD_MAPPING[method] as any](name, data); } } // Call the Callback! cb(data); }; this._wrappedMethods.set(cb, _wrapped); return _wrapped; } protected _wrapMethodWithoutName( cb: (data) => any, layer: ICommunicationInterface, method: keyof ICommunicationInterface ): (data) => any { const _this = this; return (data) => { if (_this._logger && method != "onStatusUpdate"){ _this._logger.debug("Forwarding",method, "=>", METHOD_MAPPING[method]); } for (const _layer of _this._connectedLayers.keys()) { if (_layer != layer) { // Emit the corresponding Event on the different channels. _layer[METHOD_MAPPING[method] as any](data); } } // Call the Callback! cb(data); }; } /** * Helper Function that is used to adapt the current active subscriptions * @param name name of the data container * @param mode the mode (delete or add) * @param event the name of the event * @param cb the callback */ protected _adaptStore( name: "_onRpcResponse" | "_onRpcRequest" | "_onEvent", mode: "add" | "delete", event: string, cb: any ): void { if (this._enableDynamicAdding) { const _set01 = this[name].get(event) || new Set(); _set01[mode](cb); this[name].set(event, _set01); } } async dispose(){ // Disconnect all Layers. const promises = Array.from(this._layers.keys()).map(layer => layer.dispose()); await Promise.all(promises); // Dispose the Connection Flag. this.connected.dispose(); } }