- Added:
  - helpers.limit: limitedCalls -> Functinality to limit parallel calls.

# 1.0.30
- Added:
  - helpers.limit: getLimitedOptions -> Helper to get the correspondings options
  - helpers.limit.spec: Adding test cases

# 1.0.31
- Modified:
  - helpers.singleton: Prevent using symbols, to make global version work with local version.
This commit is contained in:
Martin Karkowski 2022-04-06 20:50:54 +02:00
parent 44dcfee9a5
commit daaa0fd51a
7 changed files with 340 additions and 10 deletions

View File

@ -28,3 +28,16 @@ Inital commit, which is working with the browser
- Fixes: - Fixes:
- communication.layer.events: -> fixing receivingOwnMessages. - communication.layer.events: -> fixing receivingOwnMessages.
- runNopeBackend -> if io-server is used, no configuration file is loaded - runNopeBackend -> if io-server is used, no configuration file is loaded
# 1.0.29
- Added:
- helpers.limit: limitedCalls -> Functinality to limit parallel calls.
# 1.0.30
- Added:
- helpers.limit: getLimitedOptions -> Helper to get the correspondings options
- helpers.limit.spec: Adding test cases
# 1.0.31
- Modified:
- helpers.singleton: Prevent using symbols, to make global version work with local version.

View File

@ -1 +1 @@
1.0.28 1.0.31

View File

@ -11,6 +11,7 @@ import * as ids from "./idMethods";
import * as json from "./jsonMethods"; import * as json from "./jsonMethods";
import * as schema from "./jsonSchemaMethods"; import * as schema from "./jsonSchemaMethods";
import * as lazy from "./lazyMethods"; import * as lazy from "./lazyMethods";
import * as limit from "./limit";
import * as objects from "./objectMethods"; import * as objects from "./objectMethods";
import * as pathes from "./pathMatchingMethods"; import * as pathes from "./pathMatchingMethods";
import * as runtime from "./runtimeMethods"; import * as runtime from "./runtimeMethods";
@ -26,6 +27,7 @@ export * from "./idMethods";
export * from "./jsonMethods"; export * from "./jsonMethods";
export * from "./jsonSchemaMethods"; export * from "./jsonSchemaMethods";
export * from "./lazyMethods"; export * from "./lazyMethods";
export * from "./limit";
export * from "./objectMethods"; export * from "./objectMethods";
export * from "./pathMatchingMethods"; export * from "./pathMatchingMethods";
export * from "./runtimeMethods"; export * from "./runtimeMethods";
@ -47,4 +49,5 @@ export {
runtime, runtime,
subject, subject,
descriptors, descriptors,
limit as lock,
}; };

66
lib/helpers/limit.spec.ts Normal file
View File

@ -0,0 +1,66 @@
/**
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @desc [description]
*/
import { describe, it } from "mocha";
import { sleep } from "./async";
import { limitedCalls } from "./limit";
describe("limit", function () {
// Describe the required Test:
describe("limitedCalls", function () {
it("single-call - sync", async () => {
const f = limitedCalls(sleep, {
maxParallel: 0,
});
const start = Date.now();
const promises = [f(100), f(100)];
await Promise.all(promises);
const end = Date.now();
if (end - start < 120) {
throw Error("Failed to call sync");
}
});
it("single-call - parallel", async () => {
const f = limitedCalls(sleep, {
maxParallel: 2,
});
const start = Date.now();
const promises = [f(100), f(100)];
await Promise.all(promises);
const end = Date.now();
if (end - start > 120) {
throw Error("Failed to call parallel");
}
});
it("single-call - between (sync)", async () => {
const f = limitedCalls<void>(async (...args) => {}, {
maxParallel: 0,
callbackBetween: () => sleep(50),
});
const start = Date.now();
const promises = [f(100), f(100)];
await Promise.all(promises);
const end = Date.now();
if (end - start < 50) {
throw Error("Failed to call callbackBetween");
}
});
it("single-call - between (parallel)", async () => {
const f = limitedCalls<void>(async (...args) => {}, {
maxParallel: 10,
callbackBetween: () => sleep(50),
});
const start = Date.now();
const promises = [f(100), f(100)];
await Promise.all(promises);
const end = Date.now();
if (end - start > 50) {
throw Error("Failed to call callbackBetween");
}
});
});
});

236
lib/helpers/limit.ts Normal file
View File

