Moving Layers

This commit is contained in:
Martin Karkowski 2021-03-22 20:24:15 +01:00
parent 3742062684
commit 612bbcadb8
8 changed files with 128 additions and 285 deletions

View File

@ -1,80 +0,0 @@
/**
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2021-03-01 15:05:09
* @modify date 2021-03-01 17:42:12
* @desc [description]
*/
import { EventEmitter } from "events";
import { getNopeLogger } from "../logger/getLogger";
import { ICommunicationInterface, IExternalEventMsg, IRequestTaskMsg, IResponseTaskMsg } from "../types/nope/nopeCommunication.interface";
import { EventLayer, IEmitter } from "./eventLayer";
const METHODS: Array<keyof ICommunicationInterface> = [
"onAurevoir",
"onBonjour",
"onNewInstanceGeneratorsAvailable",
"onNewInstancesAvailable",
"onNewObservablesAvailable",
"onNewServicesAvailable",
"onTaskCancelation",
];
export class DebuggedEventLayer extends EventLayer {
constructor(
_emitter: IEmitter = new EventEmitter(),
_logger = getNopeLogger("debug-layer", "info")
) {
super(_emitter,_logger);
const _this = this;
// We Subscribe to the defined methods, we want to listen.
for (const method of METHODS){
this[method as any]((data) => {
if (_this._logger){
_this._logger.debug(method,data);
}
});
}
}
async onEvent(
event: string,
cb: (data: IExternalEventMsg) => void
): Promise<void> {
this._on("event_" + event, cb);
if (this._logger){
const _this = this;
this._on("event_" + event, (data) => {
_this._logger.debug("received event on", event, data);
});
}
}
async onRpcRequest(
name: string,
cb: (data: IRequestTaskMsg) => void
): Promise<void> {
this._on(name, cb);
if (this._logger){
const _this = this;
this._on(name, (data) => {
_this._logger.debug("received rpc request",name, data);
});
}
}
async onRpcResponse(
name: string,
cb: (result: IResponseTaskMsg) => void
): Promise<void> {
this._on(name, cb);
if (this._logger){
const _this = this;
this._on(name, (data) => {
_this._logger.debug("received rpc response",name, data);
});
}
}
}

View File

