nope/lib/helpers/limit.ts
Martin Karkowski daaa0fd51a # 1.0.29
- Added:
  - helpers.limit: limitedCalls -> Functinality to limit parallel calls.

# 1.0.30
- Added:
  - helpers.limit: getLimitedOptions -> Helper to get the correspondings options
  - helpers.limit.spec: Adding test cases

# 1.0.31
- Modified:
  - helpers.singleton: Prevent using symbols, to make global version work with local version.
2022-04-06 20:50:54 +02:00

237 lines
6.7 KiB
TypeScript

/**
* @author Martin Karkowski
* @email m.karkowski@zema.de
*/
import { EventEmitter } from "events";
import { ILogger } from "js-logger";
import { DEBUG } from "../index.browser";
import { generateId } from "./idMethods";
/**
* The options for call
*/
export type TLimitedOptions = {
/**
* The Id to use. If not provided, an specific id is generated
*/
functionId: string;
/**
* An queue that should be used. If not provided, a queue is used.
*/
queue: { [index: string]: any[] };
/**
* An emitter to use.
*/
emitter: EventEmitter;
/**
* Helper function to request a lock.
*/
getLock: (newTaskId: string) => boolean;
/**
* An additional function, wich can be used between the next function in is called. e.g. sleep.
*/
callbackBetween?: () => Promise<void>;
/**
* Number of elements, which could be called in parallel. 0 = sequntial
*/
maxParallel: number;
/**
* A logger to use.
*/
logger: ILogger | false;
/**
* An overview with active Tasks. This is relevant for multiple Funtions.
*/
activeTasks: Set<string>;
};
/**
* Helper to get the default options in a shared context
* @param options The options to enhance the defaults.
* @returns The options.
*/
export function getLimitedOptions(
options: Partial<TLimitedOptions>
): Partial<TLimitedOptions> {
const defaultSettings: Partial<TLimitedOptions> = {
queue: {},
emitter: new EventEmitter(),
maxParallel: 0,
logger: false,
activeTasks: new Set(),
};
return Object.assign(defaultSettings, options);
}
/**
* Function to limit the calls based on the settings.
* @param func The function to use. This should be an async function.
* @param options The Options.
* @returns
*/
export function limitedCalls<T>(
func: (...args) => Promise<T>,
options: Partial<TLimitedOptions>
) {
// Define the Default-Settings
const defaultSettins: TLimitedOptions = {
functionId: Date.now().toString(),
queue: {},
emitter: new EventEmitter(),
getLock: (newTaskId: string) => {
return settingsToUse.activeTasks.size <= settingsToUse.maxParallel;
},
maxParallel: 0,
logger: false,
activeTasks: new Set(),
};
const settingsToUse: TLimitedOptions = Object.assign(defaultSettins, options);
settingsToUse.queue[settingsToUse.functionId] = [];
const wrapped = function (...args) {
// Generate the Call-ID
const taskId = generateId();
// Push the Content to the emitter
settingsToUse.queue[settingsToUse.functionId].push([taskId, args]);
// lets have an item, that contains the resolve
let resolve = null;
let reject = null;
// Define a callback, which is called.
const cb = (error, result) => {
settingsToUse.emitter.off(taskId, cb);
if (error) {
reject(error);
} else {
resolve(result);
}
// Delete the Task
settingsToUse.activeTasks.delete(taskId);
if (typeof settingsToUse.callbackBetween === "function") {
if (settingsToUse.logger && settingsToUse.logger.enabledFor(DEBUG)) {
settingsToUse.logger.debug(
`using 'callbackBetween' for taskId="${taskId}". Calling now`
);
}
settingsToUse
.callbackBetween()
.then((_) => {
if (
settingsToUse.logger &&
settingsToUse.logger.enabledFor(DEBUG)
) {
settingsToUse.logger.debug(
`awaited 'callbackBetween' for taskId="${taskId}". Transmitting results now`
);
}
// Emit, that there is a new task available
settingsToUse.emitter.emit("data", settingsToUse.functionId);
})
.catch((_) => {
// Log some stuff
if (
settingsToUse.logger &&
settingsToUse.logger.enabledFor(DEBUG)
) {
settingsToUse.logger.debug(
`something went wrong with 'callbackBetween' for taskId="${taskId}". Transmitting results now!`
);
}
// Emit, that there is a new task available
settingsToUse.emitter.emit("data", settingsToUse.functionId);
});
} else {
if (settingsToUse.logger && settingsToUse.logger.enabledFor(DEBUG)) {
settingsToUse.logger.debug(
`no 'callbackBetween' for taskId="${taskId}". Transmitting results now`
);
}
// Emit, that there is a new task available
settingsToUse.emitter.emit("data", settingsToUse.functionId);
}
};
settingsToUse.emitter.on(taskId, cb);
const promise = new Promise<T>((_resolve, _reject) => {
resolve = _resolve;
reject = _reject;
});
settingsToUse.emitter.emit("data", settingsToUse.functionId);
return promise;
};
settingsToUse.emitter.on("data", (_functionId) => {
// If the function ids matches
if (settingsToUse.functionId == _functionId) {
// Check if there is
if (settingsToUse.queue[settingsToUse.functionId].length > 0) {
// Get the Id and the Args.
const [taskId, args] = settingsToUse.queue[settingsToUse.functionId][0];
if (settingsToUse.getLock(taskId)) {
// Add the Task as active.
settingsToUse.activeTasks.add(taskId);
// Remove the items:
settingsToUse.queue[settingsToUse.functionId].splice(0, 1);
// Try to perform the call.
try {
if (
settingsToUse.logger &&
settingsToUse.logger.enabledFor(DEBUG)
) {
settingsToUse.logger.debug(
`calling function '${_functionId}' for the task taskId="${taskId}"`
);
}
func(...args)
.then((result) => {
if (
settingsToUse.logger &&
settingsToUse.logger.enabledFor(DEBUG)
) {
settingsToUse.logger.debug(
`called function '${_functionId}' for the task taskId="${taskId}"`
);
}
settingsToUse.emitter.emit(taskId, null, result);
})
.catch((error) => {
if (
settingsToUse.logger &&
settingsToUse.logger.enabledFor(DEBUG)
) {
settingsToUse.logger.debug(
`called function '${_functionId}' for the task taskId="${taskId}", but resulted in an error`
);
}
settingsToUse.emitter.emit(taskId, error, null);
});
} catch (error) {
settingsToUse.emitter.emit(taskId, error, null);
}
}
}
}
});
return wrapped;
}