1203 lines
37 KiB
TypeScript
1203 lines
37 KiB
TypeScript
import { generateId } from '../helpers/idMethods';
|
|
import { Logger } from "winston";
|
|
import { ICommunicationInterface, IRequestTaskMsg, IResponseTaskMsg, IExternalEventMsg, IAvailableTopicsMsg, IAvailableServicesMsg, ICallOptions, INopeRpcDispatcherOptions, IInstanceCreation, IInstanceDescription, IAvailableInstanceGenerators, } from '../types/communication.interface';
|
|
import { INopeDispatcher, IGenerateRemoteInstanceCallback } from '../types/dispatcher.interface';
|
|
import { INopeObservable, IPipe } from '../types/nopeObservable.interface';
|
|
import { inject, injectable } from 'inversify';
|
|
import { OBSERVABLE_FACTORY, DISPATCHER_OPTIONS } from '../symbols/identifiers';
|
|
import { IRemoteInstance } from '../types/remoteInstance.interface';
|
|
|
|
/**
|
|
* A Dispatcher to perform a function on a Remote
|
|
* Dispatcher. Therefore a Task is created and forwarded
|
|
* to the remote.
|
|
*
|
|
* @export
|
|
* @class nopeDispatcher
|
|
*/
|
|
@injectable()
|
|
export class nopeDispatcher implements INopeDispatcher {
|
|
public readonly id: string;
|
|
|
|
protected _logger: Logger;
|
|
|
|
/**
|
|
* Internal Element to store the registered Functions
|
|
*
|
|
* @protected
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
protected _definedFunctions: Map<string, (...args) => Promise<any>>;
|
|
protected _remotlyCalledFunctions: Set<string>;
|
|
protected _communicatorCallbacks: Map<string, {
|
|
registeredId: string,
|
|
type: 'request' | 'response'
|
|
cb: (data) => any
|
|
}>;
|
|
protected _communicator: ICommunicationInterface;
|
|
protected _mappingOfRemoteDispatchersAndFunctions: Map<string, IAvailableServicesMsg>;
|
|
protected _externalProvidedServices: Set<string>;
|
|
|
|
protected _mappingOfRemoteDispatchersAndGenerators: Map<string, IAvailableInstanceGenerators>;
|
|
protected _externalProvidedGenerators: Set<string>;
|
|
|
|
public methodInterfaceWithOptions: { [index: string]: <T>(optins: ICallOptions, ...args) => Promise<T> }
|
|
public methodInterface: { [index: string]: <T>(...args) => Promise<T> }
|
|
|
|
|
|
protected _mappingOfRemoteDispatchersAndTopics: Map<string, IAvailableTopicsMsg>;
|
|
protected _externalSubscribed: Set<string>;
|
|
protected _externalPublished: Set<string>;
|
|
|
|
/**
|
|
* Internal Element to store the running tasks.
|
|
*
|
|
* @protected
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
protected _runningTasks: Map<string, {
|
|
resolve: (value: any) => void;
|
|
reject: (error: any) => void;
|
|
clear: () => void;
|
|
}>;
|
|
|
|
readonly _subscriptionMode: 'individual' | 'generic';
|
|
readonly _resultSharing: 'individual' | 'generic';
|
|
|
|
/**
|
|
* Creates an instance of nopeDispatcher.
|
|
* @param {nopeRpcDispatcherOptions} options The Options, used by the Dispatcher.
|
|
* @param {() => INopeObservable<IExternalEventMsg>} _generateObservable A Helper, to generate Observables.
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
constructor(
|
|
@inject(DISPATCHER_OPTIONS) public options: INopeRpcDispatcherOptions,
|
|
@inject(OBSERVABLE_FACTORY) protected _generateObservable: () => INopeObservable<IExternalEventMsg>,
|
|
) {
|
|
|
|
this._communicator = options.communicator;
|
|
|
|
if (options.logger) {
|
|
this._logger = options.logger;
|
|
}
|
|
|
|
this._subscriptionMode = this._communicator.subscriptionMode || 'generic';
|
|
this._resultSharing = this._communicator.resultSharing || 'generic';
|
|
|
|
this.id = generateId();
|
|
|
|
/**
|
|
* Define A Proxy for accessing methods easier.
|
|
*/
|
|
const _this = this;
|
|
const _handlerWithOptions = {
|
|
get(target, name) {
|
|
return async (options: ICallOptions, ...args) => {
|
|
return _this.performCall(name, args, options);
|
|
}
|
|
}
|
|
}
|
|
const _handlerWithoutOptions = {
|
|
get(target, name) {
|
|
return async (...args) => {
|
|
return _this.performCall(name, args);
|
|
}
|
|
}
|
|
}
|
|
this.methodInterfaceWithOptions = new Proxy({}, _handlerWithOptions);
|
|
this.methodInterface = new Proxy({}, _handlerWithoutOptions);
|
|
|
|
this.reset();
|
|
this._init();
|
|
}
|
|
|
|
protected _internalGenerators: Map<string, IGenerateRemoteInstanceCallback<IRemoteInstance>>;
|
|
protected _externalGenerators: Map<string, IGenerateRemoteInstanceCallback<IRemoteInstance>>;
|
|
protected _instances: Map<string, IRemoteInstance>;
|
|
|
|
public async generateInstance<I extends IRemoteInstance>(description: IInstanceCreation): Promise<I> {
|
|
if (this._instances.has(description.identifier)) {
|
|
return this._instances.get(description.identifier) as I;
|
|
}
|
|
|
|
try {
|
|
if (this._internalGenerators.has(description.type)) {
|
|
const result = await this.performCall<IInstanceDescription>('generateInstance_' + description.type, [description], {
|
|
paramsHasNoCallback: true,
|
|
});
|
|
|
|
const instance = await this._internalGenerators.get(description.type)(this, result.identifier) as I;
|
|
|
|
this._instances.set(description.identifier, instance);
|
|
|
|
return instance;
|
|
}
|
|
} catch (e) {
|
|
if (this._logger) {
|
|
this._logger.error('During creating an Instance, the following error Occurd');
|
|
this._logger.error(e)
|
|
}
|
|
|
|
throw e;
|
|
}
|
|
|
|
throw new Error("No Dynamic Interface provided");
|
|
}
|
|
|
|
public async deleteInstance<I extends IRemoteInstance>(instance: I): Promise<boolean> {
|
|
// Remove the Item:
|
|
this._instances.delete(instance.identifier);
|
|
// Dispose the Instance.
|
|
await instance.dispose();
|
|
return true;
|
|
}
|
|
|
|
public registerInstanceGenerator<I extends IRemoteInstance>(identifier: string, cb: IGenerateRemoteInstanceCallback<I>, options: { mode: "internal" | "external"; }) {
|
|
switch (options.mode) {
|
|
case 'internal':
|
|
this._internalGenerators.set(identifier, cb);
|
|
break;
|
|
case 'external':
|
|
const _this = this;
|
|
const _cb = this.registerFunction(async (data: IInstanceCreation) => {
|
|
// Check if an instance exists or not.
|
|
// if not => create an instance an store it.
|
|
if (!_this._instances.has(data.identifier)) {
|
|
|
|
// Create an Instance
|
|
const _instance = await cb(_this, data.identifier);
|
|
|
|
// Initialize the instance with the parameters.
|
|
_instance.init(...data.params);
|
|
|
|
_this.registerFunction(() => _instance.dispose(), {
|
|
deleteAfterCalling: true,
|
|
id: 'instance_dispose_' + data.identifier
|
|
});
|
|
|
|
// Store the Instance.
|
|
_this._instances.set(data.identifier, _instance);
|
|
}
|
|
|
|
const response: IInstanceDescription = {
|
|
identifier: data.identifier,
|
|
type: data.type,
|
|
}
|
|
return response
|
|
}, {
|
|
id: 'generateInstance_' + identifier,
|
|
});
|
|
this._externalGenerators.set(_cb['id'], _cb);
|
|
break;
|
|
}
|
|
}
|
|
|
|
public unregisterInstanceGenerator(identifier: string, options: { mode: "internal" | "external"; }) {
|
|
switch (options.mode) {
|
|
case 'internal':
|
|
this._internalGenerators.delete(identifier);
|
|
break;
|
|
case 'external':
|
|
if (this._externalGenerators.has(identifier)) {
|
|
this.unregistFunction(this._externalGenerators.get(identifier));
|
|
this._externalGenerators.delete(identifier);
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Internal Method to handle some requests.
|
|
*
|
|
* @protected
|
|
* @param {IRequestTaskMsg} data The provided data of the request
|
|
* @param {*} [_function=this._definedFunctions.get(data.functionId)] The Function can be provided
|
|
* @return {Promise<void>}
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
protected async _handleExternalRequest(data: IRequestTaskMsg, _function?: (...args) => Promise<any>): Promise<void> {
|
|
try {
|
|
|
|
// Try to get the function if not provided:
|
|
if (typeof _function !== 'function') {
|
|
_function = this._definedFunctions.get(data.functionId);
|
|
}
|
|
|
|
if (this._logger?.isDebugEnabled()) {
|
|
// If there is a Logger:
|
|
this._logger.debug('Dispatcher "'+ this.id + '" received request: "' + data.functionId + '" -> task: "' + data.taskId + '"');
|
|
}
|
|
|
|
const _this = this;
|
|
|
|
if (typeof _function === 'function') {
|
|
// Only if the Function is present extract the arguments etc.
|
|
const args = [];
|
|
|
|
// First extract the basic arguments
|
|
data.params.map(item => args[item.idx] = item.data);
|
|
|
|
// Add the Callbacks. Therefore create a function which will
|
|
// trigger the remote.
|
|
data.callbacks.map(options => args[options.idx] = async (..._args) => {
|
|
return await _this.performCall(options.functionId, _args, options);
|
|
});
|
|
|
|
// Perform the Task it self.
|
|
const _result = await _function(...args);
|
|
|
|
// Define the Result message
|
|
const result: IResponseTaskMsg = {
|
|
result: typeof (_result) !== 'undefined' ? _result : null,
|
|
taskId: data.taskId,
|
|
type: 'response'
|
|
}
|
|
|
|
// Use the communicator to publish the event.
|
|
await this._communicator.emitRpcResult(data.resultSink, result);
|
|
}
|
|
} catch (error) {
|
|
|
|
if (this._logger?.isErrorEnabled()) {
|
|
// If there is a Logger:
|
|
this._logger.error('Dispatcher "'+ this.id + '" failed with request: "' + data.taskId + '"');
|
|
this._logger.error(error);
|
|
}
|
|
|
|
// An Error occourd => Forward the Error.
|
|
const result: IResponseTaskMsg = {
|
|
error,
|
|
taskId: data.taskId,
|
|
type: 'response'
|
|
}
|
|
|
|
// Send the Error via the communicator to the remote.
|
|
await this._communicator.emitRpcResult(data.resultSink, result);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Internal Function to handle responses. In Generale,
|
|
* the dispatcher checks if there is an open task with
|
|
* the provided id. If so => finish the promise.
|
|
*
|
|
* @protected
|
|
* @param {IResponseTaskMsg} data The Data provided to handle the Response.
|
|
* @return {boolean} Returns a boolean, indicating whether a corresponding task was found or not.
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
protected _handleExternalResponse(data: IResponseTaskMsg): boolean {
|
|
try {
|
|
// Extract the Task
|
|
const task = this._runningTasks.get(data.taskId);
|
|
|
|
// Delete the Task:
|
|
this._runningTasks.delete(data.taskId);
|
|
|
|
// Based on the Result of the Remote =>
|
|
if (task && data.error) {
|
|
task.reject(data.error);
|
|
return true;
|
|
}
|
|
if (task) {
|
|
task.resolve(data.result);
|
|
return true;
|
|
}
|
|
} catch (e) {
|
|
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
/**
|
|
* Internal Function, used to initialize the Dispatcher.
|
|
* It subscribes to the "Messages" of the communicator.
|
|
*
|
|
* @protected
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
protected _init(): void {
|
|
const _this = this;
|
|
|
|
// Based on the Mode of the Subscription =>
|
|
// either create indivdual topics for the methods
|
|
// or use the generice function.
|
|
switch (this._subscriptionMode) {
|
|
case 'individual':
|
|
// Iterate over the Defined Functions.
|
|
for (const [id, cb] of this._definedFunctions.entries()) {
|
|
// Subscribe the Function
|
|
this._subscribeToFunction(id, cb);
|
|
}
|
|
break;
|
|
case 'generic':
|
|
// Add a generic Subscription for callbacks:
|
|
this._communicator.onRpcRequest('request', (data: IRequestTaskMsg) => {
|
|
if (data.type === 'requestOfTask') {
|
|
_this._handleExternalRequest(data);
|
|
}
|
|
});
|
|
|
|
// Subscribe to Responses
|
|
this._communicator.onRpcResult('response', (data: IResponseTaskMsg) => {
|
|
if (data.type === 'response') {
|
|
_this._handleExternalResponse(data);
|
|
}
|
|
});
|
|
break;
|
|
}
|
|
|
|
// Subscribe to the availableServices of Remotes.
|
|
// If there is a new Service => udpate the External Services
|
|
this._communicator.onNewServicesAvailable(data => {
|
|
try {
|
|
if (data.dispatcher !== _this.id) {
|
|
_this._mappingOfRemoteDispatchersAndFunctions.set(data.dispatcher, data);
|
|
_this._updateExternalServices();
|
|
}
|
|
} catch (e) {
|
|
}
|
|
});
|
|
|
|
// Subscribe to new available Topics.
|
|
this._communicator.onNewTopicsAvailable(data => {
|
|
try {
|
|
if (data.dispatcher !== _this.id) {
|
|
_this._mappingOfRemoteDispatchersAndTopics.set(data.dispatcher, data);
|
|
_this._updateExternalTopics();
|
|
}
|
|
} catch (e) {
|
|
}
|
|
});
|
|
|
|
this._communicator.onNewInstanceGeneratorsAvailable(data => {
|
|
try {
|
|
if (data.dispatcher !== _this.id) {
|
|
_this._mappingOfRemoteDispatchersAndGenerators.set(data.dispatcher, data);
|
|
_this._updateExternalGenerators();
|
|
}
|
|
} catch (e) {
|
|
}
|
|
})
|
|
|
|
this._communicator.onBonjour((dispatcher: string) => {
|
|
if (_this.id !== dispatcher) {
|
|
_this._sendAvailableServices();
|
|
_this._sendAvailableTopic();
|
|
_this._sendAvailableGenerators();
|
|
|
|
if (_this._logger?.isDebugEnabled()) {
|
|
// If there is a Logger:
|
|
_this._logger.debug('Remote Dispatcher "'+ dispatcher + '" went online');
|
|
}
|
|
}
|
|
});
|
|
|
|
this._communicator.emitBonjour(this.id);
|
|
}
|
|
|
|
/**
|
|
* Function to update the used Services.
|
|
*
|
|
* @protected
|
|
* @memberof serviceRegistry
|
|
*/
|
|
protected _updateExternalServices() {
|
|
const _this = this;
|
|
|
|
// Clear the Services
|
|
this._externalProvidedServices.clear();
|
|
for (const dispatcherInfo of this._mappingOfRemoteDispatchersAndFunctions.values()) {
|
|
dispatcherInfo.services.map(service => _this._externalProvidedServices.add(service));
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Function to update the used Services.
|
|
*
|
|
* @protected
|
|
* @memberof serviceRegistry
|
|
*/
|
|
protected _updateExternalGenerators() {
|
|
const _this = this;
|
|
|
|
// Clear the Services
|
|
this._externalProvidedGenerators.clear();
|
|
for (const generators of this._mappingOfRemoteDispatchersAndGenerators.values()) {
|
|
generators.generators.map(gen => _this._externalProvidedGenerators.add(gen));
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Internal Function to update the Listing of external Topcis.
|
|
* This Function creates a list containing all subscriptions
|
|
* and publishers which are external.
|
|
*
|
|
* @protected
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
protected _updateExternalTopics() {
|
|
const _this = this;
|
|
|
|
// Clear the Services
|
|
this._externalPublished.clear();
|
|
this._externalSubscribed.clear();
|
|
for (const dispatcherInfo of this._mappingOfRemoteDispatchersAndTopics.values()) {
|
|
dispatcherInfo.published.map(topic => _this._externalPublished.add(topic));
|
|
dispatcherInfo.subscribed.map(topic => _this._externalSubscribed.add(topic));
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Function to test if a specific Service exists.
|
|
*
|
|
* @param {string} id The Id of the Serivce
|
|
* @return {boolean} The result of the Test. True if either local or remotly a service is known.
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
public serviceExists(id: string) {
|
|
return this._definedFunctions.has(id) || this._externalProvidedServices.has(id);
|
|
}
|
|
|
|
/**
|
|
* Function to test if an subscription exists,
|
|
* for the given topic.
|
|
*
|
|
* @param {string} topic
|
|
* @return {*}
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
public subscriptionExists(topic: string) {
|
|
return this._externalSubscribed.has(topic);
|
|
}
|
|
|
|
/**
|
|
* Function to adapt a Request name.
|
|
* Only used internally
|
|
*
|
|
* @protected
|
|
* @param {string} id the original ID
|
|
* @return {string} the adapted ID.
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
protected _getServiceName(id: string, type: 'request' | 'response'): string {
|
|
return id.startsWith(`${type}/`) ? id : `${type}/${id}`;
|
|
}
|
|
|
|
/**
|
|
* Internal Helper Function to subscribe to a Function element.
|
|
*
|
|
* @protected
|
|
* @param {string} id the Id of the function
|
|
* @param {(...args) => Promise<any>} _cb The Callback of the Function
|
|
* @return {void} the adapted ID.
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
protected _subscribeToFunction(id: string, _cb: (...args) => Promise<any>): void {
|
|
const _req = this._getServiceName(id, 'request');
|
|
if (
|
|
this._subscriptionMode === 'individual' &&
|
|
!this._communicatorCallbacks.has(id)
|
|
) {
|
|
|
|
const _this = this;
|
|
|
|
// Define a Function.
|
|
const cb = (data: IRequestTaskMsg) => {
|
|
if (data.type === 'requestOfTask') {
|
|
_this._handleExternalRequest(data, _cb);
|
|
}
|
|
};
|
|
|
|
// Add the Callback.
|
|
this._communicatorCallbacks.set(id, {
|
|
registeredId: _req,
|
|
type: 'request',
|
|
cb
|
|
});
|
|
|
|
// Register Functions.
|
|
this._communicator.onRpcRequest(_req, cb);
|
|
|
|
if (this._logger?.isDebugEnabled()) {
|
|
// If there is a Logger:
|
|
this._logger.debug('Dispatcher "'+ this.id + '" listening on: "' + _req + '"');
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
* Function, used to subscribe to results.
|
|
*
|
|
* @protected
|
|
* @param {string} id
|
|
* @param {boolean} deleteAfterCalling
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
protected _subscribeToResult(id: string, deleteAfterCalling: boolean): void {
|
|
const _res = this._getServiceName(id, 'response');
|
|
if (
|
|
this._subscriptionMode === 'individual' &&
|
|
!this._communicatorCallbacks.has(id)
|
|
) {
|
|
|
|
const _this = this;
|
|
|
|
// Define a Function.
|
|
const cb = (data: IResponseTaskMsg) => {
|
|
if (data.type === 'response') {
|
|
if (_this._handleExternalResponse(data) && deleteAfterCalling) {
|
|
_this._removeRpcSubscription(id);
|
|
}
|
|
}
|
|
};
|
|
|
|
// Add the Callback.
|
|
this._communicatorCallbacks.set(id, {
|
|
registeredId: _res,
|
|
type: 'response',
|
|
cb
|
|
});
|
|
|
|
// Register Functions.
|
|
this._communicator.onRpcResult(_res, cb);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Function to register a Function in the Dispatcher
|
|
*
|
|
* @param {(...args) => Promise<any>} func The function which should be called if a request is mapped to the Function.
|
|
* @param {{
|
|
* // Flag to enable unregistering the function after calling.
|
|
* deleteAfterCalling?: boolean,
|
|
* // Instead of generating a uuid an id could be provided
|
|
* id?: string;
|
|
* }} [options={}] Options to enhance the registered ID and enabling unregistering the Element after calling it.
|
|
* @return {*} {(...args) => Promise<any>} The registered Function
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
public registerFunction(func: (...args) => Promise<any>, options: {
|
|
/** Flag to enable unregistering the function after calling. */
|
|
deleteAfterCalling?: boolean,
|
|
/** Instead of generating a uuid an id could be provided */
|
|
id?: string;
|
|
/** Flag to enable / disable sending to registery */
|
|
preventSendingToRegistery?: boolean;
|
|
} = {}): (...args) => Promise<any> {
|
|
|
|
const _this = this;
|
|
// Define / Use the ID of the Function.
|
|
const _id = options.id || generateId();
|
|
|
|
let _func = func;
|
|
|
|
if (options.deleteAfterCalling) {
|
|
_func = async (...args) => {
|
|
// Unregister the Method
|
|
_this.unregistFunction(_id, {
|
|
preventSendingToRegistery: options.preventSendingToRegistery
|
|
});
|
|
// Return the Result of the Original Function.
|
|
return await func(...args);
|
|
}
|
|
}
|
|
|
|
// Define a ID for the Function
|
|
_func['id'] = _id;
|
|
|
|
// Define the callback.
|
|
_func['unregister'] = () => _this.unregistFunction(_id);
|
|
|
|
// Reister the Function
|
|
this._definedFunctions.set(_func['id'], _func);
|
|
|
|
// Register the Callback:
|
|
this._subscribeToFunction(_id, _func);
|
|
|
|
if (!options.preventSendingToRegistery) {
|
|
// Publish the Available Services.
|
|
this._sendAvailableServices();
|
|
|
|
if (this._logger?.isDebugEnabled()) {
|
|
// If there is a Logger:
|
|
this._logger.debug('Dispatcher "'+ this.id + '" registered: "' + _id + '"');
|
|
}
|
|
}
|
|
|
|
// Return the Function.
|
|
return _func;
|
|
}
|
|
|
|
/**
|
|
* Function to unregister a Function from the Dispatcher
|
|
* @param {(((...args) => void) | string | number)} func The Function to unregister
|
|
* @return {*} {boolean} Flag, whether the element was removed (only if found) or not.
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
public unregistFunction(func: ((...args) => void) | string, options: {
|
|
/** Flag to enable / disable sending to registery */
|
|
preventSendingToRegistery?: boolean;
|
|
} = {}): boolean {
|
|
const _id = typeof func === 'string' ? func : func['id'] as string || '0';
|
|
|
|
this._removeRpcSubscription(_id);
|
|
|
|
if (!options.preventSendingToRegistery) {
|
|
// Publish the Available Services.
|
|
this._sendAvailableServices();
|
|
|
|
if (this._logger?.isDebugEnabled()) {
|
|
// If there is a Logger:
|
|
this._logger.debug('Dispatcher "'+ this.id + '" unregistered: "' + _id + '"');
|
|
}
|
|
}
|
|
|
|
return this._definedFunctions.delete(_id);
|
|
}
|
|
|
|
protected _externallySubscribeObservables: Map<string, {
|
|
observable: INopeObservable<IExternalEventMsg>,
|
|
cb: (...arg) => void,
|
|
}>;
|
|
|
|
protected _internallySubscribeObservables: Map<string, Set<INopeObservable<any>>>;
|
|
|
|
/**
|
|
* Creates an Event listener (if required)
|
|
*
|
|
* @protected
|
|
* @param {string} event The Event to Listen.
|
|
* @return {nopeObservable<IExternalEventMsg>} An Listener on the Communication Channel.
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
protected _subscribeToEvent(event: string) {
|
|
const item = this._externallySubscribeObservables.get(event) || {
|
|
observable: this._generateObservable(),
|
|
cb: () => { },
|
|
};
|
|
|
|
if (!item.observable.hasSubscriptions) {
|
|
const _this = this;
|
|
const cb = (data: IExternalEventMsg) => {
|
|
item.observable.setContent(data, _this.id);
|
|
};
|
|
|
|
this._communicator.onEvent(event, cb);
|
|
item.cb = cb;
|
|
}
|
|
|
|
// Set the Items.
|
|
this._externallySubscribeObservables.set(event, item);
|
|
|
|
return item.observable;
|
|
}
|
|
|
|
/**
|
|
* Function to unsubscribe from an event of the channel.
|
|
*
|
|
* @protected
|
|
* @param {string} event
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
protected _unsubscribeEvent(event: string) {
|
|
const item = this._externallySubscribeObservables.get(event) || {
|
|
observable: this._generateObservable(),
|
|
cb: () => { },
|
|
count: 0,
|
|
};
|
|
|
|
if (item && item.observable.hasSubscriptions) {
|
|
this._communicator.offEvent(event, item.cb);
|
|
this._externallySubscribeObservables.delete(event);
|
|
}
|
|
}
|
|
|
|
public registerObservable<T, K, S = T, G = T>(observable: INopeObservable<T, S, G>, options: {
|
|
mode: 'subscribe' | 'publish' | Array<'subscribe' | 'publish'>,
|
|
topic: string | {
|
|
subscribe?: string,
|
|
publish?: string,
|
|
},
|
|
pipe?: {
|
|
pipe?: IPipe<IExternalEventMsg, K>,
|
|
scope?: { [index: string]: any }
|
|
},
|
|
preventSendingToRegistery?: boolean
|
|
}): INopeObservable<T, S, G> {
|
|
|
|
// Reference to itself
|
|
const _this = this;
|
|
|
|
// Extract the Topics, pipe and scope.
|
|
const _subTopic = typeof options.topic === 'string' ? options.topic : options.topic.subscribe || null;
|
|
const _pubTopic = typeof options.topic === 'string' ? options.topic : options.topic.publish || null;
|
|
const _pipe = typeof options.pipe === 'function' ? options.pipe || null : null;
|
|
const _scope = typeof options.pipe === 'object' ? options.pipe.scope || null : null
|
|
|
|
// A Flag, indicating, whether the topic is new or not.
|
|
let newElement = false;
|
|
|
|
// Test if the Item should be subscribe or not.
|
|
if (options.mode == 'subscribe' || (Array.isArray(options.mode) && options.mode.includes('subscribe'))) {
|
|
|
|
newElement = newElement || !this._externallySubscribeObservables.has(_subTopic);
|
|
const _externalSource = this._subscribeToEvent(_subTopic);
|
|
|
|
if (_pipe) {
|
|
const observer = _externalSource.enhancedSubscription((data: IExternalEventMsg) => {
|
|
// Test if the Content, which has been forwared in here inst the own dispathcer.
|
|
if (data.sender != _this.id) {
|
|
observable.setContent(data.data, _this.id, data.timestamp);
|
|
}
|
|
}, {
|
|
scope: _scope,
|
|
pipe: _pipe
|
|
});
|
|
|
|
const dispose = observable.dispose;
|
|
observable.dispose = () => {
|
|
// Kill the Observer;
|
|
observer.unsubscribe();
|
|
|
|
// Unsubscribe the Event
|
|
_this._unsubscribeEvent(_subTopic);
|
|
|
|
// Call the original Dispose function;
|
|
dispose.apply(observable);
|
|
|
|
}
|
|
} else {
|
|
const observer = _externalSource.subscribe({
|
|
next(data: IExternalEventMsg) {
|
|
if (_this.id !== data.sender) {
|
|
observable.setContent(data.data, _this.id, data.timestamp);
|
|
}
|
|
},
|
|
complete() {
|
|
observable.observable.complete();
|
|
},
|
|
error(err) {
|
|
observable.observable.error(err);
|
|
}
|
|
});
|
|
|
|
// Overwrite the Original Dispose Function.
|
|
const dispose = observable.dispose;
|
|
observable.dispose = () => {
|
|
// Kill the Observer;
|
|
observer.unsubscribe();
|
|
|
|
// Unsubscribe the Event
|
|
_this._unsubscribeEvent(_subTopic);
|
|
|
|
// Call the original Dispose function;
|
|
dispose.apply(observable);
|
|
}
|
|
}
|
|
}
|
|
|
|
if (options.mode == 'publish' || (Array.isArray(options.mode) && options.mode.includes('publish'))) {
|
|
const cb = (data, sender?, timestamp?, ...args) => {
|
|
// Only Publish data, if there exists a Subscription.
|
|
if (_this.subscriptionExists(_pubTopic) && _this.id !== sender) {
|
|
// Use the Communicator to emit the Event.
|
|
_this._communicator.emitEvent(_pubTopic, {
|
|
data: data,
|
|
topic: _pubTopic,
|
|
sender: _this.id,
|
|
type: 'event',
|
|
timestamp
|
|
});
|
|
}
|
|
}
|
|
|
|
// Update the Flag.
|
|
newElement = newElement || !this._internallySubscribeObservables.has(_pubTopic);
|
|
|
|
// Register the Internally Subscribed Element.
|
|
const _set = this._internallySubscribeObservables.get(_pubTopic) || new Set();
|
|
_set.add(observable);
|
|
this._internallySubscribeObservables.set(_pubTopic, _set);
|
|
|
|
if (_pipe) {
|
|
const observer = observable.enhancedSubscription(cb, {
|
|
scope: _scope,
|
|
pipe: _pipe
|
|
});
|
|
|
|
// Overwrite the Original Dispose Function.
|
|
const dispose = observable.dispose;
|
|
observable.dispose = () => {
|
|
// Kill the Observer;
|
|
observer.unsubscribe();
|
|
|
|
// Unsubscribe the Event
|
|
_this._unsubscribeEvent(_subTopic);
|
|
|
|
// Unregister the Internally Subscribed Element.
|
|
const _set = _this._internallySubscribeObservables.get(_pubTopic) || new Set();
|
|
_set.delete(observable);
|
|
if (_set.size > 0) {
|
|
_this._internallySubscribeObservables.set(_pubTopic, _set);
|
|
} else {
|
|
_this._internallySubscribeObservables.delete(_pubTopic);
|
|
|
|
// Optionally send an update.
|
|
if (!options.preventSendingToRegistery) {
|
|
// Publish the Available Services.
|
|
_this._sendAvailableTopic();
|
|
}
|
|
}
|
|
|
|
// Call the original Dispose function;
|
|
dispose.apply(observable);
|
|
}
|
|
} else {
|
|
const observer = observable.subscribe(cb);
|
|
|
|
// Overwrite the Original Dispose Function.
|
|
const dispose = observable.dispose;
|
|
observable.dispose = () => {
|
|
// Kill the Observer;
|
|
observer.unsubscribe();
|
|
|
|
// Unsubscribe the Event
|
|
_this._unsubscribeEvent(_subTopic);
|
|
|
|
// Unregister the Internally Subscribed Element.
|
|
const _set = _this._internallySubscribeObservables.get(_pubTopic) || new Set();
|
|
_set.delete(observable);
|
|
if (_set.size > 0) {
|
|
_this._internallySubscribeObservables.set(_pubTopic, _set);
|
|
} else {
|
|
_this._internallySubscribeObservables.delete(_pubTopic);
|
|
|
|
// Optionally send an update.
|
|
if (!options.preventSendingToRegistery) {
|
|
// Publish the Available Services.
|
|
_this._sendAvailableTopic();
|
|
}
|
|
}
|
|
|
|
// Call the original Dispose function;
|
|
dispose.apply(observable);
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!options.preventSendingToRegistery && newElement) {
|
|
// Publish the Available Services.
|
|
this._sendAvailableTopic();
|
|
}
|
|
|
|
// Return the Function.
|
|
return observable;
|
|
}
|
|
|
|
protected _removeRpcSubscription(_id: string) {
|
|
// Try to unregister the Callback from the communcator:
|
|
if (this._communicatorCallbacks.has(_id)) {
|
|
|
|
const _callbacks = this._communicatorCallbacks.get(_id);
|
|
|
|
switch (_callbacks.type) {
|
|
case 'request':
|
|
// Unregister the RPC-Request-Listener
|
|
this._communicator.offRpcRequest(_callbacks.registeredId, _callbacks.cb);
|
|
break;
|
|
case 'response':
|
|
// Unregister the RPC-Response-Listener
|
|
this._communicator.offRpcResponse(_callbacks.registeredId, _callbacks.cb);
|
|
break;
|
|
}
|
|
|
|
|
|
// Remove the Callback
|
|
this._communicatorCallbacks.delete(_id);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Function used to update the Available Services.
|
|
*
|
|
* @protected
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
protected _sendAvailableServices() {
|
|
|
|
// Define the Message
|
|
const message: IAvailableServicesMsg = {
|
|
dispatcher: this.id,
|
|
services: Array.from(this._definedFunctions.keys())
|
|
}
|
|
|
|
// Send the Message.
|
|
this._communicator.emitNewServicesAvailable(message);
|
|
}
|
|
|
|
/**
|
|
* Function to emit the available topics.
|
|
*
|
|
* @protected
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
protected _sendAvailableTopic() {
|
|
|
|
// Define the Message
|
|
const message: IAvailableTopicsMsg = {
|
|
dispatcher: this.id,
|
|
published: Array.from(this._internallySubscribeObservables.keys()),
|
|
subscribed: Array.from(this._externallySubscribeObservables.keys())
|
|
}
|
|
|
|
// Send the Message.
|
|
this._communicator.emitNewTopicsAvailable(message);
|
|
}
|
|
|
|
|
|
protected _sendAvailableGenerators() {
|
|
|
|
// Define the Message
|
|
const message: IAvailableInstanceGenerators = {
|
|
dispatcher: this.id,
|
|
generators: Array.from(this._internallySubscribeObservables.keys()),
|
|
}
|
|
|
|
// Send the Message.
|
|
this._communicator.emitNewInstanceGeneratorsAvailable(message);
|
|
}
|
|
|
|
/**
|
|
* Function which is used to perform a call on the remote.
|
|
*
|
|
* @template T
|
|
* @param {string} functionName The Name / ID of the Function
|
|
* @param {any[]} params The provided Parameters.
|
|
* @param {({
|
|
* deletableCallbacks: Array<number>;
|
|
* })} [options={
|
|
* deletableCallbacks: [],
|
|
* paramsHasNoCallback: false,
|
|
* preventErrorTest: false
|
|
* }] You could additiona Optionf for the callback.
|
|
* @return {*} {Promise<T>} The result of the Operation
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
public performCall<T>(functionName: string, params: any[], options: Partial<ICallOptions> = {}): Promise<T> {
|
|
// Get a Call Id
|
|
const _taskId = generateId();
|
|
const _registeredIdx: Array<string> = [];
|
|
const _this = this;
|
|
|
|
const _options = Object.assign({
|
|
deletableCallbacks: [],
|
|
paramsHasNoCallback: false,
|
|
dynamicCallback: false,
|
|
resultSink: (this._resultSharing === 'generic' ? 'response' : this._getServiceName(functionName, 'response')),
|
|
}, options) as ICallOptions;
|
|
|
|
this._subscribeToResult(functionName, _options.dynamicCallback);
|
|
|
|
const clear = () => {
|
|
// Delete all Callbacks.
|
|
_registeredIdx.map(id => _this.unregistFunction(id));
|
|
|
|
// Remove the task:
|
|
if (_this._runningTasks.has(_taskId))
|
|
_this._runningTasks.delete(_taskId);
|
|
|
|
if (_this._logger && _this._logger.isDebugEnabled()) {
|
|
_this._logger.debug('Clearing Callbacks from ' + _taskId);
|
|
}
|
|
}
|
|
|
|
if (_this._logger && _this._logger.isDebugEnabled()) {
|
|
_this._logger.debug('Dispatcher "'+ this.id + '" requesting externally Function "' + functionName + '" with task: "' + _taskId + '"');
|
|
}
|
|
|
|
// Define a Callback-Function, which will expect the Task.
|
|
return new Promise<T>(async (resolve, reject) => {
|
|
|
|
try {
|
|
// Register the Handlers,
|
|
_this._runningTasks.set(_taskId, {
|
|
resolve,
|
|
reject,
|
|
clear
|
|
});
|
|
|
|
// Define a Task-Request
|
|
const taskRequest: IRequestTaskMsg = {
|
|
functionId: functionName,
|
|
params: [],
|
|
callbacks: [],
|
|
taskId: _taskId,
|
|
type: 'requestOfTask',
|
|
resultSink: _options.resultSink
|
|
}
|
|
|
|
// Test if there is no Callback integrated
|
|
if (!options.paramsHasNoCallback) {
|
|
// If so, the parameters has to be detailled:
|
|
|
|
// 1. Reset the Params list:
|
|
taskRequest.params = [];
|
|
|
|
// 2. Iterate over all Parameters and
|
|
// Determin Callbacks. Based on the Parameter-
|
|
// Type assign it either to packet.params (
|
|
// for parsable Parameters) and packet.callbacks
|
|
// (for callback Parameters)
|
|
for (const [idx, contentOfParameter] of params.entries()) {
|
|
|
|
// Test if the parameter is a Function
|
|
if (typeof contentOfParameter !== "function") {
|
|
taskRequest.params.push({
|
|
idx,
|
|
data: contentOfParameter
|
|
});
|
|
} else {
|
|
// The Parameter is a Callback => store a
|
|
// Description of the Callback and register
|
|
// the callback inside of the Dispatcher
|
|
|
|
const deleteAfterCalling = _options.deletableCallbacks.includes(idx);
|
|
const _func = _this.registerFunction(contentOfParameter, {
|
|
deleteAfterCalling,
|
|
preventSendingToRegistery: true,
|
|
});
|
|
|
|
_registeredIdx.push(_func['id']);
|
|
|
|
// Register the Callback
|
|
taskRequest.callbacks.push({
|
|
functionId: _func['id'],
|
|
idx,
|
|
deleteAfterCalling,
|
|
dynamicCallback: true,
|
|
deletableCallbacks: [],
|
|
resultSink: _this._resultSharing === 'generic' ? 'response' : _this._getServiceName(_func['id'], 'response')
|
|
});
|
|
}
|
|
}
|
|
} else {
|
|
for (const [idx, contentOfParameter] of params.entries()) {
|
|
taskRequest.params.push({
|
|
idx,
|
|
data: contentOfParameter
|
|
});
|
|
}
|
|
}
|
|
|
|
if (!_options.dynamicCallback && !_this.serviceExists(functionName)) {
|
|
|
|
// Create an Error:
|
|
const error = new Error('No Service Provider known for "' + functionName + '"');
|
|
|
|
if (_this._logger) {
|
|
_this._logger.error('No Service Provider known for "' + functionName + '"');
|
|
_this._logger.error(error)
|
|
}
|
|
|
|
throw error;
|
|
}
|
|
|
|
// Send the Message to the specific element:
|
|
if (_this._subscriptionMode === 'individual') {
|
|
await _this._communicator.emitRpcRequest(_this._getServiceName(taskRequest.functionId, 'request'), taskRequest);
|
|
|
|
if (_this._logger && _this._logger.isDebugEnabled()) {
|
|
_this._logger.debug('Dispatcher "'+ this.id + '" putting task "' +_taskId + '" on: "' + _this._getServiceName(taskRequest.functionId, 'request') + '"');
|
|
}
|
|
|
|
} else {
|
|
await _this._communicator.emitRpcRequest('request', taskRequest);
|
|
|
|
if (_this._logger && _this._logger.isDebugEnabled()) {
|
|
_this._logger.debug('Dispatcher "'+ this.id + '" putting task "' +_taskId + '" on: "request"');
|
|
}
|
|
}
|
|
} catch (e) {
|
|
// Clear all Elements of the Function:
|
|
clear();
|
|
|
|
// Throw the error.
|
|
reject(e);
|
|
}
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Function to clear all pending tasks
|
|
*
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
public clearTasks(): void {
|
|
if (this._runningTasks)
|
|
this._runningTasks.clear();
|
|
else
|
|
this._runningTasks = new Map();
|
|
}
|
|
|
|
/**
|
|
* Function to unregister all Functions of the Dispatcher.
|
|
*
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
public unregisterAll(): void {
|
|
if (this._definedFunctions) {
|
|
for (const id of this._definedFunctions.keys()) {
|
|
this.unregistFunction(id)
|
|
}
|
|
this._definedFunctions.clear();
|
|
} else {
|
|
this._definedFunctions = new Map<string, (...args) => Promise<any>>();
|
|
}
|
|
|
|
// Reset the Callbacks.
|
|
this._communicatorCallbacks = new Map();
|
|
}
|
|
|
|
/**
|
|
* Function to reset the Dispatcher.
|
|
*
|
|
* @memberof nopeDispatcher
|
|
*/
|
|
public reset(): void {
|
|
this._remotlyCalledFunctions = new Set();
|
|
|
|
this._mappingOfRemoteDispatchersAndFunctions = new Map();
|
|
this._mappingOfRemoteDispatchersAndTopics = new Map();
|
|
|
|
this._internalGenerators = new Map();
|
|
this._externalGenerators = new Map();
|
|
|
|
// If Instances Exists => Delete them.
|
|
if (this._instances) {
|
|
const _this = this;
|
|
|
|
// Dispose all Instances.
|
|
for (const [name, instance] of this._instances.entries()) {
|
|
instance.dispose().catch(e => {
|
|
if (_this._logger) {
|
|
_this._logger.error('Failed Removing Instance "' + name + '"');
|
|
_this._logger.error(e)
|
|
}
|
|
});
|
|
}
|
|
}
|
|
this._instances = new Map();
|
|
|
|
this._externalProvidedServices = new Set();
|
|
this._externalPublished = new Set();
|
|
this._externalSubscribed = new Set();
|
|
|
|
this._internallySubscribeObservables = new Map();
|
|
this._externallySubscribeObservables = new Map();
|
|
this.clearTasks();
|
|
this.unregisterAll();
|
|
}
|
|
} |