/** * @author Martin Karkowski * @email m.karkowski@zema.de * @create date 2021-08-03 17:32:16 * @modify date 2021-08-03 21:14:12 * @desc [description] */ import * as Logger from "js-logger"; import { ILogger } from "js-logger"; import { connect, MqttClient } from "mqtt"; import { matches } from "mqtt-pattern"; import { hostname } from "os"; import { generateId } from "../../helpers/idMethods"; import { SPLITCHAR } from "../../helpers/objectMethods"; import { replaceAll } from "../../helpers/stringMethods"; import { getNopeLogger } from "../../logger/getLogger"; import { LoggerLevel } from "../../logger/nopeLogger"; import { NopeObservable } from "../../observables/nopeObservable"; import { IAvailableInstanceGeneratorsMsg, IAvailableInstancesMsg, IAvailableServicesMsg, IAvailableTopicsMsg, ICommunicationInterface, IExecutingTaskMsg, IExternalPropertyChangedMsg, IRpcUnregisterMsg, ITaskCancelationMsg } from "../../types/nope/nopeCommunication.interface"; import { IDispatcherInfo } from "../../types/nope/nopeDispatcher.interface"; import { UNSPECIFIC_TO_METHODS } from "../bridge"; function _mqttMatch(subscription: string, offered: string): boolean { let _subscription = replaceAll(subscription, SPLITCHAR, "/"); let _offered: string = replaceAll(offered, SPLITCHAR, "/"); // Perform the Match let res = matches(_subscription, _offered); if (res) { // If it is matching => Quit method return res; } // Check if the Topic matches the data, based on a shortend Topic if ( _offered.split("/").length > _subscription.split("/").length && subscription.indexOf("+") === -1 ) { // Shorten the offered Topic _offered = _offered .split("/") .slice(0, _subscription.split("/").length) .join("/"); // Repreform the Matching res = matches(_subscription, _offered); } else if ( _offered.split("/").length < _subscription.split("/").length && subscription.indexOf("+") === -1 ) { // Shorten the Subscription _subscription = _subscription .split("/") .slice(0, _offered.split("/").length) .join("/"); // Repreform the Matching res = matches(_subscription, _offered); } // TODO: Fix // Return the Result return res; } export class MQTTLayer implements ICommunicationInterface { protected _client: MqttClient; protected _cbs: Map void>>; protected _logger: ILogger; public connected: NopeObservable; public considerConnection: boolean; public allowServiceRedundancy: boolean; public id: string; /** * Creates an instance of IoSocketClient. * @param {string} uri Uri of the Server. * @param {LoggerLevel} [level="info"] Logger level * @memberof IoSocketClient */ constructor( public uri: string, public preTopic: string = hostname(), public qos: 0 | 1 | 2 = 2, level: LoggerLevel = "info" ) { // Make shure we use the http before connecting. this.uri = this.uri.startsWith("mqtt://") ? this.uri : "mqtt://" + this.uri; this.connected = new NopeObservable(); this.connected.setContent(false); this._cbs = new Map(); this._logger = getNopeLogger("mqtt-layer", level); this.considerConnection = true; this.allowServiceRedundancy = false; this.id = generateId(); this.receivesOwnMessages = true; // Create a Broker and use the provided ID this._client = connect(this.uri); const _this = this; this._client.on("connect", () => { _this.connected.setContent(true); }); this._client.on("disconnect", () => { _this.connected.setContent(false); }); this._client.on("message", (topic, payload) => { const data = JSON.parse(payload.toString("utf-8")); for (const subscription of _this._cbs.keys()) { // Test if the Topic matches if (_mqttMatch(subscription, topic)) { if ( _this._logger.enabledFor((Logger as any).DEBUG) && !topic.includes("nope/StatusUpdate") ) { _this._logger.debug( "received", topic, data, _this._cbs.get(subscription).size ); } for (const callback of _this._cbs.get(subscription)) { // Callback callback(data); } return; } } }); for (const eventName in UNSPECIFIC_TO_METHODS) { const item = UNSPECIFIC_TO_METHODS[eventName]; const subscribeTo = "+/nope/" + eventName; const publishTo = _this.preTopic + "/nope/" + eventName; this[item.subscribe as any] = async (callback: any) => { await _this._on(subscribeTo, callback); }; this[item.emit as any] = async (data: any) => { await _this._emit(publishTo, data); }; } } allowsServiceRedundancy: boolean; readonly receivesOwnMessages: boolean; protected _adaptTopic(topic: string): string { return replaceAll(topic, ".", "/"); } /** * Internal Function to subscribe to a specific callback * @param topic the topic, which should be subscribed * @param callback the callback to call * @returns */ protected _on(topic: string, callback: (...args) => void): Promise { const _this = this; const _topic = this._adaptTopic(topic); return new Promise((resolve, reject) => { if (!_this._cbs.has(_topic)) { // No subscription is present: // create the subscription. _this._cbs.set(_topic, new Set()); _this._logger.info("subscribing :", _topic); // Call the Subscription on MQTT _this._client.subscribe(_topic, { qos: _this.qos }, (err) => { if (err) { reject(err); } else { resolve(); } }); // Store the callback _this._cbs.get(_topic).add(callback); } else { // A susbcription is allready present: // Store the callback _this._cbs.get(_topic).add(callback); resolve(); } }); } /** * Internalf function to remove a susbcription * @param topic the topic, which should be unsubscribed * @param callback the callback to unsubscribe * @returns */ protected _off(topic: string, callback: (...args) => void): Promise { const _this = this; const _topic = this._adaptTopic(topic); return new Promise((resolve, reject) => { if (_this._cbs.has(_topic)) { _this._cbs.get(_topic).delete(callback); if (_this._cbs.get(_topic).size === 0) { _this._cbs.delete(_topic); _this._logger.info("unsubscribing :", _topic); _this._client.unsubscribe(_topic, {}, (err) => { if (err) { reject(err); } else { resolve(); } }); return; } } resolve(); }); } /** * Internal function to publish data on the given topic * @param topic The topic to publish the data on * @param data The data to publish * @returns */ protected _emit(topic: string, data: any): Promise { const _this = this; const _topic = this._adaptTopic(topic); return new Promise((resolve, reject) => { // Publish the event try { if ( _this._logger.enabledFor((Logger as any).DEBUG) && !_topic.startsWith(_this.preTopic + "/nope/StatusUpdate") ) { _this._logger.debug("emitting: ", _topic); } _this._client.publish( _topic, JSON.stringify(data), { qos: _this.qos }, (err) => { if (err) { reject(err); } else { resolve(); } } ); } catch (e) { reject(e); } }); } async onEvent(event: string, cb): Promise { await this._on("+/nope/" + event, cb); } async offEvent(event: string, cb): Promise { await this._off("+/nope/" + event, cb); } async emitEvent(event: string, data): Promise { await this._emit(this.preTopic + "/nope/" + event, data); } async onRpcRequest(name: string, cb): Promise { await this._on("+/nope/rpc/" + name, cb); } async offRpcRequest(name: string, cb): Promise { await this._off("+/nope/rpc/" + name, cb); } async emitRpcRequest(name: string, data): Promise { await this._emit(this.preTopic + "/nope/rpc/" + name, data); } async onRpcResponse(name: string, cb): Promise { await this._on("+/nope/rpc/" + name, cb); } async offRpcResponse(name: string, cb): Promise { await this._off("+/nope/rpc/" + name, cb); } async emitRpcResponse(name: string, data): Promise { await this._emit(this.preTopic + "/nope/rpc/" + name, data); } async onUnregisterRpc(cb: (msg: IRpcUnregisterMsg) => void): Promise { await this._on("+/nope/unregister_rpc/", cb); } async emitUnregisterRpc(data: IRpcUnregisterMsg): Promise { await this._emit(this.preTopic + "/nope/unregister_rpc/", data); } async onExecutingTasks(cb: (msg: IExecutingTaskMsg) => void): Promise { await this._on("+/nope/tasks/", cb); } async emitExecutingTasks(data: IExecutingTaskMsg): Promise { await this._emit(this.preTopic + "/nope/tasks/", data); } /** * Function to dispose the Interface. * @returns nothing */ public dispose(): Promise { const _this = this; return new Promise((resolve, reject) => { this._client.end(true, {}, (err) => { if (err) reject(err); else resolve(); }); }); } // Implement the Default behavior of a Communication-Layer. // All the Methods below will be implemented // during the initalization. So if something // fails => an Error is thrown. emitStatusUpdate(status: IDispatcherInfo): Promise { throw new Error("Method not implemented."); } onStatusUpdate(cb: (status: IDispatcherInfo) => void): Promise { throw new Error("Method not implemented."); } emitNewServicesAvailable(services: IAvailableServicesMsg): Promise { throw new Error("Method not implemented."); } onNewServicesAvailable( cb: (services: IAvailableServicesMsg) => void ): Promise { throw new Error("Method not implemented."); } emitNewObservablesAvailable(topics: IAvailableTopicsMsg): Promise { throw new Error("Method not implemented."); } onNewObservablesAvailable( cb: (topics: IAvailableTopicsMsg) => void ): Promise { throw new Error("Method not implemented."); } onBonjour(cb: (msg: IDispatcherInfo) => void): Promise { throw new Error("Method not implemented."); } emitBonjour(msg: IDispatcherInfo): Promise { throw new Error("Method not implemented."); } onAurevoir(cb: (dispatcher: string) => void): Promise { throw new Error("Method not implemented."); } emitAurevoir(dispatcher: string): Promise { throw new Error("Method not implemented."); } onTaskCancelation(cb: (msg: ITaskCancelationMsg) => void): Promise { throw new Error("Method not implemented."); } emitTaskCancelation(msg: ITaskCancelationMsg): Promise { throw new Error("Method not implemented."); } emitNewInstanceGeneratorsAvailable( creators: IAvailableInstanceGeneratorsMsg ): Promise { throw new Error("Method not implemented."); } onNewInstanceGeneratorsAvailable( cb: (creators: IAvailableInstanceGeneratorsMsg) => void ): Promise { throw new Error("Method not implemented."); } onNewInstancesAvailable( cb: (instances: IAvailableInstancesMsg) => void ): Promise { throw new Error("Method not implemented."); } emitNewInstancesAvailable(instances: IAvailableInstancesMsg): Promise { throw new Error("Method not implemented."); } onPropertyChange(name: string, cb: (data: IExternalPropertyChangedMsg) => void): Promise { throw new Error("Method not implemented."); } emitPropertyChange(name: string, data: IExternalPropertyChangedMsg): Promise { throw new Error("Method not implemented."); } offPropertyChange(name: string, cb: (data: IExternalPropertyChangedMsg) => void): Promise { throw new Error("Method not implemented."); } }