nope/lib/pubSub/nopePubSubSystem.ts
2022-01-10 07:52:05 +01:00

658 lines
18 KiB
TypeScript

/**
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2021-11-12 12:25:30
* @modify date 2022-01-06 09:54:44
* @desc [description]
*/
import { memoize } from "lodash";
import { NopeEventEmitter } from "../eventEmitter/nopeEventEmitter";
import { generateId } from "../helpers/idMethods";
import { MapBasedMergeData } from "../helpers/mergedData";
import {
deepClone,
flattenObject,
rgetattr,
rsetattr,
} from "../helpers/objectMethods";
import {
comparePatternAndPath as _comparePatternAndPath,
containsWildcards,
TcomparePatternAndPathFunc,
} from "../helpers/pathMatchingMethods";
import {
IEventAdditionalData,
IEventCallback,
IEventOptions,
INopeDescriptor,
INopeEventEmitter,
INopeObserver,
INopeTopic,
IPubSubSystem,
IPubSubOptions,
IIncrementalChange,
} from "../types/nope/index";
import { IMapBasedMergeData } from "../types/nope/nopeHelpers.interface";
type TMatchting<O extends INopeTopic = INopeTopic> = {
// Contains subscriptions, that can get there data
// by pulling.
dataPull: Map<string, O[]>;
// Contains subscriptions, which must be pulled
dataQuery: Map<string, O[]>;
};
export class PubSubSystemBase<
CO extends IEventAdditionalData = IEventAdditionalData,
I extends INopeEventEmitter = INopeEventEmitter<any, any, any, CO>,
O extends INopeTopic = INopeTopic
> implements IPubSubSystem<CO, I, O>
{
public _options: IPubSubOptions = {
mqttBasedPatternSubscriptions: true,
forwardChildData: true,
forwardParentData: true,
};
// See interface description
public get options(): IPubSubOptions {
return deepClone(this._options) as any;
}
// See interface description
readonly onIncrementalDataChange: INopeEventEmitter<IIncrementalChange>;
// See interface description
readonly subscriptions: IMapBasedMergeData<string>;
// See interface description
readonly publishers: IMapBasedMergeData<string>;
protected _comparePatternAndPath: TcomparePatternAndPathFunc;
/**
* The internal used object to store the data.
*
* @author M.Karkowski
* @type {unknown}
* @memberof PubSubSystemBase
*/
protected _data: unknown = {};
protected _sendCurrentDataOnSubscription = false;
protected _id = generateId();
/**
* List of all Properties. For every property, we store the
* PropertyOptions. Then, we know, what elements should be
* subscribed and which not.
*
* @author M.Karkowski
* @protected
* @memberof PubSubSystemBase
*/
protected _emitters = new Map<
O,
{
options: IEventOptions;
subTopic: string | false;
pubTopic: string | false;
callback?: IEventCallback<unknown, CO>;
}
>();
protected _matched = new Map<string, TMatchting>();
protected _generateEmitterType: () => I;
public constructor(
options: Partial<IPubSubOptions> & {
generateEmitterType?: () => I;
} = {}
) {
this._options = Object.assign(
{
mqttBasedPatternSubscriptions: true,
forwardChildData: true,
forwardParentData: true,
matchTopicsWithoutWildcards: true,
} as IPubSubOptions,
options
);
this._generateEmitterType =
options.generateEmitterType ||
(() => {
return new NopeEventEmitter() as any as I;
});
// Create a memoized function for the pattern matching (its way faster)
this._comparePatternAndPath = memoize(
(pattern: string, path: string) => {
return _comparePatternAndPath(pattern, path, {
matchTopicsWithoutWildcards: options.matchTopicsWithoutWildcards,
});
},
(pattern: string, path: string) => {
return `${pattern}-${path}`;
}
);
this.subscriptions = new MapBasedMergeData(this._emitters, "subTopic");
this.publishers = new MapBasedMergeData(this._emitters, "pubTopic");
this.onIncrementalDataChange = new NopeEventEmitter();
}
// See interface description
public register(emitter: I, options: IEventOptions): O {
if (!this._emitters.has(emitter as unknown as O)) {
let pubTopic: string | false =
typeof options.topic === "string"
? options.topic
: options.topic.publish || null;
let subTopic: string | false =
typeof options.topic === "string"
? options.topic
: options.topic.subscribe || null;
if (
!(
options.mode == "publish" ||
(Array.isArray(options.mode) && options.mode.includes("publish"))
)
) {
pubTopic = false;
}
if (
!(
options.mode == "subscribe" ||
(Array.isArray(options.mode) && options.mode.includes("subscribe"))
)
) {
subTopic = false;
}
// Define a callback, which will be used to forward
// the data into the system:
let callback: IEventCallback<unknown, CO> = undefined;
if (pubTopic) {
const _this = this;
callback = (content, opts) => {
// We use this callback to forward the data into the system:
_this._pushData(pubTopic as string, content, opts as CO);
};
}
// Register the emitter. This will be used during topic matching.
this._emitters.set(emitter as unknown as O, {
options,
pubTopic,
subTopic,
callback,
});
// Update the Matching Rules.
this.updateMatchting();
if (callback) {
// If necessary. Add the Callback.
emitter.subscribe(callback, {
skipCurrent: !this._sendCurrentDataOnSubscription,
});
}
// Now, if required, add the Data to the emitter.
if (subTopic && this._sendCurrentDataOnSubscription) {
if (containsWildcards(subTopic)) {
// This is more Complex.
this._patternbasedPullData(subTopic, null).map((item) => {
// We check if the content is null
if (item.data !== null) {
(emitter as unknown as O).emit(item.data, {
sender: this._id,
mode: "direct",
topic: item.path,
});
}
});
} else {
const currentContent = this._pullData(subTopic, null);
if (currentContent !== null) {
(emitter as unknown as O).emit(currentContent, {
sender: this._id,
mode: "direct",
topic: subTopic,
});
}
}
}
} else {
throw Error("Already registered Emitter!");
}
return emitter as unknown as O;
}
// See interface description
public updateOptions(emitter: I, options: IEventOptions): void {
if (this._emitters.has(emitter as unknown as O)) {
const pubTopic =
typeof options.topic === "string"
? options.topic
: options.topic.publish || null;
const subTopic =
typeof options.topic === "string"
? options.topic
: options.topic.subscribe || null;
const data = this._emitters.get(emitter as unknown as O);
data.options = options;
data.subTopic = subTopic;
data.pubTopic = pubTopic;
this._emitters.set(emitter as unknown as O, data);
// Update the Matching Rules.
this.updateMatchting();
} else {
throw Error("Already registered Emitter!");
}
}
// See interface description
public unregister(emitter: INopeEventEmitter): boolean {
if (this._emitters.has(emitter as unknown as O)) {
this._emitters.delete(emitter as unknown as O);
// Update the Matching Rules.
this.updateMatchting();
return true;
}
return false;
}
// See interface description
public registerSubscription<T = unknown>(
topic: string,
subscription: IEventCallback<T, CO>
): INopeObserver {
// Create the Emitter
const emitter = this._generateEmitterType();
// Create the Observer
const observer = (emitter as INopeEventEmitter<T>).subscribe(subscription);
// Register the Emitter. Thereby the ELement
// will be linked to the Pub-Sub-System.
this.register(emitter, {
mode: "subscribe",
schema: {},
topic: topic,
});
// Return the Emitter
return observer;
}
// See interface description
public get emitters(): {
publishers: { name: string; schema: INopeDescriptor }[];
subscribers: { name: string; schema: INopeDescriptor }[];
} {
return {
publishers: Array.from(this._emitters.values())
.filter((item) => item.pubTopic)
.map((item) => {
return {
schema: item.options.schema as INopeDescriptor,
name: item.pubTopic as string,
};
}),
subscribers: Array.from(this._emitters.values())
.filter((item) => item.subTopic)
.map((item) => {
return {
schema: item.options.schema as INopeDescriptor,
name: item.subTopic as string,
};
}),
};
}
/**
* Internal Match-Making Algorithm. This allowes to Create a predefined
* List between Publishers and Subscribers. Thereby the Process is speed
* up, by utilizing this Look-Up-Table
*
* @author M.Karkowski
* @memberof PubSubSystemBase
*/
public updateMatchting(): void {
// Clears all defined Matches
this._matched.clear();
// Iterate through all Publishers and
for (const { pubTopic } of this._emitters.values()) {
// Now, lets Update the Matching for the specific Topics.
if (pubTopic) this._updateMatchingForTopic(pubTopic);
}
this.publishers.update();
this.subscriptions.update();
}
public emit(eventName: string, data: any, options?: CO) {
return this._pushData(eventName, data, options);
}
/**
* Unregisters all Emitters and removes all subscriptions of the
* "onIncrementalDataChange", "publishers" and "subscriptions"
*
* @author M.Karkowski
* @memberof PubSubSystemBase
*/
public dispose(): void {
const emitters = Array.from(this._emitters.keys());
emitters.map((emitter) => this.unregister(emitter));
this.onIncrementalDataChange.dispose();
this.publishers.dispose();
this.subscriptions.dispose();
}
protected __addToMatchingStructure(
entry: keyof TMatchting,
topicOfChange: string,
pathOrPattern: string,
emitter: O
): void {
if (!this._matched.has(topicOfChange)) {
this._matched.set(topicOfChange, {
dataPull: new Map(),
dataQuery: new Map(),
});
}
const emitters =
this._matched.get(topicOfChange)[entry].get(pathOrPattern) || [];
emitters.push(emitter);
this._matched.get(topicOfChange)[entry].set(pathOrPattern, emitters);
}
/** Function to Interanlly add a new Match
*
* @export
* @param {string} topicOfChange
*/
protected _updateMatchingForTopic(topicOfChange: string): void {
if (!this._matched.has(topicOfChange)) {
this._matched.set(topicOfChange, {
dataPull: new Map(),
dataQuery: new Map(),
});
}
// Find all Matches
for (const [emitter, { subTopic }] of this._emitters.entries()) {
if (subTopic) {
// Now lets determine the Path
const result = this._comparePatternAndPath(subTopic, topicOfChange);
if (result.affected) {
if (
!result.containsWildcards &&
((result.affectedByChild && !this._options.forwardChildData) ||
(result.affectedByParent && !this._options.forwardParentData))
) {
continue;
}
// We now have match the topic as following described:
// 1) subscription contains a pattern
// - dircet change (same-level) => content
// - parent based change => content
// - child based change => content
// 2) subscription doesnt contains a pattern:
// We more or less want the data on the path.
// - direct change (topic = path) => content
// - parent based change => a super change
if (result.containsWildcards) {
if (this._options.mqttBasedPatternSubscriptions) {
if (result.patternToExtractData) {
this.__addToMatchingStructure(
"dataQuery",
topicOfChange,
result.patternToExtractData,
emitter
);
} else if (result.pathToExtractData) {
this.__addToMatchingStructure(
"dataPull",
topicOfChange,
result.pathToExtractData,
emitter
);
} else {
throw Error(
"Implementation Error. Either the patternToExtractData or the pathToExtractData must be provided"
);
}
} else {
this.__addToMatchingStructure(
"dataQuery",
topicOfChange,
topicOfChange,
emitter
);
}
} else {
if (
(result.affectedByChild && !this._options.forwardChildData) ||
(result.affectedByParent && !this._options.forwardParentData)
) {
continue;
}
if (result.pathToExtractData) {
this.__addToMatchingStructure(
"dataPull",
topicOfChange,
result.pathToExtractData,
emitter
);
} else {
throw Error(
"Implementation Error. The 'pathToExtractData' must be provided"
);
}
}
}
}
}
}
/**
*Internal Function to notify Asynchronous all Subscriptions
*
* @author M.Karkowski
* @private
* @param {string} topic
* @param {*} content
* @memberof PubSubSystemBase
*/
protected _notify(
topic: string,
options: Partial<CO>,
_exclusiveEmitter: O = null
): void {
// Check whether a Matching exists for this
// Topic, if not add it.
if (!this._matched.has(topic)) {
this._updateMatchingForTopic(topic);
}
const referenceToMatch = this._matched.get(topic);
// Performe the direct Matches
for (const [
_pathToPull,
_emitters,
] of referenceToMatch.dataPull.entries()) {
for (const _emitter of _emitters) {
// Get a new copy for every element.
const data = this._pullData(_pathToPull, null);
if (_exclusiveEmitter !== null && _emitter !== _exclusiveEmitter) {
continue;
}
// Iterate through all Subscribers
_emitter.emit(data, {
...options,
mode: "direct",
topic: topic,
});
}
}
// Performe the direct Matches
for (const [_pattern, _emitters] of referenceToMatch.dataQuery.entries()) {
for (const _emitter of _emitters) {
// Get a new copy for every element.
const data = this._pullData(_pattern, null);
if (_exclusiveEmitter !== null && _emitter !== _exclusiveEmitter) {
continue;
}
// Iterate through all Subscribers
_emitter.emit(data, {
...options,
mode: "direct",
topic: topic,
});
}
}
}
protected _updateOptions(options: Partial<CO>): CO {
if (!options.timestamp) {
options.timestamp = Date.now();
}
if (typeof options.forced !== "boolean") {
options.forced = false;
}
if (!Array.isArray(options.args)) {
options.args = [];
}
if (!options.sender) {
options.sender = this._id;
}
return options as CO;
}
/**
* Internal helper to push data to the data property. This
* results in informing the subscribers.
*
* @param path Path, that is used for pushing the data.
* @param data The data to push
* @param options Options used during pushing
*/
protected _pushData<T = unknown>(
path: string,
data: T,
options: Partial<CO> = {}
): void {
const _options = this._updateOptions(options);
if (containsWildcards(path)) {
throw 'The Path contains wildcards. Please use the method "patternbasedPullData" instead';
} else if (path === "") {
this._data = deepClone(data);
this._notify(path, _options);
} else {
rsetattr(this._data, path, deepClone(data));
this._notify(path, _options);
}
// Emit the data change.
this.onIncrementalDataChange.emit({
name: path,
data,
..._options,
});
}
// Function to pull the Last Data of the Topic
protected _pullData<T = unknown, D = null>(
topic: string,
_default: D = null
): T {
if (containsWildcards(topic)) {
throw 'The Path contains wildcards. Please use the method "patternbasedPullData" instead';
}
return deepClone(rgetattr<T>(this._data, topic, _default));
}
/**
* Helper, which enable to perform a pattern based pull.
* The code receives a pattern, and matches the existing
* content (by using there path attributes) and return the
* corresponding data.
* @param pattern The Patterin
* @param _default The Default Value.
* @returns
*/
protected _patternbasedPullData<T = unknown, D = null>(
pattern: string,
_default: D = null
): { path: string; data: T }[] {
// To extract the data based on a Pattern,
// we firstly, we check if the given pattern
// is a pattern.
if (!containsWildcards(pattern)) {
// Its not a pattern so we will speed up
// things.
return [
{
path: pattern,
data: this._pullData(pattern, _default),
},
];
}
// Now we know, we have to work with the query,
// for that purpose, we will adapt the data object
// to the following form:
// {path: value}
const flattenData = flattenObject(this._data);
const ret: { path: string; data: T }[] = [];
// We will use our alternative representation of the
// object to compare the pattern with the path item.
// only if there is a direct match => we will extract it.
// That corresponds to a direct level extraction and
// prevents to grap multiple items.
for (const [path, data] of flattenData.entries()) {
const result = this._comparePatternAndPath(pattern, path);
if (result.affectedOnSameLevel || result.affectedByChild) {
ret.push({
path,
data,
});
}
}
// Now we just return our created element.
return ret;
}
}