adapting communication layer interface

This commit is contained in:
Martin Karkowski 2020-09-13 12:35:04 +02:00
parent 794a703a56
commit 77d6a810a5
4 changed files with 231 additions and 162 deletions

View File

@ -1,8 +1,7 @@
import { EventEmitter } from "events";
import { ICommunicationInterface } from "../dispatcher/nopeDispatcher";
import { ICommunicationInterface, requestTaskMsg, responseTaskMsg, availableServices } from "../dispatcher/nopeDispatcher";
import { getLogger } from "../logger/getLogger";
/**
* A Communication Layer for the Dispatchers.
* Here, only a Events are used.
@ -12,20 +11,31 @@ import { getLogger } from "../logger/getLogger";
* @implements {ICommunicationInterface}
*/
export class EventLayer implements ICommunicationInterface {
off(event: string, cb: (...args: any[]) => void) {
this._emitter.off(event, cb);
emitRpcRequest(name: string, request: requestTaskMsg): void {
this._emitter.emit(name, request);
}
emitRpcResult(result: responseTaskMsg): void {
this._emitter.emit('result',result);
}
onRpcResult(name: string, cb: (result: responseTaskMsg) => void): void {
this._emitter.on(name, cb);
}
offRpcResult(name: string, cb: (result: responseTaskMsg) => void): void {
this._emitter.off(name, cb);
}
onRpcRequest(name: string, cb: (data: requestTaskMsg) => void): void {
this._emitter.on(name, cb);
}
offRpcRequest(name: string, cb: (data: requestTaskMsg) => void): void {
this._emitter.off(name, cb);
}
emitNewServicesAvailable(services: availableServices): void {
this._emitter.emit('services', services)
}
onNewServicesAvailable(cb: (services: availableServices) => void) {
this._emitter.on('services', cb);
}
protected _emitter = new EventEmitter();
protected _logger = getLogger('info', 'Event-Layer');
on(event: string, cb: (...args: any[]) => void) {
// this._logger.debug('subscribed ' + event);
this._emitter.on(event, cb);
}
send(event: string, data: any): void {
// this._logger.debug('emitting ' + event);
this._emitter.emit(event, data)
}
}

View File

@ -1,4 +1,5 @@
import { generateId } from '../helpers/idMethods';
import { Logger } from "winston";
/**
* A Layer to communicate.
@ -7,37 +8,81 @@ import { generateId } from '../helpers/idMethods';
* @interface ICommunicationInterface
*/
export interface ICommunicationInterface {
/**
* Funciton to emit a RPC Request.
*
* @param {string} name Name of the request
* @param {requestTaskMsg} request The Request.
* @memberof ICommunicationInterface
*/
emitRpcRequest(name: string, request: requestTaskMsg): void;
/**
* Function used to subscribe to updates. If there exist
* some state change in the Communication Interface it should
* be provided on the registered functions
*
* @param {string} event The Selected Event.
* @param {(...args) => void} cb The Callback, which will be called if an Event occours
* @param {responseTaskMsg} result
* @memberof ICommunicationInterface
*/
on(event: string, cb: (...args) => void);
emitRpcResult(result: responseTaskMsg): void;
/**
* Function used to unsubscribe from updates
* Function used to subscribe to RPC Results. Each method / function
* can provide a unique eventname for the results.
*
* @param {string} event The Selected Event.
* @param {(...args) => void} cb The Callback, which should be unsubscribed.
* @param {string} name The Id of the Method.
* @param {(result: responseTaskMsg) => void} cb The callback which should be called
* @memberof ICommunicationInterface
*/
off(event: string, cb: (...args) => void);
onRpcResult(name: string, cb: (result: responseTaskMsg) => void): void;
/**
* Function to send data via the Communication interface
* Function used to unsubscribe from Rpc-Results
*
* @param {string} topic The Topic, on which data will be published
* @param {*} data The data. Could be any type and element. In Generel it should be parsable.
* @param {string} name The Id of the Method.
* @param {(result: responseTaskMsg) => void} cb The callback which should be called
* @memberof ICommunicationInterface
*/
emit(topic: string, data): void;
offRpcResult(name: string, cb: (result: responseTaskMsg) => void): void;
/**
* Function to register RPC-Request.
*
* @param {string} name the name to listen for the request.
* @param {(data: requestTaskMsg) => void} cb The callback which should be called, if a request for the method is detected
* @memberof ICommunicationInterface
*/
onRpcRequest(name: string, cb: (data: requestTaskMsg) => void): void;
/**
* Unregister a listener for a RPC-Request.
*
* @param {string} name
* @param {(data: requestTaskMsg) => void} cb
* @memberof ICommunicationInterface
*/
offRpcRequest(name: string, cb: (data: requestTaskMsg) => void): void;
/**
* Function to Emit new Services. They will then be shared to other sytems.
*
* @param {availableServices} services
* @memberof ICommunicationInterface
*/
emitNewServicesAvailable(services: availableServices): void;
/**
* Function to register a new Callback, which will be called if new Services are available.
*
* @param {() => void} cb The Callback to Call.
* @memberof ICommunicationInterface
*/
onNewServicesAvailable(cb: (services: availableServices) => void);
}
export type errorTestMsg = {
export type requestOfService = {
type: 'requestOfService'
taskId: string,
functionId: string,
@ -76,7 +121,7 @@ export type callOptions = {
*
* @type {boolean}
*/
preventErrorTest?: boolean;
preventAvailableTest?: boolean;
}
export type responseTaskMsg = {
@ -119,8 +164,7 @@ export type availableServices = {
}
export type nopeDispatcherOptions = {
rpcCommunicator: ICommunicationInterface,
eventCommunicator: ICommunicationInterface,
communicator: ICommunicationInterface,
subscriptionMode?: 'individual' | 'generic',
}
@ -136,6 +180,9 @@ export class nopeDispatcher {
public readonly id: string;
protected _logger: Logger;
/**
* Internal Element to store the registered Functions
*
@ -144,10 +191,16 @@ export class nopeDispatcher {
*/
protected _definedFunctions: Map<string, (...args) => Promise<any>>;
protected _remotlyCalledFunctions: Set<string>;
protected _communicatorCallbacks: Map<string, (data) => any>;
protected _rpcCommunicator: ICommunicationInterface
protected _eventCommunicator: ICommunicationInterface
protected _communicatorCallbacks: Map<string, {
req: (data) => any,
res: (data) => any
}>;
protected _communicator: ICommunicationInterface;
protected _mappingOfRemoteDispatchersAndFunctions: Map<string, availableServices>;
protected _externalServices: Set<string>;
public methodInterfaceWithOptions: { [index: string]: (optins: callOptions, ...args) => Promise<any> }
public methodInterface: { [index: string]: (...args) => Promise<any> }
/**
* Internal Element to store the running tasks.
@ -167,13 +220,34 @@ export class nopeDispatcher {
*/
constructor(public options: nopeDispatcherOptions) {
this._rpcCommunicator = options.rpcCommunicator;
this._eventCommunicator = options.eventCommunicator;
this._communicator = options.communicator;
this.options.subscriptionMode = this.options.subscriptionMode || 'generic';
this.id = generateId();
/**
* Define A Proxy for accessing methods easier.
*/
const _this = this;
const _handlerWithOptions = {
get(target, name) {
return async (options: callOptions, ...args) => {
return _this.performCall(name, args, options);
}
}
}
const _handlerWithoutOptions = {
get(target, name) {
return async (...args) => {
return _this.performCall(name, args);
}
}
}
this.methodInterfaceWithOptions = new Proxy({}, _handlerWithOptions);
this.methodInterface = new Proxy({}, _handlerWithoutOptions);
this.reset();
this.init();
}
@ -221,7 +295,7 @@ export class nopeDispatcher {
}
// Use the communicator to publish the event.
this._eventCommunicator.emit('response', result);
this._communicator.emitRpcResult(result);
}
} catch (error) {
@ -234,7 +308,7 @@ export class nopeDispatcher {
}
// Send the Error via the communicator to the remote.
this._eventCommunicator.emit('response', result);
this._communicator.emitRpcResult(result);
}
}
@ -291,26 +365,71 @@ export class nopeDispatcher {
break;
case 'generic':
// Add a generic Subscription for callbacks:
this._rpcCommunicator.on('request', (data: requestTaskMsg) => {
this._communicator.onRpcRequest('request', (data: requestTaskMsg) => {
if (data.type === 'requestOfTask') {
_this._handleExternalRequest(data);
}
});
// Subscribe to Responses
this._communicator.onRpcResult('response', (data: responseTaskMsg) => {
if (data.type === 'response') {
_this._handleExternalResponse(data);
}
});
break;
}
// Subscribe to Responses
this._eventCommunicator.on('response', (data: responseTaskMsg) => {
if (data.type === 'response') {
_this._handleExternalResponse(data);
// Subscribe to the availableServices of Remotes.
// If there is a new Service => udpate the External Services
this._communicator.onNewServicesAvailable((data: availableServices) => {
try {
if (data.dispatcher !== this.id){
_this._mappingOfRemoteDispatchersAndFunctions.set(data.dispatcher, data);
_this._updateExternalServices();
}
} catch (e) {
}
});
});
}
// Subscribe to the availableServices to
// Adapt the _remotlyCalledFunctions Set.
this._eventCommunicator.on('availableServices', () => {
_this._remotlyCalledFunctions.clear();
})
/**
* Function to update the used Services.
*
* @protected
* @memberof serviceRegistry
*/
protected _updateExternalServices() {
const _this = this;
// Clear the Services
this._externalServices.clear();
for (const dispatcherInfo of this._mappingOfRemoteDispatchersAndFunctions.values()) {
dispatcherInfo.services.map(service => _this._externalServices.add(service));
}
if (this._logger?.isDebugEnabled()) {
// If there is a Logger:
this._logger.debug(
'new services found: \n' +
JSON.stringify(
Array.from(this._externalServices),
undefined,
4
)
);
}
}
/**
* Function to test if a specific Service exists.
*
* @param {string} id The Id of the Serivce
* @return {boolean} The result of the Test. True if either local or remotly a service is known.
* @memberof nopeDispatcher
*/
public serviceExists(id: string){
return this._definedFunctions.has(id) || this._externalServices.has(id);
}
/**
@ -322,8 +441,8 @@ export class nopeDispatcher {
* @return {string} the adapted ID.
* @memberof nopeDispatcher
*/
protected _getRequestName(id: string): string {
return id.startsWith('request/') ? id : 'request/' + id;
protected _getRequestName(id: string, type: 'request' | 'response'): string {
return id.startsWith(`${type}/`) ? id : `${type}/${id}`;
}
/**
@ -336,25 +455,40 @@ export class nopeDispatcher {
* @memberof nopeDispatcher
*/
protected _subscribeToFunction(id: string, cb: (...args) => Promise<any>): void {
const _req = this._getRequestName(id, 'request');
const _res = this._getRequestName(id, 'response')
if (
this.options.subscriptionMode === 'individual' &&
!this._communicatorCallbacks.has(this._getRequestName(id))
!this._communicatorCallbacks.has(_req)
) {
const _this = this;
// Define a Function.
const func = (data: requestTaskMsg) => {
const req = (data: requestTaskMsg) => {
if (data.type === 'requestOfTask') {
_this._handleExternalRequest(data, cb);
}
};
// Add the Callback.
this._communicatorCallbacks.set(this._getRequestName(id), func);
const res = (data: responseTaskMsg) => {
if (data.type === 'response') {
_this._handleExternalResponse(data);
}
};
// Register Function
this._rpcCommunicator.on(this._getRequestName(id), func);
// Add the Callback.
this._communicatorCallbacks.set(_req, {
res,
req
});
// Register Functions.
this._communicator.onRpcRequest(_req, req);
this._communicator.onRpcResult(_res, res);
}
}
@ -381,7 +515,8 @@ export class nopeDispatcher {
} = {}): (...args) => Promise<any> {
const _this = this;
const _id = this._getRequestName(options.id || generateId());
// Define / Use the ID of the Function.
const _id = options.id || generateId();
let _func = func;
@ -431,8 +566,14 @@ export class nopeDispatcher {
// Try to unregister the Callback from the communcator:
if (this._communicatorCallbacks.has(_id)) {
// Unregister the Function
this._rpcCommunicator.off(_id, this._communicatorCallbacks.get(_id));
const _callbacks = this._communicatorCallbacks.get(_id);
// Unregister the RPC-Request-Listener
this._communicator.offRpcRequest(this._getRequestName(_id, 'request'), _callbacks.req);
// Unregister the Result-Listener
this._communicator.offRpcRequest(this._getRequestName(_id, 'response'), _callbacks.res);
// Remove the Callback
this._communicatorCallbacks.delete(_id);
}
@ -460,7 +601,7 @@ export class nopeDispatcher {
}
// Send the Message.
this._eventCommunicator.emit('availableServices', message);
this._communicator.emitNewServicesAvailable(message);
}
/**
@ -482,7 +623,7 @@ export class nopeDispatcher {
public performCall<T>(functionName: string, params: any[], options: callOptions = {
deletableCallbacks: [],
paramsHasNoCallback: false,
preventErrorTest: false
preventAvailableTest: false
}): Promise<T> {
// Get a Call Id
const _taskId = generateId();
@ -501,7 +642,7 @@ export class nopeDispatcher {
// Define a Task-Request
const taskRequest: requestTaskMsg = {
functionId: this._getRequestName(functionName),
functionId: functionName,
params,
callbacks: [],
taskId: _taskId,
@ -546,33 +687,31 @@ export class nopeDispatcher {
functionId: _func['id'],
idx,
deleteAfterCalling,
preventErrorTest: true,
preventAvailableTest: true,
deletableCallbacks: []
});
}
}
}
if (!_this._remotlyCalledFunctions.has(taskRequest.functionId) && !options.preventErrorTest) {
if (!options.preventAvailableTest && !_this.serviceExists(functionName)) {
// Define e Test of an Message.
const errorTestRequest: errorTestMsg = {
functionId: taskRequest.functionId,
taskId: _taskId,
type: 'requestOfService'
};
// Create an Error:
const error = new Error('No Service Provider known for "' + functionName + '"');
if (this._logger){
this._logger.error('No Service Provider known for "' + functionName + '"');
this._logger.error(error)
}
// Send the Message, to the general Request Path:
_this._eventCommunicator.emit('requestedService', errorTestRequest);
_this._remotlyCalledFunctions.add(taskRequest.functionId)
throw error;
}
// Send the Message to the specific element:
if (_this.options.subscriptionMode === 'individual') {
_this._rpcCommunicator.emit(taskRequest.functionId, taskRequest);
_this._communicator.emitRpcRequest(this._getRequestName(taskRequest.functionId, 'request'), taskRequest);
} else {
_this._rpcCommunicator.emit('request', taskRequest);
_this._communicator.emitRpcRequest('request', taskRequest);
}
} catch (e) {
// Delete all Callbacks.
@ -619,7 +758,7 @@ export class nopeDispatcher {
}
// Reset the Callbacks.
this._communicatorCallbacks = new Map<string, (data) => any>();
this._communicatorCallbacks = new Map();
}
/**
@ -628,7 +767,9 @@ export class nopeDispatcher {
* @memberof nopeDispatcher
*/
public reset(): void {
this._remotlyCalledFunctions = new Set<string>();
this._remotlyCalledFunctions = new Set();
this._externalServices = new Set();
this._mappingOfRemoteDispatchersAndFunctions = new Map();
this.clearTasks();
this.unregisterAll();
}

View File

@ -1,79 +0,0 @@
import { Logger } from "winston";
import { getLogger } from "../logger/getLogger";
import { availableServices, errorTestMsg, ICommunicationInterface, responseTaskMsg } from './nopeDispatcher';
export class serviceRegistry {
/**
* Creates an instance of serviceRegistry.
* @param {{ communicator: ICommunicationInterface }} options
* @memberof serviceRegistry
*/
constructor(public options: { communicator: ICommunicationInterface }) {
this._communicator = options.communicator;
this._logger = getLogger('info', 'Service-Registry');
this._mapping = new Map<string, availableServices>();
this._services = new Set<string>();
this.init();
}
protected _communicator: ICommunicationInterface;
protected _mapping: Map<string, availableServices>;
protected _services: Set<string>;
protected _logger: Logger;
public init() {
const _this = this;
this._communicator.on('requestedService', (task: errorTestMsg) => {
if (task.type === 'requestOfService' && !_this._services.has(task.functionId)) {
// An Error occourd => Forward the Error.
const result: responseTaskMsg = {
error: new Error('No Service Provider known for "' + task.functionId + '"'),
taskId: task.taskId,
type: 'response'
}
// Send the Error via the communicator to the remote.
_this._communicator.send('response', result);
}
});
this._communicator.on('availableServices', (data: availableServices) => {
try {
_this._mapping.set(data.dispatcher, data);
_this._updateServices();
} catch (e) {
}
});
}
/**
* Function to update the used Services.
*
* @protected
* @memberof serviceRegistry
*/
protected _updateServices() {
const _this = this;
// Clear the Services
this._services.clear();
for (const dispatcherInfo of this._mapping.values()) {
dispatcherInfo.services.map(service => _this._services.add(service));
}
this._logger.debug(
'new services found: \n' +
JSON.stringify(
Array.from(this._services),
undefined,
4
)
);
}
}

View File

@ -2,13 +2,10 @@ import { EventLayer } from "../lib/communication/eventLayer";
import { getLinkedDispatcher } from "../lib/dispatcher/getLinkedDispatcher";
import { nopeDispatcher } from "../lib/dispatcher/nopeDispatcher";
import { exportFunctionToDispatcher } from "../lib/dispatcher/nopeDispatcherDecorators";
import { serviceRegistry } from "../lib/dispatcher/serviceRegistry";
import { generateBenchmarkFunction } from "../modules/funcs/generateBenchmarkFunction";
const communicator = new EventLayer();
const serviceReg = new serviceRegistry({
communicator
});
const local = new nopeDispatcher({ communicator });
const _functionRemote = exportFunctionToDispatcher(async (a: number, b: number, operation: (a: number, b: number) => number) => {
return await operation(a, b);
@ -32,10 +29,10 @@ const main = async () => {
console.log('1', res)
res = await local.performCall<number>('_functionRemote', [1, 2, async (a, b) => {
res = await local.methodInterface._functionRemote(1, 2, async (a, b) => {
console.log('local callback')
return a * b;
}]);
});
let max = 100000;
let i = 0;