294 lines
9.4 KiB
TypeScript
294 lines
9.4 KiB
TypeScript
/**
|
|
* @author Martin Karkowski
|
|
* @email m.karkowski@zema.de
|
|
* @create date 2020-11-06 08:52:36
|
|
* @modify date 2020-12-04 17:44:34
|
|
* @desc [description]
|
|
*/
|
|
|
|
import { ILogger } from "js-logger";
|
|
import { NopeObservable } from "../observables/nopeObservable";
|
|
import { ICommunicationInterface } from "../types/nope/nopeCommunication.interface";
|
|
import { INopeObservable } from "../types/nope/nopeObservable.interface";
|
|
|
|
const METHODS: Array<keyof ICommunicationInterface> = [
|
|
"emitAurevoir",
|
|
"emitBonjour",
|
|
"emitEvent",
|
|
"emitNewInstanceGeneratorsAvailable",
|
|
"emitNewInstancesAvailable",
|
|
"emitNewObersvablesAvailable",
|
|
"emitNewServicesAvailable",
|
|
"emitRpcRequest",
|
|
"emitRpcResponse",
|
|
"emitStatusUpdate",
|
|
"emitTaskCancelation",
|
|
"onAurevoir",
|
|
"onBonjour",
|
|
"onNewInstanceGeneratorsAvailable",
|
|
"onNewInstancesAvailable",
|
|
"onNewObservablesAvailable",
|
|
"onNewServicesAvailable",
|
|
"onStatusUpdate",
|
|
"onTaskCancelation",
|
|
];
|
|
|
|
const METHOD_MAPPING: {[P in keyof ICommunicationInterface]: keyof ICommunicationInterface} = {
|
|
"onAurevoir": "emitAurevoir",
|
|
"onBonjour": "emitBonjour",
|
|
"onEvent":"emitEvent",
|
|
"onNewInstanceGeneratorsAvailable": "emitNewInstanceGeneratorsAvailable",
|
|
"onNewInstancesAvailable": "emitNewInstancesAvailable",
|
|
"onNewObservablesAvailable": "emitNewObersvablesAvailable",
|
|
"onNewServicesAvailable":"emitNewServicesAvailable",
|
|
"onRpcRequest":"emitRpcRequest",
|
|
"onRpcResponse": "emitRpcResponse",
|
|
"onTaskCancelation": "emitTaskCancelation",
|
|
"onStatusUpdate":"emitStatusUpdate"
|
|
};
|
|
|
|
const STORING: Array<keyof ICommunicationInterface> = [
|
|
"onAurevoir",
|
|
"onBonjour",
|
|
"onNewInstanceGeneratorsAvailable",
|
|
"onNewInstancesAvailable",
|
|
"onNewObservablesAvailable",
|
|
"onNewServicesAvailable",
|
|
"onTaskCancelation",
|
|
"onStatusUpdate"
|
|
];
|
|
|
|
const SPECIFIC_STORING: Array<keyof ICommunicationInterface> = [
|
|
"onRpcResponse",
|
|
"onRpcRequest",
|
|
"onEvent"
|
|
];
|
|
|
|
const SPECIFIC_REMOVING: Array<keyof ICommunicationInterface> = [
|
|
"offEvent",
|
|
"offRpcRequest",
|
|
"offRpcResponse"
|
|
];
|
|
|
|
METHODS.push(...SPECIFIC_REMOVING, ...SPECIFIC_STORING);
|
|
|
|
/**
|
|
* A Communication Layer for the Dispatchers.
|
|
* Here, only a Events are used.
|
|
*
|
|
* @export
|
|
* @class EventLayer
|
|
* @implements {ICommunicationInterface}
|
|
*/
|
|
//@ts-ignore Ignore the Interface. Its implemented manually
|
|
export class Bridge implements ICommunicationInterface {
|
|
connected: INopeObservable<boolean>;
|
|
|
|
protected _layers: Map<ICommunicationInterface,boolean>;
|
|
|
|
public addLayer(layer: ICommunicationInterface, forward = false) {
|
|
if (!this._layers.has(layer)) {
|
|
this._layers.set(layer,forward);
|
|
|
|
if (this._enableDynamicAdding) {
|
|
|
|
const _this = this;
|
|
|
|
// Play the history:
|
|
for (const method of STORING) {
|
|
for (const cb of this[`_map_${method}`]) {
|
|
layer[method as any](forward ? this._wrapMethodWithoutName(cb,layer,method) : cb);
|
|
}
|
|
}
|
|
|
|
// Create a dictionary, which maps the container
|
|
// to the corresponding Method.
|
|
const dict: {[index:string]: "onRpcResponse" | "onRpcRequest" | "onEvent"} = {
|
|
_onRpcResponse: "onRpcResponse",
|
|
_onRpcRequest: "onRpcRequest",
|
|
_onEvent: "onEvent"
|
|
};
|
|
|
|
// Now create a Loop that performs the adding of
|
|
// adding all subscribed responses, request etc.
|
|
for (const container in dict){
|
|
// 1. Extract the element:
|
|
const map: Map<string,Set<any>> = this[container];
|
|
const method = dict[container];
|
|
// 2. Iterate over the Elements.
|
|
for (const [event, callbacks] of map.entries()){
|
|
// 3. Add all callbacks (Use the Map operator.)
|
|
Array.from(callbacks).map(cb => layer[method](event,forward ? _this._wrapMethod(event,cb,layer,method) : cb));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
public removeLayer(layer: ICommunicationInterface) {
|
|
if (this._layers.has(layer)) {
|
|
// Delete the Layer
|
|
this._layers.delete(layer);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Creates a Bridge
|
|
* @param subscriptionMode
|
|
* @param resultSharing
|
|
* @param _enableDynamicAdding
|
|
* @param _logger
|
|
*/
|
|
constructor(
|
|
public readonly subscriptionMode: "individual" | "generic" = "individual",
|
|
public readonly resultSharing: "individual" | "generic" = "individual",
|
|
protected _enableDynamicAdding = false,
|
|
protected _logger?: ILogger
|
|
) {
|
|
this._layers = new Map();
|
|
this.connected = new NopeObservable();
|
|
this.connected.setContent(false);
|
|
|
|
const _this = this;
|
|
|
|
// Add a custom handler for the connect flag.
|
|
// the Flag is defined as true, if every socket
|
|
// is connected.
|
|
this.connected.getter = ()=> {
|
|
for (const layer of _this._layers.keys()){
|
|
if (!layer.connected.getContent())
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
};
|
|
|
|
// Iterate over the Methods of the Element.
|
|
// Define for every method the corresponding method.
|
|
for (const method of METHODS) {
|
|
// If the Subscription should be stored and dynamic Adding is enabled =>
|
|
// Add the methods.
|
|
if (
|
|
_enableDynamicAdding &&
|
|
STORING.includes(method)
|
|
) {
|
|
this[`_map_${method}`] = new Array<any[]>();
|
|
// Define a Function which stores the Actions:
|
|
this[method as any] = async (cb) => {
|
|
// Store the Call
|
|
_this[`_map_${method}`].push(cb);
|
|
|
|
for (const [_layer,_forward] of _this._layers) {
|
|
_layer[method as any](_forward? _this._wrapMethodWithoutName(cb,_layer,method): cb);
|
|
}
|
|
};
|
|
} else if
|
|
// Else if the method store the events individually =>
|
|
// Creaete a different Method:
|
|
(_enableDynamicAdding &&
|
|
SPECIFIC_STORING.includes(method)) {
|
|
// Determine the Container name.
|
|
const _container = `_${method}`;
|
|
// Define the Container itself
|
|
this[_container] = new Map<string, Set<any>>();
|
|
|
|
// Store the Method.
|
|
this[method as any] = async (name, cb) => {
|
|
// Call the adapted Function
|
|
_this._adaptStore(_container as any,"add", name, cb);
|
|
|
|
// Perform the Action on every available Layer.
|
|
for (const [_layer,_forward] of _this._layers) {
|
|
// Store the Callback. Because it is a "method" which listens on the
|
|
// events => wrap the method, to forward the event to the other layers.
|
|
_layer[method as any](name, _forward ? _this._wrapMethod(name, cb, _layer, method) : cb);
|
|
}
|
|
};
|
|
}else if
|
|
// Else if the method store the events individually =>
|
|
// Creaete a different Method:
|
|
(_enableDynamicAdding &&
|
|
SPECIFIC_REMOVING.includes(method)) {
|
|
// Determine the Container name.
|
|
const _container = method.replace("off","_on");
|
|
// Define the Container itself
|
|
this[_container] = new Map<string, Set<any>>();
|
|
// Store the Method.
|
|
this[method as any] = async (name, cb) => {
|
|
// Call the adapted Function
|
|
_this._adaptStore(_container as any,"delete", name, cb);
|
|
|
|
// Perform the Action on every available Layer.
|
|
for (const [_layer,_forward] of _this._layers) {
|
|
// Store the Callback
|
|
_layer[method as any](name, _this._wrappedMethods.get(cb));
|
|
}
|
|
|
|
// Delete the Wrapped Method:
|
|
_this._wrappedMethods.delete(cb);
|
|
};
|
|
}else {
|
|
// Define a Function which stores the Actions:
|
|
this[method as any] = async (...args) => {
|
|
if (_this._logger) {
|
|
_this._logger.debug(method, ...args);
|
|
}
|
|
|
|
for (const _layer of _this._layers.keys()) {
|
|
_layer[method as any](...args);
|
|
}
|
|
};
|
|
}
|
|
}
|
|
}
|
|
|
|
protected _wrappedMethods = new Map<(data) => any,(data) => any>();
|
|
protected _wrapMethod(name: string, cb: (data) => any, layer: ICommunicationInterface, method: keyof ICommunicationInterface): (data) => any{
|
|
const _this = this;
|
|
const _wrapped = (data) => {
|
|
// _this._logger.info("Forwarding",method,name);
|
|
for (const _layer of _this._layers.keys()){
|
|
if (_layer != layer){
|
|
// Emit the corresponding Event on the different channels.
|
|
_layer[METHOD_MAPPING[method] as any](name, data);
|
|
}
|
|
}
|
|
// Call the Callback!
|
|
cb(data);
|
|
};
|
|
|
|
this._wrappedMethods.set(cb, _wrapped);
|
|
return _wrapped;
|
|
}
|
|
protected _wrapMethodWithoutName(cb: (data) => any, layer: ICommunicationInterface, method: keyof ICommunicationInterface): (data) => any{
|
|
const _this = this;
|
|
return (data) => {
|
|
// _this._logger.info("Forwarding",method);
|
|
for (const _layer of _this._layers.keys()){
|
|
if (_layer != layer){
|
|
// Emit the corresponding Event on the different channels.
|
|
_layer[METHOD_MAPPING[method] as any]( data);
|
|
}
|
|
}
|
|
// Call the Callback!
|
|
cb(data);
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Helper Function that is used to adapt the current active subscriptions
|
|
* @param name name of the data container
|
|
* @param mode the mode (delete or add)
|
|
* @param event the name of the event
|
|
* @param cb the callback
|
|
*/
|
|
protected _adaptStore(name: "_onRpcResponse" | "_onRpcRequest" | "_onEvent", mode: "add" | "delete", event: string, cb: any): void {
|
|
if (this._enableDynamicAdding) {
|
|
const _set01 =
|
|
this[name].get(event) || new Set();
|
|
_set01[mode](cb);
|
|
this[name].set(event, _set01);
|
|
}
|
|
}
|
|
}
|