- log-to-file: is now storing the last logs as well
  - nope-package-loader: is transmitting the correct parameters.
- Optimazations:
  - nope-package-loader: now storing elements with stringifyWithFunctions and is capable to read functions.
  - pub-sub-system: Adding partial changes of the topic structure. This speeds up the entire system.
This commit is contained in:
Martin Karkowski 2022-03-19 07:39:12 +01:00
parent bb89d6b3b5
commit f0032eef63
12 changed files with 292 additions and 144 deletions

View File

@ -8,6 +8,7 @@ if not "%1"=="am_admin" (powershell start -verb runas '%0' am_admin & exit /b)
node contribute/toLinkBrowser.js node contribute/toLinkBrowser.js
cp ./package.json ./build/package.json cp ./package.json ./build/package.json
cp -r .\dist-browser\ .\build\dist-browser
cd ./build cd ./build
@ -22,6 +23,7 @@ if not "%1"=="am_admin" (powershell start -verb runas '%0' am_admin & exit /b)
node contribute/toLinkBrowser.js node contribute/toLinkBrowser.js
cp ./package.json ./build/package.json cp ./package.json ./build/package.json
cp -r .\dist-browser\ .\build\dist-browser
cd ./build cd ./build

View File

@ -1,3 +1,11 @@
# 1.0.12 # 1.0.12
Inital commit, which is working with the browser Inital commit, which is working with the browser
# 1.0.25
- Fixes:
- log-to-file: is now storing the last logs as well
- nope-package-loader: is transmitting the correct parameters.
- Optimazations:
- nope-package-loader: now storing elements with stringifyWithFunctions and is capable to read functions.
- pub-sub-system: Adding partial changes of the topic structure. This speeds up the entire system.

View File

@ -1 +1 @@
1.0.24 1.0.25

View File

@ -2,7 +2,7 @@
* @author Martin Karkowski * @author Martin Karkowski
* @email m.karkowski@zema.de * @email m.karkowski@zema.de
* @create date 2022-01-18 13:42:41 * @create date 2022-01-18 13:42:41
* @modify date 2022-03-18 08:56:20 * @modify date 2022-03-18 09:28:46
* @desc [description] * @desc [description]
*/ */
@ -23,6 +23,7 @@ package.version = version;
package.files = [ package.files = [
"*", "*",
"dist-browser/**/*"
]; ];
writeFileSync("./package.json", JSON.stringify(package, undefined, 2), { encoding: "utf-8" }); writeFileSync("./package.json", JSON.stringify(package, undefined, 2), { encoding: "utf-8" });

View File

