/** * @author Martin Karkowski * @email m.karkowski@zema.de */ import { EventEmitter } from "events"; import { ILogger } from "js-logger"; import { DEBUG } from "../index.browser"; import { generateId } from "./idMethods"; /** * The options for call */ export type TLimitedOptions = { /** * The Id to use. If not provided, an specific id is generated */ functionId: string; /** * An queue that should be used. If not provided, a queue is used. */ queue: { [index: string]: any[] }; /** * An emitter to use. */ emitter: EventEmitter; /** * Helper function to request a lock. */ getLock: (newTaskId: string) => boolean; /** * An additional function, wich can be used between the next function in is called. e.g. sleep. */ callbackBetween?: () => Promise; /** * Number of elements, which could be called in parallel. 0 = sequntial */ maxParallel: number; /** * A logger to use. */ logger: ILogger | false; /** * An overview with active Tasks. This is relevant for multiple Funtions. */ activeTasks: Set; }; /** * Helper to get the default options in a shared context * @param options The options to enhance the defaults. * @returns The options. */ export function getLimitedOptions( options: Partial ): Partial { const defaultSettings: Partial = { queue: {}, emitter: new EventEmitter(), maxParallel: 0, logger: false, activeTasks: new Set(), }; return Object.assign(defaultSettings, options); } /** * Function to limit the calls based on the settings. * @param func The function to use. This should be an async function. * @param options The Options. * @returns */ export function limitedCalls( func: (...args) => Promise, options: Partial ) { // Define the Default-Settings const defaultSettins: TLimitedOptions = { functionId: Date.now().toString(), queue: {}, emitter: new EventEmitter(), getLock: (newTaskId: string) => { return settingsToUse.activeTasks.size <= settingsToUse.maxParallel; }, maxParallel: 0, logger: false, activeTasks: new Set(), }; const settingsToUse: TLimitedOptions = Object.assign(defaultSettins, options); settingsToUse.queue[settingsToUse.functionId] = []; const wrapped = function (...args) { // Generate the Call-ID const taskId = generateId(); // Push the Content to the emitter settingsToUse.queue[settingsToUse.functionId].push([taskId, args]); // lets have an item, that contains the resolve let resolve = null; let reject = null; // Define a callback, which is called. const cb = (error, result) => { settingsToUse.emitter.off(taskId, cb); if (error) { reject(error); } else { resolve(result); } // Delete the Task settingsToUse.activeTasks.delete(taskId); if (typeof settingsToUse.callbackBetween === "function") { if (settingsToUse.logger && settingsToUse.logger.enabledFor(DEBUG)) { settingsToUse.logger.debug( `using 'callbackBetween' for taskId="${taskId}". Calling now` ); } settingsToUse .callbackBetween() .then((_) => { if ( settingsToUse.logger && settingsToUse.logger.enabledFor(DEBUG) ) { settingsToUse.logger.debug( `awaited 'callbackBetween' for taskId="${taskId}". Transmitting results now` ); } // Emit, that there is a new task available settingsToUse.emitter.emit("data", settingsToUse.functionId); }) .catch((_) => { // Log some stuff if ( settingsToUse.logger && settingsToUse.logger.enabledFor(DEBUG) ) { settingsToUse.logger.debug( `something went wrong with 'callbackBetween' for taskId="${taskId}". Transmitting results now!` ); } // Emit, that there is a new task available settingsToUse.emitter.emit("data", settingsToUse.functionId); }); } else { if (settingsToUse.logger && settingsToUse.logger.enabledFor(DEBUG)) { settingsToUse.logger.debug( `no 'callbackBetween' for taskId="${taskId}". Transmitting results now` ); } // Emit, that there is a new task available settingsToUse.emitter.emit("data", settingsToUse.functionId); } }; settingsToUse.emitter.on(taskId, cb); const promise = new Promise((_resolve, _reject) => { resolve = _resolve; reject = _reject; }); settingsToUse.emitter.emit("data", settingsToUse.functionId); return promise; }; settingsToUse.emitter.on("data", (_functionId) => { // If the function ids matches if (settingsToUse.functionId == _functionId) { // Check if there is if (settingsToUse.queue[settingsToUse.functionId].length > 0) { // Get the Id and the Args. const [taskId, args] = settingsToUse.queue[settingsToUse.functionId][0]; if (settingsToUse.getLock(taskId)) { // Add the Task as active. settingsToUse.activeTasks.add(taskId); // Remove the items: settingsToUse.queue[settingsToUse.functionId].splice(0, 1); // Try to perform the call. try { if ( settingsToUse.logger && settingsToUse.logger.enabledFor(DEBUG) ) { settingsToUse.logger.debug( `calling function '${_functionId}' for the task taskId="${taskId}"` ); } func(...args) .then((result) => { if ( settingsToUse.logger && settingsToUse.logger.enabledFor(DEBUG) ) { settingsToUse.logger.debug( `called function '${_functionId}' for the task taskId="${taskId}"` ); } settingsToUse.emitter.emit(taskId, null, result); }) .catch((error) => { if ( settingsToUse.logger && settingsToUse.logger.enabledFor(DEBUG) ) { settingsToUse.logger.debug( `called function '${_functionId}' for the task taskId="${taskId}", but resulted in an error` ); } settingsToUse.emitter.emit(taskId, error, null); }); } catch (error) { settingsToUse.emitter.emit(taskId, error, null); } } } } }); return wrapped; }