writing rpc manager
This commit is contained in:
parent
223b72be79
commit
6980b0117b
55
lib/RpcManager/NopeRpcManager.spec.ts
Normal file
55
lib/RpcManager/NopeRpcManager.spec.ts
Normal file
@ -0,0 +1,55 @@
|
||||
|
||||
import { assert } from "chai";
|
||||
import "chai/register-should";
|
||||
import { beforeEach, describe, it } from "mocha";
|
||||
import { getLayer } from "../communication/getLayer.nodejs";
|
||||
import { NopeObservable } from "../observables/nopeObservable";
|
||||
import {
|
||||
NopeRpcManager
|
||||
} from "./NopeRpcManager";
|
||||
|
||||
describe("NopeRpcManager", function () {
|
||||
// Describe the required Test:
|
||||
let manager = new NopeRpcManager({
|
||||
communicator: getLayer("event", "", "debug"),
|
||||
},
|
||||
() => new NopeObservable()
|
||||
);
|
||||
beforeEach(() => {
|
||||
// Create a new Observer
|
||||
manager = new NopeRpcManager({
|
||||
communicator: getLayer("event", "", "debug"),
|
||||
},
|
||||
() => new NopeObservable()
|
||||
);
|
||||
});
|
||||
|
||||
|
||||
describe("serviceHandeling", function () {
|
||||
const helloWorld = async (greetings: string) => {
|
||||
return "Hello " + greetings + "!"
|
||||
}
|
||||
|
||||
it("registering service", (done) => {
|
||||
const r = manager.registerCallback(helloWorld)
|
||||
done();
|
||||
});
|
||||
|
||||
it("call service", (done) => {
|
||||
const r = manager.registerCallback(helloWorld, {
|
||||
id: "helloworld"
|
||||
})
|
||||
|
||||
manager.performCall("helloworld", ["Mocha"]).then(done).catch(done)
|
||||
});
|
||||
|
||||
it("call service via methodInterface", (done) => {
|
||||
const r = manager.registerCallback(helloWorld, {
|
||||
id: "helloworld"
|
||||
})
|
||||
|
||||
manager.methodInterface.helloworld("Mocha").then(done).catch(done)
|
||||
});
|
||||
|
||||
});
|
||||
});
|
@ -18,21 +18,21 @@ import {
|
||||
IAvailableServicesMsg,
|
||||
ICallOptions,
|
||||
ICommunicationBridge,
|
||||
IExternalEventMsg,
|
||||
IMapBasedMergeData,
|
||||
INopeDispatcher,
|
||||
INopeDispatcherOptions,
|
||||
INopeEventEmitter,
|
||||
INopeObservable,
|
||||
INopePromise,
|
||||
INopeRpcManager,
|
||||
INopeTimeOptions,
|
||||
IRequestTaskMsg,
|
||||
IResponseTaskMsg,
|
||||
ITaskCancelationMsg,
|
||||
ValidDefaultSelectors,
|
||||
ValidSelectorFunction,
|
||||
} from "../types/nope/index";
|
||||
|
||||
import {firstItemSelector} from "./defaultSelectors"
|
||||
|
||||
/**
|
||||
* A Dispatcher to perform a function on a Remote
|
||||
* Dispatcher. Therefore a Task is created and forwarded
|
||||
@ -41,7 +41,7 @@ import {
|
||||
* @export
|
||||
* @class nopeDispatcher
|
||||
*/
|
||||
export class NopeRpcManager implements INopeRpcManager {
|
||||
export class NopeRpcManager<T=any> implements INopeRpcManager {
|
||||
public readonly id: string;
|
||||
|
||||
protected _logger: ILogger;
|
||||
@ -55,9 +55,7 @@ export class NopeRpcManager implements INopeRpcManager {
|
||||
protected _registeredCallbacks: Map<
|
||||
string,
|
||||
{
|
||||
options: {
|
||||
preventSendingToRegistery: boolean;
|
||||
};
|
||||
options: T
|
||||
func: (...args) => Promise<any>;
|
||||
}
|
||||
>;
|
||||
@ -118,6 +116,8 @@ export class NopeRpcManager implements INopeRpcManager {
|
||||
|
||||
/**
|
||||
* Element showing the available services.
|
||||
* Its more or less a map, that maps the
|
||||
* services with their dispatchers.
|
||||
*
|
||||
* T = services name.
|
||||
* K = dispatcher - ids
|
||||
@ -126,7 +126,7 @@ export class NopeRpcManager implements INopeRpcManager {
|
||||
* @type {IMapBasedMergeData<string>}
|
||||
* @memberof INopeRpcManager
|
||||
*/
|
||||
public readonly services: IMapBasedMergeData<string, string>;
|
||||
public readonly services: IMapBasedMergeData<string, string, IAvailableServicesMsg>;
|
||||
|
||||
/**
|
||||
* An event Emitter, which will be called when a task is getting
|
||||
@ -152,13 +152,18 @@ export class NopeRpcManager implements INopeRpcManager {
|
||||
clear: () => void;
|
||||
serviceName: string;
|
||||
timeout?: any;
|
||||
target: string
|
||||
}
|
||||
>;
|
||||
|
||||
/**
|
||||
* List, with external tasks, that are running.
|
||||
*/
|
||||
protected _runningExternalRequestedTasks: Set<string>;
|
||||
|
||||
protected _timeouts: INopeTimeOptions;
|
||||
|
||||
/**
|
||||
* Flag to show an inital warning
|
||||
*/
|
||||
protected __warned: boolean;
|
||||
|
||||
/**
|
||||
@ -168,15 +173,16 @@ export class NopeRpcManager implements INopeRpcManager {
|
||||
* @memberof nopeDispatcher
|
||||
*/
|
||||
constructor(
|
||||
public options: INopeDispatcherOptions,
|
||||
protected _generateObservable: <T>() => INopeObservable<T>
|
||||
public options: INopeDispatcherOptions,
|
||||
protected _generateObservable: <T>() => INopeObservable<T>,
|
||||
public dispatcher: INopeDispatcher = null
|
||||
) {
|
||||
this.communicator = options.communicator;
|
||||
|
||||
this.id = generateId();
|
||||
|
||||
if (!options.logger) {
|
||||
this._logger = getNopeLogger(`dispatcher ${this.id}`, "info");
|
||||
this._logger = getNopeLogger(`rpc-manager-${this.id}`, "info");
|
||||
} else {
|
||||
this._logger = options.logger;
|
||||
}
|
||||
@ -186,7 +192,7 @@ export class NopeRpcManager implements INopeRpcManager {
|
||||
// Use the default selector.
|
||||
this.options.defaultSelector = this.options.defaultSelector
|
||||
? this.options.defaultSelector
|
||||
: "first";
|
||||
: firstItemSelector;
|
||||
|
||||
// Define A Proxy for accessing methods easier.
|
||||
const _this = this;
|
||||
@ -223,9 +229,6 @@ export class NopeRpcManager implements INopeRpcManager {
|
||||
}
|
||||
});
|
||||
}
|
||||
unregisterCallback(func: string | ((...args: any[]) => any)): Promise<void> {
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
|
||||
/**
|
||||
* Function, which will be called, if an dispatcher matches
|
||||
@ -239,21 +242,6 @@ export class NopeRpcManager implements INopeRpcManager {
|
||||
this.services.update();
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancels all Tasks of the given Dispatcher
|
||||
*
|
||||
* @author M.Karkowski
|
||||
* @param {string} dispatcher
|
||||
* @param {Error} reason
|
||||
* @memberof NopeRpcManager
|
||||
*/
|
||||
public cancelRunningTasksOfDispatcher(
|
||||
dispatcher: string,
|
||||
reason: Error
|
||||
): void {
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal Method to handle some requests.
|
||||
*
|
||||
@ -283,26 +271,6 @@ export class NopeRpcManager implements INopeRpcManager {
|
||||
const _this = this;
|
||||
|
||||
if (typeof _function === "function") {
|
||||
// Now we check, if we have to perform test, whether
|
||||
// we are allowed to execute the task:
|
||||
if (data?.selector?.functionId) {
|
||||
const allowedToExecute = await _this.performCall<boolean>(
|
||||
data.selector.functionId,
|
||||
[_this.id],
|
||||
{
|
||||
// Make shure, the selector doesnt requires a selector:
|
||||
preventSelector: true,
|
||||
// Provide the rest of the data.
|
||||
...data.selector,
|
||||
}
|
||||
);
|
||||
|
||||
// We now test if our system is allowed,
|
||||
if (!allowedToExecute) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Callbacks
|
||||
const cbs: Array<(reason) => void> = [];
|
||||
|
||||
@ -322,35 +290,6 @@ export class NopeRpcManager implements INopeRpcManager {
|
||||
// First extract the basic arguments
|
||||
data.params.map((item) => (args[item.idx] = item.data));
|
||||
|
||||
// Add the Callbacks. Therefore create a function which will
|
||||
// trigger the remote.
|
||||
data.callbacks.map(
|
||||
(options) =>
|
||||
(args[options.idx] = async (..._args) => {
|
||||
// And Create the Task and its Promise.
|
||||
const servicePromise = _this.performCall<any>(
|
||||
options.functionId,
|
||||
_args,
|
||||
options
|
||||
);
|
||||
|
||||
const cancelCallback = (reason) => {
|
||||
// The Main Task has been canceled =>
|
||||
// We are allowed to canel the Subtask as well.
|
||||
servicePromise.cancel(reason);
|
||||
};
|
||||
cbs.push(cancelCallback);
|
||||
|
||||
// Await the Result. If an Task is canceled => The Error is Thrown.
|
||||
const result = await servicePromise;
|
||||
|
||||
// Remove the Index
|
||||
cbs.splice(cbs.indexOf(cancelCallback), 1);
|
||||
|
||||
return result;
|
||||
})
|
||||
);
|
||||
|
||||
// Perform the Task it self.
|
||||
const _resultPromise = _function(...args);
|
||||
|
||||
@ -594,37 +533,68 @@ export class NopeRpcManager implements INopeRpcManager {
|
||||
* @param {Error} reason The provided Reason, why cancelation is reuqired.
|
||||
* @memberof nopeDispatcher
|
||||
*/
|
||||
public cancelRunningTasksOfService(serviceName: string, reason: Error) {
|
||||
public async cancelRunningTasksOfService(serviceName: string, reason: Error) {
|
||||
// Provide a List containing all Tasks, that has to be canceled
|
||||
const _tasksToCancel: {
|
||||
reject: (error: Error) => void;
|
||||
id: string;
|
||||
}[] = [];
|
||||
const _tasksToCancel: string[] = [];
|
||||
|
||||
// Filter all Tasks that shoud be canceled.
|
||||
for (const [id, task] of this._runningInternalRequestedTasks.entries()) {
|
||||
// Therefore compare the reuqired Service by the Task
|
||||
if (task.serviceName === serviceName) {
|
||||
// if the service matches, put it to our list.
|
||||
_tasksToCancel.push({
|
||||
id,
|
||||
reject: task.reject,
|
||||
});
|
||||
_tasksToCancel.push(id);
|
||||
}
|
||||
}
|
||||
|
||||
const promises: Promise<any>[] = []
|
||||
|
||||
if (_tasksToCancel.length > 0) {
|
||||
// First remove all Tasks.
|
||||
// Then cancel them to avoid side effects
|
||||
for (const item of _tasksToCancel) {
|
||||
this._runningInternalRequestedTasks.delete(item.id);
|
||||
}
|
||||
|
||||
// Now Reject all Tasks.
|
||||
for (const item of _tasksToCancel) {
|
||||
item.reject(reason);
|
||||
for (const id of _tasksToCancel){
|
||||
promises.push(this.cancelTask(id, reason))
|
||||
}
|
||||
}
|
||||
|
||||
await Promise.all(promises)
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Cancels all Tasks of the given Dispatcher
|
||||
*
|
||||
* @author M.Karkowski
|
||||
* @param {string} dispatcher
|
||||
* @param {Error} reason
|
||||
* @memberof NopeRpcManager
|
||||
*/
|
||||
public async cancelRunningTasksOfDispatcher(
|
||||
dispatcher: string,
|
||||
reason: Error
|
||||
): Promise<void> {
|
||||
// Provide a List containing all Tasks, that has to be canceled
|
||||
const _tasksToCancel: string[] = [];
|
||||
|
||||
// Filter all Tasks that shoud be canceled.
|
||||
for (const [id, task] of this._runningInternalRequestedTasks.entries()) {
|
||||
// Therefore compare the reuqired Service by the Task
|
||||
if (task.target === dispatcher) {
|
||||
// if the service matches, put it to our list.
|
||||
_tasksToCancel.push(id);
|
||||
}
|
||||
}
|
||||
|
||||
const promises: Promise<any>[] = []
|
||||
|
||||
if (_tasksToCancel.length > 0) {
|
||||
// First remove all Tasks.
|
||||
// Then cancel them to avoid side effects
|
||||
for (const id of _tasksToCancel){
|
||||
promises.push(this.cancelTask(id, reason))
|
||||
}
|
||||
}
|
||||
|
||||
await Promise.all(promises)
|
||||
}
|
||||
|
||||
/**
|
||||
@ -742,13 +712,9 @@ export class NopeRpcManager implements INopeRpcManager {
|
||||
public registerCallback(
|
||||
func: (...args) => Promise<any>,
|
||||
options: {
|
||||
/** Flag to enable unregistering the function after calling. */
|
||||
deleteAfterCalling?: boolean;
|
||||
/** Instead of generating a uuid an id could be provided */
|
||||
id?: string;
|
||||
/** Flag to enable / disable sending to registery */
|
||||
preventSendingToRegistery?: boolean;
|
||||
} = {}
|
||||
} & Partial<T>= {}
|
||||
): (...args) => Promise<any> {
|
||||
const _this = this;
|
||||
// Define / Use the ID of the Function.
|
||||
@ -763,46 +729,28 @@ export class NopeRpcManager implements INopeRpcManager {
|
||||
this.__warned = true;
|
||||
}
|
||||
|
||||
if (options.deleteAfterCalling) {
|
||||
_func = async (...args) => {
|
||||
// Unregister the Method
|
||||
_this.unregisterCallback(_id, {
|
||||
preventSendingToRegistery: options.preventSendingToRegistery,
|
||||
});
|
||||
// Return the Result of the Original Function.
|
||||
return func(...args);
|
||||
};
|
||||
}
|
||||
|
||||
// Define a ID for the Function
|
||||
_func.id = _id;
|
||||
(_func as any).id = _id;
|
||||
|
||||
// Define the callback.
|
||||
_func.unregister = () => _this.unregisterCallback(_id);
|
||||
(_func as any).unregister = () => _this.unregisterCallback(_id);
|
||||
|
||||
// Reister the Function
|
||||
this._registeredCallbacks.set(_func.id, {
|
||||
options: {
|
||||
preventSendingToRegistery: options.preventSendingToRegistery || false,
|
||||
},
|
||||
this._registeredCallbacks.set((_func as any).id, {
|
||||
options: options as T,
|
||||
func: _func,
|
||||
});
|
||||
|
||||
// Register the Callback:
|
||||
this._listenForRequest(_id, _func);
|
||||
|
||||
if (!options.preventSendingToRegistery) {
|
||||
this._updateAmountOf("services");
|
||||
// Publish the Available Services.
|
||||
this._sendAvailableServices();
|
||||
|
||||
// Publish the Available Services.
|
||||
this._sendAvailableServices();
|
||||
|
||||
if (this._logger?.enabledFor((Logger as any).DEBUG)) {
|
||||
// If there is a Logger:
|
||||
this._logger.debug(`Dispatcher "${this.id}" registered: "${_id}"`);
|
||||
}
|
||||
if (this._logger?.enabledFor((Logger as any).DEBUG)) {
|
||||
// If there is a Logger:
|
||||
this._logger.debug(`Dispatcher "${this.id}" registered: "${_id}"`);
|
||||
}
|
||||
|
||||
// Return the Function.
|
||||
return _func;
|
||||
}
|
||||
@ -813,8 +761,8 @@ export class NopeRpcManager implements INopeRpcManager {
|
||||
* @return {*} {boolean} Flag, whether the element was removed (only if found) or not.
|
||||
* @memberof nopeDispatcher
|
||||
*/
|
||||
public _unregisterCallback(func: ((...args) => void) | string): boolean {
|
||||
const _id = typeof func === "string" ? func : (func.id as string) || "0";
|
||||
public async unregisterCallback(func: ((...args) => void) | string): Promise<boolean> {
|
||||
const _id = typeof func === "string" ? func : ((func as any).id as string) || "0";
|
||||
|
||||
this._removeRpcSubscription(_id);
|
||||
|
||||
@ -874,14 +822,11 @@ export class NopeRpcManager implements INopeRpcManager {
|
||||
serviceName: string,
|
||||
params: any[],
|
||||
options: Partial<ICallOptions> & {
|
||||
selector?: ValidDefaultSelectors | ValidSelectorFunction;
|
||||
quite?: boolean;
|
||||
preventSelector?: boolean;
|
||||
selector?: ValidSelectorFunction;
|
||||
} = {}
|
||||
): INopePromise<T> {
|
||||
// Get a Call Id
|
||||
const _taskId = generateId();
|
||||
const _registeredIdx: Array<string> = [];
|
||||
const _this = this;
|
||||
|
||||
const _options = {
|
||||
@ -896,9 +841,6 @@ export class NopeRpcManager implements INopeRpcManager {
|
||||
this._listenForResult(serviceName, _options.dynamicCallback);
|
||||
|
||||
const clear = () => {
|
||||
// Delete all Callbacks.
|
||||
_registeredIdx.map((id) => _this.unregisterCallback(id));
|
||||
|
||||
// Remove the task:
|
||||
if (_this._runningInternalRequestedTasks.has(_taskId)) {
|
||||
const task = _this._runningInternalRequestedTasks.get(_taskId);
|
||||
@ -926,84 +868,16 @@ export class NopeRpcManager implements INopeRpcManager {
|
||||
// Define a Callback-Function, which will expect the Task.
|
||||
const ret = new NopePromise<T>(async (resolve, reject) => {
|
||||
try {
|
||||
const requestedTask: any = {
|
||||
resolve,
|
||||
reject,
|
||||
clear,
|
||||
serviceName,
|
||||
timeout: null,
|
||||
};
|
||||
|
||||
// Register the Handlers,
|
||||
_this._runningInternalRequestedTasks.set(_taskId, requestedTask);
|
||||
|
||||
// Define a Task-Request
|
||||
const taskRequest: IRequestTaskMsg = {
|
||||
functionId: serviceName,
|
||||
params: [],
|
||||
callbacks: [],
|
||||
taskId: _taskId,
|
||||
type: "requestOfTask",
|
||||
resultSink: _options.resultSink,
|
||||
};
|
||||
|
||||
// Test if there is no Callback integrated
|
||||
if (!options.paramsHasNoCallback) {
|
||||
// If so, the parameters has to be detailled:
|
||||
|
||||
// Iterate over all Parameters and
|
||||
// Determin Callbacks. Based on the Parameter-
|
||||
// Type assign it either to packet.params (
|
||||
// for parsable Parameters) and packet.callbacks
|
||||
// (for callback Parameters)
|
||||
for (const [idx, contentOfParameter] of params.entries()) {
|
||||
// Test if the parameter is a Function
|
||||
if (typeof contentOfParameter !== "function") {
|
||||
taskRequest.params.push({
|
||||
idx,
|
||||
data: contentOfParameter,
|
||||
});
|
||||
} else {
|
||||
// The Parameter is a Callback => store a
|
||||
// Description of the Callback and register
|
||||
// the callback inside of the Dispatcher
|
||||
|
||||
const deleteAfterCalling =
|
||||
_options.deletableCallbacks.includes(idx);
|
||||
const _func = _this.registerCallback(contentOfParameter, {
|
||||
deleteAfterCalling,
|
||||
preventSendingToRegistery: true,
|
||||
});
|
||||
|
||||
_registeredIdx.push(_func.id);
|
||||
|
||||
// Register the Callback
|
||||
taskRequest.callbacks.push({
|
||||
functionId: _func.id,
|
||||
idx,
|
||||
deleteAfterCalling,
|
||||
dynamicCallback: true,
|
||||
deletableCallbacks: [],
|
||||
resultSink: _this._getServiceName(_func.id, "response"),
|
||||
});
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for (const [idx, contentOfParameter] of params.entries()) {
|
||||
taskRequest.params.push({
|
||||
idx,
|
||||
data: contentOfParameter,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (!_options.dynamicCallback && !_this.serviceExists(serviceName)) {
|
||||
// Test if the service exists,
|
||||
// if not .... we have to throw
|
||||
// an error.
|
||||
if (!_this.serviceExists(serviceName)) {
|
||||
// Create an Error:
|
||||
const error = new Error(
|
||||
`No Service Provider known for "${serviceName}"`
|
||||
);
|
||||
|
||||
if (!options.quite && _this._logger) {
|
||||
if (_this._logger) {
|
||||
_this._logger.error(
|
||||
`No Service Provider known for "${serviceName}"`
|
||||
);
|
||||
@ -1013,50 +887,54 @@ export class NopeRpcManager implements INopeRpcManager {
|
||||
throw error;
|
||||
}
|
||||
|
||||
if (
|
||||
!options.preventSelector &&
|
||||
(_this.options.forceUsingSelectors ||
|
||||
_this._amountOfServicesProvidedByName.get(serviceName)) > 1
|
||||
) {
|
||||
if (typeof options?.selector === "function") {
|
||||
const selector = await this._generateSelector(
|
||||
serviceName,
|
||||
"service",
|
||||
options.selector
|
||||
);
|
||||
// Assign the Selector Promise
|
||||
selector.selectionSucessPromise.catch((_) => {
|
||||
_this.cancelTask(
|
||||
_taskId,
|
||||
new Error(
|
||||
"No dispatcher has been selected for executing the task!"
|
||||
),
|
||||
false
|
||||
);
|
||||
});
|
||||
// Assign the Selector:
|
||||
taskRequest.selector = selector.selectorFuncOptions;
|
||||
} else {
|
||||
const selector = await this._generateAndRegisterSelectorFunction(
|
||||
serviceName,
|
||||
"service",
|
||||
typeof options?.selector === "string"
|
||||
? options.selector
|
||||
: this.options.defaultSelector
|
||||
);
|
||||
// Assign the Selector Promise
|
||||
selector.selectionSucessPromise.catch((_) => {
|
||||
_this.cancelTask(
|
||||
_taskId,
|
||||
new Error(
|
||||
"No dispatcher has been selected for executing the task!"
|
||||
),
|
||||
false
|
||||
);
|
||||
});
|
||||
// Assign the Selector:
|
||||
taskRequest.selector = selector.selectorFuncOptions;
|
||||
}
|
||||
// Define a Task-Request
|
||||
const taskRequest: IRequestTaskMsg = {
|
||||
functionId: serviceName,
|
||||
params: [],
|
||||
callbacks: [],
|
||||
taskId: _taskId,
|
||||
type: "requestOfTask",
|
||||
resultSink: _options.resultSink,
|
||||
targetDispatcher: ""
|
||||
};
|
||||
|
||||
if (typeof options.selector === "undefined"){
|
||||
options.selector = this.options.defaultSelector
|
||||
}
|
||||
|
||||
if (typeof options.selector === "function") {
|
||||
// Define the Target. The Target-Dispatcher is returned by the
|
||||
// selector function. This target will be evaluated in the
|
||||
// during handeling.
|
||||
taskRequest.targetDispatcher = await options.selector({
|
||||
rpcManager: this,
|
||||
serviceName
|
||||
});
|
||||
} else {
|
||||
throw Error("A Selector must be provided.")
|
||||
}
|
||||
|
||||
const requestedTask: any = {
|
||||
resolve,
|
||||
reject,
|
||||
clear,
|
||||
// Name of the Service
|
||||
serviceName,
|
||||
// Target of the Task
|
||||
target: taskRequest.targetDispatcher,
|
||||
// Additional Timeout
|
||||
timeout: null,
|
||||
};
|
||||
|
||||
// Register the Handlers,
|
||||
_this._runningInternalRequestedTasks.set(_taskId, requestedTask);
|
||||
|
||||
// Assign the Parameters
|
||||
for (const [idx, contentOfParameter] of params.entries()) {
|
||||
taskRequest.params.push({
|
||||
idx,
|
||||
data: contentOfParameter,
|
||||
});
|
||||
}
|
||||
|
||||
// Send the Message to the specific element:
|
||||
@ -1084,7 +962,7 @@ export class NopeRpcManager implements INopeRpcManager {
|
||||
new Error(
|
||||
`TIMEOUT. The Service allowed execution time of ${options.timeout.toString()}[ms] has been excided`
|
||||
),
|
||||
options.quite || false
|
||||
false
|
||||
);
|
||||
}, options.timeout);
|
||||
}
|
||||
|
145
lib/RpcManager/defaultSelectors.ts
Normal file
145
lib/RpcManager/defaultSelectors.ts
Normal file
@ -0,0 +1,145 @@
|
||||
import { ValidSelectorFunction } from "../types/nope";
|
||||
|
||||
/**
|
||||
* Helper, which will allways select the first item.
|
||||
* @param options must contain the serviceName and rpcManager.
|
||||
* @returns
|
||||
*/
|
||||
export const firstItemSelector: ValidSelectorFunction = ({serviceName, rpcManager}) => {
|
||||
return rpcManager.services.originalData.get(serviceName)[0]
|
||||
}
|
||||
|
||||
|
||||
// export function _generateSelectorFunction(selector: ValidDefaultSelectors) {
|
||||
// const _this = this;
|
||||
// let helperData: any;
|
||||
|
||||
// switch (selector) {
|
||||
// default:
|
||||
// case "first":
|
||||
// return async (dispatcher: string) => {
|
||||
// return true;
|
||||
// };
|
||||
// case "dispatcher":
|
||||
// // Our selector compares the dispatcher - id
|
||||
// return async (externalDispatcher, options) => {
|
||||
// return externalDispatcher == _this.id;
|
||||
// };
|
||||
// case "host":
|
||||
// // Our selector compares the host-name:
|
||||
// // 1. Get the current Host name of our dispatcher
|
||||
// helperData = this._genAliveMessage().host.name;
|
||||
// return async (externalDispatcher) => {
|
||||
// const host = _this._externalDispatchers.get(externalDispatcher);
|
||||
// return host?.host?.name == helperData;
|
||||
// };
|
||||
// case "cpu-usage":
|
||||
// helperData = {
|
||||
// arr: [],
|
||||
// timeout: null,
|
||||
// done: false,
|
||||
// finish: () => {
|
||||
// if (!helperData.done) {
|
||||
// // Now we find the Min CPU usage:
|
||||
// const bestOption = minOfArray(helperData.arr, "cpuUsage").index;
|
||||
|
||||
// for (const [index, { resolve }] of (
|
||||
// helperData.arr as any[]
|
||||
// ).entries()) {
|
||||
// resolve(index === bestOption);
|
||||
// }
|
||||
|
||||
// helperData.done = true;
|
||||
// }
|
||||
|
||||
// if (helperData.timeout !== null) {
|
||||
// clearTimeout(helperData.timeout);
|
||||
// helperData.timeout = null;
|
||||
// }
|
||||
// },
|
||||
// };
|
||||
|
||||
// return async (externalDispatcherId, options) => {
|
||||
// if (options.initalTest) {
|
||||
// return true;
|
||||
// }
|
||||
|
||||
// let cpuUsage =
|
||||
// _this._externalDispatchers.get(externalDispatcherId).host.cpu.usage;
|
||||
// cpuUsage = cpuUsage === -1 ? Infinity : cpuUsage;
|
||||
|
||||
// // For every Call, we
|
||||
// const promise = new Promise<boolean>((resolve) =>
|
||||
// helperData.arr.push({ resolve, cpuUsage })
|
||||
// );
|
||||
|
||||
// // If we are calling the Element at the firsttime,
|
||||
// // we ensure, that we have callback, which will
|
||||
// // be called after the specified amount of time to
|
||||
// // assing the selection.
|
||||
// if (options.first) {
|
||||
// helperData.timeout = setTimeout(helperData.finish, options.timeout);
|
||||
// }
|
||||
|
||||
// if (options.last) {
|
||||
// helperData.finish();
|
||||
// }
|
||||
|
||||
// return await promise;
|
||||
// };
|
||||
// case "free-ram":
|
||||
// helperData = {
|
||||
// arr: [],
|
||||
// timeout: null,
|
||||
// done: false,
|
||||
// finish: () => {
|
||||
// if (!helperData.done) {
|
||||
// // Now we find the Min CPU usage:
|
||||
// const bestOption = minOfArray(helperData.arr, "freeRam").index;
|
||||
|
||||
// for (const [index, { resolve }] of (
|
||||
// helperData.arr as any[]
|
||||
// ).entries()) {
|
||||
// resolve(index === bestOption);
|
||||
// }
|
||||
|
||||
// helperData.done = true;
|
||||
// }
|
||||
|
||||
// if (helperData.timeout !== null) {
|
||||
// clearTimeout(helperData.timeout);
|
||||
// helperData.timeout = null;
|
||||
// }
|
||||
// },
|
||||
// };
|
||||
|
||||
// return async (externalDispatcherId, options) => {
|
||||
// if (options.initalTest) {
|
||||
// return true;
|
||||
// }
|
||||
|
||||
// let freeRam =
|
||||
// _this._externalDispatchers.get(externalDispatcherId).host.ram.free;
|
||||
// freeRam = freeRam === -1 ? Infinity : freeRam;
|
||||
|
||||
// // For every Call, we
|
||||
// const promise = new Promise<boolean>((resolve) =>
|
||||
// helperData.arr.push({ resolve, freeRam })
|
||||
// );
|
||||
|
||||
// // If we are calling the Element at the firsttime,
|
||||
// // we ensure, that we have callback, which will
|
||||
// // be called after the specified amount of time to
|
||||
// // assing the selection.
|
||||
// if (options.first) {
|
||||
// helperData.timeout = setTimeout(helperData.finish, options.timeout);
|
||||
// }
|
||||
|
||||
// if (options.last) {
|
||||
// helperData.finish();
|
||||
// }
|
||||
|
||||
// return await promise;
|
||||
// };
|
||||
// }
|
||||
// }
|
@ -105,3 +105,31 @@ export function limitString(
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper to insert new lines after a given amount of time.
|
||||
* @param str
|
||||
* @param maxLength
|
||||
* @returns
|
||||
*/
|
||||
export function insertNewLines(str: string, maxLength= 100){
|
||||
// now we try to split the string
|
||||
const splitted = str.split(" ");
|
||||
|
||||
const ret: string[] = [];
|
||||
|
||||
let length = 0;
|
||||
|
||||
for (const word of splitted){
|
||||
console.log("test")
|
||||
length += word.length + 1;
|
||||
ret.push(word);
|
||||
|
||||
if (length > maxLength){
|
||||
ret.push("\n");
|
||||
length = 0;
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
@ -750,16 +750,9 @@ export type IRequestTaskMsg = {
|
||||
resultSink: string;
|
||||
|
||||
/**
|
||||
* Function which can be used to check, if the task should be executed
|
||||
* on that host.
|
||||
*
|
||||
* @type {(({
|
||||
* functionId: string;
|
||||
* } & ICallOptions))}
|
||||
* Contains the Target description.
|
||||
*/
|
||||
selector?: {
|
||||
functionId: string;
|
||||
} & ICallOptions;
|
||||
targetDispatcher: string
|
||||
};
|
||||
|
||||
export type ICallOptions = {
|
||||
|
@ -7,7 +7,7 @@
|
||||
*/
|
||||
|
||||
import { ILogger } from "js-logger";
|
||||
import { IDataPubSubSystem, IPubSubSystem } from ".";
|
||||
import { IDataPubSubSystem, IPubSubSystem, ValidSelectorFunction } from ".";
|
||||
import {
|
||||
ICallOptions,
|
||||
ICommunicationBridge,
|
||||
@ -143,6 +143,7 @@ export type ValidDefaultSelectors =
|
||||
| "host"
|
||||
| "free-ram"
|
||||
| "cpu-usage";
|
||||
|
||||
export const ValidDefaultSelectors = [
|
||||
"first",
|
||||
"dispatcher",
|
||||
@ -150,15 +151,6 @@ export const ValidDefaultSelectors = [
|
||||
"free-ram",
|
||||
"cpu-usage",
|
||||
];
|
||||
export type ValidSelectorFunction = (
|
||||
dispatcherId: string,
|
||||
options?: {
|
||||
first?: boolean;
|
||||
timeout?: number;
|
||||
last?: boolean;
|
||||
initalTest?: boolean;
|
||||
}
|
||||
) => Promise<boolean>;
|
||||
|
||||
export type INopeTimeOptions = {
|
||||
/**
|
||||
@ -169,6 +161,7 @@ export type INopeTimeOptions = {
|
||||
* @type {number}
|
||||
*/
|
||||
sendAliveInterval: number;
|
||||
|
||||
/**
|
||||
* Interval, to check the other dispatcher for being slow, dead, etc..
|
||||
* should be lager then the "sendAliveInterval". The value is given in [ms]
|
||||
@ -178,6 +171,7 @@ export type INopeTimeOptions = {
|
||||
* @type {number}
|
||||
*/
|
||||
checkInterval: number;
|
||||
|
||||
/**
|
||||
* Amount of Time, after which an external dispatcher is declared as slow.
|
||||
* The value is given in [ms]
|
||||
@ -186,6 +180,7 @@ export type INopeTimeOptions = {
|
||||
* @type {number}
|
||||
*/
|
||||
slow: number;
|
||||
|
||||
/**
|
||||
* Time, after which an warning is given. The value is given in [ms]
|
||||
*
|
||||
@ -193,6 +188,7 @@ export type INopeTimeOptions = {
|
||||
* @type {number}
|
||||
*/
|
||||
warn: number;
|
||||
|
||||
/**
|
||||
* Amount of Time, after which an external dispatcher is declared as dead. The value is given in [ms]
|
||||
*
|
||||
@ -200,6 +196,7 @@ export type INopeTimeOptions = {
|
||||
* @type {number}
|
||||
*/
|
||||
dead: number;
|
||||
|
||||
/**
|
||||
* Amount of Time, after which an external dispatcher is removed.
|
||||
*
|
||||
@ -207,6 +204,7 @@ export type INopeTimeOptions = {
|
||||
* @type {number}
|
||||
*/
|
||||
remove: number;
|
||||
|
||||
/**
|
||||
* Default timeout, after which a selector request will be timedout.
|
||||
*
|
||||
@ -224,6 +222,7 @@ export type INopeDispatcherOptions = {
|
||||
* @type {ICommunicationBridge}
|
||||
*/
|
||||
communicator: ICommunicationBridge;
|
||||
|
||||
/**
|
||||
* A Specific logger which should be used.
|
||||
*
|
||||
@ -231,6 +230,7 @@ export type INopeDispatcherOptions = {
|
||||
* @type {ILogger}
|
||||
*/
|
||||
logger?: ILogger;
|
||||
|
||||
/**
|
||||
* Timeout Definitions. These are relevant, to determine
|
||||
* alive, slow, dead, ... dispatchers.
|
||||
@ -239,6 +239,7 @@ export type INopeDispatcherOptions = {
|
||||
* @type {INopeTimeOptions}
|
||||
*/
|
||||
timeouts?: Partial<INopeTimeOptions>;
|
||||
|
||||
/**
|
||||
* Force Emitting the Update.
|
||||
*
|
||||
@ -246,13 +247,15 @@ export type INopeDispatcherOptions = {
|
||||
* @type {boolean}
|
||||
*/
|
||||
forceEmittingUpdates?: boolean;
|
||||
|
||||
/**
|
||||
* The default-selector to select the service providers
|
||||
*
|
||||
* @author M.Karkowski
|
||||
* @type {ValidDefaultSelectors}
|
||||
*/
|
||||
defaultSelector?: ValidDefaultSelectors;
|
||||
defaultSelector?: ValidSelectorFunction
|
||||
|
||||
/**
|
||||
* Flag to force using the service provider selector
|
||||
* functionalities.
|
||||
|
@ -8,18 +8,42 @@ import { INopeObservable } from "./nopeObservable.interface";
|
||||
* @modify date 2021-11-23 12:31:01
|
||||
* @desc [description]
|
||||
*/
|
||||
export interface IMergeData<T = any, K = any> {
|
||||
export interface IMergeData<T = any, K=any> {
|
||||
/**
|
||||
* Event Emitter, which is called on changes, with the removed and Added Items.
|
||||
*/
|
||||
onChange: INopeEventEmitter<{
|
||||
added: T[];
|
||||
removed: T[];
|
||||
}>;
|
||||
/**
|
||||
* The simplified data. A simple List, containing only the Values for all Keys.
|
||||
*/
|
||||
data: INopeObservable<T[]>;
|
||||
/**
|
||||
* Function, which must be called on data updates.
|
||||
* @param data The updated Data.
|
||||
*/
|
||||
update(data?: K): void;
|
||||
|
||||
/**
|
||||
* The Original Data.
|
||||
*/
|
||||
originalData: K
|
||||
}
|
||||
|
||||
export interface IMapBasedMergeData<T = any, K = any, V = any>
|
||||
export interface IMapBasedMergeData<T=any, K = any, V = any>
|
||||
extends IMergeData<T, Map<K, V>> {
|
||||
/**
|
||||
* Adds a dinfition of the Amounts, of the elements.
|
||||
*/
|
||||
amountOf: Map<T, number>;
|
||||
simplified: Map<K, T>;
|
||||
/**
|
||||
* Simplifed Data Access.
|
||||
*/
|
||||
simplified: Map<K,T>;
|
||||
/**
|
||||
* A Reverse Mapping
|
||||
*/
|
||||
reverseSimplified: Map<T, Set<K>>;
|
||||
}
|
||||
|
@ -6,7 +6,7 @@
|
||||
* @desc [description]
|
||||
*/
|
||||
|
||||
import { INopeEventEmitter, ValidSelectorFunction } from ".";
|
||||
import { INopeEventEmitter } from ".";
|
||||
import {
|
||||
IAvailableServicesMsg,
|
||||
ICallOptions,
|
||||
@ -15,6 +15,14 @@ import {
|
||||
import { IMapBasedMergeData } from "./nopeHelpers.interface";
|
||||
import { INopePromise } from "./nopePromise.interface";
|
||||
|
||||
export type ValidSelectorFunction = (
|
||||
options: {
|
||||
serviceName: string,
|
||||
rpcManager: INopeRpcManager,
|
||||
}
|
||||
) => Promise<string>;
|
||||
|
||||
|
||||
/**
|
||||
* A Manager for Remote Procedure Calls.
|
||||
*
|
||||
@ -61,17 +69,7 @@ export interface INopeRpcManager {
|
||||
* @type {IMapBasedMergeData<string>}
|
||||
* @memberof INopeRpcManager
|
||||
*/
|
||||
services: IMapBasedMergeData<string, string>;
|
||||
|
||||
/**
|
||||
* Helper to unregister an callback.
|
||||
*
|
||||
* @author M.Karkowski
|
||||
* @param {(string | ((...args) => any))} func
|
||||
* @return {*} {Promise<void>}
|
||||
* @memberof INopeRpcManager
|
||||
*/
|
||||
unregisterCallback(func: string | ((...args) => any)): Promise<void>;
|
||||
services: IMapBasedMergeData<string, string, IAvailableServicesMsg>;
|
||||
|
||||
/**
|
||||
* Function, that must be called if a dispatcher is is gone. This might be the
|
||||
@ -154,13 +152,21 @@ export interface INopeRpcManager {
|
||||
registerCallback<T>(
|
||||
func: (...args) => Promise<T>,
|
||||
options: {
|
||||
/** Flag to enable unregistering the function after calling. */
|
||||
deleteAfterCalling?: boolean;
|
||||
/** Instead of generating a uuid an id could be provided */
|
||||
id?: string;
|
||||
}
|
||||
): (...args) => Promise<T>;
|
||||
|
||||
/**
|
||||
* Helper to unregister an callback.
|
||||
*
|
||||
* @author M.Karkowski
|
||||
* @param {(string | ((...args) => any))} func
|
||||
* @return {*} {Promise<void>}
|
||||
* @memberof INopeRpcManager
|
||||
*/
|
||||
unregisterCallback(func: string | ((...args) => any)): Promise<boolean>;
|
||||
|
||||
/**
|
||||
* The Method, which will perform an action.
|
||||
*
|
||||
@ -178,15 +184,13 @@ export interface INopeRpcManager {
|
||||
performCall<T>(
|
||||
serviceName: string,
|
||||
params: unknown[],
|
||||
options: Partial<ICallOptions> & {
|
||||
options?: Partial<ICallOptions> & {
|
||||
selector?: ValidSelectorFunction;
|
||||
quite?: boolean;
|
||||
preventSelector?: boolean;
|
||||
}
|
||||
): INopePromise<T>;
|
||||
|
||||
/**
|
||||
* Clear all T
|
||||
* Clear all Tasks.
|
||||
*
|
||||
* @author M.Karkowski
|
||||
* @memberof INopeRpcManager
|
||||
|
Loading…
Reference in New Issue
Block a user