nope/lib/helpers/taskQueue.ts

156 lines
3.7 KiB
TypeScript

import { NopePromise } from "../promise";
import { isAsyncFunction } from "./async";
import { PriorityList } from "./lists";
/**
* A Task-Queue. This could be used to make parallel
* Request run sequentially. For Instance during
* Saving and Reading Vars to achive a consistent set
* of Data.
*
* Usage:
* // Create a Queue
* const _queue = new PriorityTaskQueue();
* // Create a Function
* const _func = (_input: string, _cb) => {
* console.log("Hallo ", _input)
* _cb(null, null);
* };
*
* await _queue.execute(_func, 'Welt');
*
* @export
* @class TaskQeue
*/
export class ParallelPriorityTaskQueue {
protected _queue = new PriorityList<{
func: (...args) => void;
cancel: () => void;
args: any;
resolve: (data) => void;
reject: (err) => void;
}>();
protected _runningTasks = 0;
protected _counter = 0;
public maxParallel = 1;
public usePriority = true;
/**
* Executes the given Task. If now Task is running it is executed immediatelly,
* otherwise it is pushed in the queue and call if the other tasks are call.
*
* @param {any} _func The Function which should be called.
* @param {any} _param The Data which should be used for the call.
* @param {any} _callback The Callback, which should be called after
* @memberof TaskQeue
*/
public execute<T>(
func: (...args) => T | Promise<T>,
args: any[],
priority: number = 0,
cancel: () => void = () => {}
): NopePromise<T> {
let resolve, reject;
const promise = new NopePromise<T>((res, rej) => {
resolve = res;
reject = rej;
});
promise.cancel = cancel;
// Check whether the Execution is activ:
if (
this._runningTasks < this.maxParallel &&
this._queue.length < this.maxParallel
) {
this._runningTasks++;
this._execute({
func,
args,
cancel,
resolve,
reject,
});
} else {
// Extend the Queue.
this._queue.push(this.usePriority ? priority : this._counter++, {
func,
args,
cancel,
resolve,
reject,
});
}
return promise;
}
protected _execute(data: {
func: (...args) => any | Promise<any>;
args: any[];
cancel: () => void;
resolve: (data) => void;
reject: (err) => void;
}) {
// Verify whether there is an CancelHandler, if yes.
// Register at the Cancel-Handler. Thereby the next
// function is call if the currently running Task is
// aborted.
if (data.cancel) {
data.args.push(() => {
data.cancel();
data.reject(Error("Canceled"));
this._finish();
});
}
if (isAsyncFunction(data.func)) {
(data.func as (...args: any[]) => Promise<any>)(...data.args)
.then((res) => {
data.resolve(res);
this._finish();
})
.catch((err) => {
data.reject(err);
this._finish();
});
} else {
try {
const res = (data.func as (...args: any[]) => any)(...data.args);
data.resolve(res);
} catch (err) {
data.reject(err);
}
this._finish();
}
}
/**
* Internal Function to Finish all Tasks.
*
* @protected
* @memberof PriorityTaskQueue
*/
protected _finish() {
// Remove one Element.
this._runningTasks--;
if (this._runningTasks < 0) {
this._runningTasks = 0;
}
// Remove the First Task
const task = this._queue.highest();
// Call the Function with the adapted Callback, if there is a Task Left open.
if (task) {
this._execute(task);
}
}
public get length(): number {
return this._queue.length;
}
}