/** * @author Martin Karkowski * @email m.karkowski@zema.de * @create date 2021-11-12 12:25:30 * @modify date 2021-11-16 20:55:27 * @desc [description] */ import { memoize } from "lodash"; import { NopeEventEmitter } from "../eventEmitter/index"; import { generateId } from "../helpers/idMethods"; import { deepClone, flattenObject, rgetattr, rsetattr } from "../helpers/objectMethods"; import { comparePatternAndPath as _comparePatternAndPath, containsWildcards, TcomparePatternAndPathFunc } from "../helpers/pathMatchingMethods"; import { IEventAdditionalData, IEventCallback, INopeEventEmitter, INopeObserver, INopeTopic, IPropertyOptions, IPubSubSystem, TPubSubOptions } from "../types/nope/index"; type TMatchting = { // Contains subscriptions, that can get there data // by pulling. dataPull: Map; // Contains subscriptions, which must be pulled dataQuery: Map; } export class PubSubSystemBase implements IPubSubSystem { public _options: TPubSubOptions = { mqttBasedPatternSubscriptions: true, forwardChildData: true, forwardParentData: true } public get options(): TPubSubOptions { return deepClone(this._options); } readonly incrementalDataChange: INopeEventEmitter<{ path: string, data: unknown }> = new NopeEventEmitter(); protected _comparePatternAndPath: TcomparePatternAndPathFunc; /** * The internal used object to store the data. * * @author M.Karkowski * @type {unknown} * @memberof PubSubSystemBase */ protected _data: unknown = {}; protected _sendCurrentDataOnSubscription = false; protected _id = generateId() /** * List of all Properties. For every property, we store the * PropertyOptions. Then, we know, what elements should be * subscribed and which not. * * @author M.Karkowski * @protected * @memberof PubSubSystemBase */ protected _emitters = new Map(); protected _matched = new Map(); public constructor( protected _generateEmitterType: () => I, options: Partial = {} ) { this._options = Object.assign({ mqttBasedPatternSubscriptions: true, forwardChildData: true, forwardParentData: true, matchTopicsWithoutWildcards: true }, options); // Create a memoized function for the pattern matching (its way faster) this._comparePatternAndPath = memoize( (pattern: string, path: string) => { return _comparePatternAndPath(pattern, path, { matchTopicsWithoutWildcards: options.matchTopicsWithoutWildcards }); }, (pattern: string, path: string) => { return `${pattern}-${path}`; } ); } public register(emitter: I, options: IPropertyOptions): O { if (!this._emitters.has(emitter as unknown as O)) { let pubTopic: string | false = typeof options.topic === "string" ? options.topic : options.topic.publish || null; let subTopic: string | false = typeof options.topic === "string" ? options.topic : options.topic.subscribe || null; if (!(options.mode == "publish" || (Array.isArray(options.mode) && options.mode.includes("publish")))) { pubTopic = false; } if (!(options.mode == "subscribe" || (Array.isArray(options.mode) && options.mode.includes("subscribe")))) { subTopic = false; } // Define a callback, which will be used to forward // the data into the system: let callback: IEventCallback = undefined; if (pubTopic) { const _this = this; callback = (content, opts) => { // We use this callback to forward the data into the system: _this._pushData(pubTopic as string, content, opts); }; } // Register the emitter. This will be used during topic matching. this._emitters.set(emitter as unknown as O, { options, pubTopic, subTopic, callback }); // Update the Matching Rules. this.updateMatchting(); if (callback) { // If necessary. Add the Callback. emitter.subscribe(callback, { skipCurrent: !this._sendCurrentDataOnSubscription }); } // Now, if required, add the Data to the emitter. if (subTopic && this._sendCurrentDataOnSubscription) { if (containsWildcards(subTopic)) { // This is more Complex. this._patternbasedPullData(subTopic, null).map(item => { // We check if the content is null if (item.data !== null) { (emitter as unknown as O).emit( item.data, { sender: this._id, mode: "direct", topic: item.path } ); } }); } else { const currentContent = this._pullData(subTopic, null); if (currentContent !== null) { (emitter as unknown as O).emit( currentContent, { sender: this._id, mode: "direct", topic: subTopic } ); } } } } else { throw Error("Already registered Emitter!"); } return emitter as unknown as O; } public updateOptions(emitter: I, options: IPropertyOptions): void { if (this._emitters.has(emitter as unknown as O)) { const pubTopic = typeof options.topic === "string" ? options.topic : options.topic.publish || null; const subTopic = typeof options.topic === "string" ? options.topic : options.topic.subscribe || null; const data = this._emitters.get(emitter as unknown as O); data.options = options; data.subTopic = subTopic; data.pubTopic = pubTopic; this._emitters.set(emitter as unknown as O, data); // Update the Matching Rules. this.updateMatchting(); } else { throw Error("Already registered Emitter!"); } } public unregister(emitter: INopeEventEmitter): boolean { if (this._emitters.has(emitter as unknown as O)) { this._emitters.delete(emitter as unknown as O); // Update the Matching Rules. this.updateMatchting(); return true; } return false; } public registerSubscription(topic: string, subscription: IEventCallback): INopeObserver { // Create the Emitter const emitter = this._generateEmitterType(); // Create the Observer const observer = emitter.subscribe(subscription); // Register the Emitter. Thereby the ELement // will be linked to the Pub-Sub-System. this.register(emitter, { mode: "subscribe", schema: {}, topic: topic }); // Return the Emitter return observer; } /** * Internal Match-Making Algorithm. This allowes to Create a predefined * List between Publishers and Subscribers. Thereby the Process is speed * up, by utilizing this Look-Up-Table * * @author M.Karkowski * @memberof PubSubSystemBase */ public updateMatchting(): void { // Clears all defined Matches this._matched.clear(); // Iterate through all Publishers and for (const { pubTopic } of this._emitters.values()) { // Now, lets Update the Matching for the specific Topics. if (pubTopic) this._updateMatchingForTopic(pubTopic); } } protected __addToMatchingStructure(entry: keyof TMatchting, topicOfChange: string, pathOrPattern: string, emitter: O): void { if (!this._matched.has(topicOfChange)) { this._matched.set(topicOfChange, { dataPull: new Map(), dataQuery: new Map() }); } const emitters = this._matched.get(topicOfChange)[entry].get(pathOrPattern) || []; emitters.push(emitter); this._matched.get(topicOfChange)[entry].set(pathOrPattern, emitters); } /** Function to Interanlly add a new Match * * @export * @param {string} topicOfChange */ protected _updateMatchingForTopic(topicOfChange: string): void { if (!this._matched.has(topicOfChange)) { this._matched.set(topicOfChange, { dataPull: new Map(), dataQuery: new Map() }); } // Find all Matches for (const [emitter, { subTopic }] of this._emitters.entries()) { if (subTopic) { // Now lets determine the Path const result = this._comparePatternAndPath(subTopic, topicOfChange); if (result.affected) { if ( (result.affectedByChild && !this._options.forwardChildData) || (result.affectedByParent && !this._options.forwardParentData) ) { continue; } // We now have match the topic as following described: // 1) subscription contains a pattern // - dircet change (same-level) => content // - parent based change => content // - child based change => content // 2) subscription doesnt contains a pattern: // We more or less want the data on the path. // - direct change (topic = path) => content // - parent based change => a super change if (result.containsWildcards) { if (this._options.mqttBasedPatternSubscriptions) { if (result.patternToExtractData) { this.__addToMatchingStructure("dataQuery", topicOfChange, result.patternToExtractData, emitter); } else if (result.pathToExtractData) { this.__addToMatchingStructure("dataPull", topicOfChange, result.pathToExtractData, emitter); } else { throw Error("Implementation Error. Either the patternToExtractData or the pathToExtractData must be provided"); } } else { this.__addToMatchingStructure("dataQuery", topicOfChange, topicOfChange, emitter); } } else { if (result.pathToExtractData) { this.__addToMatchingStructure("dataPull", topicOfChange, result.pathToExtractData, emitter); } else { throw Error("Implementation Error. The 'pathToExtractData' must be provided"); } } } } } } /** *Internal Function to notify Asynchronous all Subscriptions * * @author M.Karkowski * @private * @param {string} topic * @param {*} content * @memberof PubSubSystemBase */ protected _notify(topic: string, options: IEventAdditionalData, _exclusiveEmitter: O = null): void { // Check whether a Matching exists for this // Topic, if not add it. if (!this._matched.has(topic)) { this._updateMatchingForTopic(topic); } const referenceToMatch = this._matched.get(topic); // Performe the direct Matches for (const [_pathToPull, _emitters] of referenceToMatch.dataPull.entries()) { for (const _emitter of _emitters) { // Get a new copy for every element. const data = this._pullData(_pathToPull, null); if (_exclusiveEmitter !== null && _emitter !== _exclusiveEmitter) { continue; } // Iterate through all Subscribers _emitter.emit(data, { ...options, mode: "direct", topic: topic }); } } // Performe the direct Matches for (const [_pattern, _emitters] of referenceToMatch.dataQuery.entries()) { for (const _emitter of _emitters) { // Get a new copy for every element. const data = this._pullData(_pattern, null); if (_exclusiveEmitter !== null && _emitter !== _exclusiveEmitter) { continue; } // Iterate through all Subscribers _emitter.emit(data, { ...options, mode: "direct", topic: topic }); } } } protected _pushData(path: string, data: T, options: IEventAdditionalData = {}): void { if (containsWildcards(path)) { throw ("The Path contains wildcards. Please use the method \"patternbasedPullData\" instead"); } else if (path === "") { this._data = deepClone(data); this._notify(path, options); } else { rsetattr(this._data, path, deepClone(data)); this._notify(path, options); } // Emit the data change. this.incrementalDataChange.emit({ path, data }); } // Function to pull the Last Data of the Topic protected _pullData(topic: string, _default: D = null): T { if (containsWildcards(topic)) { throw ("The Path contains wildcards. Please use the method \"patternbasedPullData\" instead"); } return deepClone(rgetattr(this._data, topic, _default)); } protected _patternbasedPullData(pattern: string, _default: D = null): { path: string, data: T }[] { // To extract the data based on a Pattern, // we firstly, we check if the given pattern // is a pattern. if (!containsWildcards(pattern)) { // Its not a pattern so we will speed up // things. return [{ path: pattern, data: this._pullData(pattern, _default) }]; } // Now we know, we have to work with the query, // for that purpose, we will adapt the data object // to the following form: // {path: value} const flattenData = flattenObject(this._data); const ret: { path: string, data: T }[] = []; // We will use our alternative representation of the // object to compare the pattern with the path item. // only if there is a direct match => we will extract it. // That corresponds to a direct level extraction and // prevents to grap multiple items. for (const [path, data] of flattenData.entries()) { const result = this._comparePatternAndPath(pattern, path); if (result.affectedOnSameLevel || result.affectedByChild) { ret.push({ path, data }); } } // Now we just return our created element. return ret; } }