Adding onPublishersChanged, and publishers

This commit is contained in:
Martin Karkowski 2021-11-17 07:27:02 +01:00
parent bca4898e28
commit 5abe77f92c
2 changed files with 91 additions and 60 deletions

View File

@ -49,6 +49,13 @@ export class PubSubSystemBase<I extends INopeEventEmitter = INopeEventEmitter, O
readonly subscriptions: INopeObservable<string[]>; readonly subscriptions: INopeObservable<string[]>;
readonly onPublishersChanged: INopeEventEmitter<{
added: string[],
removed: string[]
}>;
readonly publishers: INopeObservable<string[]>;
protected _comparePatternAndPath: TcomparePatternAndPathFunc; protected _comparePatternAndPath: TcomparePatternAndPathFunc;
/** /**
@ -106,27 +113,72 @@ export class PubSubSystemBase<I extends INopeEventEmitter = INopeEventEmitter, O
this.subscriptions = new NopeObservable(); this.subscriptions = new NopeObservable();
this.subscriptions.setContent([]); this.subscriptions.setContent([]);
this.publishers = new NopeObservable();
this.publishers.setContent([]);
this.onIncrementalDataChange = new NopeEventEmitter(); this.onIncrementalDataChange = new NopeEventEmitter();
this.onSubscriptionChanged = new NopeEventEmitter(); this.onSubscriptionChanged = new NopeEventEmitter();
this.onPublishersChanged = new NopeEventEmitter();
} }
protected _determineSusbcriptions(): Set<string> { protected _updateEmitters(beforeAdding: {
publishers: Set<string>,
subscriptions: Set<string>
}): void {
const afterAdding = this._determineCurrentPublishersAndSubscribers();
const diffSubscribers = determineDifference(beforeAdding.subscriptions, afterAdding.subscriptions);
const diffPublishers = determineDifference(beforeAdding.subscriptions, afterAdding.subscriptions);
if (diffSubscribers.removed.size > 0 || diffSubscribers.added.size > 0) {
// Update the currently used subscriptions
this.subscriptions.setContent(
Array.from(afterAdding.subscriptions)
);
// Now emit, that there is a new subscription.
this.onSubscriptionChanged.emit({
added: Array.from(diffSubscribers.added),
removed: Array.from(diffSubscribers.removed)
});
}
if (diffPublishers.removed.size > 0 || diffPublishers.added.size > 0) {
// Update the currently used subscriptions
this.publishers.setContent(
Array.from(afterAdding.publishers)
);
// Now emit, that there is a new subscription.
this.onPublishersChanged.emit({
added: Array.from(diffPublishers.added),
removed: Array.from(diffPublishers.removed)
});
}
}
protected _determineCurrentPublishersAndSubscribers(): {
publishers: Set<string>,
subscriptions: Set<string>
} {
const subscriptions = new Set<string>(); const subscriptions = new Set<string>();
const publishers = new Set<string>();
for (const { subTopic, pubTopic } of this._emitters.values()) {
for (const { subTopic } of this._emitters.values()) {
if (subTopic) { if (subTopic) {
subscriptions.add(subTopic); subscriptions.add(subTopic);
} }
if (pubTopic) {
publishers.add(pubTopic);
}
} }
return subscriptions; return {
publishers,
subscriptions
};
} }
public register(emitter: I, options: IPropertyOptions): O { public register(emitter: I, options: IPropertyOptions): O {
if (!this._emitters.has(emitter as unknown as O)) { if (!this._emitters.has(emitter as unknown as O)) {
const subscriptionsBeforeAdding = this._determineSusbcriptions(); const beforeAdding = this._determineCurrentPublishersAndSubscribers();
let pubTopic: string | false = let pubTopic: string | false =
typeof options.topic === "string" typeof options.topic === "string"
@ -175,50 +227,38 @@ export class PubSubSystemBase<I extends INopeEventEmitter = INopeEventEmitter, O
}); });
} }
this._updateEmitters(beforeAdding);
// Now, if required, add the Data to the emitter. // Now, if required, add the Data to the emitter.
if (subTopic) { if (subTopic && this._sendCurrentDataOnSubscription) {
if (containsWildcards(subTopic)) {
const subscriptionsAfterAdding = this._determineSusbcriptions(); // This is more Complex.
const diff = determineDifference(subscriptionsBeforeAdding, subscriptionsAfterAdding); this._patternbasedPullData(subTopic, null).map(item => {
// We check if the content is null
if (diff.removed.size > 0 || diff.added.size > 0) { if (item.data !== null) {
this.subscriptions.setContent(Array.from(subscriptionsAfterAdding));
this.onSubscriptionChanged.emit({
added: Array.from(diff.added),
removed: Array.from(diff.removed)
});
}
if (this._sendCurrentDataOnSubscription) {
if (containsWildcards(subTopic)) {
// This is more Complex.
this._patternbasedPullData(subTopic, null).map(item => {
// We check if the content is null
if (item.data !== null) {
(emitter as unknown as O).emit(
item.data,
{
sender: this._id,
mode: "direct",
topic: item.path
}
);
}
});
} else {
const currentContent = this._pullData(subTopic, null);
if (currentContent !== null) {
(emitter as unknown as O).emit( (emitter as unknown as O).emit(
currentContent, item.data,
{ {
sender: this._id, sender: this._id,
mode: "direct", mode: "direct",
topic: subTopic topic: item.path
} }
); );
} }
});
} else {
const currentContent = this._pullData(subTopic, null);
if (currentContent !== null) {
(emitter as unknown as O).emit(
currentContent,
{
sender: this._id,
mode: "direct",
topic: subTopic
}
);
} }
} }
} }
@ -231,7 +271,7 @@ export class PubSubSystemBase<I extends INopeEventEmitter = INopeEventEmitter, O
public updateOptions(emitter: I, options: IPropertyOptions): void { public updateOptions(emitter: I, options: IPropertyOptions): void {
if (this._emitters.has(emitter as unknown as O)) { if (this._emitters.has(emitter as unknown as O)) {
const subscriptionsBeforeAdding = this._determineSusbcriptions(); const beforeAdding = this._determineCurrentPublishersAndSubscribers();
const pubTopic = const pubTopic =
typeof options.topic === "string" typeof options.topic === "string"
@ -249,29 +289,13 @@ export class PubSubSystemBase<I extends INopeEventEmitter = INopeEventEmitter, O
data.subTopic = subTopic; data.subTopic = subTopic;
data.pubTopic = pubTopic; data.pubTopic = pubTopic;
this._emitters.set(emitter as unknown as O, data); this._emitters.set(emitter as unknown as O, data);
if (subTopic) {
const subscriptionsAfterAdding = this._determineSusbcriptions();
const diff = determineDifference(subscriptionsBeforeAdding, subscriptionsAfterAdding);
if (diff.removed.size > 0 || diff.added.size > 0) {
// Update the currently used subscriptions
this.subscriptions.setContent(
Array.from(subscriptionsAfterAdding)
);
// Now emit, that there is a new subscription.
this.onSubscriptionChanged.emit({
added: Array.from(diff.added),
removed: Array.from(diff.removed)
});
}
}
// Update the Matching Rules. // Update the Matching Rules.
this.updateMatchting(); this.updateMatchting();
this._updateEmitters(beforeAdding);
} else { } else {
throw Error("Already registered Emitter!"); throw Error("Already registered Emitter!");
} }

View File

@ -159,6 +159,13 @@ export interface IPubSubSystem<I extends INopeEventEmitter = INopeEventEmitter,
added: string[], added: string[],
removed: string[], removed: string[],
}> }>
readonly publishers: INopeObservable<string[]>;
readonly onPublishersChanged: INopeEventEmitter<{
added: string[],
removed: string[],
}>
} }
/** /**