/** * @author Martin Karkowski * @email m.karkowski@zema.de * @create date 2020-11-06 08:52:36 * @modify date 2021-01-12 08:54:17 * @desc [description] */ import { ILogger } from "js-logger"; import { NopeObservable } from "../observables/nopeObservable"; import { ICommunicationInterface } from "../types/nope/nopeCommunication.interface"; import { INopeObservable } from "../types/nope/nopeObservable.interface"; const METHODS: Array = [ "emitAurevoir", "emitBonjour", "emitEvent", "emitNewInstanceGeneratorsAvailable", "emitNewInstancesAvailable", "emitNewObersvablesAvailable", "emitNewServicesAvailable", "emitRpcRequest", "emitRpcResponse", "emitStatusUpdate", "emitTaskCancelation", "onAurevoir", "onBonjour", "onNewInstanceGeneratorsAvailable", "onNewInstancesAvailable", "onNewObservablesAvailable", "onNewServicesAvailable", "onStatusUpdate", "onTaskCancelation" ]; const METHOD_MAPPING: { [P in keyof ICommunicationInterface]: keyof ICommunicationInterface; } = { onAurevoir: "emitAurevoir", onBonjour: "emitBonjour", onEvent: "emitEvent", onNewInstanceGeneratorsAvailable: "emitNewInstanceGeneratorsAvailable", onNewInstancesAvailable: "emitNewInstancesAvailable", onNewObservablesAvailable: "emitNewObersvablesAvailable", onNewServicesAvailable: "emitNewServicesAvailable", onRpcRequest: "emitRpcRequest", onRpcResponse: "emitRpcResponse", onTaskCancelation: "emitTaskCancelation", onStatusUpdate: "emitStatusUpdate" }; const STORING: Array = [ "onAurevoir", "onBonjour", "onNewInstanceGeneratorsAvailable", "onNewInstancesAvailable", "onNewObservablesAvailable", "onNewServicesAvailable", "onTaskCancelation", "onStatusUpdate" ]; const SPECIFIC_STORING: Array = [ "onRpcResponse", "onRpcRequest", "onEvent" ]; const SPECIFIC_REMOVING: Array = [ "offEvent", "offRpcRequest", "offRpcResponse" ]; METHODS.push(...SPECIFIC_REMOVING, ...SPECIFIC_STORING); /** * A Communication Layer for the Dispatchers. * Here, only a Events are used. * * @export * @class EventLayer * @implements {ICommunicationInterface} */ //@ts-ignore Ignore the Interface. Its implemented manually export class Bridge implements ICommunicationInterface { connected: INopeObservable; protected _layers: Map; public addLayer(layer: ICommunicationInterface, forward = false) { if (!this._layers.has(layer)) { this._layers.set(layer, forward); if (this._enableDynamicAdding) { const _this = this; // Play the history: for (const method of STORING) { for (const cb of this[`_map_${method}`]) { layer[method as any]( forward ? this._wrapMethodWithoutName(cb, layer, method) : cb ); } } // Create a dictionary, which maps the container // to the corresponding Method. const dict: { [index: string]: "onRpcResponse" | "onRpcRequest" | "onEvent"; } = { _onRpcResponse: "onRpcResponse", _onRpcRequest: "onRpcRequest", _onEvent: "onEvent" }; // Now create a Loop that performs the adding of // adding all subscribed responses, request etc. for (const container in dict) { // 1. Extract the element: const map: Map> = this[container]; const method = dict[container]; // 2. Iterate over the Elements. for (const [event, callbacks] of map.entries()) { // 3. Add all callbacks (Use the Map operator.) Array.from(callbacks).map((cb) => layer[method]( event, forward ? _this._wrapMethod(event, cb, layer, method) : cb ) ); } } } } } public removeLayer(layer: ICommunicationInterface) { if (this._layers.has(layer)) { // Delete the Layer this._layers.delete(layer); } } /** * Creates a Bridge * @param subscriptionMode * @param resultSharing * @param _enableDynamicAdding * @param _logger */ constructor( public readonly subscriptionMode: "individual" | "generic" = "individual", public readonly resultSharing: "individual" | "generic" = "individual", protected _enableDynamicAdding = false, protected _logger?: ILogger ) { this._layers = new Map(); this.connected = new NopeObservable(); this.connected.setContent(false); const _this = this; // Add a custom handler for the connect flag. // the Flag is defined as true, if every socket // is connected. this.connected.getter = () => { for (const layer of _this._layers.keys()) { if (!layer.connected.getContent()) return false; } return true; }; // Iterate over the Methods of the Element. // Define for every method the corresponding method. for (const method of METHODS) { // If the Subscription should be stored and dynamic Adding is enabled => // Add the methods. if (_enableDynamicAdding && STORING.includes(method)) { this[`_map_${method}`] = new Array(); // Define a Function which stores the Actions: this[method as any] = async (cb) => { // Store the Call _this[`_map_${method}`].push(cb); for (const [_layer, _forward] of _this._layers) { _layer[method as any]( _forward ? _this._wrapMethodWithoutName(cb, _layer, method) : cb ); } }; } else if ( // Else if the method store the events individually => // Creaete a different Method: _enableDynamicAdding && SPECIFIC_STORING.includes(method) ) { // Determine the Container name. const _container = `_${method}`; // Define the Container itself this[_container] = new Map>(); // Store the Method. this[method as any] = async (name, cb) => { // Call the adapted Function _this._adaptStore(_container as any, "add", name, cb); // Perform the Action on every available Layer. for (const [_layer, _forward] of _this._layers) { // Store the Callback. Because it is a "method" which listens on the // events => wrap the method, to forward the event to the other layers. _layer[method as any]( name, _forward ? _this._wrapMethod(name, cb, _layer, method) : cb ); } }; } else if ( // Else if the method store the events individually => // Creaete a different Method: _enableDynamicAdding && SPECIFIC_REMOVING.includes(method) ) { // Determine the Container name. const _container = method.replace("off", "_on"); // Define the Container itself this[_container] = new Map>(); // Store the Method. this[method as any] = async (name, cb) => { // Call the adapted Function _this._adaptStore(_container as any, "delete", name, cb); // Perform the Action on every available Layer. for (const [_layer, _forward] of _this._layers) { // Store the Callback _layer[method as any](name, _this._wrappedMethods.get(cb)); } // Delete the Wrapped Method: _this._wrappedMethods.delete(cb); }; } else { // Define a Function which stores the Actions: this[method as any] = async (...args) => { if (_this._logger) { _this._logger.debug(method, ...args); } for (const _layer of _this._layers.keys()) { _layer[method as any](...args); } }; } } } protected _wrappedMethods = new Map<(data) => any, (data) => any>(); protected _wrapMethod( name: string, cb: (data) => any, layer: ICommunicationInterface, method: keyof ICommunicationInterface ): (data) => any { const _this = this; const _wrapped = (data) => { // _this._logger.info("Forwarding",method,name); for (const _layer of _this._layers.keys()) { if (_layer != layer) { // Emit the corresponding Event on the different channels. _layer[METHOD_MAPPING[method] as any](name, data); } } // Call the Callback! cb(data); }; this._wrappedMethods.set(cb, _wrapped); return _wrapped; } protected _wrapMethodWithoutName( cb: (data) => any, layer: ICommunicationInterface, method: keyof ICommunicationInterface ): (data) => any { const _this = this; return (data) => { // _this._logger.info("Forwarding",method); for (const _layer of _this._layers.keys()) { if (_layer != layer) { // Emit the corresponding Event on the different channels. _layer[METHOD_MAPPING[method] as any](data); } } // Call the Callback! cb(data); }; } /** * Helper Function that is used to adapt the current active subscriptions * @param name name of the data container * @param mode the mode (delete or add) * @param event the name of the event * @param cb the callback */ protected _adaptStore( name: "_onRpcResponse" | "_onRpcRequest" | "_onEvent", mode: "add" | "delete", event: string, cb: any ): void { if (this._enableDynamicAdding) { const _set01 = this[name].get(event) || new Set(); _set01[mode](cb); this[name].set(event, _set01); } } }