Fixing PubSub

This commit is contained in:
Martin Karkowski 2021-11-16 10:35:06 +01:00
parent b47d0cb772
commit 65dbe90870
4 changed files with 352 additions and 10 deletions

View File

@ -421,6 +421,8 @@ describe("DataPubSubSystemBase", function () {
} catch (e) {
done(e);
}
}, {
skipCurrent: true
});
pubSubSystem.pushData("this/is", "a test");
});

View File

@ -10,7 +10,7 @@ import { memoize } from "lodash";
import { generateId } from "../helpers/idMethods";
import { deepClone, flattenObject, rgetattr, rsetattr } from "../helpers/objectMethods";
import { comparePatternAndPath as _comparePatternAndPath, containsWildcards, TcomparePatternAndPathFunc } from "../helpers/pathMatchingMethods";
import { IEventAdditionalData, IEventCallback, INopeEventEmitter, INopeObserver, INopeTopic, IPropertyOptions, IPubSubSystem } from "../types/nope/index";
import { IEventAdditionalData, IEventCallback, INopeEventEmitter, INopeObserver, INopeTopic, IPropertyOptions, IPubSubSystem, TPubSubOptions } from "../types/nope/index";
type TMatchting<O extends INopeTopic = INopeTopic> = {
// Contains subscriptions, that can get there data
@ -23,8 +23,19 @@ type TMatchting<O extends INopeTopic = INopeTopic> = {
export class PubSubSystemBase<I extends INopeEventEmitter = INopeEventEmitter, O extends INopeTopic = INopeTopic> implements IPubSubSystem<I, O> {
public options = {
extractPatternForSubscriptions: true
public _options: TPubSubOptions = {
mqttBasedPatternSubscriptions: true,
forwardChildData: true,
forwardParentData: true
}
public set options(options: Partial<TPubSubOptions>) {
this._options = Object.assign(this._options, options);
this.updateMatchting();
}
public get options(): TPubSubOptions {
return deepClone(this._options);
}
protected _comparePatternAndPath: TcomparePatternAndPathFunc;
@ -286,6 +297,13 @@ export class PubSubSystemBase<I extends INopeEventEmitter = INopeEventEmitter, O
if (result.affected) {
if (
(result.affectedByChild && !this._options.forwardChildData) ||
(result.affectedByParent && !this._options.forwardParentData)
) {
continue;
}
// We now have match the topic as following described:
// 1) subscription contains a pattern
// - dircet change (same-level) => content
@ -296,7 +314,7 @@ export class PubSubSystemBase<I extends INopeEventEmitter = INopeEventEmitter, O
// - direct change (topic = path) => content
// - parent based change => a super change
if (result.containsWildcards) {
if (this.options.extractPatternForSubscriptions) {
if (this._options.mqttBasedPatternSubscriptions) {
if (result.patternToExtractData) {
this.__addToMatchingStructure("dataQuery", topicOfChange, result.patternToExtractData, emitter);
} else if (result.pathToExtractData) {
@ -329,7 +347,7 @@ export class PubSubSystemBase<I extends INopeEventEmitter = INopeEventEmitter, O
* @param {*} content
* @memberof PubSubSystemBase
*/
protected _notify(topic: string, content, options: IEventAdditionalData, _exclusiveEmitter: O = null): void {
protected _notify(topic: string, options: IEventAdditionalData, _exclusiveEmitter: O = null): void {
// Check whether a Matching exists for this
// Topic, if not add it.
if (!this._matched.has(topic)) {
@ -362,7 +380,6 @@ export class PubSubSystemBase<I extends INopeEventEmitter = INopeEventEmitter, O
// Performe the direct Matches
for (const [_pattern, _emitters] of referenceToMatch.dataQuery.entries()) {
for (const _emitter of _emitters) {
// Get a new copy for every element.
@ -386,11 +403,12 @@ export class PubSubSystemBase<I extends INopeEventEmitter = INopeEventEmitter, O
protected _pushData<T = unknown>(path: string, content: T, options: IEventAdditionalData = {}): void {
if (containsWildcards(path)) {
throw ("The Path contains wildcards. Please use the method \"patternbasedPullData\" instead");
} else if (path === "") {
this._data = deepClone(content);
this._notify(path, options);
} else {
const _data = deepClone(content);
rsetattr(this._data, path, _data);
this._notify(path, _data, options);
rsetattr(this._data, path, deepClone(content));
this._notify(path, options);
}
}

View File

@ -0,0 +1,156 @@
/**
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2020-10-12 18:31:24
* @modify date 2021-10-19 16:23:53
* @desc [description]
*/
import {
Subscription
} from "rxjs";
export type IWaitForCallback<T> = (
content?: T | null,
sender?: string,
timeStamp?: number | null,
...data
) => boolean | Promise<boolean>;
export interface INopeWaitForEventOptions {
subscriptionMode?: "immediate" | "sync";
triggerMode?: Array<"sub" | "super" | "direct">;
}
export interface INopeObserver extends Subscription {
options: INopeSubscriptionOptions;
pause(): void;
unpause(): void;
}
export interface INopeSubscriptionOptions {
type?: "immediate" | "sync";
mode?: Array<"sub" | "super" | "direct">;
skipCurrent?: boolean
}
export type IEventCallback<T = unknown, CO = IEventAdditionalData> = (
content: T | null,
options: CO
) => void;
export interface IEventAdditionalData {
sender?: string,
timestamp?: number,
forced?: boolean,
args?: any[]
}
/**
* RsJX based EventEmitter.
*
* Contains additional Functionalities like:
* - property with the current value
* - function to publish values. (wrapper for next)
* - enables performing a subscription with synced call or a immediate call.
*/
export interface INopeEventEmitter<T = unknown, S = T, G = T, CO extends IEventAdditionalData = IEventAdditionalData> {
/**
* An id of the Observable.
*/
readonly id: string;
/**
* options.
*/
options: any;
/**
* Function to specify a Setter
*/
setter:
| ((
value: S | null,
sender: string | null,
timeStamp: number | null,
...data
) => {
data: T | null;
valid: boolean;
})
| null;
/**
* Function to update the Content
* @param value The content
* @param sender A sender, how has updated the Content (Optional)
* @param timeStamp Timestampt of the Update (optional)
* @param data
*/
emit(
value: S | null,
options?: CO
): boolean;
/**
* Flag to Disable Publishing
*/
disablePublishing: boolean;
/**
* Function, used to dispose the observable.
* Every item will be unsubscribed.
*/
dispose(): void;
/**
* A Function to subscribe to updates of the Observable.
* @param listener The Listener, which will receive an update.
* @param mode The Mode of the Subscription
* @param options Additional Options.
*/
subscribe(
listener: IEventCallback<G, CO>,
options?: INopeSubscriptionOptions
): INopeObserver;
/**
* Creates a Subscription for the value of the Observable. After one Update the Value will be deleted
* @param func Function which is called when new Datas are pushed
* @param mode Mode of the Subscription
* @param options Additional Options
*/
once(
func: IEventCallback<G, CO>,
options?: INopeSubscriptionOptions
): INopeObserver;
/**
* Async Function to Wait for an Update
* @param mode Mode of the Subscription
* @param options Additional Options for the Wait Function.
*/
waitFor(
testCallback?: IWaitForCallback<G>,
options?: INopeWaitForEventOptions
): Promise<G>;
/**
* Async Function to Wait for an Update
* @param mode Mode of the Subscription
* @param options Additional Options for the Wait Function.
*/
waitForUpdate(
options?: INopeSubscriptionOptions
): Promise<G>;
/**
* Flag, showing if there exists any subscription this particular observer.
*/
readonly hasSubscriptions: boolean;
readonly observerLength: number;
}

View File

@ -0,0 +1,166 @@
/**
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2021-11-12 17:33:12
* @modify date 2021-11-12 17:33:12
* @desc [description]
*/
import { IEventAdditionalData, IEventCallback, INopeEventEmitter, INopeObserver } from "./nopeEventEmitter.interface";
import { IPropertyOptions } from "./nopeModule.interface";
import { INopeObservable } from "./nopeObservable.interface";
export interface ITopicSetContentOptions extends IEventAdditionalData {
mode: "sub" | "super" | "direct",
topic: string
}
export type INopeTopic<T = any, S = T, G = T> = INopeEventEmitter<T, S, G, ITopicSetContentOptions>
export type INopeTopicWithDirectAccess<T = any, S = T, G = T> = INopeObservable<T, S, G, ITopicSetContentOptions>
export interface IPubSubSystemConstructor {
new(_generateObservable: <T>() => INopeEventEmitter<T>): IPubSubSystem
}
export interface TPubSubOptions {
/**
* Handles pattern like subscriptions like on mqtt.
*
*
* @author M.Karkowski
* @type {boolean}
* @memberof TPubSubOptions
*/
mqttBasedPatternSubscriptions?: boolean,
forwardChildData?: boolean,
forwardParentData?: boolean
}
/**
*
*
* @author M.Karkowski
* @export
* @class PubSubSystem
*/
export interface IPubSubSystem<I extends INopeEventEmitter = INopeEventEmitter, O extends INopeTopic = INopeTopic> {
options: TPubSubOptions
/**
* Function to register an Observable. Please define the Options, to decide
* whether the data of the observable should be published or subscribed.
*
* @author M.Karkowski
* @param {I} emitter The Emitter to consider
* @param {IPropertyOptions} options
* @return {*} {O}
* @memberof IPubSubSystem
*/
register(emitter: I, options: IPropertyOptions): O;
/**
* Function to update the options and there by the topics of an observable.
*
* @author M.Karkowski
* @param {I} emitter The Emitter to consider
* @param {IPropertyOptions} options The modified options
* @memberof IPubSubSystem
*/
updateOptions(emitter: I, options: IPropertyOptions): void;
/**
* Removes an observable of the Pub-Sub-System.
*
* @author M.Karkowski
* @param {I} observable
* @return {*} {boolean}
* @memberof IPubSubSystem
*/
unregister(emitter: I): boolean;
/**
*
*
* @author M.Karkowski
* @template T
* @param {string} topic
* @param {IEventCallback<T>} subscription
* @return {*} {INopeObserver}
* @memberof IPubSubSystem
*/
registerSubscription<T = unknown>(topic: string, subscription: IEventCallback<T>): INopeObserver;
/**
* Helper to manually Trigger an update of the Matching
*
* @author M.Karkowski
* @memberof IPubSubSystem
*/
updateMatchting(): void;
}
/**
*
*
* @author M.Karkowski
* @export
* @class PubSubSystem
*/
export interface IDataPubSubSystem extends IPubSubSystem<INopeObservable, INopeTopicWithDirectAccess> {
/**
* A Getter to return a COPY of the item. Outside of the system,
* you'll never receive the original object.
*
* @author M.Karkowski
* @readonly
* @type {unknown}
* @memberof IPubSubSystem
*/
data: unknown;
/**
*
*
* @author M.Karkowski
* @template T
* @param {string} path
* @param {T} content
* @param {IEventAdditionalData} options
* @memberof IPubSubSystem
*/
pushData<T = unknown>(path: string, content: T, options: IEventAdditionalData): void;
/**
* Pull some Data of System. You will allways receive a just a copy. This method prevents you
* to use a pattern like path. If you want to use patterns please use the "patternbasedPullData"
*
* @author M.Karkowski
* @template T Expected Type of the return. Defaults to unkown
* @template D Default Value.
* @param {string} topic
* @param {D} _default If no data is found => return the default data.
* @return {*} {T}
* @memberof IPubSubSystem
*/
pullData<T = unknown, D = null>(topic: string, _default: D): T;
/**
* A Pattern based Pull. You cann provide data with the
*
* @author M.Karkowski
* @template T
* @template D
* @param {string} pattern
* @param {D} _default
* @return {*} {{ path: string, data: T }[]}
* @memberof IPubSubSystem
*/
patternbasedPullData<T = unknown, D = null>(pattern: string, _default: D): { path: string, data: T }[];
/** Patternbased */
patternBasedPush<T = unknown>(pattern: string, content: T, options: IEventAdditionalData): void;
}