298 lines
9.0 KiB
TypeScript
298 lines
9.0 KiB
TypeScript
/**
|
|
* @author Martin Karkowski
|
|
* @email m.karkowski@zema.de
|
|
* @create date 2021-08-03 17:32:16
|
|
* @modify date 2021-08-03 21:14:12
|
|
* @desc [description]
|
|
*/
|
|
|
|
import * as Logger from "js-logger";
|
|
import { ILogger } from "js-logger";
|
|
import { connect, MqttClient } from "mqtt";
|
|
import { matches } from "mqtt-pattern";
|
|
import { hostname } from "os";
|
|
import { generateId } from "../../helpers/idMethods";
|
|
import { SPLITCHAR } from "../../helpers/objectMethods";
|
|
import { replaceAll } from "../../helpers/stringMethods";
|
|
import { getNopeLogger } from "../../logger/getLogger";
|
|
import { LoggerLevel } from "../../logger/nopeLogger";
|
|
import { NopeObservable } from "../../observables/nopeObservable";
|
|
import { ICommunicationInterface, IExecutingTaskMsg, IRpcUnregisterMsg } from "../../types/nope/nopeCommunication.interface";
|
|
import { UNSPECIFIC_TO_METHODS } from "../bridge";
|
|
|
|
function _mqttMatch(subscription: string, offered: string): boolean {
|
|
let _subscription = replaceAll(subscription, SPLITCHAR, "/");
|
|
let _offered: string = replaceAll(offered, SPLITCHAR, "/");
|
|
|
|
// Perform the Match
|
|
let res = matches(_subscription, _offered);
|
|
|
|
if (res) {
|
|
// If it is matching => Quit method
|
|
return res;
|
|
}
|
|
|
|
// Check if the Topic matches the data, based on a shortend Topic
|
|
if ((_offered.split("/").length > _subscription.split("/").length) && (subscription.indexOf("+") === -1)) {
|
|
// Shorten the offered Topic
|
|
_offered = _offered.split("/").slice(0, _subscription.split("/").length).join("/");
|
|
// Repreform the Matching
|
|
res = matches(_subscription, _offered);
|
|
} else if ((_offered.split("/").length < _subscription.split("/").length) && (subscription.indexOf("+") === -1)) {
|
|
// Shorten the Subscription
|
|
_subscription = _subscription.split("/").slice(0, _offered.split("/").length).join("/");
|
|
// Repreform the Matching
|
|
res = matches(_subscription, _offered);
|
|
}
|
|
|
|
// TODO: Fix
|
|
|
|
// Return the Result
|
|
return res;
|
|
}
|
|
|
|
//@ts-ignore Ignore the Interface. Its implemented manually
|
|
export class MQTTLayer implements ICommunicationInterface {
|
|
|
|
protected _client: MqttClient;
|
|
protected _cbs: Map<string, Set<(...args: any[]) => void>>;
|
|
protected _logger: ILogger
|
|
|
|
public connected: NopeObservable<boolean>;
|
|
public considerConnection: boolean;
|
|
public allowServiceRedundancy: boolean;
|
|
public id: string;
|
|
/**
|
|
* Creates an instance of IoSocketClient.
|
|
* @param {string} uri Uri of the Server.
|
|
* @param {LoggerLevel} [level="info"] Logger level
|
|
* @memberof IoSocketClient
|
|
*/
|
|
constructor(public uri: string, public preTopic: string = hostname(), public qos: 0 | 1 | 2 = 1, level: LoggerLevel = "info") {
|
|
// Make shure we use the http before connecting.
|
|
this.uri = this.uri.startsWith("mqtt://") ? this.uri : "mqtt://" + this.uri;
|
|
this.connected = new NopeObservable<boolean>();
|
|
this.connected.setContent(false);
|
|
|
|
this._cbs = new Map();
|
|
this._logger = getNopeLogger("mqtt-layer", level);
|
|
|
|
this.considerConnection = true;
|
|
this.allowServiceRedundancy = false;
|
|
this.id = generateId();
|
|
|
|
|
|
// Create a Broker and use the provided ID
|
|
this._client = connect(this.uri);
|
|
|
|
const _this = this;
|
|
|
|
this._client.on("connect", () => {
|
|
_this.connected.setContent(true);
|
|
});
|
|
this._client.on("disconnect", () => {
|
|
_this.connected.setContent(false);
|
|
});
|
|
this._client.on("message", (topic, payload) => {
|
|
const data = JSON.parse(payload.toString("utf-8"));
|
|
for (const subscription of _this._cbs.keys()) {
|
|
// Test if the Topic matches
|
|
if (_mqttMatch(subscription, topic)) {
|
|
if (_this._logger.enabledFor(Logger.DEBUG) && !topic.includes("nope/StatusUpdate")) {
|
|
_this._logger.debug("received", topic, data, _this._cbs.get(subscription).size);
|
|
}
|
|
for (const callback of _this._cbs.get(subscription)) {
|
|
// Callback
|
|
callback(data);
|
|
}
|
|
|
|
return;
|
|
}
|
|
}
|
|
|
|
});
|
|
|
|
for (const eventName in UNSPECIFIC_TO_METHODS) {
|
|
const item = UNSPECIFIC_TO_METHODS[eventName];
|
|
const subscribeTo = "+/nope/" + eventName;
|
|
const publishTo = _this.preTopic + "/nope/" + eventName;
|
|
|
|
this[item.subscribe as any] = async (callback: any) => {
|
|
await _this._on(subscribeTo, callback);
|
|
};
|
|
|
|
this[item.emit as any] = async (data: any) => {
|
|
await _this._emit(publishTo, data);
|
|
};
|
|
}
|
|
}
|
|
|
|
protected _adaptTopic(topic: string): string {
|
|
return replaceAll(topic, ".", "/");
|
|
}
|
|
|
|
/**
|
|
* Internal Function to subscribe to a specific callback
|
|
* @param topic the topic, which should be subscribed
|
|
* @param callback the callback to call
|
|
* @returns
|
|
*/
|
|
protected _on(topic: string, callback: (...args) => void): Promise<void> {
|
|
const _this = this;
|
|
const _topic = this._adaptTopic(topic);
|
|
return new Promise<void>((resolve, reject) => {
|
|
if (!_this._cbs.has(_topic)) {
|
|
// No subscription is present:
|
|
// create the subscription.
|
|
_this._cbs.set(_topic, new Set());
|
|
|
|
_this._logger.info("subscribing :", _topic);
|
|
|
|
// Call the Subscription on MQTT
|
|
_this._client.subscribe(_topic, { qos: _this.qos }, (err) => {
|
|
if (err) {
|
|
reject(err);
|
|
} else {
|
|
resolve();
|
|
}
|
|
});
|
|
|
|
// Store the callback
|
|
_this._cbs.get(_topic).add(callback);
|
|
} else {
|
|
// A susbcription is allready present:
|
|
// Store the callback
|
|
_this._cbs.get(_topic).add(callback);
|
|
resolve();
|
|
}
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Internalf function to remove a susbcription
|
|
* @param topic the topic, which should be unsubscribed
|
|
* @param callback the callback to unsubscribe
|
|
* @returns
|
|
*/
|
|
protected _off(topic: string, callback: (...args) => void): Promise<void> {
|
|
const _this = this;
|
|
const _topic = this._adaptTopic(topic);
|
|
|
|
return new Promise((resolve, reject) => {
|
|
if (_this._cbs.has(_topic)) {
|
|
_this._cbs.get(_topic).delete(callback);
|
|
|
|
if (_this._cbs.get(_topic).size === 0) {
|
|
_this._cbs.delete(_topic);
|
|
_this._logger.info("unsubscribing :", _topic);
|
|
_this._client.unsubscribe(_topic, {}, (err) => {
|
|
if (err) {
|
|
reject(err);
|
|
} else {
|
|
resolve();
|
|
}
|
|
});
|
|
return;
|
|
}
|
|
}
|
|
|
|
|
|
resolve();
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Internal function to publish data on the given topic
|
|
* @param topic The topic to publish the data on
|
|
* @param data The data to publish
|
|
* @returns
|
|
*/
|
|
protected _emit(topic: string, data: any): Promise<void> {
|
|
const _this = this;
|
|
const _topic = this._adaptTopic(topic);
|
|
return new Promise<void>((resolve, reject) => {
|
|
// Publish the event
|
|
try {
|
|
if (_this._logger.enabledFor(Logger.DEBUG) && !_topic.startsWith(_this.preTopic + "/nope/StatusUpdate")) {
|
|
_this._logger.debug("emitting: ", _topic);
|
|
}
|
|
_this._client.publish(_topic, JSON.stringify(data), { qos: _this.qos }, (err) => {
|
|
if (err) {
|
|
reject(err);
|
|
} else {
|
|
resolve();
|
|
}
|
|
});
|
|
} catch (e) {
|
|
reject(e);
|
|
}
|
|
});
|
|
}
|
|
|
|
async onEvent(event: string, cb): Promise<void> {
|
|
await this._on("+/nope/" + event, cb);
|
|
}
|
|
|
|
async offEvent(event: string, cb): Promise<void> {
|
|
await this._off("+/nope/" + event, cb);
|
|
}
|
|
|
|
async emitEvent(event: string, data): Promise<void> {
|
|
await this._emit(this.preTopic + "/nope/" + event, data);
|
|
}
|
|
|
|
async onRpcRequest(name: string, cb): Promise<void> {
|
|
await this._on("+/nope/rpc/" + name, cb);
|
|
}
|
|
|
|
async offRpcRequest(name: string, cb): Promise<void> {
|
|
await this._off("+/nope/rpc/" + name, cb);
|
|
}
|
|
|
|
async emitRpcRequest(name: string, data): Promise<void> {
|
|
await this._emit(this.preTopic + "/nope/rpc/" + name, data);
|
|
}
|
|
|
|
async onRpcResponse(name: string, cb): Promise<void> {
|
|
await this._on("+/nope/rpc/" + name, cb);
|
|
}
|
|
|
|
async offRpcResponse(name: string, cb): Promise<void> {
|
|
await this._off("+/nope/rpc/" + name, cb);
|
|
}
|
|
|
|
async emitRpcResponse(name: string, data): Promise<void> {
|
|
await this._emit(this.preTopic + "/nope/rpc/" + name, data);
|
|
}
|
|
|
|
async onUnregisterRpc(cb: (msg: IRpcUnregisterMsg) => void): Promise<void> {
|
|
await this._on("+/nope/unregister_rpc/", cb);
|
|
}
|
|
|
|
async emitUnregisterRpc(data: IRpcUnregisterMsg): Promise<void> {
|
|
await this._emit(this.preTopic + "/nope/unregister_rpc/", data);
|
|
}
|
|
|
|
async onExecutingTasks(cb: (msg: IExecutingTaskMsg) => void): Promise<void> {
|
|
await this._on("+/nope/tasks/", cb);
|
|
}
|
|
|
|
async emitExecutingTasks(data: IExecutingTaskMsg): Promise<void> {
|
|
await this._emit(this.preTopic + "/nope/tasks/", data);
|
|
}
|
|
|
|
/**
|
|
* Function to dispose the Interface.
|
|
* @returns nothing
|
|
*/
|
|
public dispose(): Promise<void> {
|
|
const _this = this;
|
|
return new Promise<void>((resolve, reject) => {
|
|
|
|
this._client.end(true, {}, err => {
|
|
if (err) reject(err);
|
|
else resolve();
|
|
});
|
|
});
|
|
}
|
|
} |