fixing channel issues

This commit is contained in:
Martin Karkowski 2020-09-12 09:56:17 +02:00
parent d36cf9660b
commit 9026204809
2 changed files with 104 additions and 103 deletions

View File

@ -37,8 +37,14 @@ export interface ICommunicationInterface {
send(topic: string, data): void; send(topic: string, data): void;
} }
export type requestTask = { export type errorTestMsg = {
type: 'request', type: 'requestOfService'
taskId: number,
functionId: string,
}
export type requestTaskMsg = {
type: 'requestOfTask',
taskId: number, taskId: number,
functionId: string, functionId: string,
params: { params: {
@ -52,13 +58,15 @@ export type requestTask = {
} & callOptions)[] } & callOptions)[]
} }
export type callOptions = { export type callOptions = {
deletableCallbacks: Array<number>; deletableCallbacks: Array<number>;
noCallbackItegrated?: boolean; paramsHasNoCallback?: boolean;
noErrorTest?: boolean; preventErrorTest?: boolean;
} }
export type responseOfTask = { export type responseTaskMsg = {
type: 'response', type: 'response',
taskId: number, taskId: number,
result?: any, result?: any,
@ -94,6 +102,7 @@ export class nopeDispatcher {
* @memberof nopeDispatcher * @memberof nopeDispatcher
*/ */
protected _definedFunctions: Map<string, (...args) => Promise<any>>; protected _definedFunctions: Map<string, (...args) => Promise<any>>;
protected _remotlyCalledFunctions: Set<string>;
protected _communicatorCallbacks: Map<string, (data) => any>; protected _communicatorCallbacks: Map<string, (data) => any>;
protected _communicator: ICommunicationInterface protected _communicator: ICommunicationInterface
@ -129,12 +138,12 @@ export class nopeDispatcher {
* Internal Method to handle some requests. * Internal Method to handle some requests.
* *
* @protected * @protected
* @param {requestTask} data The provided data of the request * @param {requestTaskMsg} data The provided data of the request
* @param {*} [_function=this._definedFunctions.get(data.functionId)] The Function can be provided * @param {*} [_function=this._definedFunctions.get(data.functionId)] The Function can be provided
* @return {Promise<void>} * @return {Promise<void>}
* @memberof nopeDispatcher * @memberof nopeDispatcher
*/ */
protected async _handleExternalRequest(data: requestTask, _function?: (...args) => Promise<any>): Promise<void> { protected async _handleExternalRequest(data: requestTaskMsg, _function?: (...args) => Promise<any>): Promise<void> {
try { try {
// Try to get the function if not provided: // Try to get the function if not provided:
@ -153,15 +162,15 @@ export class nopeDispatcher {
// Add the Callbacks. Therefore create a function which will // Add the Callbacks. Therefore create a function which will
// trigger the remote. // trigger the remote.
data.callbacks.map(item => args[item.idx] = async (..._args) => { data.callbacks.map(options => args[options.idx] = async (..._args) => {
return await _this.performCall(item.functionId, _args); return await _this.performCall(options.functionId, _args, options);
}); });
// Perform the Task it self. // Perform the Task it self.
const _result = await _function(...args); const _result = await _function(...args);
// Define the Result message // Define the Result message
const result: responseOfTask = { const result: responseTaskMsg = {
result: typeof (_result) !== 'undefined' ? _result : null, result: typeof (_result) !== 'undefined' ? _result : null,
taskId: data.taskId, taskId: data.taskId,
type: 'response' type: 'response'
@ -174,7 +183,7 @@ export class nopeDispatcher {
} catch (error) { } catch (error) {
// An Error occourd => Forward the Error. // An Error occourd => Forward the Error.
const result: responseOfTask = { const result: responseTaskMsg = {
error, error,
taskId: data.taskId, taskId: data.taskId,
type: 'response' type: 'response'
@ -191,11 +200,11 @@ export class nopeDispatcher {
* the provided id. If so => finish the promise. * the provided id. If so => finish the promise.
* *
* @protected * @protected
* @param {responseOfTask} data The Data provided to handle the Response. * @param {responseTaskMsg} data The Data provided to handle the Response.
* @return {void} Nothing * @return {void} Nothing
* @memberof nopeDispatcher * @memberof nopeDispatcher
*/ */
protected _handleExternalResponse(data: responseOfTask): void { protected _handleExternalResponse(data: responseTaskMsg): void {
try { try {
// Extract the Task // Extract the Task
const task = this._runningTasks.get(data.taskId); const task = this._runningTasks.get(data.taskId);
@ -238,8 +247,8 @@ export class nopeDispatcher {
break; break;
case 'generic': case 'generic':
// Add a generic Subscription for callbacks: // Add a generic Subscription for callbacks:
this._communicator.on('request', (data: requestTask) => { this._communicator.on('request', (data: requestTaskMsg) => {
if (data.type === 'request') { if (data.type === 'requestOfTask') {
_this._handleExternalRequest(data); _this._handleExternalRequest(data);
} }
}); });
@ -247,7 +256,7 @@ export class nopeDispatcher {
} }
// Subscribe to Responses // Subscribe to Responses
this._communicator.on('response', (data: responseOfTask) => { this._communicator.on('response', (data: responseTaskMsg) => {
if (data.type === 'response') { if (data.type === 'response') {
_this._handleExternalResponse(data); _this._handleExternalResponse(data);
} }
@ -267,8 +276,8 @@ export class nopeDispatcher {
const _this = this; const _this = this;
// Define a Function. // Define a Function.
const func = (data: requestTask) => { const func = (data: requestTaskMsg) => {
if (data.type === 'request') { if (data.type === 'requestOfTask') {
_this._handleExternalRequest(data, cb); _this._handleExternalRequest(data, cb);
} }
}; };
@ -304,20 +313,16 @@ export class nopeDispatcher {
} = {}): (...args) => Promise<any> { } = {}): (...args) => Promise<any> {
const _this = this; const _this = this;
let _id = options.id || generateId(); const _id = this._getRequestName(options.id || generateId());
// if (this.options.subscriptionMode === 'generic') {
// _id = this._getRequestName(_id);
// }
_id = this._getRequestName(_id);
let _func = func; let _func = func;
if (options.deleteAfterCalling) { if (options.deleteAfterCalling) {
_func = async (...args) => { _func = async (...args) => {
// Unregister the Method // Unregister the Method
_this.unregistFunction(_id); _this.unregistFunction(_id, {
preventSendingToRegistery: options.preventSendingToRegistery
});
// Return the Result of the Original Function. // Return the Result of the Original Function.
return await func(...args); return await func(...args);
} }
@ -335,9 +340,10 @@ export class nopeDispatcher {
// Register the Callback: // Register the Callback:
this._subscribeToFunction(_id, _func); this._subscribeToFunction(_id, _func);
if (!options.preventSendingToRegistery) if (!options.preventSendingToRegistery) {
// Publish the Available Services. // Publish the Available Services.
this._sendAvailableServices(); this._sendAvailableServices();
}
// Return the Function. // Return the Function.
return _func; return _func;
@ -349,7 +355,10 @@ export class nopeDispatcher {
* @return {*} {boolean} Flag, whether the element was removed (only if found) or not. * @return {*} {boolean} Flag, whether the element was removed (only if found) or not.
* @memberof nopeDispatcher * @memberof nopeDispatcher
*/ */
public unregistFunction(func: ((...args) => void) | string | number): boolean { public unregistFunction(func: ((...args) => void) | string, options: {
/** Flag to enable / disable sending to registery */
preventSendingToRegistery?: boolean;
} = {}): boolean {
const _id = typeof func === 'string' ? func : func['id'] as string || '0'; const _id = typeof func === 'string' ? func : func['id'] as string || '0';
// Try to unregister the Callback from the communcator: // Try to unregister the Callback from the communcator:
@ -360,8 +369,10 @@ export class nopeDispatcher {
this._communicatorCallbacks.delete(_id); this._communicatorCallbacks.delete(_id);
} }
// Publish the Available Services. if (!options.preventSendingToRegistery) {
this._sendAvailableServices(); // Publish the Available Services.
this._sendAvailableServices();
}
return this._definedFunctions.delete(_id); return this._definedFunctions.delete(_id);
} }
@ -393,23 +404,21 @@ export class nopeDispatcher {
* @param {({ * @param {({
* deletableCallbacks: Array<number>; * deletableCallbacks: Array<number>;
* })} [options={ * })} [options={
* deletableCallbacks: [] * deletableCallbacks: [],
* }] You could provide the index of removeable callbacks. * paramsHasNoCallback: false,
* preventErrorTest: false
* }] You could additiona Optionf for the callback.
* @return {*} {Promise<T>} The result of the Operation * @return {*} {Promise<T>} The result of the Operation
* @memberof nopeDispatcher * @memberof nopeDispatcher
*/ */
public performCall<T>(functionName: string, params: any[], options: { public performCall<T>(functionName: string, params: any[], options: callOptions = {
deletableCallbacks: Array<number>; deletableCallbacks: [],
noCallbackItegrated?: boolean; paramsHasNoCallback: false,
noErrorTest?: boolean; preventErrorTest: false
} = { }): Promise<T> {
deletableCallbacks: [],
noCallbackItegrated: false,
noErrorTest: true
}): Promise<T> {
// Get a Call Id // Get a Call Id
const _taskId = generateId(); const _taskId = generateId();
const _registeredIdx: Array<number | string> = []; const _registeredIdx: Array<string> = [];
const _this = this; const _this = this;
// Define a Callback-Function, which will expect the Task. // Define a Callback-Function, which will expect the Task.
@ -422,51 +431,42 @@ export class nopeDispatcher {
reject reject
}); });
// Test if there is no Callback integrated => Speedup the Function. // Define a Task-Request
if (options.noCallbackItegrated) { const taskRequest: requestTaskMsg = {
// Perform the call. Therefore create the data package. functionId: this._getRequestName(functionName),
const packet: requestTask = { params,
functionId: this._getRequestName(functionName), callbacks: [],
params, taskId: _taskId,
callbacks: [], type: 'requestOfTask'
taskId: _taskId, }
type: 'request'
}
// Send the Message, to the general Request Path: // Test if there is no Callback integrated
_this._communicator.send('request', packet); if (!options.paramsHasNoCallback) {
// If so, the parameters has to be detailled:
// Send the Message to the specific element: // 1. Reset the Params list:
if (_this.options.subscriptionMode === 'individual') { taskRequest.params = [];
_this._communicator.send(packet.functionId, packet);
}
// leave the Method. // 2. Iterate over all Parameters and
return; // Determin Callbacks. Based on the Parameter-
} else { // Type assign it either to packet.params (
// for parsable Parameters) and packet.callbacks
// (for callback Parameters)
for (const [idx, contentOfParameter] of params.entries()) {
const _parameters: { // Test if the parameter is a Function
idx: number, if (typeof contentOfParameter !== "function") {
data: any, taskRequest.params.push({
}[] = [];
const _callbacks: ({
functionId: string,
idx: number,
deleteAfterCalling: boolean,
} & callOptions)[] = [];
// Detail the Parameters.
for (const [idx, data] of params.entries()) {
if (typeof data !== "function") {
_parameters.push({
idx, idx,
data data: contentOfParameter
}); });
} else { } else {
// The Parameter is a Callback => store a
// Description of the Callback and register
// the callback inside of the Dispatcher
const deleteAfterCalling = options.deletableCallbacks.includes(idx); const deleteAfterCalling = options.deletableCallbacks.includes(idx);
const _func = _this.registerFunction(data, { const _func = _this.registerFunction(contentOfParameter, {
deleteAfterCalling, deleteAfterCalling,
preventSendingToRegistery: true preventSendingToRegistery: true
}); });
@ -474,35 +474,37 @@ export class nopeDispatcher {
_registeredIdx.push(_func['id']); _registeredIdx.push(_func['id']);
// Register the Callback // Register the Callback
_callbacks.push({ taskRequest.callbacks.push({
functionId: _func['id'], functionId: _func['id'],
idx, idx,
deleteAfterCalling, deleteAfterCalling,
noErrorTest: true, preventErrorTest: true,
deletableCallbacks: [] deletableCallbacks: []
}); });
} }
} }
}
// Perform the call. Therefore create the data package. if (!_this._remotlyCalledFunctions.has(taskRequest.functionId) && !options.preventErrorTest) {
const packet: requestTask = {
functionId: _this._getRequestName(functionName), // Define e Test of an Message.
params: _parameters, const errorTestRequest: errorTestMsg = {
callbacks: _callbacks, functionId: taskRequest.functionId,
taskId: _taskId, taskId: _taskId,
type: 'request' type: 'requestOfService'
} };
// Send the Message to the specific element: // Send the Message, to the general Request Path:
if (_this.options.subscriptionMode === 'individual') { _this._communicator.send('requestedService', errorTestRequest);
if (!options.noErrorTest) {
// Send the Message, to the general Request Path: _this._remotlyCalledFunctions.add(taskRequest.functionId)
_this._communicator.send('request', packet); }
}
_this._communicator.send(packet.functionId, packet); // Send the Message to the specific element:
} else { if (_this.options.subscriptionMode === 'individual') {
_this._communicator.send('request', packet); _this._communicator.send(taskRequest.functionId, taskRequest);
} } else {
_this._communicator.send('request', taskRequest);
} }
} catch (e) { } catch (e) {
// Delete all Callbacks. // Delete all Callbacks.
@ -558,6 +560,7 @@ export class nopeDispatcher {
* @memberof nopeDispatcher * @memberof nopeDispatcher
*/ */
public reset(): void { public reset(): void {
this._remotlyCalledFunctions = new Set<string>();
this.clearTasks(); this.clearTasks();
this.unregisterAll(); this.unregisterAll();
} }

View File

@ -1,6 +1,6 @@
import { Logger } from "winston"; import { Logger } from "winston";
import { getLogger } from "../logger/getLogger"; import { getLogger } from "../logger/getLogger";
import { availableServices, ICommunicationInterface, requestTask, responseOfTask } from './nopeDispatcher'; import { availableServices, errorTestMsg, ICommunicationInterface, responseTaskMsg } from './nopeDispatcher';
export class serviceRegistry { export class serviceRegistry {
@ -27,13 +27,11 @@ export class serviceRegistry {
public init() { public init() {
const _this = this; const _this = this;
this._communicator.on('request', (task: requestTask) => { this._communicator.on('requestedService', (task: errorTestMsg) => {
_this._logger.debug('Getting Request for ' + task.functionId) if (task.type === 'requestOfService' && !_this._services.has(task.functionId)) {
if (!_this._services.has(task.functionId)) {
// An Error occourd => Forward the Error. // An Error occourd => Forward the Error.
const result: responseOfTask = { const result: responseTaskMsg = {
error: new Error('No Service Provider known for "' + task.functionId + '"'), error: new Error('No Service Provider known for "' + task.functionId + '"'),
taskId: task.taskId, taskId: task.taskId,
type: 'response' type: 'response'