
591 lines
17 KiB

import { generateId } from '../helpers/idMethods';
* A Layer to communicate.
* @export
* @interface ICommunicationInterface
export interface ICommunicationInterface {
* Function used to subscribe to updates. If there exist
* some state change in the Communication Interface it should
* be provided on the registered functions
* @param {string} event The Selected Event.
* @param {(...args) => void} cb The Callback, which will be called if an Event occours
* @memberof ICommunicationInterface
on(event: string, cb: (...args) => void);
* Function used to unsubscribe from updates
* @param {string} event The Selected Event.
* @param {(...args) => void} cb The Callback, which should be unsubscribed.
* @memberof ICommunicationInterface
off(event: string, cb: (...args) => void);
* Function to send data via the Communication interface
* @param {string} topic The Topic, on which data will be published
* @param {*} data The data. Could be any type and element. In Generel it should be parsable.
* @memberof ICommunicationInterface
send(topic: string, data): void;
export type errorTestMsg = {
type: 'requestOfService'
taskId: number,
functionId: string,
export type requestTaskMsg = {
type: 'requestOfTask',
taskId: number,
functionId: string,
params: {
idx: number,
data: any
callbacks: ({
functionId: string,
idx: number,
deleteAfterCalling: boolean,
} & callOptions)[]
export type callOptions = {
deletableCallbacks: Array<number>;
paramsHasNoCallback?: boolean;
preventErrorTest?: boolean;
export type responseTaskMsg = {
type: 'response',
taskId: number,
result?: any,
error?: any
export type availableServices = {
dispatcher: string,
services: string[]
export type nopeDispatcherOptions = {
communicator: ICommunicationInterface,
subscriptionMode?: 'individual' | 'generic',
* A Dispatcher to perform a function on a Remote
* Dispatcher. Therefore a Task is created and forwarded
* to the remote.
* @export
* @class nopeDispatcher
export class nopeDispatcher {
public readonly id: string;
* Internal Element to store the registered Functions
* @protected
* @memberof nopeDispatcher
protected _definedFunctions: Map<string, (...args) => Promise<any>>;
protected _remotlyCalledFunctions: Set<string>;
protected _communicatorCallbacks: Map<string, (data) => any>;
protected _communicator: ICommunicationInterface
* Internal Element to store the running tasks.
* @protected
* @memberof nopeDispatcher
protected _runningTasks: Map<number, {
resolve: (value: any) => void;
reject: (error: any) => void;
* Creates an instance of nopeDispatcher.
* @param {ICommunicationInterface} communicator The Communication Layer which should be used.
* @memberof nopeDispatcher
constructor(public options: nopeDispatcherOptions) {
this._communicator = options.communicator;
this.options.subscriptionMode = this.options.subscriptionMode || 'generic'; = generateId();
* Internal Method to handle some requests.
* @protected
* @param {requestTaskMsg} data The provided data of the request
* @param {*} [_function=this._definedFunctions.get(data.functionId)] The Function can be provided
* @return {Promise<void>}
* @memberof nopeDispatcher
protected async _handleExternalRequest(data: requestTaskMsg, _function?: (...args) => Promise<any>): Promise<void> {
try {
// Try to get the function if not provided:
if (typeof _function !== 'function') {
_function = this._definedFunctions.get(data.functionId);
const _this = this;
if (typeof _function === 'function') {
// Only if the Function is present extract the arguments etc.
const args = [];
// First extract the basic arguments => args[item.idx] =;
// Add the Callbacks. Therefore create a function which will
// trigger the remote. => args[options.idx] = async (..._args) => {
return await _this.performCall(options.functionId, _args, options);
// Perform the Task it self.
const _result = await _function(...args);
// Define the Result message
const result: responseTaskMsg = {
result: typeof (_result) !== 'undefined' ? _result : null,
taskId: data.taskId,
type: 'response'
// Use the communicator to publish the event.
this._communicator.send('response', result);
} catch (error) {
// An Error occourd => Forward the Error.
const result: responseTaskMsg = {
taskId: data.taskId,
type: 'response'
// Send the Error via the communicator to the remote.
this._communicator.send('response', result);
* Internal Function to handle responses. In Generale,
* the dispatcher checks if there is an open task with
* the provided id. If so => finish the promise.
* @protected
* @param {responseTaskMsg} data The Data provided to handle the Response.
* @return {void} Nothing
* @memberof nopeDispatcher
protected _handleExternalResponse(data: responseTaskMsg): void {
try {
// Extract the Task
const task = this._runningTasks.get(data.taskId);
// Delete the Task:
// Based on the Result of the Remote =>
if (task && data.error) {
return task.reject(data.error);
if (task) {
return task.resolve(data.result);
} catch (e) {
* Internal Function, used to initialize the Dispatcher.
* It subscribes to the "Messages" of the communicator.
* @protected
* @memberof nopeDispatcher
protected init(): void {
const _this = this;
// Based on the Mode of the Subscription =>
// either create indivdual topics for the methods
// or use the generice function.
switch (this.options.subscriptionMode) {
case 'individual':
// Iterate over the Defined Functions.
for (const [id, cb] of this._definedFunctions.entries()) {
// Subscribe the Function
this._subscribeToFunction(id, cb);
case 'generic':
// Add a generic Subscription for callbacks:
this._communicator.on('request', (data: requestTaskMsg) => {
if (data.type === 'requestOfTask') {
// Subscribe to Responses
this._communicator.on('response', (data: responseTaskMsg) => {
if (data.type === 'response') {
// Subscribe to the availableServices to
// Adapt the _remotlyCalledFunctions Set.
this._communicator.on('availableServices', () => {
* Function to adapt a Request name.
* Only used internally
* @protected
* @param {string} id the original ID
* @return {string} the adapted ID.
* @memberof nopeDispatcher
protected _getRequestName(id: string): string {
return id.startsWith('request/') ? id : 'request/' + id;
* Internal Helper Function to subscribe to a Function element.
* @protected
* @param {string} id the Id of the function
* @param {(...args) => Promise<any>} cb The Callback of the Function
* @return {void} the adapted ID.
* @memberof nopeDispatcher
protected _subscribeToFunction(id: string, cb: (...args) => Promise<any>): void {
if (
this.options.subscriptionMode === 'individual' &&
) {
const _this = this;
// Define a Function.
const func = (data: requestTaskMsg) => {
if (data.type === 'requestOfTask') {
_this._handleExternalRequest(data, cb);
// Add the Callback.
this._communicatorCallbacks.set(this._getRequestName(id), func);
// Register Function
this._communicator.on(this._getRequestName(id), func);
* Function to register a Function in the Dispatcher
* @param {(...args) => Promise<any>} func The function which should be called if a request is mapped to the Function.
* @param {{
* // Flag to enable unregistering the function after calling.
* deleteAfterCalling?: boolean,
* // Instead of generating a uuid an id could be provided
* id?: string;
* }} [options={}] Options to enhance the registered ID and enabling unregistering the Element after calling it.
* @return {*} {(...args) => Promise<any>} The registered Function
* @memberof nopeDispatcher
public registerFunction(func: (...args) => Promise<any>, options: {
/** Flag to enable unregistering the function after calling. */
deleteAfterCalling?: boolean,
/** Instead of generating a uuid an id could be provided */
id?: string;
/** Flag to enable / disable sending to registery */
preventSendingToRegistery?: boolean;
} = {}): (...args) => Promise<any> {
const _this = this;
const _id = this._getRequestName( || generateId());
let _func = func;
if (options.deleteAfterCalling) {
_func = async (...args) => {
// Unregister the Method
_this.unregistFunction(_id, {
preventSendingToRegistery: options.preventSendingToRegistery
// Return the Result of the Original Function.
return await func(...args);
// Define a ID for the Function
_func['id'] = _id;
// Define the callback.
_func['unregister'] = () => _this.unregistFunction(_id);
// Reister the Function
this._definedFunctions.set(_func['id'], _func);
// Register the Callback:
this._subscribeToFunction(_id, _func);
if (!options.preventSendingToRegistery) {
// Publish the Available Services.
// Return the Function.
return _func;
* Function to unregister a Function from the Dispatcher
* @param {(((...args) => void) | string | number)} func The Function to unregister
* @return {*} {boolean} Flag, whether the element was removed (only if found) or not.
* @memberof nopeDispatcher
public unregistFunction(func: ((...args) => void) | string, options: {
/** Flag to enable / disable sending to registery */
preventSendingToRegistery?: boolean;
} = {}): boolean {
const _id = typeof func === 'string' ? func : func['id'] as string || '0';
// Try to unregister the Callback from the communcator:
if (this._communicatorCallbacks.has(_id)) {
// Unregister the Function, this._communicatorCallbacks.get(_id));
// Remove the Callback
if (!options.preventSendingToRegistery) {
// Publish the Available Services.
return this._definedFunctions.delete(_id);
* Function used to update the Available Services.
* @protected
* @memberof nopeDispatcher
protected _sendAvailableServices() {
// Define the Message
const message: availableServices = {
services: Array.from(this._definedFunctions.keys())
// Send the Message.
this._communicator.send('availableServices', message);
* Function which is used to perform a call on the remote.
* @template T
* @param {string} functionName The Name / ID of the Function
* @param {any[]} params The provided Parameters.
* @param {({
* deletableCallbacks: Array<number>;
* })} [options={
* deletableCallbacks: [],
* paramsHasNoCallback: false,
* preventErrorTest: false
* }] You could additiona Optionf for the callback.
* @return {*} {Promise<T>} The result of the Operation
* @memberof nopeDispatcher
public performCall<T>(functionName: string, params: any[], options: callOptions = {
deletableCallbacks: [],
paramsHasNoCallback: false,
preventErrorTest: false
}): Promise<T> {
// Get a Call Id
const _taskId = generateId();
const _registeredIdx: Array<string> = [];
const _this = this;
// Define a Callback-Function, which will expect the Task.
return new Promise<T>((resolve, reject) => {
try {
// Register the Handlers,
_this._runningTasks.set(_taskId, {
// Define a Task-Request
const taskRequest: requestTaskMsg = {
functionId: this._getRequestName(functionName),
callbacks: [],
taskId: _taskId,
type: 'requestOfTask'
// Test if there is no Callback integrated
if (!options.paramsHasNoCallback) {
// If so, the parameters has to be detailled:
// 1. Reset the Params list:
taskRequest.params = [];
// 2. Iterate over all Parameters and
// Determin Callbacks. Based on the Parameter-
// Type assign it either to packet.params (
// for parsable Parameters) and packet.callbacks
// (for callback Parameters)
for (const [idx, contentOfParameter] of params.entries()) {
// Test if the parameter is a Function
if (typeof contentOfParameter !== "function") {
data: contentOfParameter
} else {
// The Parameter is a Callback => store a
// Description of the Callback and register
// the callback inside of the Dispatcher
const deleteAfterCalling = options.deletableCallbacks.includes(idx);
const _func = _this.registerFunction(contentOfParameter, {
preventSendingToRegistery: true
// Register the Callback
functionId: _func['id'],
preventErrorTest: true,
deletableCallbacks: []
if (!_this._remotlyCalledFunctions.has(taskRequest.functionId) && !options.preventErrorTest) {
// Define e Test of an Message.
const errorTestRequest: errorTestMsg = {
functionId: taskRequest.functionId,
taskId: _taskId,
type: 'requestOfService'
// Send the Message, to the general Request Path:
_this._communicator.send('requestedService', errorTestRequest);
// Send the Message to the specific element:
if (_this.options.subscriptionMode === 'individual') {
_this._communicator.send(taskRequest.functionId, taskRequest);
} else {
_this._communicator.send('request', taskRequest);
} catch (e) {
// Delete all Callbacks. => _this.unregistFunction(id));
// Remove the task:
if (_this._runningTasks.has(_taskId))
// Throw an error.
* Function to clear all pending tasks
* @memberof nopeDispatcher
public clearTasks(): void {
if (this._runningTasks)
this._runningTasks = new Map<number, {
resolve: (value: any) => void;
reject: (error: any) => void;
* Function to unregister all Functions of the Dispatcher.
* @memberof nopeDispatcher
public unregisterAll(): void {
if (this._definedFunctions) {
for (const id of this._definedFunctions.keys()) {
} else {
this._definedFunctions = new Map<string, (...args) => Promise<any>>();
// Reset the Callbacks.
this._communicatorCallbacks = new Map<string, (data) => any>();
* Function to reset the Dispatcher.
* @memberof nopeDispatcher
public reset(): void {
this._remotlyCalledFunctions = new Set<string>();