From 53ab291d25054374a0721af80e66ec583f582444 Mon Sep 17 00:00:00 2001 From: martin Date: Sun, 15 Nov 2020 21:04:31 +0100 Subject: [PATCH] fixing the internal subscriptions --- .../mqtt-connector/src/mqtt.client.module.ts | 40 ++++++++++++++++--- 1 file changed, 34 insertions(+), 6 deletions(-) diff --git a/modules/mqtt-connector/src/mqtt.client.module.ts b/modules/mqtt-connector/src/mqtt.client.module.ts index 7704c1f..f68849a 100644 --- a/modules/mqtt-connector/src/mqtt.client.module.ts +++ b/modules/mqtt-connector/src/mqtt.client.module.ts @@ -7,6 +7,7 @@ */ import { connect, MqttClient } from "mqtt"; +import { Subscription } from "rxjs"; import { Logger } from "winston"; import { exportProperty } from "../../../lib/decorators/moduleDecorators"; import { SPLITCHAR } from "../../../lib/helpers/objectMethods"; @@ -14,7 +15,7 @@ import { replaceAll } from "../../../lib/helpers/stringMethods"; import { getNopeLogger } from "../../../lib/logger/getLogger"; import { NopeBaseModule } from "../../../lib/module/BaseModule"; import { NopeObservable } from "../../../lib/observables/nopeObservable"; -import { INopeObserver } from "../../../lib/types/nope/nopeObservable.interface"; +import { throttleTime } from 'rxjs/operators' import { ISubscriptionOptions } from "../../mod-Publish-And-Subscribe-System/type/interfaces"; const MQTT_SUBSCRIPTIONS_TOPIC = 'mqtt/subscriptions'; @@ -304,7 +305,7 @@ export class MQTTClientModule extends NopeBaseModule { } protected _firstConnect = true; - protected _interallyPublishedTopics: Map = new Map(); + protected _interallyPublishedTopics: Map = new Map(); protected _client: MqttClient; protected _id: string; @@ -397,12 +398,38 @@ export class MQTTClientModule extends NopeBaseModule { delimitTime: 500 }, _subscribeOptions || {}); - console.log('Subscribing Hier',topic) + // differenciate between internal and external elements + // Therefore iterate over the elements and compare the subscribed and + // offered topic. + for (const prop of _this._registeredProperties.values()){ + + const _pubTopic = typeof prop.options.topic === 'string' ? prop.options.topic : prop.options.topic.publish || null; + + if (_pubTopic === topic) { + // get an observable, holding the content + const observer = prop.observable.enhancedSubscription((data) => { + // Publish the Content. + _publishToMQTT(data, _this.identifier, _pubTopic); + }, _subscribeOptions.delimitTime > 0 ? { + pipe(scope, obs) { + return obs.pipe(scope.throttleTime(_subscribeOptions.delimitTime)) + }, + scope: { + throttleTime + } + } : {}); + + // Mark the Topic as subscribed. + _this._interallyPublishedTopics.set(topic, observer); + + // End the call + return; + } + } // get an observable, holding the content const observer = await _this._dispatcher.subscribeToEvent(topic, (data, sender, timestamp) => { // Publish the Content. - console.log(topic,data); _publishToMQTT(data, sender, topic); }, _subscribeOptions.delimitTime > 0 ? { pipe: { @@ -410,9 +437,10 @@ export class MQTTClientModule extends NopeBaseModule { return obs.pipe(scope.throttleTime(_subscribeOptions.delimitTime)) }, scope: { - throttleTime: 0 + throttleTime } - } + }, + mode: 'sync' } : { mode: 'sync', })