From 6424fd9eae0a9c2a1b405fec0f54ed40d89b9648 Mon Sep 17 00:00:00 2001 From: Martin Karkowski Date: Tue, 5 Jan 2021 07:38:28 +0100 Subject: [PATCH] Fixing custom subscription --- .../mqtt-connector/src/mqtt.client.module.ts | 52 +++++++++++-------- modules/mqtt-connector/types/interfaces.ts | 4 +- 2 files changed, 31 insertions(+), 25 deletions(-) diff --git a/modules/mqtt-connector/src/mqtt.client.module.ts b/modules/mqtt-connector/src/mqtt.client.module.ts index 4f4626f..ce7d364 100644 --- a/modules/mqtt-connector/src/mqtt.client.module.ts +++ b/modules/mqtt-connector/src/mqtt.client.module.ts @@ -2,7 +2,7 @@ * @author Martin Karkowski * @email m.karkowski@zema.de * @create date 2020-11-13 09:53:27 - * @modify date 2021-01-04 17:35:12 + * @modify date 2021-01-05 07:37:44 * @desc [description] */ @@ -21,6 +21,7 @@ import { InjectableNopeBaseModule } from "../../../lib/module/BaseModule.injecta import { NopeObservable } from "../../../lib/observables/nopeObservable"; import { ISubscriptionOptions } from "../../mod-Publish-And-Subscribe-System/type/interfaces"; import { IMQTTClientModule, IMqttSettings } from "../types/interfaces"; +import { mqttTopicAffected } from "./mqtt.functions"; const MQTT_SUBSCRIPTIONS_TOPIC = "mqtt/subscriptions"; @@ -111,7 +112,7 @@ export class MQTTClientModule private _subscriptions = new Map< string, { - callback: () => Promise; + callback: (topic: string, data: any) => Promise; format: "buffer" | "json"; } >(); @@ -269,6 +270,15 @@ export class MQTTClientModule protected _client: MqttClient; protected _id: string; + protected _checkForCustomSubscription(offered) { + for (const [subscription, data] of this._subscriptions.entries()) { + if (mqttTopicAffected(subscription, offered)) { + return data; + } + } + return false; + } + /** * Function used to connect the client. * @@ -595,28 +605,24 @@ export class MQTTClientModule _subscribeAllRequiredTopics(externallySubscribedByOtherSystems); } else { try { - const internalTopic = _this.settings - .getContent() - .MQTTTopicToInternal(mqttTopic); - if ( - // Check whether the Topic contains a MQTT Wildcard - // or the client ID => if so skip that message - internalTopic.indexOf("+") === -1 && - internalTopic.indexOf("#") === -1 && - !internalTopic.startsWith(_this._id) - ) { - // Parse the Data. Therefore it is assumed that the data is present in JSON: - const _data = JSON.parse(payload.toString("utf-8")); + const test = _this._checkForCustomSubscription(mqttTopic); - if (_this._logger.enabledFor(Logger.DEBUG)) { - _this._logger.debug( - "mqtt converted message on \"" + internalTopic + "\"", - _data + if (test) { + // There exists a custom subscription. + // Forward the content. + const data = + test.format === "json" + ? JSON.parse(payload.toString("utf-8")) + : JSON.stringify(payload); + + test + .callback(mqttTopic, data) + .catch((e) => + _this._logger.error( + "Failed to content to custom subscription.", + e + ) ); - } - - // Forward the Message - _this._dispatcher.emit(internalTopic, _data, _this._id); } } catch (e) { // Show a Hint, that parsing failed. @@ -655,7 +661,7 @@ export class MQTTClientModule }) public subscribe( topic: string, - callback, + callback: (topic: string, data: any | string) => Promise, options: { qos: 0 | 1 | 2; format: "buffer" | "json"; diff --git a/modules/mqtt-connector/types/interfaces.ts b/modules/mqtt-connector/types/interfaces.ts index b6e972e..52cbc28 100644 --- a/modules/mqtt-connector/types/interfaces.ts +++ b/modules/mqtt-connector/types/interfaces.ts @@ -2,7 +2,7 @@ * @author Martin Karkowski * @email m.karkowski@zema.de * @create date 2020-12-29 15:42:27 - * @modify date 2021-01-04 14:10:14 + * @modify date 2021-01-05 07:37:12 * @desc [description] */ @@ -139,7 +139,7 @@ export interface IMQTTClientModule { subscribe( topic: string, - callback, + callback: (topic: string, data: any | string) => Promise, options: { qos?: 0 | 1 | 2; format: "buffer" | "json";