nope/lib/communication/layers/mqttLayer.ts

320 lines
8.7 KiB
TypeScript

/**
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2021-08-03 17:32:16
* @modify date 2021-08-03 21:14:12
* @desc [description]
*/
import { ILogger } from "js-logger";
import { connect, MqttClient } from "mqtt";
import { matches } from "mqtt-pattern";
import { hostname } from "os";
import { generateId } from "../../helpers/idMethods";
import { SPLITCHAR } from "../../helpers/objectMethods";
import { replaceAll } from "../../helpers/stringMethods";
import {
defineNopeLogger,
ValidLoggerDefinition,
} from "../../logger/getLogger";
import { DEBUG, INFO } from "../../logger/index.browser";
import { NopeObservable } from "../../observables/nopeObservable";
import {
EventnameToEventType,
ICommunicationInterface,
} from "../../types/nope";
function _mqttMatch(subscription: string, offered: string): boolean {
let _subscription = replaceAll(subscription, SPLITCHAR, "/");
let _offered: string = replaceAll(offered, SPLITCHAR, "/");
// Perform the Match
let res = matches(_subscription, _offered);
if (res) {
// If it is matching => Quit method
return res;
}
// Check if the Topic matches the data, based on a shortend Topic
if (
_offered.split("/").length > _subscription.split("/").length &&
subscription.indexOf("+") === -1
) {
// Shorten the offered Topic
_offered = _offered
.split("/")
.slice(0, _subscription.split("/").length)
.join("/");
// Repreform the Matching
res = matches(_subscription, _offered);
} else if (
_offered.split("/").length < _subscription.split("/").length &&
subscription.indexOf("+") === -1
) {
// Shorten the Subscription
_subscription = _subscription
.split("/")
.slice(0, _offered.split("/").length)
.join("/");
// Repreform the Matching
res = matches(_subscription, _offered);
}
// TODO: Fix
// Return the Result
return res;
}
export class MQTTLayer implements ICommunicationInterface {
protected _client: MqttClient;
protected _cbs: Map<string, Set<(...args: any[]) => void>>;
protected _logger: ILogger;
public connected: NopeObservable<boolean>;
public considerConnection: boolean;
public allowServiceRedundancy: boolean;
public id: string;
/**
* Creates an instance of IoSocketClient.
* @param {string} uri Uri of the Server.
* @param {LoggerLevel} [logger="info"] Logger level
* @memberof IoSocketClient
*/
constructor(
public uri: string,
logger: ValidLoggerDefinition = "info",
public preTopic: string = hostname(),
public qos: 0 | 1 | 2 = 2,
public forwardToCustomTopics = true
) {
// Make shure we use the http before connecting.
this.uri = this.uri.startsWith("mqtt://") ? this.uri : "mqtt://" + this.uri;
this.connected = new NopeObservable<boolean>();
this.connected.setContent(false);
this._cbs = new Map();
this._logger = defineNopeLogger(logger, "core.layer.mqtt");
this.considerConnection = true;
this.allowServiceRedundancy = false;
this.id = generateId();
this.receivesOwnMessages = true;
// Create a Broker and use the provided ID
this._client = connect(this.uri);
const _this = this;
this._client.on("connect", () => {
_this.connected.setContent(true);
});
this._client.on("disconnect", () => {
_this.connected.setContent(false);
});
this._client.on("message", (topic, payload) => {
const data = JSON.parse(payload.toString("utf-8"));
for (const subscription of _this._cbs.keys()) {
// Test if the Topic matches
if (_mqttMatch(subscription, topic)) {
if (
_this._logger?.enabledFor(DEBUG) &&
!topic.includes("nope/StatusUpdate")
) {
_this._logger.debug(
"received",
topic,
data,
_this._cbs.get(subscription).size
);
}
for (const callback of _this._cbs.get(subscription)) {
// Callback
callback(data);
}
return;
}
}
});
}
// See interface description
async on<T extends keyof EventnameToEventType>(
eventname: T,
cb: (data: EventnameToEventType[T]) => void
): Promise<void> {
return await this._on(`+/nope/${eventname}`, cb);
}
// See interface description
async emit<T extends keyof EventnameToEventType>(
eventname: T,
data: EventnameToEventType[T]
): Promise<void> {
await this._emit(`${this.preTopic}/nope/${eventname}`, data);
if (this.forwardToCustomTopics) {
switch (eventname) {
case "DataChanged": {
let topic = (data as EventnameToEventType["DataChanged"]).path;
topic = this._adaptTopic(topic);
await this._emit(
topic,
(data as EventnameToEventType["DataChanged"]).data
);
break;
}
case "Event": {
let topic = (data as EventnameToEventType["Event"]).path;
topic = this._adaptTopic(topic);
await this._emit(topic, (data as EventnameToEventType["Event"]).data);
break;
}
case "RpcRequest": {
let topic = (data as EventnameToEventType["RpcRequest"]).functionId;
topic = this._adaptTopic(topic);
await this._emit(
topic,
(data as EventnameToEventType["RpcRequest"]).params
);
break;
}
}
}
}
allowsServiceRedundancy: boolean;
readonly receivesOwnMessages: boolean;
protected _adaptTopic(topic: string): string {
return replaceAll(topic, ".", "/");
}
/**
* Internal Function to subscribe to a specific callback
* @param topic the topic, which should be subscribed
* @param callback the callback to call
* @returns
*/
protected _on(topic: string, callback: (...args) => void): Promise<void> {
const _this = this;
const _topic = `${this._adaptTopic(topic)}`;
return new Promise<void>((resolve, reject) => {
if (!_this._cbs.has(_topic)) {
// No subscription is present:
// create the subscription.
_this._cbs.set(_topic, new Set());
if (_this._logger?.enabledFor(DEBUG)) {
_this._logger.debug("subscribing :", _topic);
}
// Call the Subscription on MQTT
_this._client.subscribe(_topic, { qos: _this.qos }, (err) => {
if (err) {
reject(err);
} else {
resolve();
}
});
// Store the callback
_this._cbs.get(_topic).add(callback);
} else {
// A susbcription is allready present:
// Store the callback
_this._cbs.get(_topic).add(callback);
resolve();
}
});
}
/**
* Internalf function to remove a susbcription
* @param topic the topic, which should be unsubscribed
* @param callback the callback to unsubscribe
* @returns
*/
protected _off(topic: string, callback: (...args) => void): Promise<void> {
const _this = this;
const _topic = this._adaptTopic(topic);
return new Promise((resolve, reject) => {
if (_this._cbs.has(_topic)) {
_this._cbs.get(_topic).delete(callback);
if (_this._cbs.get(_topic).size === 0) {
_this._cbs.delete(_topic);
if (_this._logger?.enabledFor(INFO)) {
_this._logger.info("unsubscribing :", _topic);
}
_this._client.unsubscribe(_topic, {}, (err) => {
if (err) {
reject(err);
} else {
resolve();
}
});
return;
}
}
resolve();
});
}
/**
* Internal function to publish data on the given topic
* @param topic The topic to publish the data on
* @param data The data to publish
* @returns
*/
protected _emit(topic: string, data: any): Promise<void> {
const _this = this;
const _topic = this._adaptTopic(topic);
return new Promise<void>((resolve, reject) => {
// Publish the event
try {
if (
_this._logger?.enabledFor(DEBUG) &&
!_topic.startsWith(_this.preTopic + "/nope/StatusUpdate")
) {
_this._logger.debug("emitting: ", _topic);
}
_this._client.publish(
_topic,
JSON.stringify(data),
{ qos: _this.qos },
(err) => {
if (err) {
reject(err);
} else {
resolve();
}
}
);
} catch (e) {
reject(e);
}
});
}
/**
* Function to dispose the Interface.
* @returns nothing
*/
public dispose(): Promise<void> {
const _this = this;
return new Promise<void>((resolve, reject) => {
this._client.end(true, {}, (err) => {
if (err) reject(err);
else resolve();
});
});
}
}