Adding Server

This commit is contained in:
Martin Karkowski 2021-03-22 20:24:57 +01:00
parent bdff1158b3
commit 8ca4e39e6b
3 changed files with 190 additions and 32 deletions

View File

@ -6,8 +6,9 @@
* @desc [description]
*/
import { IoSocketClient } from "../../../lib/communication/IoSocketClient";
import { connect } from "socket.io-client";
import { exportProperty } from "../../../lib/decorators/moduleDecorators";
import { getNopeLogger } from "../../../lib/logger/getLogger";
import { LoggerLevel } from "../../../lib/logger/nopeLogger";
import { InjectableNopeBaseModule } from "../../../lib/module/BaseModule.injectable";
import { NopeObservable } from "../../../lib/observables/nopeObservable";
@ -31,6 +32,7 @@ export class MirrorLinkModule extends InjectableNopeBaseModule {
public connected = new NopeObservable<boolean>();
protected _layer: ICommunicationInterface;
protected _logger = getNopeLogger("MirrorLink");
/**
* Initialize the client
@ -62,32 +64,43 @@ export class MirrorLinkModule extends InjectableNopeBaseModule {
await super.init();
// Define the Layer.
this._layer = (new IoSocketClient(
uri,
"mirror",
level
) as any) as ICommunicationInterface;
this._layer.considerConnection = considerConnection;
// Make shure we use the http before connecting.
const uriToUse = uri.startsWith("http://") ? uri : "http://" + uri;
// Add the layer
this._dispatcher.communicator.addLayer(
this._layer,
true,
considerConnection
);
this.connected.setContent(false);
this._layer.connected.subscribe(() => _this.connected.forcePublish());
this._logger.info("connecting to: " + uri);
this.connected.getter = (value) => {
return _this._layer.connected.getContent();
};
// 1. Create the Socket.
const socket = connect(uriToUse);
this.connected.subscribe((connected) => {
// Every-Time we establish a connection, we say hello.
if (connected) {
_this._dispatcher.emitBonjour();
}
let firstTimeConnectionEstablished = true;
socket.on("connect", (...args) => {
// Assign new ID.
(_this._logger as any).context.name += "_" + socket.id;
// Element is connected
_this._logger.info("connected");
// Mark the Element as Connected
_this.connected.setContent(true);
_this._dispatcher.communicator.addMirror(
socket as any,
firstTimeConnectionEstablished
);
_this._dispatcher.emitBonjour();
firstTimeConnectionEstablished = false;
});
socket.on("disconnect", () => {
// Connection Lost.
_this._logger.error("Connection lost!");
_this.connected.setContent(false);
_this._dispatcher.communicator.removeMirror(socket as any);
});
if (waitForConnection) {

View File

@ -8,10 +8,12 @@
import { hostname } from "os";
import { IPackageDescription } from "../../../lib/types/nope/nopePackage.interface";
import { MirrorModule } from "./bridge.module";
import { MirrorLinkModule } from "./mirror-link.module";
import { MirrorServerModule } from "./mirror-server.module";
const TYPES = {
mirrorLink: Symbol.for("mirrorLink")
mirrorLink: Symbol.for("mirrorLink"),
mirrorServer: Symbol.for("mirrorServer")
};
export const DESCRIPTION: IPackageDescription<typeof TYPES> = {
@ -20,20 +22,38 @@ export const DESCRIPTION: IPackageDescription<typeof TYPES> = {
defaultInstances: [
{
options: {
identifier: hostname() + "-bridge-client",
params: ["io-client", "http://localhost:7000"],
type: MirrorModule.prototype.constructor.name.toString()
identifier: hostname() + "-mirror-client",
params: ["http://localhost:7001"],
type: MirrorLinkModule.prototype.constructor.name.toString()
},
selector: MirrorModule.prototype.constructor.name.toString()
selector: MirrorLinkModule.prototype.constructor.name.toString()
},
{
options: {
identifier: hostname() + "-mirror-server",
params: [7001],
type: MirrorServerModule.prototype.constructor.name.toString()
},
selector: MirrorServerModule.prototype.constructor.name.toString()
}
],
nameOfPackage: "bridgeLayer",
nameOfPackage: "mirrorPackage",
providedClasses: [
{
description: {
name: MirrorModule.prototype.constructor.name.toString(),
name: MirrorLinkModule.prototype.constructor.name.toString(),
selector: TYPES.mirrorLink,
type: MirrorModule
type: MirrorLinkModule
},
settings: {
allowInstanceGeneration: true
}
},
{
description: {
name: MirrorServerModule.prototype.constructor.name.toString(),
selector: TYPES.mirrorServer,
type: MirrorServerModule
},
settings: {
allowInstanceGeneration: true

View File

@ -0,0 +1,125 @@
/**
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2021-03-22 18:06:08
* @modify date 2021-03-22 18:06:08
* @desc [description]
*/
import * as io from "socket.io";
import { exportProperty } from "../../../lib/decorators/moduleDecorators";
import { getNopeLogger } from "../../../lib/logger/getLogger";
import { InjectableNopeBaseModule } from "../../../lib/module/BaseModule.injectable";
import { NopeObservable } from "../../../lib/observables/nopeObservable";
const EVENTS_TO_FORWARD: Set<string> = new Set([
// Default emitters
"aurevoir",
"bonjour",
"NewInstanceGeneratorsAvailable",
"NewInstancesAvailable",
"NewObersvablesAvailable",
"NewServicesAvailable",
"StatusUpdate",
"TaskCancelation",
"event",
"rpcRequest",
"rpcResponse"
]);
export class MirrorServerModule extends InjectableNopeBaseModule {
/**
* Attribute holding a Flag, whether the Element is connected or not.
*
* @memberof MirrorServerModule
*/
@exportProperty({
mode: "publish",
schema: {
description: "An Overview, showing the connected clients",
type: "array",
items: {
description: "The ID of the connection.",
type: "string"
}
},
topic: "clients"
})
public clients = new NopeObservable<string[]>();
protected _logger = getNopeLogger("Mirror-Server");
protected _socket: io.Server;
protected _sockets: Set<io.Socket>;
async init(port = 7001): Promise<void> {
const _this = this;
// Define the Author.
this.author = {
forename: "Martin",
mail: "m.karkowski@zema.de",
surename: "karkowski"
};
this.description = "A Module which will host a Mirror Server.";
this.version = {
date: new Date("22.03.2021"),
version: 1
};
await super.init();
this.clients.setContent([]);
this._logger.info(
"waiting for connection. Listening on port: " + port.toString()
);
this._socket = (io as any)();
this._socket.listen(port);
this._socket.on("connection", (client) => {
_this._logger.info("New Connection established: " + client.id);
// Add the Layer to the Dispatcher.
_this._dispatcher.communicator.addMirror(client, true);
// Now Subscribe the Events and make shure we
// are forwarding the data.
for (const event of EVENTS_TO_FORWARD) {
client.on(event, (data) => {
_this._forward(client, event, data);
});
}
// Subscribe to Loosing connection:
client.on("disconnect", () => {
_this._logger.info("Connection of : " + client.id + " lost.");
_this._sockets.delete(client);
// Manually Remove the Layer
_this._dispatcher.communicator.removeMirror(client);
_this.clients.forcePublish();
});
_this.clients.forcePublish();
});
this._sockets = new Set();
}
protected _id: string;
protected _forward(socket: io.Socket, event: string, data: any) {
for (const socketToForward of this._sockets) {
if (socket !== socketToForward) {
socketToForward.emit(event, data);
}
}
}
public async dispose() {
this._socket.close();
await super.dispose();
}
}