From f0032eef631325df7ae71b24a40de8cb5b9d0460 Mon Sep 17 00:00:00 2001 From: Martin Karkowski Date: Sat, 19 Mar 2022 07:39:12 +0100 Subject: [PATCH] - Fixes: - log-to-file: is now storing the last logs as well - nope-package-loader: is transmitting the correct parameters. - Optimazations: - nope-package-loader: now storing elements with stringifyWithFunctions and is capable to read functions. - pub-sub-system: Adding partial changes of the topic structure. This speeds up the entire system. --- 05-link.bat | 2 + CHANGELOG.md | 10 +- contribute/VERSION | 2 +- contribute/toLinkBrowser.js | 3 +- lib/cli/runNopeBackend.ts | 13 +- lib/helpers/async.ts | 7 +- lib/helpers/jsonMethods.ts | 48 ++++--- lib/loader/loadPackages.ts | 6 +- lib/loader/nopePackageLoader.ts | 2 +- lib/logger/fileLogging.ts | 78 +++++++---- lib/logger/nopeLogger.ts | 31 +++++ lib/pubSub/nopePubSubSystem.ts | 234 +++++++++++++++++++++----------- 12 files changed, 292 insertions(+), 144 deletions(-) diff --git a/05-link.bat b/05-link.bat index 8219889..177c622 100644 --- a/05-link.bat +++ b/05-link.bat @@ -8,6 +8,7 @@ if not "%1"=="am_admin" (powershell start -verb runas '%0' am_admin & exit /b) node contribute/toLinkBrowser.js cp ./package.json ./build/package.json + cp -r .\dist-browser\ .\build\dist-browser cd ./build @@ -22,6 +23,7 @@ if not "%1"=="am_admin" (powershell start -verb runas '%0' am_admin & exit /b) node contribute/toLinkBrowser.js cp ./package.json ./build/package.json + cp -r .\dist-browser\ .\build\dist-browser cd ./build diff --git a/CHANGELOG.md b/CHANGELOG.md index 98ddb09..055e094 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ # 1.0.12 -Inital commit, which is working with the browser \ No newline at end of file +Inital commit, which is working with the browser + +# 1.0.25 +- Fixes: + - log-to-file: is now storing the last logs as well + - nope-package-loader: is transmitting the correct parameters. +- Optimazations: + - nope-package-loader: now storing elements with stringifyWithFunctions and is capable to read functions. + - pub-sub-system: Adding partial changes of the topic structure. This speeds up the entire system. diff --git a/contribute/VERSION b/contribute/VERSION index 321816a..855f702 100644 --- a/contribute/VERSION +++ b/contribute/VERSION @@ -1 +1 @@ -1.0.24 \ No newline at end of file +1.0.25 \ No newline at end of file diff --git a/contribute/toLinkBrowser.js b/contribute/toLinkBrowser.js index d3df479..463b0ee 100644 --- a/contribute/toLinkBrowser.js +++ b/contribute/toLinkBrowser.js @@ -2,7 +2,7 @@ * @author Martin Karkowski * @email m.karkowski@zema.de * @create date 2022-01-18 13:42:41 - * @modify date 2022-03-18 08:56:20 + * @modify date 2022-03-18 09:28:46 * @desc [description] */ @@ -23,6 +23,7 @@ package.version = version; package.files = [ "*", + "dist-browser/**/*" ]; writeFileSync("./package.json", JSON.stringify(package, undefined, 2), { encoding: "utf-8" }); \ No newline at end of file diff --git a/lib/cli/runNopeBackend.ts b/lib/cli/runNopeBackend.ts index 37e1fec..28f5711 100644 --- a/lib/cli/runNopeBackend.ts +++ b/lib/cli/runNopeBackend.ts @@ -171,7 +171,7 @@ export async function readInArgs( help: 'Specify the Logger Level. Defaults to "info". Valid values are: ' + LoggerLevels.join(", "), - default: "debug", + default: "info", type: "str", dest: "log", }); @@ -268,11 +268,6 @@ export async function runNopeBackend( opts = {} as any; } - if (args.logToFile) { - const fileName = generateLogfilePath("run"); - closeCallbacks.push(useLogFile(fileName, 10)); - } - if (LoggerLevels.includes(args.log)) { setGlobalLoggerLevel(args.log); } @@ -280,6 +275,12 @@ export async function runNopeBackend( // Define a Logger const logger = getNopeLogger("starter"); + if (args.logToFile) { + const fileName = generateLogfilePath("run"); + logger.warn("Using File Logger. Logging to", fileName); + closeCallbacks.push(useLogFile(fileName, 200)); + } + if (args.profile) { logger.warn("Enabled Profiling."); closeCallbacks.push(recordCPUProfile()); diff --git a/lib/helpers/async.ts b/lib/helpers/async.ts index c6864bc..fc2ece8 100644 --- a/lib/helpers/async.ts +++ b/lib/helpers/async.ts @@ -6,8 +6,13 @@ * @desc [description] */ +/** + * Delays some execution. + * @param delay [ms] + * @returns void + */ export function sleep(delay: number): Promise { - return new Promise((resolve) => setTimeout(resolve, delay)); + return new Promise((resolve) => setTimeout(resolve, delay)); } /** diff --git a/lib/helpers/jsonMethods.ts b/lib/helpers/jsonMethods.ts index f54e712..65aff62 100644 --- a/lib/helpers/jsonMethods.ts +++ b/lib/helpers/jsonMethods.ts @@ -10,31 +10,35 @@ * Function to stringify an Object. This Function will stringify Functions as well. * @param obj The Object. */ -export function stringifyWithFunctions(obj) { - return JSON.stringify(obj, (key, value) => { - if (typeof value === "function") { - let str: string = value.toString(); +export function stringifyWithFunctions(obj, ...args) { + return JSON.stringify( + obj, + (key, value) => { + if (typeof value === "function") { + let str: string = value.toString(); - // Todo Parse Arrow-Functions Correctly! - // Details here: https://zendev.com/2018/10/01/javascript-arrow-functions-how-why-when.html - // Difference Cases For: - // 1) (a, b) => a + b; - // 2) array => array[0]; - // 3) (a, b) => (a + b); - // 4) (name, description) => ({name: name, description: description}) - // .... + // Todo Parse Arrow-Functions Correctly! + // Details here: https://zendev.com/2018/10/01/javascript-arrow-functions-how-why-when.html + // Difference Cases For: + // 1) (a, b) => a + b; + // 2) array => array[0]; + // 3) (a, b) => (a + b); + // 4) (name, description) => ({name: name, description: description}) + // .... - if (!str.startsWith("function") && !str.startsWith("(")) { - const name = str.slice(0, str.indexOf("=>")); - const func = str.slice(str.indexOf("=>(") + 3, str.length - 2); - const adaptedFunc = "function(" + name + "){ return " + func + "; }"; - str = adaptedFunc; + if (!str.startsWith("function") && !str.startsWith("(")) { + const name = str.slice(0, str.indexOf("=>")); + const func = str.slice(str.indexOf("=>(") + 3, str.length - 2); + const adaptedFunc = "function(" + name + "){ return " + func + "; }"; + str = adaptedFunc; + } + + return "/Function(" + str + ")/"; } - - return "/Function(" + str + ")/"; - } - return value; - }); + return value; + }, + ...args + ); } /** diff --git a/lib/loader/loadPackages.ts b/lib/loader/loadPackages.ts index 49358d3..8f6b4ef 100644 --- a/lib/loader/loadPackages.ts +++ b/lib/loader/loadPackages.ts @@ -11,6 +11,7 @@ import { join, resolve } from "path"; import "reflect-metadata"; import { sleep } from "../helpers/async"; import { createFile, listFiles } from "../helpers/fileMethods"; +import { parseWithFunctions, stringifyWithFunctions } from "../index.browser"; import { getNopeLogger } from "../logger/getLogger"; import { IInstanceCreationMsg } from "../types/nope/nopeCommunication.interface"; import { IPackageDescription } from "../types/nope/nopePackage.interface"; @@ -133,12 +134,11 @@ export async function writeDefaultConfig( await createFile( filename, - JSON.stringify( + stringifyWithFunctions( { functions, packages, }, - undefined, 4 ) ); @@ -163,7 +163,7 @@ export async function loadPackages( try { /** Load the File and Parse it. */ - data = JSON.parse(await readFile(filename, { encoding: "utf8" })); + data = parseWithFunctions(await readFile(filename, { encoding: "utf8" })); } catch (e) { // Generate the Default File await writeDefaultConfig(filename); diff --git a/lib/loader/nopePackageLoader.ts b/lib/loader/nopePackageLoader.ts index 94099c7..b07a66a 100644 --- a/lib/loader/nopePackageLoader.ts +++ b/lib/loader/nopePackageLoader.ts @@ -678,7 +678,7 @@ export class NopePackageLoader implements INopePackageLoader { if (task.delay) { await sleep(task.delay); } - await instance[task.service](task.params); + await instance[task.service](...task.params); } } catch (e) { this._logger.error( diff --git a/lib/logger/fileLogging.ts b/lib/logger/fileLogging.ts index 867c913..b7f2985 100644 --- a/lib/logger/fileLogging.ts +++ b/lib/logger/fileLogging.ts @@ -12,6 +12,7 @@ import { createFile } from "../helpers/fileMethods"; import { replaceAll } from "../helpers/stringMethods"; import { sleep } from "../index.browser"; import { getCentralNopeLogger, getNopeLogger } from "./getLogger"; +import { formatMsgForConsole } from "./nopeLogger"; export const CURRENT_DATE = _parsableISOString(); export const DEFAULT_LOG_LOCATION = join(process.cwd(), "logs"); @@ -51,6 +52,7 @@ export function useLogFile( bufferSize = 100 ): () => Promise { const logger = getCentralNopeLogger(); + let bufferSizeToUse = bufferSize; // Define a function, that will write the content of the Buffer to our // file. @@ -104,46 +106,66 @@ export function useLogFile( ); logger.setHandler((msg, context) => { - if (bufferSize < buffer.length && readyToWrite) { - // Now if the Data is ready, lets write the - // buffer to the File. - writeBufferToFile(); - } else { - // Else we extend our buffer: - const logAsString = [ - context.filterLevel.name, - "-", - context.name, - ":", - // Try to store the Elements a String. - Object.values(msg) - .map((item) => { + // Else we extend our buffer: + const logAsString = [ + new Date().toISOString(), + "-", + context.level.name, + "-", + context.name, + ":", + // Try to store the Elements a String. + Object.values(msg) + .map((item) => { + try { + if (typeof item === "object") + return JSON.stringify(item, undefined, 2); + return item; + } catch (e) { try { - if (typeof item === "object") - return JSON.stringify(item, undefined, 2); - return item; + return item.toString(); } catch (e) { return item; } - }) - .join(" "), - ].join(" "); - buffer.push(logAsString); + } + }) + .join(" "), + ].join(" "); + buffer.push(logAsString); + + if (context.level.name === "ERROR") { + // We want errors to be rendered in here as well. + console.log(...formatMsgForConsole(msg, context)); + } + + if (bufferSizeToUse < buffer.length) { + if (readyToWrite) { + // Now if the Data is ready, lets write the + // buffer to the File. + writeBufferToFile(); + } else if (_clearing) { + clearBufferAtEnd(); + } } }); + let _clearing = false; + const clearBufferAtEnd = async () => { consoleLogger.info("Shutdown detected! Trying to Write the Buffer"); + _clearing = true; - while (!readyToWrite) { - await sleep(50); + while (!readyToWrite || buffer.length > 0) { + if (readyToWrite) { + const promise = new Promise((resolve, reject) => { + writeBufferToFile(resolve); + }); + + await promise; + } else await sleep(50); } - const promise = new Promise((resolve, reject) => { - writeBufferToFile(resolve); - }); - - await promise; + bufferSizeToUse = 0; }; return clearBufferAtEnd; diff --git a/lib/logger/nopeLogger.ts b/lib/logger/nopeLogger.ts index eed0284..edbb3da 100644 --- a/lib/logger/nopeLogger.ts +++ b/lib/logger/nopeLogger.ts @@ -55,6 +55,36 @@ const spacer = { TRACE: "", }; +export const formatMsgForConsole = RUNNINGINNODE + ? function (message, context) { + return [ + colors.FgBlue, + new Date().toISOString(), + colors.Reset, + "-", + colorMatching[context.level.name], + context.level.name + spacer[context.level.name], + colors.Reset, + "-", + colors.BgWhite + colors.FgBlack, + context.name, + colors.Reset, + ":", + ...message, + ]; + } + : function (message, context) { + return [ + new Date().toISOString(), + "-", + context.level.name + spacer[context.level.name], + "-", + context.name, + ":", + ...message, + ]; + }; + Logger.useDefaults({ defaultLevel: (Logger as any).DEBUG, formatter: RUNNINGINNODE @@ -66,6 +96,7 @@ Logger.useDefaults({ "-", colorMatching[context.level.name], context.level.name + spacer[context.level.name], + colors.Reset, "-", colors.BgWhite + colors.FgBlack, context.name, diff --git a/lib/pubSub/nopePubSubSystem.ts b/lib/pubSub/nopePubSubSystem.ts index 95c43b0..1bbc0a7 100644 --- a/lib/pubSub/nopePubSubSystem.ts +++ b/lib/pubSub/nopePubSubSystem.ts @@ -35,10 +35,10 @@ import { IMapBasedMergeData } from "../types/nope/nopeHelpers.interface"; type TMatchting = { // Contains subscriptions, that can get there data // by pulling. - dataPull: Map; + dataPull: Map>; // Contains subscriptions, which must be pulled - dataQuery: Map; + dataQuery: Map>; }; export class PubSubSystemBase< @@ -113,6 +113,7 @@ export class PubSubSystemBase< protected _matched = new Map(); protected _generateEmitterType: () => I; + protected _disposing: boolean; public constructor( options: Partial & { @@ -129,6 +130,9 @@ export class PubSubSystemBase< options ); + // Flag to stop forwarding data, if disposing is enabled. + this._disposing = false; + this._generateEmitterType = options.generateEmitterType || (() => { @@ -213,7 +217,8 @@ export class PubSubSystemBase< }); // Update the Matching Rules. - this.updateMatchting(); + // this.updateMatchting(); + this._partialMatchingUpdate("add", emitter, pubTopic, subTopic); if (callback) { // If necessary. Add the Callback. @@ -271,6 +276,13 @@ export class PubSubSystemBase< const data = this._emitters.get(emitter as unknown as O); + this._partialMatchingUpdate( + "remove", + emitter, + data.pubTopic, + data.subTopic + ); + data.options = options; data.subTopic = subTopic; data.pubTopic = pubTopic; @@ -278,7 +290,9 @@ export class PubSubSystemBase< this._emitters.set(emitter as unknown as O, data); // Update the Matching Rules. - this.updateMatchting(); + + this._partialMatchingUpdate("add", emitter, pubTopic, subTopic); + // this.updateMatchting(); } else { throw Error("Already registered Emitter!"); } @@ -287,10 +301,15 @@ export class PubSubSystemBase< // See interface description public unregister(emitter: I): boolean { if (this._emitters.has(emitter as unknown as O)) { + const { pubTopic, subTopic } = this._emitters.get( + emitter as unknown as O + ); + this._emitters.delete(emitter as unknown as O); // Update the Matching Rules. - this.updateMatchting(); + this._partialMatchingUpdate("remove", emitter, pubTopic, subTopic); + // this.updateMatchting(); return true; } @@ -366,6 +385,125 @@ export class PubSubSystemBase< this.subscriptions.update(); } + private __deleteMatchingEntry( + _pubTopic: string, + _subTopic: string, + _emitter: I + ) { + if (this._matched.has(_pubTopic)) { + const data = this._matched.get(_pubTopic); + + if (data.dataPull.has(_subTopic)) { + data.dataPull.get(_subTopic).delete(_emitter); + } + + if (data.dataQuery.has(_subTopic)) { + data.dataQuery.get(_subTopic).delete(_emitter); + } + } + } + + private __addMatchingEntryIfRequired(pubTopic, subTopic, emitter) { + // Now lets determine the Path + const result = this._comparePatternAndPath(subTopic, pubTopic); + + if (result.affected) { + if ( + !result.containsWildcards && + ((result.affectedByChild && !this._options.forwardChildData) || + (result.affectedByParent && !this._options.forwardParentData)) + ) { + return; + } + + // We now have match the topic as following described: + // 1) subscription contains a pattern + // - dircet change (same-level) => content + // - parent based change => content + // - child based change => content + // 2) subscription doesnt contains a pattern: + // We more or less want the data on the path. + // - direct change (topic = path) => content + // - parent based change => a super change + if (result.containsWildcards) { + if (this._options.mqttBasedPatternSubscriptions) { + if (result.patternToExtractData) { + this.__addToMatchingStructure( + "dataQuery", + pubTopic, + result.patternToExtractData, + emitter + ); + } else if (result.pathToExtractData) { + this.__addToMatchingStructure( + "dataPull", + pubTopic, + result.pathToExtractData, + emitter + ); + } else { + throw Error( + "Implementation Error. Either the patternToExtractData or the pathToExtractData must be provided" + ); + } + } else { + this.__addToMatchingStructure( + "dataQuery", + pubTopic, + pubTopic, + emitter + ); + } + } else { + if ( + (result.affectedByChild && !this._options.forwardChildData) || + (result.affectedByParent && !this._options.forwardParentData) + ) { + return; + } + + if (result.pathToExtractData) { + this.__addToMatchingStructure( + "dataPull", + pubTopic, + result.pathToExtractData, + emitter + ); + } else { + throw Error( + "Implementation Error. The 'pathToExtractData' must be provided" + ); + } + } + } + } + + protected _partialMatchingUpdate( + mode: "add" | "remove", + _emitter: I, + _pubTopic: string | false, + _subTopic: string | false + ): void { + // Iterate through all Publishers and + for (const { pubTopic, subTopic } of this._emitters.values()) { + // Now, lets Update the Matching for the specific Topics. + if (mode === "remove" && pubTopic && _subTopic) { + this.__deleteMatchingEntry(pubTopic, _subTopic, _emitter); + } else if (mode === "add" && pubTopic && _subTopic) { + this.__addMatchingEntryIfRequired(pubTopic, _subTopic, _emitter); + } + } + + if (mode === "add" && _pubTopic) { + this._updateMatchingForTopic(_pubTopic); + } else if (mode === "add" && _pubTopic) { + this._matched.delete(_pubTopic); + } + + this.publishers.update(); + this.subscriptions.update(); + } + public emit(eventName: string, data: any, options?: AD) { return this._pushData(eventName, data, options); } @@ -378,6 +516,7 @@ export class PubSubSystemBase< * @memberof PubSubSystemBase */ public dispose(): void { + this._disposing = true; const emitters = Array.from(this._emitters.keys()); emitters.map((emitter) => this.unregister(emitter as unknown as I)); @@ -399,10 +538,11 @@ export class PubSubSystemBase< }); } - const emitters = - this._matched.get(topicOfChange)[entry].get(pathOrPattern) || []; - emitters.push(emitter); - this._matched.get(topicOfChange)[entry].set(pathOrPattern, emitters); + if (!this._matched.get(topicOfChange)[entry].has(pathOrPattern)) { + this._matched.get(topicOfChange)[entry].set(pathOrPattern, new Set()); + } + + this._matched.get(topicOfChange)[entry].get(pathOrPattern).add(emitter); } /** Function to Interanlly add a new Match @@ -422,77 +562,7 @@ export class PubSubSystemBase< for (const [emitter, { subTopic }] of this._emitters.entries()) { if (subTopic) { // Now lets determine the Path - const result = this._comparePatternAndPath(subTopic, topicOfChange); - - if (result.affected) { - if ( - !result.containsWildcards && - ((result.affectedByChild && !this._options.forwardChildData) || - (result.affectedByParent && !this._options.forwardParentData)) - ) { - continue; - } - - // We now have match the topic as following described: - // 1) subscription contains a pattern - // - dircet change (same-level) => content - // - parent based change => content - // - child based change => content - // 2) subscription doesnt contains a pattern: - // We more or less want the data on the path. - // - direct change (topic = path) => content - // - parent based change => a super change - if (result.containsWildcards) { - if (this._options.mqttBasedPatternSubscriptions) { - if (result.patternToExtractData) { - this.__addToMatchingStructure( - "dataQuery", - topicOfChange, - result.patternToExtractData, - emitter - ); - } else if (result.pathToExtractData) { - this.__addToMatchingStructure( - "dataPull", - topicOfChange, - result.pathToExtractData, - emitter - ); - } else { - throw Error( - "Implementation Error. Either the patternToExtractData or the pathToExtractData must be provided" - ); - } - } else { - this.__addToMatchingStructure( - "dataQuery", - topicOfChange, - topicOfChange, - emitter - ); - } - } else { - if ( - (result.affectedByChild && !this._options.forwardChildData) || - (result.affectedByParent && !this._options.forwardParentData) - ) { - continue; - } - - if (result.pathToExtractData) { - this.__addToMatchingStructure( - "dataPull", - topicOfChange, - result.pathToExtractData, - emitter - ); - } else { - throw Error( - "Implementation Error. The 'pathToExtractData' must be provided" - ); - } - } - } + this.__addMatchingEntryIfRequired(topicOfChange, subTopic, emitter); } } } @@ -511,6 +581,10 @@ export class PubSubSystemBase< options: Partial, _emitter: O = null ): void { + if (this._disposing) { + return; + } + // Check whether a Matching exists for this // Topic, if not add it. if (!this._matched.has(topic)) {