Adding the events, to know, which data is present

This commit is contained in:
Martin Karkowski 2021-11-16 22:15:39 +01:00
parent a2054a6a3e
commit bca4898e28
4 changed files with 135 additions and 29 deletions

View File

@ -223,6 +223,36 @@ describe("PubSubSystemBase", function () {
publisher.emit("Hello World!");
});
it("Testing Emitters", (done) => {
pubSubSystem.onSubscriptionChanged.subscribe(data => {
if (data.added.includes("this/#")) {
done();
}
});
pubSubSystem.register(subscriber, {
mode: "subscribe",
schema: {},
topic: "this/#"
});
});
it("Testing Emitters", (done) => {
pubSubSystem.subscriptions.subscribe(data => {
if (data.includes("this/#")) {
done();
}
}, { skipCurrent: true });
pubSubSystem.register(subscriber, {
mode: "subscribe",
schema: {},
topic: "this/#"
});
});
it("throw Error on multi registering", (done) => {
const error = new Error("Error not thrown");
try {

View File

@ -2,16 +2,18 @@
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2021-11-12 12:25:30
* @modify date 2021-11-16 20:55:27
* @modify date 2021-11-16 22:13:11
* @desc [description]
*/
import { memoize } from "lodash";
import { NopeEventEmitter } from "../eventEmitter/index";
import { NopeEventEmitter } from "../eventEmitter/nopeEventEmitter";
import { generateId } from "../helpers/idMethods";
import { deepClone, flattenObject, rgetattr, rsetattr } from "../helpers/objectMethods";
import { comparePatternAndPath as _comparePatternAndPath, containsWildcards, TcomparePatternAndPathFunc } from "../helpers/pathMatchingMethods";
import { IEventAdditionalData, IEventCallback, INopeEventEmitter, INopeObserver, INopeTopic, IPropertyOptions, IPubSubSystem, TPubSubOptions } from "../types/nope/index";
import { determineDifference } from "../helpers/setMethods";
import { NopeObservable } from "../observables/nopeObservable";
import { IEventAdditionalData, IEventCallback, INopeEventEmitter, INopeObservable, INopeObserver, INopeTopic, IPropertyOptions, IPubSubSystem, TPubSubOptions } from "../types/nope/index";
type TMatchting<O extends INopeTopic = INopeTopic> = {
@ -35,10 +37,17 @@ export class PubSubSystemBase<I extends INopeEventEmitter = INopeEventEmitter, O
return deepClone(this._options);
}
readonly incrementalDataChange: INopeEventEmitter<{
readonly onIncrementalDataChange: INopeEventEmitter<{
path: string,
data: unknown
}> = new NopeEventEmitter();
}>;
readonly onSubscriptionChanged: INopeEventEmitter<{
added: string[],
removed: string[]
}>;
readonly subscriptions: INopeObservable<string[]>;
protected _comparePatternAndPath: TcomparePatternAndPathFunc;
@ -93,11 +102,32 @@ export class PubSubSystemBase<I extends INopeEventEmitter = INopeEventEmitter, O
return `${pattern}-${path}`;
}
);
this.subscriptions = new NopeObservable();
this.subscriptions.setContent([]);
this.onIncrementalDataChange = new NopeEventEmitter();
this.onSubscriptionChanged = new NopeEventEmitter();
}
protected _determineSusbcriptions(): Set<string> {
const subscriptions = new Set<string>();
for (const { subTopic } of this._emitters.values()) {
if (subTopic) {
subscriptions.add(subTopic);
}
}
return subscriptions;
}
public register(emitter: I, options: IPropertyOptions): O {
if (!this._emitters.has(emitter as unknown as O)) {
const subscriptionsBeforeAdding = this._determineSusbcriptions();
let pubTopic: string | false =
typeof options.topic === "string"
? options.topic
@ -146,35 +176,49 @@ export class PubSubSystemBase<I extends INopeEventEmitter = INopeEventEmitter, O
}
// Now, if required, add the Data to the emitter.
if (subTopic && this._sendCurrentDataOnSubscription) {
if (containsWildcards(subTopic)) {
// This is more Complex.
this._patternbasedPullData(subTopic, null).map(item => {
// We check if the content is null
if (item.data !== null) {
if (subTopic) {
const subscriptionsAfterAdding = this._determineSusbcriptions();
const diff = determineDifference(subscriptionsBeforeAdding, subscriptionsAfterAdding);
if (diff.removed.size > 0 || diff.added.size > 0) {
this.subscriptions.setContent(Array.from(subscriptionsAfterAdding));
this.onSubscriptionChanged.emit({
added: Array.from(diff.added),
removed: Array.from(diff.removed)
});
}
if (this._sendCurrentDataOnSubscription) {
if (containsWildcards(subTopic)) {
// This is more Complex.
this._patternbasedPullData(subTopic, null).map(item => {
// We check if the content is null
if (item.data !== null) {
(emitter as unknown as O).emit(
item.data,
{
sender: this._id,
mode: "direct",
topic: item.path
}
);
}
});
} else {
const currentContent = this._pullData(subTopic, null);
if (currentContent !== null) {
(emitter as unknown as O).emit(
item.data,
currentContent,
{
sender: this._id,
mode: "direct",
topic: item.path
topic: subTopic
}
);
}
});
} else {
const currentContent = this._pullData(subTopic, null);
if (currentContent !== null) {
(emitter as unknown as O).emit(
currentContent,
{
sender: this._id,
mode: "direct",
topic: subTopic
}
);
}
}
}
}
@ -187,6 +231,8 @@ export class PubSubSystemBase<I extends INopeEventEmitter = INopeEventEmitter, O
public updateOptions(emitter: I, options: IPropertyOptions): void {
if (this._emitters.has(emitter as unknown as O)) {
const subscriptionsBeforeAdding = this._determineSusbcriptions();
const pubTopic =
typeof options.topic === "string"
? options.topic
@ -203,8 +249,27 @@ export class PubSubSystemBase<I extends INopeEventEmitter = INopeEventEmitter, O
data.subTopic = subTopic;
data.pubTopic = pubTopic;
this._emitters.set(emitter as unknown as O, data);
if (subTopic) {
const subscriptionsAfterAdding = this._determineSusbcriptions();
const diff = determineDifference(subscriptionsBeforeAdding, subscriptionsAfterAdding);
if (diff.removed.size > 0 || diff.added.size > 0) {
// Update the currently used subscriptions
this.subscriptions.setContent(
Array.from(subscriptionsAfterAdding)
);
// Now emit, that there is a new subscription.
this.onSubscriptionChanged.emit({
added: Array.from(diff.added),
removed: Array.from(diff.removed)
});
}
}
// Update the Matching Rules.
this.updateMatchting();
} else {
@ -414,7 +479,7 @@ export class PubSubSystemBase<I extends INopeEventEmitter = INopeEventEmitter, O
}
// Emit the data change.
this.incrementalDataChange.emit({
this.onIncrementalDataChange.emit({
path,
data
});

View File

@ -7,6 +7,7 @@
*/
import { ILogger } from "js-logger";
import { IDataPubSubSystem, IPubSubSystem } from ".";
import {
ICallOptions,
ICommunicationBridge,
@ -275,6 +276,9 @@ export interface INopeDispatcher {
*/
readonly communicator: ICommunicationBridge;
readonly eventDistributor: IPubSubSystem;
readonly propertyDistributor: IDataPubSubSystem;
/**
* ID of the Dispatcher
*

View File

@ -94,7 +94,7 @@ export interface IPubSubSystem<I extends INopeEventEmitter = INopeEventEmitter,
* @return {*} {O}
* @memberof IPubSubSystem
*/
register(emitter: I, options: IPropertyOptions): O;
register(emitter: I, options: IPropertyOptions,): O;
/**
* Function to update the options and there by the topics of an observable.
@ -148,10 +148,17 @@ export interface IPubSubSystem<I extends INopeEventEmitter = INopeEventEmitter,
* }>}
* @memberof IPubSubSystem
*/
readonly incrementalDataChange: INopeEventEmitter<{
readonly onIncrementalDataChange: INopeEventEmitter<{
path: string,
data: unknown
}>
readonly subscriptions: INopeObservable<string[]>;
readonly onSubscriptionChanged: INopeEventEmitter<{
added: string[],
removed: string[],
}>
}
/**