diff --git a/lib/communication/mirrorLayer.ts b/lib/communication/mirrorLayer.ts new file mode 100644 index 0000000..95ba4b7 --- /dev/null +++ b/lib/communication/mirrorLayer.ts @@ -0,0 +1,174 @@ +/** + * @author Martin Karkowski + * @email m.karkowski@zema.de + * @create date 2021-03-19 08:26:11 + * @modify date 2021-03-19 08:26:11 + * @desc [description] + */ +import { EventEmitter } from "events"; +import { + ICommunicationInterface, + IEmitter, + IExternalEventMsg, + IRequestTaskMsg, + IResponseTaskMsg +} from "../types/nope/nopeCommunication.interface"; +import { INopeObservable } from "../types/nope/nopeObservable.interface"; +import { BaseLayer } from "./baseLayer"; + +/** + * This Layer, handles a little bit differently as the default layers. Whereas every + * default layer provides a seperate channel for requests, events and responses, + * a mirror aggregates theese layers be into 3 different channels. Thereby, we are not + * able to miss specific events. + * + * @export + * @class MirrorLayer + * @extends {BaseLayer} + * @implements {ICommunicationInterface} + */ +export class MirrorLayer extends BaseLayer implements ICommunicationInterface { + connected: INopeObservable; + considerConnection = true; + allowServiceRedundancy = true; + + protected _emitters = new Map(); + protected _callbacks = new Map(); + + /** + * Enhanced RpcRequest. We are emitting every request on "rpcRequest" on mirrors. + * this enusres, that the messages are forwared. + * + * @param {string} name name of the RPC-Request-Channel + * @param {IRequestTaskMsg} request The Data to provide during the request. + * @return {*} {Promise} + * @memberof BaseBridgedLayer + */ + async emitRpcRequest(name: string, request: IRequestTaskMsg): Promise { + await this._emitter.emit("rpcRequest", { name, data: request }); + } + + /** + * Enhanced RpcResponse. We are emitting every response on "rpcResponse" on mirrors. + * this enusres, that the messages are forwared. + * + * @param {string} name name of the RPC-Response-Channel + * @param {IResponseTaskMsg} result The result to share + * @return {*} {Promise} + * @memberof BaseBridgedLayer + */ + async emitRpcResponse(name: string, result: IResponseTaskMsg): Promise { + await this._emitter.emit("rpcResponse", { name, data: result }); + } + + /** + * Enhanced RpcResponse. We are emitting every response on "rpcResponse" on mirrors. + * this enusres, that the messages are forwared. + * + * @param {string} name + * @param {IExternalEventMsg} data + * @return {*} {Promise} + * @memberof BaseBridgedLayer + */ + async emitEvent(name: string, data: IExternalEventMsg): Promise { + await this._emitter.emit("event", { name, data }); + } + + /** + * Wrapper, which is used to subscribe to the Channel. Therefore we + * create an additional event emitter, which will only forward the + * information, of the aggregated channel, if the event name matches + * + * @protected + * @param {string} name + * @param {(...args) => void} cb + * @param {string} type + * @return {*} {Promise} + * @memberof BaseBridgedLayer + */ + protected async _wrappedOn( + name: string, + cb: (...args) => void, + type: string + ): Promise { + // Create an Emitter if required: + if (!this._emitters.has(name)) { + const emitter = new EventEmitter(); + this._emitters.set(name, emitter); + + // Internally Forward the Event. + await this._emitter.on(type, (data) => { + if (data.name === name) emitter.emit(data.name, data.data); + }); + } + + const emitter = this._emitters.get(name); + + // Adapth the max. amount of listeners + emitter.setMaxListeners(emitter.getMaxListeners() + 1); + + // subscribe. + emitter.on(name, cb); + } + + /** + * Helper function to remove the subscriptions. + * + * @protected + * @param {string} name + * @param {(...args) => void} cb + * @memberof BaseBridgedLayer + */ + protected _wrappedOff(name: string, cb: (...args) => void): void { + // Create an Emitter if required: + if (!this._emitters.has(name)) { + throw Error("that should happen"); + } + + const emitter = this._emitters.get(name); + emitter.off(name, cb); + } + + async onRpcResponse( + name: string, + cb: (result: IResponseTaskMsg) => void + ): Promise { + await this._wrappedOn(name, cb, "rpcResponse"); + } + + async offRpcResponse( + name: string, + cb: (result: IResponseTaskMsg) => void + ): Promise { + this._wrappedOff(name, cb); + } + + async onRpcRequest( + name: string, + cb: (data: IRequestTaskMsg) => void + ): Promise { + // Create an Emitter if required: + await this._wrappedOn(name, cb, "rpcRequest"); + } + + async offRpcRequest( + name: string, + cb: (data: IRequestTaskMsg) => void + ): Promise { + this._wrappedOff(name, cb); + } + + async onEvent( + event: string, + cb: (data: IExternalEventMsg) => void + ): Promise { + await this._wrappedOn(event, cb, "rpcResponse"); + } + + async offEvent( + event: string, + cb: (data: IExternalEventMsg) => void + ): Promise { + this._wrappedOff(event, cb); + } +}