@ -8,12 +8,11 @@
import * as io from "socket.io";
import { connect } from "socket.io-client";
import { generateId } from "../helpers/idMethods";
import { LoggerLevel } from "../logger/nopeLogger";
import { ICommunicationInterface } from "../types/nope/nopeCommunication.interface";
import { Bridge } from "./bridge";
import { generateId } from "../../helpers/idMethods";
import { LoggerLevel } from "../../logger/nopeLogger";
import { ICommunicationInterface } from "../../types/nope/nopeCommunication.interface";
import { Bridge } from "../bridge";
import { EventLayer } from "./eventLayer";
import { MirrorLayer } from "./mirrorLayer";
/**
* Communication Layer using IO-Sockets.
@ -39,11 +38,7 @@ export class IoSocketClient extends Bridge implements ICommunicationInterface {
* @param {LoggerLevel} [level="info"] Logger level
* @memberof IoSocketClient
*/
constructor(
public uri: string,
mode: "mirror" | "default" = "default",
level: LoggerLevel = "info"
) {
constructor(public uri: string, level: LoggerLevel = "info") {
super(generateId(), "io-socket-client " + uri, level);
// Make shure we use the http before connecting.
@ -59,10 +54,7 @@ export class IoSocketClient extends Bridge implements ICommunicationInterface {
// 1. Create the Socket.
const socket = connect(uri);
// 2. Create the Layer. Based on the Mode it will be either an Mirror or Default Layer.
const layer =
mode === "default"
? new EventLayer(socket as any, this._logger)
: new MirrorLayer(socket as any, this._logger);
const layer = new EventLayer(socket as any, this._logger);
// 3. Add the Layer
this.addLayer(layer, true, true);

View File

@ -8,12 +8,11 @@
import { Server } from "http";
import * as io from "socket.io";
import { generateId } from "../helpers/idMethods";
import { getNopeLogger } from "../logger/getLogger";
import { LoggerLevel } from "../logger/nopeLogger";
import { Bridge } from "./bridge";
import { generateId } from "../../helpers/idMethods";
import { getNopeLogger } from "../../logger/getLogger";
import { LoggerLevel } from "../../logger/nopeLogger";
import { Bridge } from "../bridge";
import { EventLayer } from "./eventLayer";
import { MirrorLayer } from "./mirrorLayer";
/**
* Communication Layer using IO-Sockets.
@ -36,7 +35,6 @@ export class IoSocketServer extends Bridge {
*/
constructor(
public port: number,
mode: "mirror" | "default" = "default",
level: LoggerLevel = "info",
server?: Server
) {
@ -67,10 +65,7 @@ export class IoSocketServer extends Bridge {
const logger = getNopeLogger("io-client-" + client.id, "info");
// 2. Create the Layer. Based on the Mode it will be either an Mirror or Default Layer.
const layer =
mode === "default"
? new EventLayer(client as any, logger)
: new MirrorLayer(client as any, logger);
const layer = new EventLayer(client as any, logger);
// 3. Add the Layer
this.addLayer(layer, true);

View File

@ -7,8 +7,8 @@
*/
import * as amqp from "amqplib";
import { getNopeLogger } from "../logger/getLogger";
import { NopeObservable } from "../observables/nopeObservable";
import { getNopeLogger } from "../../logger/getLogger";
import { NopeObservable } from "../../observables/nopeObservable";
import {
IAvailableInstanceGeneratorsMsg,
IAvailableInstancesMsg,
@ -19,9 +19,9 @@ import {
IRequestTaskMsg,
IResponseTaskMsg,
ITaskCancelationMsg
} from "../types/nope/nopeCommunication.interface";
import { IDispatcherInfo } from "../types/nope/nopeDispatcher.interface";
import { INopeObservable } from "../types/nope/nopeObservable.interface";
} from "../../types/nope/nopeCommunication.interface";
import { IDispatcherInfo } from "../../types/nope/nopeDispatcher.interface";
import { INopeObservable } from "../../types/nope/nopeObservable.interface";
export type QueuePublishOptions = {
subscribeOptions?: amqp.Options.AssertQueue;
@ -396,7 +396,9 @@ export class AmqpInterface {
export class AmqpLayer
extends AmqpInterface
implements ICommunicationInterface {
allowServiceRedundancy: boolean;
id: string;
considerConnection = true;
async emitStatusUpdate(status: IDispatcherInfo): Promise<void> {
await this.emit("publish", "statusUdpate", status);
@ -580,7 +582,7 @@ export class AmqpLayer
return {};
};
public async dispose(){
public async dispose() {
// Dispose the Connection Flag.
this.connected.dispose();
await this._socket.close();

View File

@ -8,8 +8,8 @@
import { EventEmitter } from "events";
import { ILogger } from "js-logger";
import { generateId } from "../helpers/idMethods";
import { NopeObservable } from "../observables/nopeObservable";
import { generateId } from "../../helpers/idMethods";
import { NopeObservable } from "../../observables/nopeObservable";
import {
IAvailableInstanceGeneratorsMsg,
IAvailableInstancesMsg,
@ -21,9 +21,9 @@ import {
IRequestTaskMsg,
IResponseTaskMsg,
ITaskCancelationMsg
} from "../types/nope/nopeCommunication.interface";
import { IDispatcherInfo } from "../types/nope/nopeDispatcher.interface";
import { INopeObservable } from "../types/nope/nopeObservable.interface";
} from "../../types/nope/nopeCommunication.interface";
import { IDispatcherInfo } from "../../types/nope/nopeDispatcher.interface";
import { INopeObservable } from "../../types/nope/nopeObservable.interface";
/**
* A Communication Layer for the Dispatchers.

View File

@ -1,169 +0,0 @@
/**
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2021-03-19 08:26:11
* @modify date 2021-03-19 08:26:11
* @desc [description]
*/
import { EventEmitter } from "events";
import {
ICommunicationInterface,
IEmitter,
IExternalEventMsg,
IRequestTaskMsg,
IResponseTaskMsg
} from "../types/nope/nopeCommunication.interface";
import { EventLayer } from "./eventLayer";
/**
* This Layer, handles a little bit differently as the default layers. Whereas every
* default layer provides a seperate channel for requests, events and responses,
* a mirror aggregates theese layers be into 3 different channels. Thereby, we are not
* able to miss specific events.
*
* @export
* @class MirrorLayer
* @extends {BaseLayer}
* @implements {ICommunicationInterface}
*/
export class MirrorLayer extends EventLayer implements ICommunicationInterface {
protected _emitters = new Map<string, IEmitter>();
protected _callbacks = new Map();
/**
* Enhanced RpcRequest. We are emitting every request on "rpcRequest" on mirrors.
* this enusres, that the messages are forwared.
*
* @param {string} name name of the RPC-Request-Channel
* @param {IRequestTaskMsg} request The Data to provide during the request.
* @return {*} {Promise<void>}
* @memberof BaseBridgedLayer
*/
async emitRpcRequest(name: string, request: IRequestTaskMsg): Promise<void> {
await this._emitter.emit("rpcRequest", { name, data: request });
}
/**
* Enhanced RpcResponse. We are emitting every response on "rpcResponse" on mirrors.
* this enusres, that the messages are forwared.
*
* @param {string} name name of the RPC-Response-Channel
* @param {IResponseTaskMsg} result The result to share
* @return {*} {Promise<void>}
* @memberof BaseBridgedLayer
*/
async emitRpcResponse(name: string, result: IResponseTaskMsg): Promise<void> {
await this._emitter.emit("rpcResponse", { name, data: result });
}
/**
* Enhanced RpcResponse. We are emitting every response on "rpcResponse" on mirrors.
* this enusres, that the messages are forwared.
*
* @param {string} name
* @param {IExternalEventMsg} data
* @return {*} {Promise<void>}
* @memberof BaseBridgedLayer
*/
async emitEvent(name: string, data: IExternalEventMsg): Promise<void> {
await this._emitter.emit("event", { name, data });
}
/**
* Wrapper, which is used to subscribe to the Channel. Therefore we
* create an additional event emitter, which will only forward the
* information, of the aggregated channel, if the event name matches
*
* @protected
* @param {string} name
* @param {(...args) => void} cb
* @param {string} type
* @return {*} {Promise<void>}
* @memberof BaseBridgedLayer
*/
protected async _wrappedOn(
name: string,
cb: (...args) => void,
type: string
): Promise<void> {
// Create an Emitter if required:
if (!this._emitters.has(name)) {
const emitter = new EventEmitter();
this._emitters.set(name, emitter);
// Internally Forward the Event.
await this._emitter.on(type, (data) => {
if (data.name === name) emitter.emit(data.name, data.data);
});
}
const emitter = this._emitters.get(name);
// Adapth the max. amount of listeners
emitter.setMaxListeners(emitter.getMaxListeners() + 1);
// subscribe.
emitter.on(name, cb);
}
/**
* Helper function to remove the subscriptions.
*
* @protected
* @param {string} name
* @param {(...args) => void} cb
* @memberof BaseBridgedLayer
*/
protected _wrappedOff(name: string, cb: (...args) => void): void {
// Create an Emitter if required:
if (!this._emitters.has(name)) {
throw Error("that should happen");
}
const emitter = this._emitters.get(name);
emitter.off(name, cb);
}
async onRpcResponse(
name: string,
cb: (result: IResponseTaskMsg) => void
): Promise<void> {
await this._wrappedOn(name, cb, "rpcResponse");
}
async offRpcResponse(
name: string,
cb: (result: IResponseTaskMsg) => void
): Promise<void> {
this._wrappedOff(name, cb);
}
async onRpcRequest(
name: string,
cb: (data: IRequestTaskMsg) => void
): Promise<void> {
// Create an Emitter if required:
await this._wrappedOn(name, cb, "rpcRequest");
}
async offRpcRequest(
name: string,
cb: (data: IRequestTaskMsg) => void
): Promise<void> {
this._wrappedOff(name, cb);
}
async onEvent(
event: string,
cb: (data: IExternalEventMsg) => void
): Promise<void> {
await this._wrappedOn(event, cb, "rpcResponse");
}
async offEvent(
event: string,
cb: (data: IExternalEventMsg) => void
): Promise<void> {
this._wrappedOff(event, cb);
}
}

View File

@ -0,0 +1,39 @@
/**
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2021-03-22 19:03:15
* @modify date 2021-03-22 19:03:15
* @desc [description]
*/
import EventEmitter from "events";
import { ILogger } from "js-logger";
import { NopeObservable } from "../../../lib/observables/nopeObservable";
import {
ICommunicationMirror,
IEmitter,
ValidEventTypesOfMirror
} from "../../types/nope/nopeCommunication.interface";
import { INopeObservable } from "../../types/nope/nopeObservable.interface";
export class EventMirror implements ICommunicationMirror {
on(
event: symbol | ValidEventTypesOfMirror,
listener: (...args: any[]) => void
): void {
this._emitter.on(event, listener);
}
emit(event: ValidEventTypesOfMirror, data: any): void {
this._emitter.emit(event, data);
}
connected: INopeObservable<boolean>;
constructor(
protected _emitter: IEmitter = new EventEmitter() as any,
protected _logger?: ILogger
) {
this.connected = new NopeObservable<boolean>();
}
}

View File

@ -0,0 +1,64 @@
/**
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2021-03-22 19:03:15
* @modify date 2021-03-22 19:12:16
* @desc [description]
*/
import * as io from "socket.io";
import { connect } from "socket.io-client";
import { getNopeLogger } from "../../logger/getLogger";
import { LoggerLevel } from "../../logger/nopeLogger";
import { EventMirror } from "./eventMirror";
/**
* Communication Layer using IO-Sockets.
*
* @export
* @class IoSocketServer
* @implements {ICommunicationInterface}
*/
export class IoSocketClient extends EventMirror {
protected _socket: io.Server;
protected _sockets: Set<io.Socket>;
/**
* Creates an instance of IoSocketServer.
* @param {number} port The Port, on which the Server should be hosted
* @param {Server} [server] A Server shich should be used. (Otherwise a Server is created)
* @memberof IoSocketServer
*/
/**
* Creates an instance of IoSocketClient.
* @param {string} uri Uri of the Server.
* @param {LoggerLevel} [level="info"] Logger level
* @memberof IoSocketClient
*/
constructor(public uri: string, level: LoggerLevel = "info") {
super(
connect(uri.startsWith("http://") ? uri : "http://" + uri) as any,
getNopeLogger("io-mirror-client", level)
);
// Make shure we use the http before connecting.
this.uri = this.uri.startsWith("http://") ? this.uri : "http://" + this.uri;
this.connected.setContent(false);
this._logger.info("connecting to: " + uri);
const _this = this;
this._emitter.on("connect", (...args) => {
// Element is connected
_this._logger.info("connected");
_this.connected.setContent(true);
});
this._emitter.on("disconnect", () => {
// Connection Lost.
_this._logger.error("Connection lost!");
_this.connected.setContent(false);
});
}
}