@ -0,0 +1,236 @@
/**
* @author Martin Karkowski
* @email m.karkowski@zema.de
*/
import { EventEmitter } from "events";
import { ILogger } from "js-logger";
import { DEBUG } from "../index.browser";
import { generateId } from "./idMethods";
/**
* The options for call
*/
export type TLimitedOptions = {
/**
* The Id to use. If not provided, an specific id is generated
*/
functionId: string;
/**
* An queue that should be used. If not provided, a queue is used.
*/
queue: { [index: string]: any[] };
/**
* An emitter to use.
*/
emitter: EventEmitter;
/**
* Helper function to request a lock.
*/
getLock: (newTaskId: string) => boolean;
/**
* An additional function, wich can be used between the next function in is called. e.g. sleep.
*/
callbackBetween?: () => Promise<void>;
/**
* Number of elements, which could be called in parallel. 0 = sequntial
*/
maxParallel: number;
/**
* A logger to use.
*/
logger: ILogger | false;
/**
* An overview with active Tasks. This is relevant for multiple Funtions.
*/
activeTasks: Set<string>;
};
/**
* Helper to get the default options in a shared context
* @param options The options to enhance the defaults.
* @returns The options.
*/
export function getLimitedOptions(
options: Partial<TLimitedOptions>
): Partial<TLimitedOptions> {
const defaultSettings: Partial<TLimitedOptions> = {
queue: {},
emitter: new EventEmitter(),
maxParallel: 0,
logger: false,
activeTasks: new Set(),
};
return Object.assign(defaultSettings, options);
}
/**
* Function to limit the calls based on the settings.
* @param func The function to use. This should be an async function.
* @param options The Options.
* @returns
*/
export function limitedCalls<T>(
func: (...args) => Promise<T>,
options: Partial<TLimitedOptions>
) {
// Define the Default-Settings
const defaultSettins: TLimitedOptions = {
functionId: Date.now().toString(),
queue: {},
emitter: new EventEmitter(),
getLock: (newTaskId: string) => {
return settingsToUse.activeTasks.size <= settingsToUse.maxParallel;
},
maxParallel: 0,
logger: false,
activeTasks: new Set(),
};
const settingsToUse: TLimitedOptions = Object.assign(defaultSettins, options);
settingsToUse.queue[settingsToUse.functionId] = [];
const wrapped = function (...args) {
// Generate the Call-ID
const taskId = generateId();
// Push the Content to the emitter
settingsToUse.queue[settingsToUse.functionId].push([taskId, args]);
// lets have an item, that contains the resolve
let resolve = null;
let reject = null;
// Define a callback, which is called.
const cb = (error, result) => {
settingsToUse.emitter.off(taskId, cb);
if (error) {
reject(error);
} else {
resolve(result);
}
// Delete the Task
settingsToUse.activeTasks.delete(taskId);
if (typeof settingsToUse.callbackBetween === "function") {
if (settingsToUse.logger && settingsToUse.logger.enabledFor(DEBUG)) {
settingsToUse.logger.debug(
`using 'callbackBetween' for taskId="${taskId}". Calling now`
);
}
settingsToUse
.callbackBetween()
.then((_) => {
if (
settingsToUse.logger &&
settingsToUse.logger.enabledFor(DEBUG)
) {
settingsToUse.logger.debug(
`awaited 'callbackBetween' for taskId="${taskId}". Transmitting results now`
);
}
// Emit, that there is a new task available
settingsToUse.emitter.emit("data", settingsToUse.functionId);
})
.catch((_) => {
// Log some stuff
if (
settingsToUse.logger &&
settingsToUse.logger.enabledFor(DEBUG)
) {
settingsToUse.logger.debug(
`something went wrong with 'callbackBetween' for taskId="${taskId}". Transmitting results now!`
);
}
// Emit, that there is a new task available
settingsToUse.emitter.emit("data", settingsToUse.functionId);
});
} else {
if (settingsToUse.logger && settingsToUse.logger.enabledFor(DEBUG)) {
settingsToUse.logger.debug(
`no 'callbackBetween' for taskId="${taskId}". Transmitting results now`
);
}
// Emit, that there is a new task available
settingsToUse.emitter.emit("data", settingsToUse.functionId);
}
};
settingsToUse.emitter.on(taskId, cb);
const promise = new Promise<T>((_resolve, _reject) => {
resolve = _resolve;
reject = _reject;
});
settingsToUse.emitter.emit("data", settingsToUse.functionId);
return promise;
};
settingsToUse.emitter.on("data", (_functionId) => {
// If the function ids matches
if (settingsToUse.functionId == _functionId) {
// Check if there is
if (settingsToUse.queue[settingsToUse.functionId].length > 0) {
// Get the Id and the Args.
const [taskId, args] = settingsToUse.queue[settingsToUse.functionId][0];
if (settingsToUse.getLock(taskId)) {
// Add the Task as active.
settingsToUse.activeTasks.add(taskId);
// Remove the items:
settingsToUse.queue[settingsToUse.functionId].splice(0, 1);
// Try to perform the call.
try {
if (
settingsToUse.logger &&
settingsToUse.logger.enabledFor(DEBUG)
) {
settingsToUse.logger.debug(
`calling function '${_functionId}' for the task taskId="${taskId}"`
);
}
func(...args)
.then((result) => {
if (
settingsToUse.logger &&
settingsToUse.logger.enabledFor(DEBUG)
) {
settingsToUse.logger.debug(
`called function '${_functionId}' for the task taskId="${taskId}"`
);
}
settingsToUse.emitter.emit(taskId, null, result);
})
.catch((error) => {
if (
settingsToUse.logger &&
settingsToUse.logger.enabledFor(DEBUG)
) {
settingsToUse.logger.debug(
`called function '${_functionId}' for the task taskId="${taskId}", but resulted in an error`
);
}
settingsToUse.emitter.emit(taskId, error, null);
});
} catch (error) {
settingsToUse.emitter.emit(taskId, error, null);
}
}
}
}
});
return wrapped;
}

