fixing individual communication layer errors

This commit is contained in:
Martin Karkowski 2020-09-13 16:09:01 +02:00
parent 77d6a810a5
commit d7df3581bc
3 changed files with 209 additions and 84 deletions

View File

@ -1,6 +1,5 @@
import { EventEmitter } from "events";
import { ICommunicationInterface, requestTaskMsg, responseTaskMsg, availableServices } from "../dispatcher/nopeDispatcher";
import { getLogger } from "../logger/getLogger";
/**
* A Communication Layer for the Dispatchers.
@ -11,16 +10,17 @@ import { getLogger } from "../logger/getLogger";
* @implements {ICommunicationInterface}
*/
export class EventLayer implements ICommunicationInterface {
emitRpcRequest(name: string, request: requestTaskMsg): void {
this._emitter.emit(name, request);
}
emitRpcResult(result: responseTaskMsg): void {
this._emitter.emit('result',result);
emitRpcResult(name: string, result: responseTaskMsg): void {
this._emitter.emit(name,result);
}
onRpcResult(name: string, cb: (result: responseTaskMsg) => void): void {
this._emitter.on(name, cb);
}
offRpcResult(name: string, cb: (result: responseTaskMsg) => void): void {
offRpcResponse(name: string, cb: (result: responseTaskMsg) => void): void {
this._emitter.off(name, cb);
}
onRpcRequest(name: string, cb: (data: requestTaskMsg) => void): void {
@ -36,6 +36,13 @@ export class EventLayer implements ICommunicationInterface {
this._emitter.on('services', cb);
}
protected _emitter = new EventEmitter();
protected _logger = getLogger('info', 'Event-Layer');
constructor(
public readonly subscriptionMode: 'individual' | 'generic',
public readonly resultSharing: 'individual' | 'generic',
protected _logger?: Logger
){
}
protected _emitter = new EventEmitter();
}

View File

@ -9,6 +9,9 @@ import { Logger } from "winston";
*/
export interface ICommunicationInterface {
readonly subscriptionMode?: 'individual' | 'generic',
readonly resultSharing?: 'individual' | 'generic',
/**
* Funciton to emit a RPC Request.
*
@ -23,10 +26,11 @@ export interface ICommunicationInterface {
* some state change in the Communication Interface it should
* be provided on the registered functions
*
* @param {string} name The Id of the Method.
* @param {responseTaskMsg} result
* @memberof ICommunicationInterface
*/
emitRpcResult(result: responseTaskMsg): void;
emitRpcResult(name: string, result: responseTaskMsg): void;
/**
* Function used to subscribe to RPC Results. Each method / function
@ -45,7 +49,7 @@ export interface ICommunicationInterface {
* @param {(result: responseTaskMsg) => void} cb The callback which should be called
* @memberof ICommunicationInterface
*/
offRpcResult(name: string, cb: (result: responseTaskMsg) => void): void;
offRpcResponse(name: string, cb: (result: responseTaskMsg) => void): void;
/**
* Function to register RPC-Request.
@ -90,17 +94,60 @@ export type requestOfService = {
export type requestTaskMsg = {
type: 'requestOfTask',
/**
* UUID of a Task
*
* @type {string}
*/
taskId: string,
/**
* ID of the Function, on which it is available.
*
* @type {string}
*/
functionId: string,
/**
* The Parameters
*
* @type {{
* idx: number,
* data: any
* }[]}
*/
params: {
/**
* Index of the Parameter
*
* @type {number}
*/
idx: number,
data: any
}[]
/**
* Callbacks, that are available in a Dispatcher.
*
* @type {(({
* functionId: string,
* idx: number,
* deleteAfterCalling: boolean,
* } & callOptions)[])}
*/
callbacks: ({
functionId: string,
idx: number,
deleteAfterCalling: boolean,
} & callOptions)[]
/**
* Element, allowing to describe where the result should be hosted.
*
* @type {string}
*/
resultSink: string
}
export type callOptions = {
@ -109,7 +156,7 @@ export type callOptions = {
*
* @type {Array<number>}
*/
deletableCallbacks: Array<number>;
deletableCallbacks?: Array<number>;
/**
* Flag, showing that the parameters set has no callback
*
@ -121,7 +168,13 @@ export type callOptions = {
*
* @type {boolean}
*/
preventAvailableTest?: boolean;
dynamicCallback?: boolean;
/**
* Element, allowing to describe where the result should be hosted.
*
* @type {string}
*/
resultSink: string
}
export type responseTaskMsg = {
@ -164,8 +217,7 @@ export type availableServices = {
}
export type nopeDispatcherOptions = {
communicator: ICommunicationInterface,
subscriptionMode?: 'individual' | 'generic',
communicator: ICommunicationInterface
}
/**
@ -192,15 +244,16 @@ export class nopeDispatcher {
protected _definedFunctions: Map<string, (...args) => Promise<any>>;
protected _remotlyCalledFunctions: Set<string>;
protected _communicatorCallbacks: Map<string, {
req: (data) => any,
res: (data) => any
registeredId: string,
type: 'request' | 'response'
cb: (data) => any
}>;
protected _communicator: ICommunicationInterface;
protected _mappingOfRemoteDispatchersAndFunctions: Map<string, availableServices>;
protected _externalServices: Set<string>;
public methodInterfaceWithOptions: { [index: string]: (optins: callOptions, ...args) => Promise<any> }
public methodInterface: { [index: string]: (...args) => Promise<any> }
public methodInterfaceWithOptions: { [index: string]: <T>(optins: callOptions, ...args) => Promise<T> }
public methodInterface: { [index: string]: <T>(...args) => Promise<T> }
/**
* Internal Element to store the running tasks.
@ -213,6 +266,9 @@ export class nopeDispatcher {
reject: (error: any) => void;
}>;
readonly _subscriptionMode: 'individual' | 'generic';
readonly _resultSharing: 'individual' | 'generic';
/**
* Creates an instance of nopeDispatcher.
* @param {ICommunicationInterface} communicator The Communication Layer which should be used.
@ -222,11 +278,11 @@ export class nopeDispatcher {
this._communicator = options.communicator;
this.options.subscriptionMode = this.options.subscriptionMode || 'generic';
this._subscriptionMode = this._communicator.subscriptionMode || 'generic';
this._resultSharing = this._communicator.resultSharing || 'generic';
this.id = generateId();
/**
* Define A Proxy for accessing methods easier.
*/
@ -295,7 +351,7 @@ export class nopeDispatcher {
}
// Use the communicator to publish the event.
this._communicator.emitRpcResult(result);
this._communicator.emitRpcResult(data.resultSink,result);
}
} catch (error) {
@ -308,7 +364,7 @@ export class nopeDispatcher {
}
// Send the Error via the communicator to the remote.
this._communicator.emitRpcResult(result);
this._communicator.emitRpcResult(data.resultSink, result);
}
}
@ -319,10 +375,10 @@ export class nopeDispatcher {
*
* @protected
* @param {responseTaskMsg} data The Data provided to handle the Response.
* @return {void} Nothing
* @return {boolean} Returns a boolean, indicating whether a corresponding task was found or not.
* @memberof nopeDispatcher
*/
protected _handleExternalResponse(data: responseTaskMsg): void {
protected _handleExternalResponse(data: responseTaskMsg): boolean {
try {
// Extract the Task
const task = this._runningTasks.get(data.taskId);
@ -332,14 +388,18 @@ export class nopeDispatcher {
// Based on the Result of the Remote =>
if (task && data.error) {
return task.reject(data.error);
task.reject(data.error);
return true;
}
if (task) {
return task.resolve(data.result);
task.resolve(data.result);
return true;
}
} catch (e) {
}
return false;
}
/**
@ -355,7 +415,7 @@ export class nopeDispatcher {
// Based on the Mode of the Subscription =>
// either create indivdual topics for the methods
// or use the generice function.
switch (this.options.subscriptionMode) {
switch (this._subscriptionMode) {
case 'individual':
// Iterate over the Defined Functions.
for (const [id, cb] of this._definedFunctions.entries()) {
@ -441,7 +501,7 @@ export class nopeDispatcher {
* @return {string} the adapted ID.
* @memberof nopeDispatcher
*/
protected _getRequestName(id: string, type: 'request' | 'response'): string {
protected _getName(id: string, type: 'request' | 'response'): string {
return id.startsWith(`${type}/`) ? id : `${type}/${id}`;
}
@ -450,45 +510,65 @@ export class nopeDispatcher {
*
* @protected
* @param {string} id the Id of the function
* @param {(...args) => Promise<any>} cb The Callback of the Function
* @param {(...args) => Promise<any>} _cb The Callback of the Function
* @return {void} the adapted ID.
* @memberof nopeDispatcher
*/
protected _subscribeToFunction(id: string, cb: (...args) => Promise<any>): void {
const _req = this._getRequestName(id, 'request');
const _res = this._getRequestName(id, 'response')
protected _subscribeToFunction(id: string, _cb: (...args) => Promise<any>): void {
const _req = this._getName(id, 'request');
if (
this.options.subscriptionMode === 'individual' &&
!this._communicatorCallbacks.has(_req)
this._subscriptionMode === 'individual' &&
!this._communicatorCallbacks.has(id)
) {
const _this = this;
// Define a Function.
const req = (data: requestTaskMsg) => {
const cb = (data: requestTaskMsg) => {
if (data.type === 'requestOfTask') {
_this._handleExternalRequest(data, cb);
_this._handleExternalRequest(data, _cb);
}
};
const res = (data: responseTaskMsg) => {
if (data.type === 'response') {
_this._handleExternalResponse(data);
}
};
// Add the Callback.
this._communicatorCallbacks.set(_req, {
res,
req
this._communicatorCallbacks.set(id, {
registeredId: _req,
type: 'request',
cb
});
// Register Functions.
this._communicator.onRpcRequest(_req, req);
this._communicator.onRpcResult(_res, res);
this._communicator.onRpcRequest(_req, cb);
}
}
protected _subscribeToResult(id: string, deleteAfterCalling: boolean): void {
const _res = this._getName(id, 'response');
if (
this._subscriptionMode === 'individual' &&
!this._communicatorCallbacks.has(id)
) {
const _this = this;
// Define a Function.
const cb = (data: responseTaskMsg) => {
if (data.type === 'response') {
if (_this._handleExternalResponse(data)) {
_this._removeSubscription(id);
}
}
};
// Add the Callback.
this._communicatorCallbacks.set(id, {
registeredId: _res,
type: 'response',
cb
});
// Register Functions.
this._communicator.onRpcResult(_res, cb);
}
}
@ -564,19 +644,7 @@ export class nopeDispatcher {
} = {}): boolean {
const _id = typeof func === 'string' ? func : func['id'] as string || '0';
// Try to unregister the Callback from the communcator:
if (this._communicatorCallbacks.has(_id)) {
const _callbacks = this._communicatorCallbacks.get(_id);
// Unregister the RPC-Request-Listener
this._communicator.offRpcRequest(this._getRequestName(_id, 'request'), _callbacks.req);
// Unregister the Result-Listener
this._communicator.offRpcRequest(this._getRequestName(_id, 'response'), _callbacks.res);
// Remove the Callback
this._communicatorCallbacks.delete(_id);
}
this._removeSubscription(_id);
if (!options.preventSendingToRegistery) {
// Publish the Available Services.
@ -586,6 +654,29 @@ export class nopeDispatcher {
return this._definedFunctions.delete(_id);
}
protected _removeSubscription(_id: string){
// Try to unregister the Callback from the communcator:
if (this._communicatorCallbacks.has(_id)) {
const _callbacks = this._communicatorCallbacks.get(_id);
switch (_callbacks.type) {
case 'request':
// Unregister the RPC-Request-Listener
this._communicator.offRpcRequest(_callbacks.registeredId, _callbacks.cb);
break;
case 'response':
// Unregister the RPC-Response-Listener
this._communicator.offRpcResponse(_callbacks.registeredId, _callbacks.cb);
break;
}
// Remove the Callback
this._communicatorCallbacks.delete(_id);
}
}
/**
* Function used to update the Available Services.
*
@ -620,16 +711,21 @@ export class nopeDispatcher {
* @return {*} {Promise<T>} The result of the Operation
* @memberof nopeDispatcher
*/
public performCall<T>(functionName: string, params: any[], options: callOptions = {
deletableCallbacks: [],
paramsHasNoCallback: false,
preventAvailableTest: false
}): Promise<T> {
public performCall<T>(functionName: string, params: any[], options: Partial<callOptions> = {}): Promise<T> {
// Get a Call Id
const _taskId = generateId();
const _registeredIdx: Array<string> = [];
const _this = this;
const _options = Object.assign({
deletableCallbacks: [],
paramsHasNoCallback: false,
dynamicCallback: false,
resultSink: (this._resultSharing === 'generic' ? 'response' : this._getName(functionName, 'response')) as string
} as callOptions, options) as callOptions;
this._subscribeToResult(functionName, _options.dynamicCallback)
// Define a Callback-Function, which will expect the Task.
return new Promise<T>((resolve, reject) => {
@ -646,7 +742,8 @@ export class nopeDispatcher {
params,
callbacks: [],
taskId: _taskId,
type: 'requestOfTask'
type: 'requestOfTask',
resultSink: _options.resultSink
}
// Test if there is no Callback integrated
@ -674,10 +771,10 @@ export class nopeDispatcher {
// Description of the Callback and register
// the callback inside of the Dispatcher
const deleteAfterCalling = options.deletableCallbacks.includes(idx);
const deleteAfterCalling = _options.deletableCallbacks.includes(idx);
const _func = _this.registerFunction(contentOfParameter, {
deleteAfterCalling,
preventSendingToRegistery: true
preventSendingToRegistery: true,
});
_registeredIdx.push(_func['id']);
@ -687,29 +784,30 @@ export class nopeDispatcher {
functionId: _func['id'],
idx,
deleteAfterCalling,
preventAvailableTest: true,
deletableCallbacks: []
dynamicCallback: true,
deletableCallbacks: [],
resultSink: _this._resultSharing === 'generic' ? 'response' : _this._getName(_func['id'], 'response')
});
}
}
}
if (!options.preventAvailableTest && !_this.serviceExists(functionName)) {
if (!_options.dynamicCallback && !_this.serviceExists(functionName)) {
// Create an Error:
const error = new Error('No Service Provider known for "' + functionName + '"');
if (this._logger){
this._logger.error('No Service Provider known for "' + functionName + '"');
this._logger.error(error)
if (_this._logger){
_this._logger.error('No Service Provider known for "' + functionName + '"');
_this._logger.error(error)
}
throw error;
}
// Send the Message to the specific element:
if (_this.options.subscriptionMode === 'individual') {
_this._communicator.emitRpcRequest(this._getRequestName(taskRequest.functionId, 'request'), taskRequest);
if (_this._subscriptionMode === 'individual') {
_this._communicator.emitRpcRequest(_this._getName(taskRequest.functionId, 'request'), taskRequest);
} else {
_this._communicator.emitRpcRequest('request', taskRequest);
}

View File

@ -5,12 +5,19 @@ import { exportFunctionToDispatcher } from "../lib/dispatcher/nopeDispatcherDeco
import { generateBenchmarkFunction } from "../modules/funcs/generateBenchmarkFunction";
const communicator = new EventLayer();
let max = 100000;
const communicator = new EventLayer(
'generic',
'generic'
);
const local = new nopeDispatcher({ communicator });
const _functionRemote = exportFunctionToDispatcher(async (a: number, b: number, operation: (a: number, b: number) => number) => {
return await operation(a, b);
}, {
uri: '_functionRemote'
uri: 'functionRemote'
});
const _benchmark = exportFunctionToDispatcher(generateBenchmarkFunction(max, ''), {
uri: 'benchmark'
});
const remote = getLinkedDispatcher({ communicator });
@ -22,25 +29,25 @@ const main = async () => {
console.error(e)
}
let res = await local.performCall<number>('_functionRemote', [1, 2, async (a, b) => {
let res = await local.performCall<number>('functionRemote', [1, 2, async (a, b) => {
console.log('local callback')
return a + b;
}]);
console.log('1', res)
res = await local.methodInterface._functionRemote(1, 2, async (a, b) => {
res = await local.methodInterface.functionRemote<number>(1, 2, async (a, b) => {
console.log('local callback')
return a * b;
});
let max = 100000;
let i = 0;
const benchmark = generateBenchmarkFunction(max, '');
while (i < max * 10) {
await local.performCall<number>('_functionRemote', [1, 2, async (a, b) => {
await local.performCall<number>('functionRemote', [1, 2, async (a, b) => {
return 1
}], {
deletableCallbacks: [2],
@ -51,7 +58,20 @@ const main = async () => {
i++;
}
const opts = {
deletableCallbacks: []
};
const args = []
i = 0;
while (i < max * 10) {
await local.performCall<void>('benchmark', args, opts);
i++;
}
main();
}
main().catch(console.error);
setTimeout(console.log, 10000, 'done');