@ -171,7 +171,7 @@ export async function readInArgs(
help: help:
'Specify the Logger Level. Defaults to "info". Valid values are: ' + 'Specify the Logger Level. Defaults to "info". Valid values are: ' +
LoggerLevels.join(", "), LoggerLevels.join(", "),
default: "debug", default: "info",
type: "str", type: "str",
dest: "log", dest: "log",
}); });
@ -268,11 +268,6 @@ export async function runNopeBackend(
opts = {} as any; opts = {} as any;
} }
if (args.logToFile) {
const fileName = generateLogfilePath("run");
closeCallbacks.push(useLogFile(fileName, 10));
}
if (LoggerLevels.includes(args.log)) { if (LoggerLevels.includes(args.log)) {
setGlobalLoggerLevel(args.log); setGlobalLoggerLevel(args.log);
} }
@ -280,6 +275,12 @@ export async function runNopeBackend(
// Define a Logger // Define a Logger
const logger = getNopeLogger("starter"); const logger = getNopeLogger("starter");
if (args.logToFile) {
const fileName = generateLogfilePath("run");
logger.warn("Using File Logger. Logging to", fileName);
closeCallbacks.push(useLogFile(fileName, 200));
}
if (args.profile) { if (args.profile) {
logger.warn("Enabled Profiling."); logger.warn("Enabled Profiling.");
closeCallbacks.push(recordCPUProfile()); closeCallbacks.push(recordCPUProfile());

View File

@ -6,8 +6,13 @@
* @desc [description] * @desc [description]
*/ */
/**
* Delays some execution.
* @param delay [ms]
* @returns void
*/
export function sleep(delay: number): Promise<void> { export function sleep(delay: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, delay)); return new Promise<void>((resolve) => setTimeout(resolve, delay));
} }
/** /**

View File

@ -10,31 +10,35 @@
* Function to stringify an Object. This Function will stringify Functions as well. * Function to stringify an Object. This Function will stringify Functions as well.
* @param obj The Object. * @param obj The Object.
*/ */
export function stringifyWithFunctions(obj) { export function stringifyWithFunctions(obj, ...args) {
return JSON.stringify(obj, (key, value) => { return JSON.stringify(
if (typeof value === "function") { obj,
let str: string = value.toString(); (key, value) => {
if (typeof value === "function") {
let str: string = value.toString();
// Todo Parse Arrow-Functions Correctly! // Todo Parse Arrow-Functions Correctly!
// Details here: https://zendev.com/2018/10/01/javascript-arrow-functions-how-why-when.html // Details here: https://zendev.com/2018/10/01/javascript-arrow-functions-how-why-when.html
// Difference Cases For: // Difference Cases For:
// 1) (a, b) => a + b; // 1) (a, b) => a + b;
// 2) array => array[0]; // 2) array => array[0];
// 3) (a, b) => (a + b); // 3) (a, b) => (a + b);
// 4) (name, description) => ({name: name, description: description}) // 4) (name, description) => ({name: name, description: description})
// .... // ....
if (!str.startsWith("function") && !str.startsWith("(")) { if (!str.startsWith("function") && !str.startsWith("(")) {
const name = str.slice(0, str.indexOf("=>")); const name = str.slice(0, str.indexOf("=>"));
const func = str.slice(str.indexOf("=>(") + 3, str.length - 2); const func = str.slice(str.indexOf("=>(") + 3, str.length - 2);
const adaptedFunc = "function(" + name + "){ return " + func + "; }"; const adaptedFunc = "function(" + name + "){ return " + func + "; }";
str = adaptedFunc; str = adaptedFunc;
}
return "/Function(" + str + ")/";
} }
return value;
return "/Function(" + str + ")/"; },
} ...args
return value; );
});
} }
/** /**

View File

@ -11,6 +11,7 @@ import { join, resolve } from "path";
import "reflect-metadata"; import "reflect-metadata";
import { sleep } from "../helpers/async"; import { sleep } from "../helpers/async";
import { createFile, listFiles } from "../helpers/fileMethods"; import { createFile, listFiles } from "../helpers/fileMethods";
import { parseWithFunctions, stringifyWithFunctions } from "../index.browser";
import { getNopeLogger } from "../logger/getLogger"; import { getNopeLogger } from "../logger/getLogger";
import { IInstanceCreationMsg } from "../types/nope/nopeCommunication.interface"; import { IInstanceCreationMsg } from "../types/nope/nopeCommunication.interface";
import { IPackageDescription } from "../types/nope/nopePackage.interface"; import { IPackageDescription } from "../types/nope/nopePackage.interface";
@ -133,12 +134,11 @@ export async function writeDefaultConfig(
await createFile( await createFile(
filename, filename,
JSON.stringify( stringifyWithFunctions(
{ {
functions, functions,
packages, packages,
}, },
undefined,
4 4
) )
); );
@ -163,7 +163,7 @@ export async function loadPackages(
try { try {
/** Load the File and Parse it. */ /** Load the File and Parse it. */
data = JSON.parse(await readFile(filename, { encoding: "utf8" })); data = parseWithFunctions(await readFile(filename, { encoding: "utf8" }));
} catch (e) { } catch (e) {
// Generate the Default File // Generate the Default File
await writeDefaultConfig(filename); await writeDefaultConfig(filename);

View File

@ -678,7 +678,7 @@ export class NopePackageLoader implements INopePackageLoader {
if (task.delay) { if (task.delay) {
await sleep(task.delay); await sleep(task.delay);
} }
await instance[task.service](task.params); await instance[task.service](...task.params);
} }
} catch (e) { } catch (e) {
this._logger.error( this._logger.error(

View File

@ -12,6 +12,7 @@ import { createFile } from "../helpers/fileMethods";
import { replaceAll } from "../helpers/stringMethods"; import { replaceAll } from "../helpers/stringMethods";
import { sleep } from "../index.browser"; import { sleep } from "../index.browser";
import { getCentralNopeLogger, getNopeLogger } from "./getLogger"; import { getCentralNopeLogger, getNopeLogger } from "./getLogger";
import { formatMsgForConsole } from "./nopeLogger";
export const CURRENT_DATE = _parsableISOString(); export const CURRENT_DATE = _parsableISOString();
export const DEFAULT_LOG_LOCATION = join(process.cwd(), "logs"); export const DEFAULT_LOG_LOCATION = join(process.cwd(), "logs");
@ -51,6 +52,7 @@ export function useLogFile(
bufferSize = 100 bufferSize = 100
): () => Promise<void> { ): () => Promise<void> {
const logger = getCentralNopeLogger(); const logger = getCentralNopeLogger();
let bufferSizeToUse = bufferSize;
// Define a function, that will write the content of the Buffer to our // Define a function, that will write the content of the Buffer to our
// file. // file.
@ -104,46 +106,66 @@ export function useLogFile(
); );
logger.setHandler((msg, context) => { logger.setHandler((msg, context) => {
if (bufferSize < buffer.length && readyToWrite) { // Else we extend our buffer:
// Now if the Data is ready, lets write the const logAsString = [
// buffer to the File. new Date().toISOString(),
writeBufferToFile(); "-",
} else { context.level.name,
// Else we extend our buffer: "-",
const logAsString = [ context.name,
context.filterLevel.name, ":",
"-", // Try to store the Elements a String.
context.name, Object.values(msg)
":", .map((item) => {
// Try to store the Elements a String. try {
Object.values(msg) if (typeof item === "object")
.map((item) => { return JSON.stringify(item, undefined, 2);
return item;
} catch (e) {
try { try {
if (typeof item === "object") return item.toString();
return JSON.stringify(item, undefined, 2);
return item;
} catch (e) { } catch (e) {
return item; return item;
} }
}) }
.join(" "), })
].join(" "); .join(" "),
buffer.push(logAsString); ].join(" ");
buffer.push(logAsString);
if (context.level.name === "ERROR") {
// We want errors to be rendered in here as well.
console.log(...formatMsgForConsole(msg, context));
}
if (bufferSizeToUse < buffer.length) {
if (readyToWrite) {
// Now if the Data is ready, lets write the
// buffer to the File.
writeBufferToFile();
} else if (_clearing) {
clearBufferAtEnd();
}
} }
}); });
let _clearing = false;
const clearBufferAtEnd = async () => { const clearBufferAtEnd = async () => {
consoleLogger.info("Shutdown detected! Trying to Write the Buffer"); consoleLogger.info("Shutdown detected! Trying to Write the Buffer");
_clearing = true;
while (!readyToWrite) { while (!readyToWrite || buffer.length > 0) {
await sleep(50); if (readyToWrite) {
const promise = new Promise<void>((resolve, reject) => {
writeBufferToFile(resolve);
});
await promise;
} else await sleep(50);
} }
const promise = new Promise<void>((resolve, reject) => { bufferSizeToUse = 0;
writeBufferToFile(resolve);
});
await promise;
}; };
return clearBufferAtEnd; return clearBufferAtEnd;

View File

@ -55,6 +55,36 @@ const spacer = {
TRACE: "", TRACE: "",
}; };
export const formatMsgForConsole = RUNNINGINNODE
? function (message, context) {
return [
colors.FgBlue,
new Date().toISOString(),
colors.Reset,
"-",
colorMatching[context.level.name],
context.level.name + spacer[context.level.name],
colors.Reset,
"-",
colors.BgWhite + colors.FgBlack,
context.name,
colors.Reset,
":",
...message,
];
}
: function (message, context) {
return [
new Date().toISOString(),
"-",
context.level.name + spacer[context.level.name],
"-",
context.name,
":",
...message,
];
};
Logger.useDefaults({ Logger.useDefaults({
defaultLevel: (Logger as any).DEBUG, defaultLevel: (Logger as any).DEBUG,
formatter: RUNNINGINNODE formatter: RUNNINGINNODE
@ -66,6 +96,7 @@ Logger.useDefaults({
"-", "-",
colorMatching[context.level.name], colorMatching[context.level.name],
context.level.name + spacer[context.level.name], context.level.name + spacer[context.level.name],
colors.Reset,
"-", "-",
colors.BgWhite + colors.FgBlack, colors.BgWhite + colors.FgBlack,
context.name, context.name,

View File

@ -35,10 +35,10 @@ import { IMapBasedMergeData } from "../types/nope/nopeHelpers.interface";
type TMatchting<O extends INopeTopic = INopeTopic> = { type TMatchting<O extends INopeTopic = INopeTopic> = {
// Contains subscriptions, that can get there data // Contains subscriptions, that can get there data
// by pulling. // by pulling.
dataPull: Map<string, O[]>; dataPull: Map<string, Set<O>>;
// Contains subscriptions, which must be pulled // Contains subscriptions, which must be pulled
dataQuery: Map<string, O[]>; dataQuery: Map<string, Set<O>>;
}; };
export class PubSubSystemBase< export class PubSubSystemBase<
@ -113,6 +113,7 @@ export class PubSubSystemBase<
protected _matched = new Map<string, TMatchting>(); protected _matched = new Map<string, TMatchting>();
protected _generateEmitterType: () => I; protected _generateEmitterType: () => I;
protected _disposing: boolean;
public constructor( public constructor(
options: Partial<IPubSubOptions> & { options: Partial<IPubSubOptions> & {
@ -129,6 +130,9 @@ export class PubSubSystemBase<
options options
); );
// Flag to stop forwarding data, if disposing is enabled.
this._disposing = false;
this._generateEmitterType = this._generateEmitterType =
options.generateEmitterType || options.generateEmitterType ||
(() => { (() => {
@ -213,7 +217,8 @@ export class PubSubSystemBase<
}); });
// Update the Matching Rules. // Update the Matching Rules.
this.updateMatchting(); // this.updateMatchting();
this._partialMatchingUpdate("add", emitter, pubTopic, subTopic);
if (callback) { if (callback) {
// If necessary. Add the Callback. // If necessary. Add the Callback.
@ -271,6 +276,13 @@ export class PubSubSystemBase<
const data = this._emitters.get(emitter as unknown as O); const data = this._emitters.get(emitter as unknown as O);
this._partialMatchingUpdate(
"remove",
emitter,
data.pubTopic,
data.subTopic
);
data.options = options; data.options = options;
data.subTopic = subTopic; data.subTopic = subTopic;
data.pubTopic = pubTopic; data.pubTopic = pubTopic;
@ -278,7 +290,9 @@ export class PubSubSystemBase<
this._emitters.set(emitter as unknown as O, data); this._emitters.set(emitter as unknown as O, data);
// Update the Matching Rules. // Update the Matching Rules.
this.updateMatchting();
this._partialMatchingUpdate("add", emitter, pubTopic, subTopic);
// this.updateMatchting();
} else { } else {
throw Error("Already registered Emitter!"); throw Error("Already registered Emitter!");
} }
@ -287,10 +301,15 @@ export class PubSubSystemBase<
// See interface description // See interface description
public unregister(emitter: I): boolean { public unregister(emitter: I): boolean {
if (this._emitters.has(emitter as unknown as O)) { if (this._emitters.has(emitter as unknown as O)) {
const { pubTopic, subTopic } = this._emitters.get(
emitter as unknown as O
);
this._emitters.delete(emitter as unknown as O); this._emitters.delete(emitter as unknown as O);
// Update the Matching Rules. // Update the Matching Rules.
this.updateMatchting(); this._partialMatchingUpdate("remove", emitter, pubTopic, subTopic);
// this.updateMatchting();
return true; return true;
} }
@ -366,6 +385,125 @@ export class PubSubSystemBase<
this.subscriptions.update(); this.subscriptions.update();
} }
private __deleteMatchingEntry(
_pubTopic: string,
_subTopic: string,
_emitter: I
) {
if (this._matched.has(_pubTopic)) {
const data = this._matched.get(_pubTopic);
if (data.dataPull.has(_subTopic)) {
data.dataPull.get(_subTopic).delete(_emitter);
}
if (data.dataQuery.has(_subTopic)) {
data.dataQuery.get(_subTopic).delete(_emitter);
}
}
}
private __addMatchingEntryIfRequired(pubTopic, subTopic, emitter) {
// Now lets determine the Path
const result = this._comparePatternAndPath(subTopic, pubTopic);
if (result.affected) {
if (
!result.containsWildcards &&
((result.affectedByChild && !this._options.forwardChildData) ||
(result.affectedByParent && !this._options.forwardParentData))
) {
return;
}
// We now have match the topic as following described:
// 1) subscription contains a pattern
// - dircet change (same-level) => content
// - parent based change => content
// - child based change => content
// 2) subscription doesnt contains a pattern:
// We more or less want the data on the path.
// - direct change (topic = path) => content
// - parent based change => a super change
if (result.containsWildcards) {
if (this._options.mqttBasedPatternSubscriptions) {
if (result.patternToExtractData) {
this.__addToMatchingStructure(
"dataQuery",
pubTopic,
result.patternToExtractData,
emitter
);
} else if (result.pathToExtractData) {
this.__addToMatchingStructure(
"dataPull",
pubTopic,
result.pathToExtractData,
emitter
);
} else {
throw Error(
"Implementation Error. Either the patternToExtractData or the pathToExtractData must be provided"
);
}
} else {
this.__addToMatchingStructure(
"dataQuery",
pubTopic,
pubTopic,
emitter
);
}
} else {
if (
(result.affectedByChild && !this._options.forwardChildData) ||
(result.affectedByParent && !this._options.forwardParentData)
) {
return;
}
if (result.pathToExtractData) {
this.__addToMatchingStructure(
"dataPull",
pubTopic,
result.pathToExtractData,
emitter
);
} else {
throw Error(
"Implementation Error. The 'pathToExtractData' must be provided"
);
}
}
}
}
protected _partialMatchingUpdate(
mode: "add" | "remove",
_emitter: I,
_pubTopic: string | false,
_subTopic: string | false
): void {
// Iterate through all Publishers and
for (const { pubTopic, subTopic } of this._emitters.values()) {
// Now, lets Update the Matching for the specific Topics.
if (mode === "remove" && pubTopic && _subTopic) {
this.__deleteMatchingEntry(pubTopic, _subTopic, _emitter);
} else if (mode === "add" && pubTopic && _subTopic) {
this.__addMatchingEntryIfRequired(pubTopic, _subTopic, _emitter);
}
}
if (mode === "add" && _pubTopic) {
this._updateMatchingForTopic(_pubTopic);
} else if (mode === "add" && _pubTopic) {
this._matched.delete(_pubTopic);
}
this.publishers.update();
this.subscriptions.update();
}
public emit(eventName: string, data: any, options?: AD) { public emit(eventName: string, data: any, options?: AD) {
return this._pushData(eventName, data, options); return this._pushData(eventName, data, options);
} }
@ -378,6 +516,7 @@ export class PubSubSystemBase<
* @memberof PubSubSystemBase * @memberof PubSubSystemBase
*/ */
public dispose(): void { public dispose(): void {
this._disposing = true;
const emitters = Array.from(this._emitters.keys()); const emitters = Array.from(this._emitters.keys());
emitters.map((emitter) => this.unregister(emitter as unknown as I)); emitters.map((emitter) => this.unregister(emitter as unknown as I));
@ -399,10 +538,11 @@ export class PubSubSystemBase<
}); });
} }
const emitters = if (!this._matched.get(topicOfChange)[entry].has(pathOrPattern)) {
this._matched.get(topicOfChange)[entry].get(pathOrPattern) || []; this._matched.get(topicOfChange)[entry].set(pathOrPattern, new Set());
emitters.push(emitter); }
this._matched.get(topicOfChange)[entry].set(pathOrPattern, emitters);
this._matched.get(topicOfChange)[entry].get(pathOrPattern).add(emitter);
} }
/** Function to Interanlly add a new Match /** Function to Interanlly add a new Match
@ -422,77 +562,7 @@ export class PubSubSystemBase<
for (const [emitter, { subTopic }] of this._emitters.entries()) { for (const [emitter, { subTopic }] of this._emitters.entries()) {
if (subTopic) { if (subTopic) {
// Now lets determine the Path // Now lets determine the Path
const result = this._comparePatternAndPath(subTopic, topicOfChange); this.__addMatchingEntryIfRequired(topicOfChange, subTopic, emitter);
if (result.affected) {
if (
!result.containsWildcards &&
((result.affectedByChild && !this._options.forwardChildData) ||
(result.affectedByParent && !this._options.forwardParentData))
) {
continue;
}
// We now have match the topic as following described:
// 1) subscription contains a pattern
// - dircet change (same-level) => content
// - parent based change => content
// - child based change => content
// 2) subscription doesnt contains a pattern:
// We more or less want the data on the path.
// - direct change (topic = path) => content
// - parent based change => a super change
if (result.containsWildcards) {
if (this._options.mqttBasedPatternSubscriptions) {
if (result.patternToExtractData) {
this.__addToMatchingStructure(
"dataQuery",
topicOfChange,
result.patternToExtractData,
emitter
);
} else if (result.pathToExtractData) {
this.__addToMatchingStructure(
"dataPull",
topicOfChange,
result.pathToExtractData,
emitter
);
} else {
throw Error(
"Implementation Error. Either the patternToExtractData or the pathToExtractData must be provided"
);
}
} else {
this.__addToMatchingStructure(
"dataQuery",
topicOfChange,
topicOfChange,
emitter
);
}
} else {
if (
(result.affectedByChild && !this._options.forwardChildData) ||
(result.affectedByParent && !this._options.forwardParentData)
) {
continue;
}
if (result.pathToExtractData) {
this.__addToMatchingStructure(
"dataPull",
topicOfChange,
result.pathToExtractData,
emitter
);
} else {
throw Error(
"Implementation Error. The 'pathToExtractData' must be provided"
);
}
}
}
} }
} }
} }
@ -511,6 +581,10 @@ export class PubSubSystemBase<
options: Partial<AD>, options: Partial<AD>,
_emitter: O = null _emitter: O = null
): void { ): void {
if (this._disposing) {
return;
}
// Check whether a Matching exists for this // Check whether a Matching exists for this
// Topic, if not add it. // Topic, if not add it.
if (!this._matched.has(topic)) { if (!this._matched.has(topic)) {