View File

@ -22,30 +22,42 @@ export function getSingleton<T>(
instance: T; instance: T;
setInstance: (value: T) => void; setInstance: (value: T) => void;
} { } {
if (!global["nope"]) {
global["nope"] = {
singletons: {},
};
}
if (!global["nope"]["singletons"]) {
global["nope"]["singletons"] = {};
}
// Extract all // Extract all
const globalSymbols = Object.getOwnPropertySymbols(global); const globalSingletons = Object.getOwnPropertyNames(
global["nope"]["singletons"]
);
// create a unique, global symbol name // create a unique, global symbol name
// ----------------------------------- // -----------------------------------
const IDENTIFIER_DISPATCHER_CONTAINER = Symbol.for(identifier); const IDENTIFIER_DISPATCHER_CONTAINER = identifier;
// check if the global object has this symbol // check if the global object has this symbol
// add it if it does not have the symbol, yet // add it if it does not have the symbol, yet
// ------------------------------------------ // ------------------------------------------
const hasContainer = const hasContainer =
globalSymbols.indexOf(IDENTIFIER_DISPATCHER_CONTAINER) > -1; globalSingletons.indexOf(IDENTIFIER_DISPATCHER_CONTAINER) > -1;
if (!hasContainer) { if (!hasContainer) {
global[IDENTIFIER_DISPATCHER_CONTAINER] = create(); global["nope"]["singletons"][IDENTIFIER_DISPATCHER_CONTAINER] = create();
} }
const ret: { const ret: {
instance: T; instance: T;
setInstance: (value: T) => void; setInstance: (value: T) => void;
} = { } = {
instance: global[IDENTIFIER_DISPATCHER_CONTAINER], instance: global["nope"]["singletons"][IDENTIFIER_DISPATCHER_CONTAINER],
setInstance: (value: T) => { setInstance: (value: T) => {
global[IDENTIFIER_DISPATCHER_CONTAINER] = value; global["nope"]["singletons"][IDENTIFIER_DISPATCHER_CONTAINER] = value;
}, },
}; };
@ -53,7 +65,7 @@ export function getSingleton<T>(
// ------------------------ // ------------------------
Object.defineProperty(ret, "instance", { Object.defineProperty(ret, "instance", {
get: function () { get: function () {
return global[IDENTIFIER_DISPATCHER_CONTAINER]; return global["nope"]["singletons"][IDENTIFIER_DISPATCHER_CONTAINER];
}, },
}); });

View File

@ -1,6 +1,6 @@
{ {
"name": "nope", "name": "nope",
"version": "1.0.28", "version": "1.0.31",
"description": "NoPE Runtime for Nodejs. For Browser-Support please use nope-browser", "description": "NoPE Runtime for Nodejs. For Browser-Support please use nope-browser",
"files": [ "files": [
"dist-nodejs/**/*", "dist-nodejs/**/*",