fixing the internal subscriptions

This commit is contained in:
martin 2020-11-15 21:04:31 +01:00
parent 4761734d5f
commit 53ab291d25

View File

@ -7,6 +7,7 @@
*/ */
import { connect, MqttClient } from "mqtt"; import { connect, MqttClient } from "mqtt";
import { Subscription } from "rxjs";
import { Logger } from "winston"; import { Logger } from "winston";
import { exportProperty } from "../../../lib/decorators/moduleDecorators"; import { exportProperty } from "../../../lib/decorators/moduleDecorators";
import { SPLITCHAR } from "../../../lib/helpers/objectMethods"; import { SPLITCHAR } from "../../../lib/helpers/objectMethods";
@ -14,7 +15,7 @@ import { replaceAll } from "../../../lib/helpers/stringMethods";
import { getNopeLogger } from "../../../lib/logger/getLogger"; import { getNopeLogger } from "../../../lib/logger/getLogger";
import { NopeBaseModule } from "../../../lib/module/BaseModule"; import { NopeBaseModule } from "../../../lib/module/BaseModule";
import { NopeObservable } from "../../../lib/observables/nopeObservable"; import { NopeObservable } from "../../../lib/observables/nopeObservable";
import { INopeObserver } from "../../../lib/types/nope/nopeObservable.interface"; import { throttleTime } from 'rxjs/operators'
import { ISubscriptionOptions } from "../../mod-Publish-And-Subscribe-System/type/interfaces"; import { ISubscriptionOptions } from "../../mod-Publish-And-Subscribe-System/type/interfaces";
const MQTT_SUBSCRIPTIONS_TOPIC = 'mqtt/subscriptions'; const MQTT_SUBSCRIPTIONS_TOPIC = 'mqtt/subscriptions';
@ -304,7 +305,7 @@ export class MQTTClientModule extends NopeBaseModule {
} }
protected _firstConnect = true; protected _firstConnect = true;
protected _interallyPublishedTopics: Map<string,INopeObserver> = new Map(); protected _interallyPublishedTopics: Map<string,Subscription> = new Map();
protected _client: MqttClient; protected _client: MqttClient;
protected _id: string; protected _id: string;
@ -397,12 +398,38 @@ export class MQTTClientModule extends NopeBaseModule {
delimitTime: 500 delimitTime: 500
}, _subscribeOptions || {}); }, _subscribeOptions || {});
console.log('Subscribing Hier',topic) // differenciate between internal and external elements
// Therefore iterate over the elements and compare the subscribed and
// offered topic.
for (const prop of _this._registeredProperties.values()){
const _pubTopic = typeof prop.options.topic === 'string' ? prop.options.topic : prop.options.topic.publish || null;
if (_pubTopic === topic) {
// get an observable, holding the content
const observer = prop.observable.enhancedSubscription((data) => {
// Publish the Content.
_publishToMQTT(data, _this.identifier, _pubTopic);
}, _subscribeOptions.delimitTime > 0 ? {
pipe(scope, obs) {
return obs.pipe(scope.throttleTime(_subscribeOptions.delimitTime))
},
scope: {
throttleTime
}
} : {});
// Mark the Topic as subscribed.
_this._interallyPublishedTopics.set(topic, observer);
// End the call
return;
}
}
// get an observable, holding the content // get an observable, holding the content
const observer = await _this._dispatcher.subscribeToEvent(topic, (data, sender, timestamp) => { const observer = await _this._dispatcher.subscribeToEvent(topic, (data, sender, timestamp) => {
// Publish the Content. // Publish the Content.
console.log(topic,data);
_publishToMQTT(data, sender, topic); _publishToMQTT(data, sender, topic);
}, _subscribeOptions.delimitTime > 0 ? { }, _subscribeOptions.delimitTime > 0 ? {
pipe: { pipe: {
@ -410,9 +437,10 @@ export class MQTTClientModule extends NopeBaseModule {
return obs.pipe(scope.throttleTime(_subscribeOptions.delimitTime)) return obs.pipe(scope.throttleTime(_subscribeOptions.delimitTime))
}, },
scope: { scope: {
throttleTime: 0 throttleTime
} }
} },
mode: 'sync'
} : { } : {
mode: 'sync', mode: 'sync',
}) })