/** * @author Martin Karkowski * @email m.karkowski@zema.de * @create date 2020-11-06 08:52:36 * @modify date 2020-12-04 17:44:34 * @desc [description] */ import { ILogger } from "js-logger"; import { NopeObservable } from "../observables/nopeObservable"; import { ICommunicationInterface } from "../types/nope/nopeCommunication.interface"; import { INopeObservable } from "../types/nope/nopeObservable.interface"; const METHODS: Array = [ "emitAurevoir", "emitBonjour", "emitEvent", "emitNewInstanceGeneratorsAvailable", "emitNewInstancesAvailable", "emitNewObersvablesAvailable", "emitNewServicesAvailable", "emitRpcRequest", "emitRpcResponse", "emitStatusUpdate", "emitTaskCancelation", "onAurevoir", "onBonjour", "onNewInstanceGeneratorsAvailable", "onNewInstancesAvailable", "onNewObservablesAvailable", "onNewServicesAvailable", "onStatusUpdate", "onTaskCancelation", ]; const METHOD_MAPPING: {[P in keyof ICommunicationInterface]: keyof ICommunicationInterface} = { "onAurevoir": "emitAurevoir", "onBonjour": "emitBonjour", "onEvent":"emitEvent", "onNewInstanceGeneratorsAvailable": "emitNewInstanceGeneratorsAvailable", "onNewInstancesAvailable": "emitNewInstancesAvailable", "onNewObservablesAvailable": "emitNewObersvablesAvailable", "onNewServicesAvailable":"emitNewServicesAvailable", "onRpcRequest":"emitRpcRequest", "onRpcResponse": "emitRpcResponse", "onTaskCancelation": "emitTaskCancelation", "onStatusUpdate":"emitStatusUpdate" }; const STORING: Array = [ "onAurevoir", "onBonjour", "onNewInstanceGeneratorsAvailable", "onNewInstancesAvailable", "onNewObservablesAvailable", "onNewServicesAvailable", "onTaskCancelation", "onStatusUpdate" ]; const SPECIFIC_STORING: Array = [ "onRpcResponse", "onRpcRequest", "onEvent" ]; const SPECIFIC_REMOVING: Array = [ "offEvent", "offRpcRequest", "offRpcResponse" ]; METHODS.push(...SPECIFIC_REMOVING, ...SPECIFIC_STORING); /** * A Communication Layer for the Dispatchers. * Here, only a Events are used. * * @export * @class EventLayer * @implements {ICommunicationInterface} */ //@ts-ignore Ignore the Interface. Its implemented manually export class Bridge implements ICommunicationInterface { connected: INopeObservable; protected _layers: Map; public addLayer(layer: ICommunicationInterface, forward = false) { if (!this._layers.has(layer)) { this._layers.set(layer,forward); if (this._enableDynamicAdding) { const _this = this; // Play the history: for (const method of STORING) { for (const cb of this[`_map_${method}`]) { layer[method as any](forward ? this._wrapMethodWithoutName(cb,layer,method) : cb); } } // Create a dictionary, which maps the container // to the corresponding Method. const dict: {[index:string]: "onRpcResponse" | "onRpcRequest" | "onEvent"} = { _onRpcResponse: "onRpcResponse", _onRpcRequest: "onRpcRequest", _onEvent: "onEvent" }; // Now create a Loop that performs the adding of // adding all subscribed responses, request etc. for (const container in dict){ // 1. Extract the element: const map: Map> = this[container]; const method = dict[container]; // 2. Iterate over the Elements. for (const [event, callbacks] of map.entries()){ // 3. Add all callbacks (Use the Map operator.) Array.from(callbacks).map(cb => layer[method](event,forward ? _this._wrapMethod(event,cb,layer,method) : cb)); } } } } } public removeLayer(layer: ICommunicationInterface) { if (this._layers.has(layer)) { // Delete the Layer this._layers.delete(layer); } } /** * Creates a Bridge * @param subscriptionMode * @param resultSharing * @param _enableDynamicAdding * @param _logger */ constructor( public readonly subscriptionMode: "individual" | "generic" = "individual", public readonly resultSharing: "individual" | "generic" = "individual", protected _enableDynamicAdding = false, protected _logger?: ILogger ) { this._layers = new Map(); this.connected = new NopeObservable(); this.connected.setContent(false); const _this = this; // Add a custom handler for the connect flag. // the Flag is defined as true, if every socket // is connected. this.connected.getter = ()=> { for (const layer of _this._layers.keys()){ if (!layer.connected.getContent()) return false; } return true; }; // Iterate over the Methods of the Element. // Define for every method the corresponding method. for (const method of METHODS) { // If the Subscription should be stored and dynamic Adding is enabled => // Add the methods. if ( _enableDynamicAdding && STORING.includes(method) ) { this[`_map_${method}`] = new Array(); // Define a Function which stores the Actions: this[method as any] = async (cb) => { // Store the Call _this[`_map_${method}`].push(cb); for (const [_layer,_forward] of _this._layers) { _layer[method as any](_forward? _this._wrapMethodWithoutName(cb,_layer,method): cb); } }; } else if // Else if the method store the events individually => // Creaete a different Method: (_enableDynamicAdding && SPECIFIC_STORING.includes(method)) { // Determine the Container name. const _container = `_${method}`; // Define the Container itself this[_container] = new Map>(); // Store the Method. this[method as any] = async (name, cb) => { // Call the adapted Function _this._adaptStore(_container as any,"add", name, cb); // Perform the Action on every available Layer. for (const [_layer,_forward] of _this._layers) { // Store the Callback. Because it is a "method" which listens on the // events => wrap the method, to forward the event to the other layers. _layer[method as any](name, _forward ? _this._wrapMethod(name, cb, _layer, method) : cb); } }; }else if // Else if the method store the events individually => // Creaete a different Method: (_enableDynamicAdding && SPECIFIC_REMOVING.includes(method)) { // Determine the Container name. const _container = method.replace("off","_on"); // Define the Container itself this[_container] = new Map>(); // Store the Method. this[method as any] = async (name, cb) => { // Call the adapted Function _this._adaptStore(_container as any,"delete", name, cb); // Perform the Action on every available Layer. for (const [_layer,_forward] of _this._layers) { // Store the Callback _layer[method as any](name, _this._wrappedMethods.get(cb)); } // Delete the Wrapped Method: _this._wrappedMethods.delete(cb); }; }else { // Define a Function which stores the Actions: this[method as any] = async (...args) => { if (_this._logger) { _this._logger.debug(method, ...args); } for (const _layer of _this._layers.keys()) { _layer[method as any](...args); } }; } } } protected _wrappedMethods = new Map<(data) => any,(data) => any>(); protected _wrapMethod(name: string, cb: (data) => any, layer: ICommunicationInterface, method: keyof ICommunicationInterface): (data) => any{ const _this = this; const _wrapped = (data) => { // _this._logger.info("Forwarding",method,name); for (const _layer of _this._layers.keys()){ if (_layer != layer){ // Emit the corresponding Event on the different channels. _layer[METHOD_MAPPING[method] as any](name, data); } } // Call the Callback! cb(data); }; this._wrappedMethods.set(cb, _wrapped); return _wrapped; } protected _wrapMethodWithoutName(cb: (data) => any, layer: ICommunicationInterface, method: keyof ICommunicationInterface): (data) => any{ const _this = this; return (data) => { // _this._logger.info("Forwarding",method); for (const _layer of _this._layers.keys()){ if (_layer != layer){ // Emit the corresponding Event on the different channels. _layer[METHOD_MAPPING[method] as any]( data); } } // Call the Callback! cb(data); }; } /** * Helper Function that is used to adapt the current active subscriptions * @param name name of the data container * @param mode the mode (delete or add) * @param event the name of the event * @param cb the callback */ protected _adaptStore(name: "_onRpcResponse" | "_onRpcRequest" | "_onEvent", mode: "add" | "delete", event: string, cb: any): void { if (this._enableDynamicAdding) { const _set01 = this[name].get(event) || new Set(); _set01[mode](cb); this[name].set(event, _set01); } } }