/** * @author Martin Karkowski * @email m.karkowski@zema.de * @create date 2021-08-09 21:44:37 * @modify date 2021-08-09 21:44:37 * @desc [description] */ import * as aedes from "aedes"; import { injectable } from "inversify"; import { ILogger } from "js-logger"; import * as net from "net"; import { getNopeLogger } from "../../../lib/logger/getLogger"; import { InjectableNopeBaseModule } from "../../../lib/module/BaseModule.injectable"; import { NopeObservable } from "../../../lib/observables/nopeObservable"; import { IGenericNopeModule } from "../../../lib/types/nope/nopeModule.interface"; import { INopeObservable } from "../../../lib/types/nope/nopeObservable.interface"; import { IMQTTClientModule } from "../types/interfaces"; export interface MQTT extends IGenericNopeModule, IMQTTClientModule { } @injectable() export class MqttTestModule extends InjectableNopeBaseModule { protected _logger: ILogger public subscribeProp: INopeObservable; public publishProp: INopeObservable; public aliasSubProp: INopeObservable; public aliasPubProp: INopeObservable; public aliasProp: INopeObservable; protected _interval; async init() { const logger = getNopeLogger("mqtt-broker"); const _aedes = aedes(); const _tcpServer = net.createServer(_aedes.handle); _tcpServer.listen(1883, () => { logger.info("MQTT-Broker online"); }); this._logger = getNopeLogger("mqtt-test-module"); this.subscribeProp = new NopeObservable(); this.publishProp = new NopeObservable(); this.aliasSubProp = new NopeObservable(); this.aliasPubProp = new NopeObservable(); this.aliasProp = new NopeObservable(); this.author = { forename: "Martin", mail: "m.karkowski@zema.de", surename: "karkowski" }; this.description = "Test MQTT Module for Nope"; this.version = { date: new Date("09.08.2021"), version: 1 }; const _this = this; const props = [{ name: "subscribeProp", mode: ["subscribe"] }, { name: "aliasSubProp", mode: ["subscribe"] }, { name: "aliasPubProp", mode: ["publish"] }, { name: "aliasProp", mode: ["subscribe", "subscribe"] }, { name: "publishProp", mode: ["publish"] }]; for (const prop of props) { // Register Property await this.registerProperty(prop.name, this[prop.name], { mode: prop.mode as any, schema: { type: "string" }, topic: prop.name }); if (prop.mode.includes("subscribe")) { this[prop.name].subscribe((value, { sender }) => { _this._logger.info("Changed", prop.name, "to", value, "adapted by", sender); }); } } // create a MQTT-Connector: const broker = "mqtt://localhost:1883"; const mqttParams = [ broker, { autoPublish: { active: false }, autoSubscribe: "alias" }, // Waiting for the Connection. true ]; await super.init(); // create a MQTT-Connector: const _mqtt = await _this._dispatcher.generateInstance({ identifier: ("mqtt-client").toLowerCase(), params: mqttParams, type: "MQTTClientModule" }); // We add an Alias for the fired Attribute. This results in adding // an alias for the property "fired" await _mqtt.addAlias( _this.getIdentifierOf(_this.aliasSubProp), { subscribe: "test/alias/subscribe" } ); await _mqtt.addAlias( _this.getIdentifierOf(_this.aliasProp), { subscribe: "test/alias/publish" } ); await _mqtt.addAlias( _this.getIdentifierOf(_this.aliasProp), { subscribe: "test/alias2/subscribe", publish: "test/alias2/publish" } ); let i = 0; // Define an interval this._interval = setInterval(() => { _this.publishProp.setContent((i + 1).toString()); _this.aliasPubProp.setContent((i + 1).toString()); _this.aliasProp.setContent((i + 1).toString()); i++; }, 100); } async dispose() { clearInterval(this._interval); } }