Adding A Mirror Layer
This commit is contained in:
parent
255efdaa48
commit
8ba7fd628c
174
lib/communication/mirrorLayer.ts
Normal file
174
lib/communication/mirrorLayer.ts
Normal file
@ -0,0 +1,174 @@
|
||||
/**
|
||||
* @author Martin Karkowski
|
||||
* @email m.karkowski@zema.de
|
||||
* @create date 2021-03-19 08:26:11
|
||||
* @modify date 2021-03-19 08:26:11
|
||||
* @desc [description]
|
||||
*/
|
||||
import { EventEmitter } from "events";
|
||||
import {
|
||||
ICommunicationInterface,
|
||||
IEmitter,
|
||||
IExternalEventMsg,
|
||||
IRequestTaskMsg,
|
||||
IResponseTaskMsg
|
||||
} from "../types/nope/nopeCommunication.interface";
|
||||
import { INopeObservable } from "../types/nope/nopeObservable.interface";
|
||||
import { BaseLayer } from "./baseLayer";
|
||||
|
||||
/**
|
||||
* This Layer, handles a little bit differently as the default layers. Whereas every
|
||||
* default layer provides a seperate channel for requests, events and responses,
|
||||
* a mirror aggregates theese layers be into 3 different channels. Thereby, we are not
|
||||
* able to miss specific events.
|
||||
*
|
||||
* @export
|
||||
* @class MirrorLayer
|
||||
* @extends {BaseLayer}
|
||||
* @implements {ICommunicationInterface}
|
||||
*/
|
||||
export class MirrorLayer extends BaseLayer implements ICommunicationInterface {
|
||||
connected: INopeObservable<boolean>;
|
||||
considerConnection = true;
|
||||
allowServiceRedundancy = true;
|
||||
|
||||
protected _emitters = new Map<string, IEmitter>();
|
||||
protected _callbacks = new Map();
|
||||
|
||||
/**
|
||||
* Enhanced RpcRequest. We are emitting every request on "rpcRequest" on mirrors.
|
||||
* this enusres, that the messages are forwared.
|
||||
*
|
||||
* @param {string} name name of the RPC-Request-Channel
|
||||
* @param {IRequestTaskMsg} request The Data to provide during the request.
|
||||
* @return {*} {Promise<void>}
|
||||
* @memberof BaseBridgedLayer
|
||||
*/
|
||||
async emitRpcRequest(name: string, request: IRequestTaskMsg): Promise<void> {
|
||||
await this._emitter.emit("rpcRequest", { name, data: request });
|
||||
}
|
||||
|
||||
/**
|
||||
* Enhanced RpcResponse. We are emitting every response on "rpcResponse" on mirrors.
|
||||
* this enusres, that the messages are forwared.
|
||||
*
|
||||
* @param {string} name name of the RPC-Response-Channel
|
||||
* @param {IResponseTaskMsg} result The result to share
|
||||
* @return {*} {Promise<void>}
|
||||
* @memberof BaseBridgedLayer
|
||||
*/
|
||||
async emitRpcResponse(name: string, result: IResponseTaskMsg): Promise<void> {
|
||||
await this._emitter.emit("rpcResponse", { name, data: result });
|
||||
}
|
||||
|
||||
/**
|
||||
* Enhanced RpcResponse. We are emitting every response on "rpcResponse" on mirrors.
|
||||
* this enusres, that the messages are forwared.
|
||||
*
|
||||
* @param {string} name
|
||||
* @param {IExternalEventMsg} data
|
||||
* @return {*} {Promise<void>}
|
||||
* @memberof BaseBridgedLayer
|
||||
*/
|
||||
async emitEvent(name: string, data: IExternalEventMsg): Promise<void> {
|
||||
await this._emitter.emit("event", { name, data });
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper, which is used to subscribe to the Channel. Therefore we
|
||||
* create an additional event emitter, which will only forward the
|
||||
* information, of the aggregated channel, if the event name matches
|
||||
*
|
||||
* @protected
|
||||
* @param {string} name
|
||||
* @param {(...args) => void} cb
|
||||
* @param {string} type
|
||||
* @return {*} {Promise<void>}
|
||||
* @memberof BaseBridgedLayer
|
||||
*/
|
||||
protected async _wrappedOn(
|
||||
name: string,
|
||||
cb: (...args) => void,
|
||||
type: string
|
||||
): Promise<void> {
|
||||
// Create an Emitter if required:
|
||||
if (!this._emitters.has(name)) {
|
||||
const emitter = new EventEmitter();
|
||||
this._emitters.set(name, emitter);
|
||||
|
||||
// Internally Forward the Event.
|
||||
await this._emitter.on(type, (data) => {
|
||||
if (data.name === name) emitter.emit(data.name, data.data);
|
||||
});
|
||||
}
|
||||
|
||||
const emitter = this._emitters.get(name);
|
||||
|
||||
// Adapth the max. amount of listeners
|
||||
emitter.setMaxListeners(emitter.getMaxListeners() + 1);
|
||||
|
||||
// subscribe.
|
||||
emitter.on(name, cb);
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper function to remove the subscriptions.
|
||||
*
|
||||
* @protected
|
||||
* @param {string} name
|
||||
* @param {(...args) => void} cb
|
||||
* @memberof BaseBridgedLayer
|
||||
*/
|
||||
protected _wrappedOff(name: string, cb: (...args) => void): void {
|
||||
// Create an Emitter if required:
|
||||
if (!this._emitters.has(name)) {
|
||||
throw Error("that should happen");
|
||||
}
|
||||
|
||||
const emitter = this._emitters.get(name);
|
||||
emitter.off(name, cb);
|
||||
}
|
||||
|
||||
async onRpcResponse(
|
||||
name: string,
|
||||
cb: (result: IResponseTaskMsg) => void
|
||||
): Promise<void> {
|
||||
await this._wrappedOn(name, cb, "rpcResponse");
|
||||
}
|
||||
|
||||
async offRpcResponse(
|
||||
name: string,
|
||||
cb: (result: IResponseTaskMsg) => void
|
||||
): Promise<void> {
|
||||
this._wrappedOff(name, cb);
|
||||
}
|
||||
|
||||
async onRpcRequest(
|
||||
name: string,
|
||||
cb: (data: IRequestTaskMsg) => void
|
||||
): Promise<void> {
|
||||
// Create an Emitter if required:
|
||||
await this._wrappedOn(name, cb, "rpcRequest");
|
||||
}
|
||||
|
||||
async offRpcRequest(
|
||||
name: string,
|
||||
cb: (data: IRequestTaskMsg) => void
|
||||
): Promise<void> {
|
||||
this._wrappedOff(name, cb);
|
||||
}
|
||||
|
||||
async onEvent(
|
||||
event: string,
|
||||
cb: (data: IExternalEventMsg) => void
|
||||
): Promise<void> {
|
||||
await this._wrappedOn(event, cb, "rpcResponse");
|
||||
}
|
||||
|
||||
async offEvent(
|
||||
event: string,
|
||||
cb: (data: IExternalEventMsg) => void
|
||||
): Promise<void> {
|
||||
this._wrappedOff(event, cb);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user