From 5abe77f92c8acebb98a0f2ba9b95be24fd93dd54 Mon Sep 17 00:00:00 2001 From: Martin Karkowski Date: Wed, 17 Nov 2021 07:27:02 +0100 Subject: [PATCH] Adding onPublishersChanged, and publishers --- lib/pubSub/nopePubSubSystem.ts | 144 ++++++++++++++----------- lib/types/nope/nopePubSub.interface.ts | 7 ++ 2 files changed, 91 insertions(+), 60 deletions(-) diff --git a/lib/pubSub/nopePubSubSystem.ts b/lib/pubSub/nopePubSubSystem.ts index 39de393..cdd3b06 100644 --- a/lib/pubSub/nopePubSubSystem.ts +++ b/lib/pubSub/nopePubSubSystem.ts @@ -49,6 +49,13 @@ export class PubSubSystemBase; + readonly onPublishersChanged: INopeEventEmitter<{ + added: string[], + removed: string[] + }>; + + readonly publishers: INopeObservable; + protected _comparePatternAndPath: TcomparePatternAndPathFunc; /** @@ -106,27 +113,72 @@ export class PubSubSystemBase { + protected _updateEmitters(beforeAdding: { + publishers: Set, + subscriptions: Set + }): void { + const afterAdding = this._determineCurrentPublishersAndSubscribers(); + const diffSubscribers = determineDifference(beforeAdding.subscriptions, afterAdding.subscriptions); + const diffPublishers = determineDifference(beforeAdding.subscriptions, afterAdding.subscriptions); + + if (diffSubscribers.removed.size > 0 || diffSubscribers.added.size > 0) { + // Update the currently used subscriptions + this.subscriptions.setContent( + Array.from(afterAdding.subscriptions) + ); + // Now emit, that there is a new subscription. + this.onSubscriptionChanged.emit({ + added: Array.from(diffSubscribers.added), + removed: Array.from(diffSubscribers.removed) + }); + } + if (diffPublishers.removed.size > 0 || diffPublishers.added.size > 0) { + // Update the currently used subscriptions + this.publishers.setContent( + Array.from(afterAdding.publishers) + ); + // Now emit, that there is a new subscription. + this.onPublishersChanged.emit({ + added: Array.from(diffPublishers.added), + removed: Array.from(diffPublishers.removed) + }); + } + } + + protected _determineCurrentPublishersAndSubscribers(): { + publishers: Set, + subscriptions: Set + } { const subscriptions = new Set(); + const publishers = new Set(); - - for (const { subTopic } of this._emitters.values()) { + for (const { subTopic, pubTopic } of this._emitters.values()) { if (subTopic) { subscriptions.add(subTopic); } + if (pubTopic) { + publishers.add(pubTopic); + } } - return subscriptions; + return { + publishers, + subscriptions + }; } public register(emitter: I, options: IPropertyOptions): O { if (!this._emitters.has(emitter as unknown as O)) { - const subscriptionsBeforeAdding = this._determineSusbcriptions(); + const beforeAdding = this._determineCurrentPublishersAndSubscribers(); let pubTopic: string | false = typeof options.topic === "string" @@ -175,50 +227,38 @@ export class PubSubSystemBase 0 || diff.added.size > 0) { - this.subscriptions.setContent(Array.from(subscriptionsAfterAdding)); - this.onSubscriptionChanged.emit({ - added: Array.from(diff.added), - removed: Array.from(diff.removed) - }); - } - - if (this._sendCurrentDataOnSubscription) { - if (containsWildcards(subTopic)) { - // This is more Complex. - this._patternbasedPullData(subTopic, null).map(item => { - // We check if the content is null - if (item.data !== null) { - (emitter as unknown as O).emit( - item.data, - { - sender: this._id, - mode: "direct", - topic: item.path - } - ); - } - }); - } else { - const currentContent = this._pullData(subTopic, null); - if (currentContent !== null) { + if (subTopic && this._sendCurrentDataOnSubscription) { + if (containsWildcards(subTopic)) { + // This is more Complex. + this._patternbasedPullData(subTopic, null).map(item => { + // We check if the content is null + if (item.data !== null) { (emitter as unknown as O).emit( - currentContent, + item.data, { sender: this._id, mode: "direct", - topic: subTopic + topic: item.path } ); } - + }); + } else { + const currentContent = this._pullData(subTopic, null); + if (currentContent !== null) { + (emitter as unknown as O).emit( + currentContent, + { + sender: this._id, + mode: "direct", + topic: subTopic + } + ); } + } } @@ -231,7 +271,7 @@ export class PubSubSystemBase 0 || diff.added.size > 0) { - // Update the currently used subscriptions - this.subscriptions.setContent( - Array.from(subscriptionsAfterAdding) - ); - // Now emit, that there is a new subscription. - this.onSubscriptionChanged.emit({ - added: Array.from(diff.added), - removed: Array.from(diff.removed) - }); - } - } - // Update the Matching Rules. this.updateMatchting(); + + this._updateEmitters(beforeAdding); + } else { throw Error("Already registered Emitter!"); } diff --git a/lib/types/nope/nopePubSub.interface.ts b/lib/types/nope/nopePubSub.interface.ts index 5944e6f..deb8dac 100644 --- a/lib/types/nope/nopePubSub.interface.ts +++ b/lib/types/nope/nopePubSub.interface.ts @@ -159,6 +159,13 @@ export interface IPubSubSystem + + readonly publishers: INopeObservable; + + readonly onPublishersChanged: INopeEventEmitter<{ + added: string[], + removed: string[], + }> } /**