diff --git a/CHANGELOG.md b/CHANGELOG.md index 43ea713..b6b53de 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,4 +27,17 @@ Inital commit, which is working with the browser # 1.0.28 - Fixes: - communication.layer.events: -> fixing receivingOwnMessages. - - runNopeBackend -> if io-server is used, no configuration file is loaded \ No newline at end of file + - runNopeBackend -> if io-server is used, no configuration file is loaded + +# 1.0.29 +- Added: + - helpers.limit: limitedCalls -> Functinality to limit parallel calls. + +# 1.0.30 +- Added: + - helpers.limit: getLimitedOptions -> Helper to get the correspondings options + - helpers.limit.spec: Adding test cases + +# 1.0.31 +- Modified: + - helpers.singleton: Prevent using symbols, to make global version work with local version. diff --git a/contribute/VERSION b/contribute/VERSION index f8536a4..a8c6b78 100644 --- a/contribute/VERSION +++ b/contribute/VERSION @@ -1 +1 @@ -1.0.28 \ No newline at end of file +1.0.31 \ No newline at end of file diff --git a/lib/helpers/index.browser.ts b/lib/helpers/index.browser.ts index a449106..9addc93 100644 --- a/lib/helpers/index.browser.ts +++ b/lib/helpers/index.browser.ts @@ -11,6 +11,7 @@ import * as ids from "./idMethods"; import * as json from "./jsonMethods"; import * as schema from "./jsonSchemaMethods"; import * as lazy from "./lazyMethods"; +import * as limit from "./limit"; import * as objects from "./objectMethods"; import * as pathes from "./pathMatchingMethods"; import * as runtime from "./runtimeMethods"; @@ -26,6 +27,7 @@ export * from "./idMethods"; export * from "./jsonMethods"; export * from "./jsonSchemaMethods"; export * from "./lazyMethods"; +export * from "./limit"; export * from "./objectMethods"; export * from "./pathMatchingMethods"; export * from "./runtimeMethods"; @@ -47,4 +49,5 @@ export { runtime, subject, descriptors, + limit as lock, }; diff --git a/lib/helpers/limit.spec.ts b/lib/helpers/limit.spec.ts new file mode 100644 index 0000000..5c9dd13 --- /dev/null +++ b/lib/helpers/limit.spec.ts @@ -0,0 +1,66 @@ +/** + * @author Martin Karkowski + * @email m.karkowski@zema.de + * @desc [description] + */ + +import { describe, it } from "mocha"; +import { sleep } from "./async"; +import { limitedCalls } from "./limit"; + +describe("limit", function () { + // Describe the required Test: + + describe("limitedCalls", function () { + it("single-call - sync", async () => { + const f = limitedCalls(sleep, { + maxParallel: 0, + }); + const start = Date.now(); + const promises = [f(100), f(100)]; + await Promise.all(promises); + const end = Date.now(); + if (end - start < 120) { + throw Error("Failed to call sync"); + } + }); + it("single-call - parallel", async () => { + const f = limitedCalls(sleep, { + maxParallel: 2, + }); + const start = Date.now(); + const promises = [f(100), f(100)]; + await Promise.all(promises); + const end = Date.now(); + if (end - start > 120) { + throw Error("Failed to call parallel"); + } + }); + it("single-call - between (sync)", async () => { + const f = limitedCalls(async (...args) => {}, { + maxParallel: 0, + callbackBetween: () => sleep(50), + }); + const start = Date.now(); + const promises = [f(100), f(100)]; + await Promise.all(promises); + const end = Date.now(); + if (end - start < 50) { + throw Error("Failed to call callbackBetween"); + } + }); + it("single-call - between (parallel)", async () => { + const f = limitedCalls(async (...args) => {}, { + maxParallel: 10, + callbackBetween: () => sleep(50), + }); + const start = Date.now(); + const promises = [f(100), f(100)]; + await Promise.all(promises); + const end = Date.now(); + if (end - start > 50) { + throw Error("Failed to call callbackBetween"); + } + }); + }); +}); diff --git a/lib/helpers/limit.ts b/lib/helpers/limit.ts new file mode 100644 index 0000000..5534109 --- /dev/null +++ b/lib/helpers/limit.ts @@ -0,0 +1,236 @@ +/** + * @author Martin Karkowski + * @email m.karkowski@zema.de + */ + +import { EventEmitter } from "events"; +import { ILogger } from "js-logger"; +import { DEBUG } from "../index.browser"; +import { generateId } from "./idMethods"; + +/** + * The options for call + */ +export type TLimitedOptions = { + /** + * The Id to use. If not provided, an specific id is generated + */ + functionId: string; + /** + * An queue that should be used. If not provided, a queue is used. + */ + queue: { [index: string]: any[] }; + /** + * An emitter to use. + */ + emitter: EventEmitter; + /** + * Helper function to request a lock. + */ + getLock: (newTaskId: string) => boolean; + /** + * An additional function, wich can be used between the next function in is called. e.g. sleep. + */ + callbackBetween?: () => Promise; + /** + * Number of elements, which could be called in parallel. 0 = sequntial + */ + maxParallel: number; + /** + * A logger to use. + */ + logger: ILogger | false; + /** + * An overview with active Tasks. This is relevant for multiple Funtions. + */ + activeTasks: Set; +}; + +/** + * Helper to get the default options in a shared context + * @param options The options to enhance the defaults. + * @returns The options. + */ +export function getLimitedOptions( + options: Partial +): Partial { + const defaultSettings: Partial = { + queue: {}, + emitter: new EventEmitter(), + maxParallel: 0, + logger: false, + activeTasks: new Set(), + }; + + return Object.assign(defaultSettings, options); +} + +/** + * Function to limit the calls based on the settings. + * @param func The function to use. This should be an async function. + * @param options The Options. + * @returns + */ +export function limitedCalls( + func: (...args) => Promise, + options: Partial +) { + // Define the Default-Settings + const defaultSettins: TLimitedOptions = { + functionId: Date.now().toString(), + queue: {}, + emitter: new EventEmitter(), + getLock: (newTaskId: string) => { + return settingsToUse.activeTasks.size <= settingsToUse.maxParallel; + }, + maxParallel: 0, + logger: false, + activeTasks: new Set(), + }; + + const settingsToUse: TLimitedOptions = Object.assign(defaultSettins, options); + + settingsToUse.queue[settingsToUse.functionId] = []; + + const wrapped = function (...args) { + // Generate the Call-ID + const taskId = generateId(); + + // Push the Content to the emitter + settingsToUse.queue[settingsToUse.functionId].push([taskId, args]); + + // lets have an item, that contains the resolve + let resolve = null; + let reject = null; + + // Define a callback, which is called. + const cb = (error, result) => { + settingsToUse.emitter.off(taskId, cb); + + if (error) { + reject(error); + } else { + resolve(result); + } + + // Delete the Task + settingsToUse.activeTasks.delete(taskId); + + if (typeof settingsToUse.callbackBetween === "function") { + if (settingsToUse.logger && settingsToUse.logger.enabledFor(DEBUG)) { + settingsToUse.logger.debug( + `using 'callbackBetween' for taskId="${taskId}". Calling now` + ); + } + + settingsToUse + .callbackBetween() + .then((_) => { + if ( + settingsToUse.logger && + settingsToUse.logger.enabledFor(DEBUG) + ) { + settingsToUse.logger.debug( + `awaited 'callbackBetween' for taskId="${taskId}". Transmitting results now` + ); + } + + // Emit, that there is a new task available + settingsToUse.emitter.emit("data", settingsToUse.functionId); + }) + .catch((_) => { + // Log some stuff + if ( + settingsToUse.logger && + settingsToUse.logger.enabledFor(DEBUG) + ) { + settingsToUse.logger.debug( + `something went wrong with 'callbackBetween' for taskId="${taskId}". Transmitting results now!` + ); + } + + // Emit, that there is a new task available + settingsToUse.emitter.emit("data", settingsToUse.functionId); + }); + } else { + if (settingsToUse.logger && settingsToUse.logger.enabledFor(DEBUG)) { + settingsToUse.logger.debug( + `no 'callbackBetween' for taskId="${taskId}". Transmitting results now` + ); + } + + // Emit, that there is a new task available + settingsToUse.emitter.emit("data", settingsToUse.functionId); + } + }; + + settingsToUse.emitter.on(taskId, cb); + + const promise = new Promise((_resolve, _reject) => { + resolve = _resolve; + reject = _reject; + }); + + settingsToUse.emitter.emit("data", settingsToUse.functionId); + + return promise; + }; + + settingsToUse.emitter.on("data", (_functionId) => { + // If the function ids matches + if (settingsToUse.functionId == _functionId) { + // Check if there is + if (settingsToUse.queue[settingsToUse.functionId].length > 0) { + // Get the Id and the Args. + const [taskId, args] = settingsToUse.queue[settingsToUse.functionId][0]; + + if (settingsToUse.getLock(taskId)) { + // Add the Task as active. + settingsToUse.activeTasks.add(taskId); + + // Remove the items: + settingsToUse.queue[settingsToUse.functionId].splice(0, 1); + // Try to perform the call. + try { + if ( + settingsToUse.logger && + settingsToUse.logger.enabledFor(DEBUG) + ) { + settingsToUse.logger.debug( + `calling function '${_functionId}' for the task taskId="${taskId}"` + ); + } + + func(...args) + .then((result) => { + if ( + settingsToUse.logger && + settingsToUse.logger.enabledFor(DEBUG) + ) { + settingsToUse.logger.debug( + `called function '${_functionId}' for the task taskId="${taskId}"` + ); + } + settingsToUse.emitter.emit(taskId, null, result); + }) + .catch((error) => { + if ( + settingsToUse.logger && + settingsToUse.logger.enabledFor(DEBUG) + ) { + settingsToUse.logger.debug( + `called function '${_functionId}' for the task taskId="${taskId}", but resulted in an error` + ); + } + settingsToUse.emitter.emit(taskId, error, null); + }); + } catch (error) { + settingsToUse.emitter.emit(taskId, error, null); + } + } + } + } + }); + + return wrapped; +} diff --git a/lib/helpers/singletonMethod.ts b/lib/helpers/singletonMethod.ts index 3455a64..06fc03d 100644 --- a/lib/helpers/singletonMethod.ts +++ b/lib/helpers/singletonMethod.ts @@ -22,30 +22,42 @@ export function getSingleton( instance: T; setInstance: (value: T) => void; } { + if (!global["nope"]) { + global["nope"] = { + singletons: {}, + }; + } + + if (!global["nope"]["singletons"]) { + global["nope"]["singletons"] = {}; + } + // Extract all - const globalSymbols = Object.getOwnPropertySymbols(global); + const globalSingletons = Object.getOwnPropertyNames( + global["nope"]["singletons"] + ); // create a unique, global symbol name // ----------------------------------- - const IDENTIFIER_DISPATCHER_CONTAINER = Symbol.for(identifier); + const IDENTIFIER_DISPATCHER_CONTAINER = identifier; // check if the global object has this symbol // add it if it does not have the symbol, yet // ------------------------------------------ const hasContainer = - globalSymbols.indexOf(IDENTIFIER_DISPATCHER_CONTAINER) > -1; + globalSingletons.indexOf(IDENTIFIER_DISPATCHER_CONTAINER) > -1; if (!hasContainer) { - global[IDENTIFIER_DISPATCHER_CONTAINER] = create(); + global["nope"]["singletons"][IDENTIFIER_DISPATCHER_CONTAINER] = create(); } const ret: { instance: T; setInstance: (value: T) => void; } = { - instance: global[IDENTIFIER_DISPATCHER_CONTAINER], + instance: global["nope"]["singletons"][IDENTIFIER_DISPATCHER_CONTAINER], setInstance: (value: T) => { - global[IDENTIFIER_DISPATCHER_CONTAINER] = value; + global["nope"]["singletons"][IDENTIFIER_DISPATCHER_CONTAINER] = value; }, }; @@ -53,7 +65,7 @@ export function getSingleton( // ------------------------ Object.defineProperty(ret, "instance", { get: function () { - return global[IDENTIFIER_DISPATCHER_CONTAINER]; + return global["nope"]["singletons"][IDENTIFIER_DISPATCHER_CONTAINER]; }, }); diff --git a/package.json b/package.json index 1c96737..aedf612 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "nope", - "version": "1.0.28", + "version": "1.0.31", "description": "NoPE Runtime for Nodejs. For Browser-Support please use nope-browser", "files": [ "dist-nodejs/**/*",