diff --git a/modules/mirror/src/mirror-link.module.ts b/modules/mirror/src/mirror-link.module.ts index 3692c37..03f3350 100644 --- a/modules/mirror/src/mirror-link.module.ts +++ b/modules/mirror/src/mirror-link.module.ts @@ -6,8 +6,9 @@ * @desc [description] */ -import { IoSocketClient } from "../../../lib/communication/IoSocketClient"; +import { connect } from "socket.io-client"; import { exportProperty } from "../../../lib/decorators/moduleDecorators"; +import { getNopeLogger } from "../../../lib/logger/getLogger"; import { LoggerLevel } from "../../../lib/logger/nopeLogger"; import { InjectableNopeBaseModule } from "../../../lib/module/BaseModule.injectable"; import { NopeObservable } from "../../../lib/observables/nopeObservable"; @@ -31,6 +32,7 @@ export class MirrorLinkModule extends InjectableNopeBaseModule { public connected = new NopeObservable(); protected _layer: ICommunicationInterface; + protected _logger = getNopeLogger("MirrorLink"); /** * Initialize the client @@ -62,32 +64,43 @@ export class MirrorLinkModule extends InjectableNopeBaseModule { await super.init(); - // Define the Layer. - this._layer = (new IoSocketClient( - uri, - "mirror", - level - ) as any) as ICommunicationInterface; - this._layer.considerConnection = considerConnection; + // Make shure we use the http before connecting. + const uriToUse = uri.startsWith("http://") ? uri : "http://" + uri; - // Add the layer - this._dispatcher.communicator.addLayer( - this._layer, - true, - considerConnection - ); + this.connected.setContent(false); - this._layer.connected.subscribe(() => _this.connected.forcePublish()); + this._logger.info("connecting to: " + uri); - this.connected.getter = (value) => { - return _this._layer.connected.getContent(); - }; + // 1. Create the Socket. + const socket = connect(uriToUse); - this.connected.subscribe((connected) => { - // Every-Time we establish a connection, we say hello. - if (connected) { - _this._dispatcher.emitBonjour(); - } + let firstTimeConnectionEstablished = true; + + socket.on("connect", (...args) => { + // Assign new ID. + (_this._logger as any).context.name += "_" + socket.id; + + // Element is connected + _this._logger.info("connected"); + + // Mark the Element as Connected + _this.connected.setContent(true); + + _this._dispatcher.communicator.addMirror( + socket as any, + firstTimeConnectionEstablished + ); + _this._dispatcher.emitBonjour(); + + firstTimeConnectionEstablished = false; + }); + + socket.on("disconnect", () => { + // Connection Lost. + _this._logger.error("Connection lost!"); + _this.connected.setContent(false); + + _this._dispatcher.communicator.removeMirror(socket as any); }); if (waitForConnection) { diff --git a/modules/mirror/src/mirror-link.package.ts b/modules/mirror/src/mirror-link.package.ts index 1a7aaaf..ed9f1a0 100644 --- a/modules/mirror/src/mirror-link.package.ts +++ b/modules/mirror/src/mirror-link.package.ts @@ -8,10 +8,12 @@ import { hostname } from "os"; import { IPackageDescription } from "../../../lib/types/nope/nopePackage.interface"; -import { MirrorModule } from "./bridge.module"; +import { MirrorLinkModule } from "./mirror-link.module"; +import { MirrorServerModule } from "./mirror-server.module"; const TYPES = { - mirrorLink: Symbol.for("mirrorLink") + mirrorLink: Symbol.for("mirrorLink"), + mirrorServer: Symbol.for("mirrorServer") }; export const DESCRIPTION: IPackageDescription = { @@ -20,20 +22,38 @@ export const DESCRIPTION: IPackageDescription = { defaultInstances: [ { options: { - identifier: hostname() + "-bridge-client", - params: ["io-client", "http://localhost:7000"], - type: MirrorModule.prototype.constructor.name.toString() + identifier: hostname() + "-mirror-client", + params: ["http://localhost:7001"], + type: MirrorLinkModule.prototype.constructor.name.toString() }, - selector: MirrorModule.prototype.constructor.name.toString() + selector: MirrorLinkModule.prototype.constructor.name.toString() + }, + { + options: { + identifier: hostname() + "-mirror-server", + params: [7001], + type: MirrorServerModule.prototype.constructor.name.toString() + }, + selector: MirrorServerModule.prototype.constructor.name.toString() } ], - nameOfPackage: "bridgeLayer", + nameOfPackage: "mirrorPackage", providedClasses: [ { description: { - name: MirrorModule.prototype.constructor.name.toString(), + name: MirrorLinkModule.prototype.constructor.name.toString(), selector: TYPES.mirrorLink, - type: MirrorModule + type: MirrorLinkModule + }, + settings: { + allowInstanceGeneration: true + } + }, + { + description: { + name: MirrorServerModule.prototype.constructor.name.toString(), + selector: TYPES.mirrorServer, + type: MirrorServerModule }, settings: { allowInstanceGeneration: true diff --git a/modules/mirror/src/mirror-server.module.ts b/modules/mirror/src/mirror-server.module.ts new file mode 100644 index 0000000..ce01e2c --- /dev/null +++ b/modules/mirror/src/mirror-server.module.ts @@ -0,0 +1,125 @@ +/** + * @author Martin Karkowski + * @email m.karkowski@zema.de + * @create date 2021-03-22 18:06:08 + * @modify date 2021-03-22 18:06:08 + * @desc [description] + */ +import * as io from "socket.io"; +import { exportProperty } from "../../../lib/decorators/moduleDecorators"; +import { getNopeLogger } from "../../../lib/logger/getLogger"; +import { InjectableNopeBaseModule } from "../../../lib/module/BaseModule.injectable"; +import { NopeObservable } from "../../../lib/observables/nopeObservable"; + +const EVENTS_TO_FORWARD: Set = new Set([ + // Default emitters + "aurevoir", + "bonjour", + "NewInstanceGeneratorsAvailable", + "NewInstancesAvailable", + "NewObersvablesAvailable", + "NewServicesAvailable", + "StatusUpdate", + "TaskCancelation", + "event", + "rpcRequest", + "rpcResponse" +]); + +export class MirrorServerModule extends InjectableNopeBaseModule { + /** + * Attribute holding a Flag, whether the Element is connected or not. + * + * @memberof MirrorServerModule + */ + @exportProperty({ + mode: "publish", + schema: { + description: "An Overview, showing the connected clients", + type: "array", + items: { + description: "The ID of the connection.", + type: "string" + } + }, + topic: "clients" + }) + public clients = new NopeObservable(); + + protected _logger = getNopeLogger("Mirror-Server"); + protected _socket: io.Server; + protected _sockets: Set; + + async init(port = 7001): Promise { + const _this = this; + + // Define the Author. + this.author = { + forename: "Martin", + mail: "m.karkowski@zema.de", + surename: "karkowski" + }; + + this.description = "A Module which will host a Mirror Server."; + this.version = { + date: new Date("22.03.2021"), + version: 1 + }; + + await super.init(); + + this.clients.setContent([]); + + this._logger.info( + "waiting for connection. Listening on port: " + port.toString() + ); + + this._socket = (io as any)(); + this._socket.listen(port); + + this._socket.on("connection", (client) => { + _this._logger.info("New Connection established: " + client.id); + + // Add the Layer to the Dispatcher. + _this._dispatcher.communicator.addMirror(client, true); + + // Now Subscribe the Events and make shure we + // are forwarding the data. + for (const event of EVENTS_TO_FORWARD) { + client.on(event, (data) => { + _this._forward(client, event, data); + }); + } + + // Subscribe to Loosing connection: + client.on("disconnect", () => { + _this._logger.info("Connection of : " + client.id + " lost."); + _this._sockets.delete(client); + + // Manually Remove the Layer + _this._dispatcher.communicator.removeMirror(client); + + _this.clients.forcePublish(); + }); + + _this.clients.forcePublish(); + }); + + this._sockets = new Set(); + } + + protected _id: string; + + protected _forward(socket: io.Socket, event: string, data: any) { + for (const socketToForward of this._sockets) { + if (socket !== socketToForward) { + socketToForward.emit(event, data); + } + } + } + + public async dispose() { + this._socket.close(); + await super.dispose(); + } +}