Fixing custom subscription

This commit is contained in:
Martin Karkowski 2021-01-05 07:38:28 +01:00
parent b1a2ca30a6
commit 6424fd9eae
2 changed files with 31 additions and 25 deletions

View File

@ -2,7 +2,7 @@
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2020-11-13 09:53:27
* @modify date 2021-01-04 17:35:12
* @modify date 2021-01-05 07:37:44
* @desc [description]
*/
@ -21,6 +21,7 @@ import { InjectableNopeBaseModule } from "../../../lib/module/BaseModule.injecta
import { NopeObservable } from "../../../lib/observables/nopeObservable";
import { ISubscriptionOptions } from "../../mod-Publish-And-Subscribe-System/type/interfaces";
import { IMQTTClientModule, IMqttSettings } from "../types/interfaces";
import { mqttTopicAffected } from "./mqtt.functions";
const MQTT_SUBSCRIPTIONS_TOPIC = "mqtt/subscriptions";
@ -111,7 +112,7 @@ export class MQTTClientModule
private _subscriptions = new Map<
string,
{
callback: () => Promise<void>;
callback: (topic: string, data: any) => Promise<void>;
format: "buffer" | "json";
}
>();
@ -269,6 +270,15 @@ export class MQTTClientModule
protected _client: MqttClient;
protected _id: string;
protected _checkForCustomSubscription(offered) {
for (const [subscription, data] of this._subscriptions.entries()) {
if (mqttTopicAffected(subscription, offered)) {
return data;
}
}
return false;
}
/**
* Function used to connect the client.
*
@ -595,28 +605,24 @@ export class MQTTClientModule
_subscribeAllRequiredTopics(externallySubscribedByOtherSystems);
} else {
try {
const internalTopic = _this.settings
.getContent()
.MQTTTopicToInternal(mqttTopic);
if (
// Check whether the Topic contains a MQTT Wildcard
// or the client ID => if so skip that message
internalTopic.indexOf("+") === -1 &&
internalTopic.indexOf("#") === -1 &&
!internalTopic.startsWith(_this._id)
) {
// Parse the Data. Therefore it is assumed that the data is present in JSON:
const _data = JSON.parse(payload.toString("utf-8"));
const test = _this._checkForCustomSubscription(mqttTopic);
if (_this._logger.enabledFor(Logger.DEBUG)) {
_this._logger.debug(
"mqtt converted message on \"" + internalTopic + "\"",
_data
if (test) {
// There exists a custom subscription.
// Forward the content.
const data =
test.format === "json"
? JSON.parse(payload.toString("utf-8"))
: JSON.stringify(payload);
test
.callback(mqttTopic, data)
.catch((e) =>
_this._logger.error(
"Failed to content to custom subscription.",
e
)
);
}
// Forward the Message
_this._dispatcher.emit(internalTopic, _data, _this._id);
}
} catch (e) {
// Show a Hint, that parsing failed.
@ -655,7 +661,7 @@ export class MQTTClientModule
})
public subscribe(
topic: string,
callback,
callback: (topic: string, data: any | string) => Promise<void>,
options: {
qos: 0 | 1 | 2;
format: "buffer" | "json";

View File

@ -2,7 +2,7 @@
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2020-12-29 15:42:27
* @modify date 2021-01-04 14:10:14
* @modify date 2021-01-05 07:37:12
* @desc [description]
*/
@ -139,7 +139,7 @@ export interface IMQTTClientModule {
subscribe(
topic: string,
callback,
callback: (topic: string, data: any | string) => Promise<void>,
options: {
qos?: 0 | 1 | 2;
format: "buffer" | "json";