nope/lib/communication/layers/mqttLayer.ts
2021-12-04 08:25:26 +01:00

423 lines
12 KiB
TypeScript

/**
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2021-08-03 17:32:16
* @modify date 2021-08-03 21:14:12
* @desc [description]
*/
import * as Logger from "js-logger";
import { ILogger } from "js-logger";
import { connect, MqttClient } from "mqtt";
import { matches } from "mqtt-pattern";
import { hostname } from "os";
import { generateId } from "../../helpers/idMethods";
import { SPLITCHAR } from "../../helpers/objectMethods";
import { replaceAll } from "../../helpers/stringMethods";
import { getNopeLogger } from "../../logger/getLogger";
import { LoggerLevel } from "../../logger/nopeLogger";
import { NopeObservable } from "../../observables/nopeObservable";
import {
IAvailableInstanceGeneratorsMsg,
IAvailableInstancesMsg,
IAvailableServicesMsg,
IAvailableTopicsMsg,
ICommunicationInterface,
IExecutingTaskMsg,
IExternalPropertyChangedMsg,
IRpcUnregisterMsg,
ITaskCancelationMsg,
} from "../../types/nope/nopeCommunication.interface";
import { IDispatcherInfo } from "../../types/nope/nopeDispatcher.interface";
import { UNSPECIFIC_TO_METHODS } from "../bridge";
function _mqttMatch(subscription: string, offered: string): boolean {
let _subscription = replaceAll(subscription, SPLITCHAR, "/");
let _offered: string = replaceAll(offered, SPLITCHAR, "/");
// Perform the Match
let res = matches(_subscription, _offered);
if (res) {
// If it is matching => Quit method
return res;
}
// Check if the Topic matches the data, based on a shortend Topic
if (
_offered.split("/").length > _subscription.split("/").length &&
subscription.indexOf("+") === -1
) {
// Shorten the offered Topic
_offered = _offered
.split("/")
.slice(0, _subscription.split("/").length)
.join("/");
// Repreform the Matching
res = matches(_subscription, _offered);
} else if (
_offered.split("/").length < _subscription.split("/").length &&
subscription.indexOf("+") === -1
) {
// Shorten the Subscription
_subscription = _subscription
.split("/")
.slice(0, _offered.split("/").length)
.join("/");
// Repreform the Matching
res = matches(_subscription, _offered);
}
// TODO: Fix
// Return the Result
return res;
}
export class MQTTLayer implements ICommunicationInterface {
protected _client: MqttClient;
protected _cbs: Map<string, Set<(...args: any[]) => void>>;
protected _logger: ILogger;
public connected: NopeObservable<boolean>;
public considerConnection: boolean;
public allowServiceRedundancy: boolean;
public id: string;
/**
* Creates an instance of IoSocketClient.
* @param {string} uri Uri of the Server.
* @param {LoggerLevel} [level="info"] Logger level
* @memberof IoSocketClient
*/
constructor(
public uri: string,
public preTopic: string = hostname(),
public qos: 0 | 1 | 2 = 2,
level: LoggerLevel = "info"
) {
// Make shure we use the http before connecting.
this.uri = this.uri.startsWith("mqtt://") ? this.uri : "mqtt://" + this.uri;
this.connected = new NopeObservable<boolean>();
this.connected.setContent(false);
this._cbs = new Map();
this._logger = getNopeLogger("mqtt-layer", level);
this.considerConnection = true;
this.allowServiceRedundancy = false;
this.id = generateId();
this.receivesOwnMessages = true;
// Create a Broker and use the provided ID
this._client = connect(this.uri);
const _this = this;
this._client.on("connect", () => {
_this.connected.setContent(true);
});
this._client.on("disconnect", () => {
_this.connected.setContent(false);
});
this._client.on("message", (topic, payload) => {
const data = JSON.parse(payload.toString("utf-8"));
for (const subscription of _this._cbs.keys()) {
// Test if the Topic matches
if (_mqttMatch(subscription, topic)) {
if (
_this._logger.enabledFor((Logger as any).DEBUG) &&
!topic.includes("nope/StatusUpdate")
) {
_this._logger.debug(
"received",
topic,
data,
_this._cbs.get(subscription).size
);
}
for (const callback of _this._cbs.get(subscription)) {
// Callback
callback(data);
}
return;
}
}
});
for (const eventName in UNSPECIFIC_TO_METHODS) {
const item = UNSPECIFIC_TO_METHODS[eventName];
const subscribeTo = "+/nope/" + eventName;
const publishTo = _this.preTopic + "/nope/" + eventName;
this[item.subscribe as any] = async (callback: any) => {
await _this._on(subscribeTo, callback);
};
this[item.emit as any] = async (data: any) => {
await _this._emit(publishTo, data);
};
}
}
allowsServiceRedundancy: boolean;
readonly receivesOwnMessages: boolean;
protected _adaptTopic(topic: string): string {
return replaceAll(topic, ".", "/");
}
/**
* Internal Function to subscribe to a specific callback
* @param topic the topic, which should be subscribed
* @param callback the callback to call
* @returns
*/
protected _on(topic: string, callback: (...args) => void): Promise<void> {
const _this = this;
const _topic = this._adaptTopic(topic);
return new Promise<void>((resolve, reject) => {
if (!_this._cbs.has(_topic)) {
// No subscription is present:
// create the subscription.
_this._cbs.set(_topic, new Set());
_this._logger.info("subscribing :", _topic);
// Call the Subscription on MQTT
_this._client.subscribe(_topic, { qos: _this.qos }, (err) => {
if (err) {
reject(err);
} else {
resolve();
}
});
// Store the callback
_this._cbs.get(_topic).add(callback);
} else {
// A susbcription is allready present:
// Store the callback
_this._cbs.get(_topic).add(callback);
resolve();
}
});
}
/**
* Internalf function to remove a susbcription
* @param topic the topic, which should be unsubscribed
* @param callback the callback to unsubscribe
* @returns
*/
protected _off(topic: string, callback: (...args) => void): Promise<void> {
const _this = this;
const _topic = this._adaptTopic(topic);
return new Promise((resolve, reject) => {
if (_this._cbs.has(_topic)) {
_this._cbs.get(_topic).delete(callback);
if (_this._cbs.get(_topic).size === 0) {
_this._cbs.delete(_topic);
_this._logger.info("unsubscribing :", _topic);
_this._client.unsubscribe(_topic, {}, (err) => {
if (err) {
reject(err);
} else {
resolve();
}
});
return;
}
}
resolve();
});
}
/**
* Internal function to publish data on the given topic
* @param topic The topic to publish the data on
* @param data The data to publish
* @returns
*/
protected _emit(topic: string, data: any): Promise<void> {
const _this = this;
const _topic = this._adaptTopic(topic);
return new Promise<void>((resolve, reject) => {
// Publish the event
try {
if (
_this._logger.enabledFor((Logger as any).DEBUG) &&
!_topic.startsWith(_this.preTopic + "/nope/StatusUpdate")
) {
_this._logger.debug("emitting: ", _topic);
}
_this._client.publish(
_topic,
JSON.stringify(data),
{ qos: _this.qos },
(err) => {
if (err) {
reject(err);
} else {
resolve();
}
}
);
} catch (e) {
reject(e);
}
});
}
async onEvent(event: string, cb): Promise<void> {
await this._on("+/nope/" + event, cb);
}
async offEvent(event: string, cb): Promise<void> {
await this._off("+/nope/" + event, cb);
}
async emitEvent(event: string, data): Promise<void> {
await this._emit(this.preTopic + "/nope/" + event, data);
}
async onRpcRequest(name: string, cb): Promise<void> {
await this._on("+/nope/rpc/" + name, cb);
}
async offRpcRequest(name: string, cb): Promise<void> {
await this._off("+/nope/rpc/" + name, cb);
}
async emitRpcRequest(name: string, data): Promise<void> {
await this._emit(this.preTopic + "/nope/rpc/" + name, data);
}
async onRpcResponse(name: string, cb): Promise<void> {
await this._on("+/nope/rpc/" + name, cb);
}
async offRpcResponse(name: string, cb): Promise<void> {
await this._off("+/nope/rpc/" + name, cb);
}
async emitRpcResponse(name: string, data): Promise<void> {
await this._emit(this.preTopic + "/nope/rpc/" + name, data);
}
async onUnregisterRpc(cb: (msg: IRpcUnregisterMsg) => void): Promise<void> {
await this._on("+/nope/unregister_rpc/", cb);
}
async emitUnregisterRpc(data: IRpcUnregisterMsg): Promise<void> {
await this._emit(this.preTopic + "/nope/unregister_rpc/", data);
}
async onExecutingTasks(cb: (msg: IExecutingTaskMsg) => void): Promise<void> {
await this._on("+/nope/tasks/", cb);
}
async emitExecutingTasks(data: IExecutingTaskMsg): Promise<void> {
await this._emit(this.preTopic + "/nope/tasks/", data);
}
/**
* Function to dispose the Interface.
* @returns nothing
*/
public dispose(): Promise<void> {
const _this = this;
return new Promise<void>((resolve, reject) => {
this._client.end(true, {}, (err) => {
if (err) reject(err);
else resolve();
});
});
}
// Implement the Default behavior of a Communication-Layer.
// All the Methods below will be implemented
// during the initalization. So if something
// fails => an Error is thrown.
emitStatusUpdate(status: IDispatcherInfo): Promise<void> {
throw new Error("Method not implemented.");
}
onStatusUpdate(cb: (status: IDispatcherInfo) => void): Promise<void> {
throw new Error("Method not implemented.");
}
emitNewServicesAvailable(services: IAvailableServicesMsg): Promise<void> {
throw new Error("Method not implemented.");
}
onNewServicesAvailable(
cb: (services: IAvailableServicesMsg) => void
): Promise<void> {
throw new Error("Method not implemented.");
}
emitNewObservablesAvailable(topics: IAvailableTopicsMsg): Promise<void> {
throw new Error("Method not implemented.");
}
onNewObservablesAvailable(
cb: (topics: IAvailableTopicsMsg) => void
): Promise<void> {
throw new Error("Method not implemented.");
}
onBonjour(cb: (msg: IDispatcherInfo) => void): Promise<void> {
throw new Error("Method not implemented.");
}
emitBonjour(msg: IDispatcherInfo): Promise<void> {
throw new Error("Method not implemented.");
}
onAurevoir(cb: (dispatcher: string) => void): Promise<void> {
throw new Error("Method not implemented.");
}
emitAurevoir(dispatcher: string): Promise<void> {
throw new Error("Method not implemented.");
}
onTaskCancelation(cb: (msg: ITaskCancelationMsg) => void): Promise<void> {
throw new Error("Method not implemented.");
}
emitTaskCancelation(msg: ITaskCancelationMsg): Promise<void> {
throw new Error("Method not implemented.");
}
emitNewInstanceGeneratorsAvailable(
creators: IAvailableInstanceGeneratorsMsg
): Promise<void> {
throw new Error("Method not implemented.");
}
onNewInstanceGeneratorsAvailable(
cb: (creators: IAvailableInstanceGeneratorsMsg) => void
): Promise<void> {
throw new Error("Method not implemented.");
}
onNewInstancesAvailable(
cb: (instances: IAvailableInstancesMsg) => void
): Promise<void> {
throw new Error("Method not implemented.");
}
emitNewInstancesAvailable(instances: IAvailableInstancesMsg): Promise<void> {
throw new Error("Method not implemented.");
}
onPropertyChange(
name: string,
cb: (data: IExternalPropertyChangedMsg<unknown>) => void
): Promise<void> {
throw new Error("Method not implemented.");
}
emitPropertyChange(
name: string,
data: IExternalPropertyChangedMsg<unknown>
): Promise<void> {
throw new Error("Method not implemented.");
}
offPropertyChange(
name: string,
cb: (data: IExternalPropertyChangedMsg<unknown>) => void
): Promise<void> {
throw new Error("Method not implemented.");
}
}