nope/lib/dispatcher/nopeDispatcher.ts
2020-11-16 13:06:45 +01:00

1773 lines
55 KiB
TypeScript

/**
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2020-10-12 18:52:00
* @modify date 2020-11-15 19:43:09
* @desc [description]
*/
import { inject, injectable } from 'inversify';
import { Logger } from "winston";
import { generateId } from '../helpers/idMethods';
import { NopeGenericModule } from '../module/GenericModule';
import { NopePromise } from '../promise/nopePromise';
import { DISPATCHER_OPTIONS, OBSERVABLE_FACTORY } from '../symbols/identifiers';
import { IAvailableInstanceGeneratorsMsg, IAvailableServicesMsg, IAvailableTopicsMsg, ICallOptions, ICommunicationInterface, IExternalEventMsg, IInstanceCreationMsg, IInstanceDescriptionMsg, IInstanceRemovalMsg, INopeDispatcherOptions, IRequestTaskMsg, IResponseTaskMsg, ITaskCancelationMsg } from '../types/nope/nopeCommunication.interface';
import { IGenerateRemoteInstanceCallback, IGenerateRemoteInstanceForOtherDispatcherCallback, INopeDispatcher } from '../types/nope/nopeDispatcher.interface';
import { INopeModule } from '../types/nope/nopeModule.interface';
import { INopeObservable, INopeSubscriptionOptions, IObservableCallback, IPipe } from '../types/nope/nopeObservable.interface';
import { INopePromise } from '../types/nope/nopePromise.interface';
/**
* A Dispatcher to perform a function on a Remote
* Dispatcher. Therefore a Task is created and forwarded
* to the remote.
*
* @export
* @class nopeDispatcher
*/
@injectable()
export class nopeDispatcher implements INopeDispatcher {
public readonly id: string;
protected _logger: Logger;
/**
* Internal Element to store the registered Functions
*
* @protected
* @memberof nopeDispatcher
*/
protected _definedFunctions: Map<string, (...args) => Promise<any>>;
protected _remotlyCalledFunctions: Set<string>;
protected _communicatorCallbacks: Map<string, {
registeredId: string,
type: 'request' | 'response',
cb: (data) => any
}>;
protected _communicator: ICommunicationInterface;
protected _mappingOfRemoteDispatchersAndServices: Map<string, IAvailableServicesMsg>;
protected _externalProvidedServices: Set<string>;
protected _mappingOfRemoteDispatchersAndGenerators: Map<string, IAvailableInstanceGeneratorsMsg>;
protected _externalProvidedGenerators: Set<string>;
public methodInterfaceWithOptions: { [index: string]: <T>(options: ICallOptions, ...args) => INopePromise<T> }
public methodInterface: { [index: string]: <T>(...args) => INopePromise<T> }
protected _mappingOfRemoteDispatchersAndTopics: Map<string, IAvailableTopicsMsg>;
protected _externalSubscribed: Set<string>;
protected _externalPublished: Set<string>;
public readonly subscribedEvents: INopeObservable<string[]>;
public readonly publishedEvents: INopeObservable<string[]>;
public readonly canceledTask: INopeObservable<ITaskCancelationMsg>;
public readonly ready: INopeObservable<boolean>;
/**
* Internal Element to store the running tasks.
*
* @protected
* @memberof nopeDispatcher
*/
protected _runningInternalRequestedTasks: Map<string, {
resolve: (value: any) => void;
reject: (error: any) => void;
clear: () => void;
serviceName: string;
timeout?: any;
}>;
protected _runningExternalRequestedTasks: Set<string>
readonly _subscriptionMode: 'individual' | 'generic';
readonly _publishingMode: 'individual' | 'generic';
/**
* Creates an instance of nopeDispatcher.
* @param {nopeRpcDispatcherOptions} options The Options, used by the Dispatcher.
* @param {() => INopeObservable<IExternalEventMsg>} _generateObservable A Helper, to generate Observables.
* @memberof nopeDispatcher
*/
constructor(
@inject(DISPATCHER_OPTIONS) public options: INopeDispatcherOptions,
@inject(OBSERVABLE_FACTORY) protected _generateObservable: <T>() => INopeObservable<T>,
) {
this._communicator = options.communicator;
if (options.logger) {
this._logger = options.logger;
}
this._subscriptionMode = this._communicator.subscriptionMode || 'generic';
this._publishingMode = this._communicator.resultSharing || 'generic';
this.id = generateId();
/**
* Define A Proxy for accessing methods easier.
*/
const _this = this;
const _handlerWithOptions = {
get(target, name) {
return (options: ICallOptions, ...args) => {
return _this.performCall(name, args, options);
}
}
}
const _handlerWithoutOptions = {
get(target, name) {
return (...args) => {
return _this.performCall(name, args);
}
}
}
this.methodInterfaceWithOptions = new Proxy({}, _handlerWithOptions);
this.methodInterface = new Proxy({}, _handlerWithoutOptions);
this.subscribedEvents = this._generateObservable();
this.publishedEvents = this._generateObservable();
this.canceledTask = this._generateObservable();
this.ready = this._generateObservable();
this.ready.setContent(false);
if (this._logger) {
this._logger.info('Dispatcher online. -> Reseting and Initializing');
}
this.reset();
this._init().catch(error => {
if (_this._logger){
_this._logger.error('Failed to intialize the Dispatcher');
}
// Additionally log the error.
console.error(error);
});
}
provideInstanceGeneratorForExternalDispatchers<I extends INopeModule>(identifier: string, cb: IGenerateRemoteInstanceForOtherDispatcherCallback<I>) {
const _this = this;
if (this._logger?.isSillyEnabled()) {
this._logger.silly('Adding instance generator for "' + identifier + '" to external Generators. Other Elements can now create instances of this type.');
}
const _cb = this.registerFunction(async (data: IInstanceCreationMsg) => {
// Check if an instance exists or not.
// if not => create an instance an store it.
if (!_this._instances.has(data.identifier)) {
// Create an Instance
const _instance = await cb(_this, data.identifier);
// Make shure the Data is expressed as Array.
if (!Array.isArray(data.params)){
data.params = [data.params];
}
// Initialize the instance with the parameters.
await _instance.init(... data.params);
// A Function is registered, taking care of removing
// an instances, if it isnt needed any more.
_this.registerFunction(async (_data: IInstanceRemovalMsg) => {
if (_this._instances.get(data.identifier)?.usedBy) {
const idx = _this._instances.get(data.identifier).usedBy.indexOf(_data.dispatcherID);
if (idx > -1) {
_this._instances.get(data.identifier).usedBy.splice(idx, 1);
}
if (_this._instances.get(data.identifier).usedBy.length == 0) {
// Remove the Instance.
await _instance.dispose();
// Delete the Entry.
_this._instances.delete(data.identifier);
// Remove the Function itself
_this.unregisterFunction('instance_dispose_' + data.identifier)
}
}
}, {
deleteAfterCalling: false,
id: 'instance_dispose_' + data.identifier
});
// Store the Instance.
_this._instances.set(data.identifier, {
instance: _instance,
usedBy: [data.dispatcherID]
});
} else {
// If an Element exists => Add the Element.
_this._instances.get(data.identifier).usedBy.push(data.dispatcherID);
}
// Define the Response.
const response: IInstanceDescriptionMsg = {
description: _this._instances.get(data.identifier).instance.toDescription(),
type: data.type,
}
// Send the Response
return response
}, {
id: 'generateInstance_' + identifier,
});
// Store the Generator.
this._externalGenerators.set(identifier, _cb);
// Send an update of the available Generators
this._sendAvailableGenerators();
}
unprovideInstanceGeneratorForExternalDispatchers(identifier: string) {
if (this._externalGenerators.has(identifier)) {
if (this._logger?.isSillyEnabled()) {
this._logger.silly('Removing instance generator for "' + identifier + '" from external Generators. Other Elements cant create instances of this type anymore.');
}
this.unregisterFunction(this._externalGenerators.get(identifier));
this._externalGenerators.delete(identifier);
}
}
registerInternalInstanceGenerator<I extends INopeModule>(identifier: string, cb: IGenerateRemoteInstanceCallback<I>) {
if (this._logger?.isSillyEnabled()) {
this._logger.silly('Adding instance generator for "' + identifier + '" as internal Generator. This Generator wont be used externally.');
}
this._internalGenerators.set(identifier, cb);
}
unregisterInternalInstanceGenerator(identifier: string) {
if (this._logger?.isSillyEnabled()) {
this._logger.silly('Rmoving instance generator for "' + identifier + '" from internal Generator. The sytem cant create elements of this type any more.');
}
this._internalGenerators.delete(identifier);
}
protected _internalGenerators: Map<string, IGenerateRemoteInstanceCallback<INopeModule>>;
protected _externalGenerators: Map<string, IGenerateRemoteInstanceCallback<INopeModule>>;
protected _instances: Map<string, {
instance: INopeModule,
usedBy: Array<string>,
}>;
public async generateInstance<I extends INopeModule>(description: Partial<IInstanceCreationMsg>): Promise<I> {
const _defDescription: IInstanceCreationMsg = {
dispatcherID: this.id,
identifier: 'error',
params: [],
type: 'unkown'
}
const _description = Object.assign(_defDescription, description, { dispatcherID: this.id }) as IInstanceCreationMsg;
if (_defDescription.type === 'unkown' || _description.identifier === 'error') {
throw Error('Please Provide at least a "type" and "identifier" in the paremeters');
}
if (this._logger?.isSillyEnabled()) {
this._logger.silly('Requesting an Instance of type: "' + _defDescription.type + '" with the identifier: "' + _defDescription.identifier + '"');
}
if (this._instances.has(_description.identifier)) {
if (this._logger?.isSillyEnabled()) {
this._logger.silly('Already created instance with the identifiert: "' + _defDescription.identifier + '" => returning this instance');
}
// Add the Dispatcher to the Element:
this._instances.get(_description.identifier).usedBy.push(_description.dispatcherID);
// Return the Instance.
return this._instances.get(_description.identifier).instance as I;
}
try {
let _type = _description.type;
if (!this._internalGenerators.has(_type)) {
_type = '*';
}
if (!this.generatorExists(_description.type)) {
throw Error("Generator isnt present in the network!");
}
if (this._internalGenerators.has(_type)) {
if (this._logger?.isSillyEnabled()) {
this._logger.silly('No instance with the identifiert: "' + _defDescription.identifier + '" found, but an internal generator is available. Using the internal one for creating the instance and requesting the "real" instance externally');
}
const result = await this.performCall<IInstanceDescriptionMsg>('generateInstance_' + _description.type, [_description], {
paramsHasNoCallback: true,
});
if (this._logger?.isSillyEnabled()) {
this._logger.silly('Received a description for the instance');
}
// Create the Instance
const instance = await this._internalGenerators.get(_type)(this, result.description) as I;
// Store the Instances.
this._instances.set(_description.identifier, {
instance,
usedBy: [_description.dispatcherID]
});
return instance;
}
throw Error('No generator Available!')
} catch (e) {
if (this._logger) {
this._logger.error('During creating an Instance, the following error Occurd');
this._logger.error(e)
}
throw e;
}
}
public async deleteInstance<I extends INopeModule>(instance: I | string): Promise<boolean> {
// Block to find the instance.
// Based on the property (string or instance)
// the corresponding instance object has to be select.
let _instance: { instance: INopeModule; usedBy: Array<string>; };
if (typeof instance === 'string') {
_instance = this._instances.get(instance);
} else {
for (const data of this._instances.values()) {
if (instance == data.instance) {
_instance = data;
break;
}
}
}
// if the instance has been found => delete the instance.
if (_instance) {
_instance.usedBy.pop();
if (_instance.usedBy.length === 0) {
const params: IInstanceRemovalMsg = {
dispatcherID: this.id,
identifier: _instance.instance.identifier
}
// Call the corresponding Dispose Function for the "real" instance
// All other elements are just accessors.
await this.performCall('instance_dispose_' + _instance.instance.identifier, [params]);
// Delete the Identifier
this._instances.delete(_instance.instance.identifier);
// Dispose the Handler;
await _instance.instance.dispose();
}
return true;
}
return false;
}
/**
* Internal Method to handle some requests.
*
* @protected
* @param {IRequestTaskMsg} data The provided data of the request
* @param {*} [_function=this._definedFunctions.get(data.functionId)] The Function can be provided
* @return {Promise<void>}
* @memberof nopeDispatcher
*/
protected async _handleExternalRequest(data: IRequestTaskMsg, _function?: (...args) => Promise<any>): Promise<void> {
let unsubscribe: () => void = () => { };
try {
// Try to get the function if not provided:
if (typeof _function !== 'function') {
_function = this._definedFunctions.get(data.functionId);
}
if (this._logger?.isDebugEnabled()) {
// If there is a Logger:
this._logger.debug('Dispatcher "' + this.id + '" received request: "' + data.functionId + '" -> task: "' + data.taskId + '"');
}
const _this = this;
if (typeof _function === 'function') {
const cbs: Array<(reason) => void> = [];
const observer = _this.canceledTask.subscribe((cancelEvent) => {
if (cancelEvent.taskId == data.taskId) {
// Call Every Callback.
cbs.map(cb => cb(cancelEvent.reason));
// Although we are allowed to Cancel the Subscription
observer.unsubscribe();
}
});
// Only if the Function is present extract the arguments etc.
const args = [];
// First extract the basic arguments
data.params.map(item => args[item.idx] = item.data);
// Add the Callbacks. Therefore create a function which will
// trigger the remote.
data.callbacks.map(options => args[options.idx] = async (..._args) => {
// And Create the Task and its Promise.
const servicePromise = _this.performCall<any>(options.functionId, _args, options);
const cancelCallback = (reason) => {
// The Main Task has been canceled =>
// We are allowed to canel the Subtask as well.
servicePromise.cancel(reason);
};
cbs.push(cancelCallback);
// Await the Result. If an Task is canceled => The Error is Thrown.
const result = await servicePromise;
// Remove the Index
cbs.splice(cbs.indexOf(cancelCallback), 1);
return result
});
// Perform the Task it self.
const _resultPromise = _function(...args);
if (typeof (_resultPromise as INopePromise<any>).cancel === 'function') {
// Push the Callback to the Result.
cbs.push((reason) => (_resultPromise as INopePromise<any>).cancel(reason));
}
// Wait for the Result to finish.
const _result = await _resultPromise;
// Define the Result message
const result: IResponseTaskMsg = {
result: typeof (_result) !== 'undefined' ? _result : null,
taskId: data.taskId,
type: 'response'
}
// Use the communicator to publish the event.
await this._communicator.emitRpcResult(data.resultSink, result);
}
} catch (error) {
if (this._logger?.isErrorEnabled()) {
// If there is a Logger:
this._logger.error('Dispatcher "' + this.id + '" failed with request: "' + data.taskId + '"');
console.error(error);
this._logger.error(error);
}
// An Error occourd => Forward the Error.
const result: IResponseTaskMsg = {
error: {
error,
msg: error.toString()
},
taskId: data.taskId,
type: 'response'
}
// Send the Error via the communicator to the remote.
await this._communicator.emitRpcResult(data.resultSink, result);
}
// Unsubscribe the Observable.
unsubscribe();
}
/**
* Internal Function to handle responses. In Generale,
* the dispatcher checks if there is an open task with
* the provided id. If so => finish the promise.
*
* @protected
* @param {IResponseTaskMsg} data The Data provided to handle the Response.
* @return {boolean} Returns a boolean, indicating whether a corresponding task was found or not.
* @memberof nopeDispatcher
*/
protected _handleExternalResponse(data: IResponseTaskMsg): boolean {
try {
// Extract the Task
const task = this._runningInternalRequestedTasks.get(data.taskId);
// Delete the Task:
this._runningInternalRequestedTasks.delete(data.taskId);
// Based on the Result of the Remote =>
if (task && data.error) {
if (this._logger?.isErrorEnabled()){
this._logger.error('Failed with task ' + data.taskId);
this._logger.error('Reason: ' + data.error.msg);
console.error(data.error.error);
}
task.reject(data.error);
// Clearout the Timer
if (task.timeout) {
clearTimeout(task.timeout)
}
return true;
}
if (task) {
task.resolve(data.result);
// Clearout the Timer
if (task.timeout) {
clearTimeout(task.timeout)
}
return true;
}
} catch (e) {
}
return false;
}
/**
* Internal Function, used to initialize the Dispatcher.
* It subscribes to the "Messages" of the communicator.
*
* @protected
* @memberof nopeDispatcher
*/
protected async _init(): Promise<void> {
const _this = this;
// Wait until the Element is connected.
await this._communicator.connected.waitFor(value => value,{
testCurrent: true,
});
this._communicator.connected.subscribe(connected => {
// Handle an unconnect.
});
this.registerInternalInstanceGenerator('*', async (dispather, description) => {
const mod = new NopeGenericModule(dispather, _this._generateObservable);
await mod.fromDescription(description, 'overwrite');
return mod;
});
// Based on the Mode of the Subscription =>
// either create indivdual topics for the methods
// or use the generice function.
switch (this._subscriptionMode) {
case 'individual':
// Iterate over the Defined Functions.
for (const [id, cb] of this._definedFunctions.entries()) {
// Subscribe the Function
this._subscribeToService(id, cb);
}
break;
case 'generic':
// Add a generic Subscription for callbacks:
this._communicator.onRpcRequest('request', (data: IRequestTaskMsg) => {
if (data.type === 'requestOfTask') {
_this._handleExternalRequest(data);
}
});
break;
}
// If the Responses are shared using the Generic Methods => subscribe to Responses
// On the generic "response" channel.
if (this._publishingMode === 'generic') {
this._communicator.onRpcResult('response', (data) => {
if (data.type === 'response') {
_this._handleExternalResponse(data);
}
});
}
// Subscribe to the availableServices of Remotes.
// If there is a new Service => udpate the External Services
this._communicator.onNewServicesAvailable(data => {
try {
if (data.dispatcher !== _this.id) {
_this._mappingOfRemoteDispatchersAndServices.set(data.dispatcher, data);
_this._updateExternalServices();
if (this._logger?.isSillyEnabled()) {
// If there is a Logger:
this._logger.silly('received new services');
}
}
} catch (e) {
}
});
// Subscribe to new available Topics.
this._communicator.onNewTopicsAvailable(data => {
try {
if (data.dispatcher !== _this.id) {
_this._mappingOfRemoteDispatchersAndTopics.set(data.dispatcher, data);
_this._updateExternalEvents();
if (this._logger?.isSillyEnabled()) {
// If there is a Logger:
this._logger.silly('received new events');
}
}
} catch (e) {
}
});
this._communicator.onNewInstanceGeneratorsAvailable(data => {
try {
_this._mappingOfRemoteDispatchersAndGenerators.set(data.dispatcher, data);
_this._updateExternalGenerators();
if (this._logger?.isSillyEnabled()) {
// If there is a Logger:
this._logger.silly('received new generators');
}
} catch (e) {
}
})
this._communicator.onBonjour((dispatcher: string) => {
if (_this.id !== dispatcher) {
_this._sendAvailableServices();
_this._sendAvailableProperties();
_this._sendAvailableGenerators();
if (_this._logger?.isDebugEnabled()) {
// If there is a Logger:
_this._logger.debug('Remote Dispatcher "' + dispatcher + '" went online');
}
}
});
this._communicator.onAurevoir((dispatcher: string) => {
// Delete the Generators of the Instances.
_this._mappingOfRemoteDispatchersAndGenerators.delete(dispatcher);
_this._mappingOfRemoteDispatchersAndServices.delete(dispatcher);
_this._mappingOfRemoteDispatchersAndTopics.delete(dispatcher);
_this._updateExternalServices();
_this._updateExternalGenerators();
_this._updateExternalEvents();
if (this._logger?.isDebugEnabled()) {
// If there is a Logger:
this._logger.debug('a dispatcher went offline');
}
});
this._communicator.onTaskCancelation((event) => {
if (event.dispatcher !== _this.id) {
_this.canceledTask.setContent(event);
}
})
if (this._logger) {
this._logger.info('initialized');
}
this._communicator.emitBonjour(this.id);
this.ready.setContent(true);
}
/**
* Function to cancel an indivual Task.
*
* @param {string} taskId The Id of the Task. Which should be canceled.
* @param {Error} reason The Reason, why the Task should be canceled (In general shoudl be something meaning full)
* @return {*} Flag, that indicates, whether cancelation was sucessfull or not.
* @memberof nopeDispatcher
*/
public cancelTask(taskId: string, reason: Error) {
if (this._runningInternalRequestedTasks.has(taskId)) {
const task = this._runningInternalRequestedTasks.get(taskId);
// Delete the task
this._runningInternalRequestedTasks.delete(taskId);
// Propagate the Cancellation (internally):
task.reject(reason);
// Propagate the Cancellation externally.
// Therefore use the desired Mode.
switch (this._publishingMode) {
default:
case 'individual':
case 'generic':
this._communicator.emitTaskCancelation({
dispatcher: this.id,
reason,
taskId
});
break;
// case 'individua2l':
// this._communicator
// break;
}
// Indicate a successful cancelation.
return true;
}
// Task hasnt been found => Cancel the Task.
return false;
}
/**
* Internal Helper Function, used to close all tasks with a specific service.
*
* @protected
* @param {string} serviceName The Name of the Service.
* @param {Error} reason The provided Reason, why cancelation is reuqired.
* @memberof nopeDispatcher
*/
public cancelRunningTasksOfService(serviceName: string, reason: Error) {
// Provide a List containing all Tasks, that has to be canceled
const _tasksToCancel: {
reject: (error: Error) => void,
id: string
}[] = [];
// Filter all Tasks that shoud be canceled.
for (const [id, task] of this._runningInternalRequestedTasks.entries()) {
// Therefore compare the reuqired Service by the Task
if (task.serviceName === serviceName) {
// if the service matches, put it to our list.
_tasksToCancel.push({
id,
reject: task.reject
});
}
}
if (_tasksToCancel.length > 0) {
// First remove all Tasks.
// Then cancel them to avoid side effects
for (const item of _tasksToCancel) {
this._runningInternalRequestedTasks.delete(item.id);
}
// Now Reject all Tasks.
for (const item of _tasksToCancel) {
item.reject(reason);
}
}
}
public dispose() {
for (const task of Array.from(this._runningInternalRequestedTasks.keys())) {
this.cancelTask(task, new Error('Client going offline'))
}
// Emits the aurevoir Message.
this._communicator.emitAurevoir(this.id);
}
/**
* Function to update the used Services.
*
* @protected
* @memberof serviceRegistry
*/
protected _updateExternalServices() {
const _this = this;
// Store the Availabe Services before the Update.
const _servicesBeforeUpdate = new Set(this._externalProvidedServices);
// Clear the Services
this._externalProvidedServices.clear();
for (const dispatcherInfo of this._mappingOfRemoteDispatchersAndServices.values()) {
dispatcherInfo.services.map(service => _this._externalProvidedServices.add(service));
}
// Create a Comparing loop.
// The Loop checks if the element doesnt exists in the known services
// before the update.
const added = new Set<string>();
for (const service of this._externalProvidedServices) {
if (!_servicesBeforeUpdate.has(service)) {
added.add(service);
} else {
// Delete the element, because it is available.
_servicesBeforeUpdate.delete(service);
}
}
// If there are unavailable tasks => cancel their tasks.
if (_servicesBeforeUpdate.size > 0) {
for (const unavailable of _servicesBeforeUpdate) {
// Cancel the Tasks
this.cancelRunningTasksOfService(unavailable, new Error('Service unavailable!'));
}
}
return {
// Contains the "new" services
addedServices: Array.from(added),
// contains the "unavailable services"
removedServices: Array.from(_servicesBeforeUpdate)
}
}
/**
* Function to update the used Services.
*
* @protected
* @memberof serviceRegistry
*/
protected _updateExternalGenerators() {
const _this = this;
// Clear the Services
this._externalProvidedGenerators.clear();
for (const generators of this._mappingOfRemoteDispatchersAndGenerators.values()) {
generators.generators.map(gen => _this._externalProvidedGenerators.add(gen));
}
}
/**
* Internal Function to update the Listing of external Topcis.
* This Function creates a list containing all subscriptions
* and publishers which are external.
*
* @protected
* @memberof nopeDispatcher
*/
protected _updateExternalEvents() {
const _this = this;
// Clear the Services
this._externalPublished.clear();
this._externalSubscribed.clear();
for (const dispatcherInfo of this._mappingOfRemoteDispatchersAndTopics.values()) {
dispatcherInfo.published.map(topic => _this._externalPublished.add(topic));
dispatcherInfo.subscribed.map(topic => _this._externalSubscribed.add(topic));
}
// Update the Elements.
this.subscribedEvents.setContent(Array.from(this._externalSubscribed));
this.publishedEvents.setContent(Array.from(this._externalPublished));
}
/**
* Function to test if a specific Service exists.
*
* @param {string} id The Id of the Serivce
* @return {boolean} The result of the Test. True if either local or remotly a service is known.
* @memberof nopeDispatcher
*/
public serviceExists(id: string) {
return this._definedFunctions.has(id) || this._externalProvidedServices.has(id);
}
/**
* Function to test if an subscription exists,
* for the given topic.
*
* @param {string} topic The topic to test.
* @return {boolean} The result of the test.
* @memberof nopeDispatcher
*/
public subscriptionExists(topic: string) {
return this._externalSubscribed.has(topic);
}
/**
* Function to test if a generator exists for
* the given type idenfitier
*
* @param {string} typeIdentifier Identifier of the type.
* @return {boolean} The result of the test.
* @memberof nopeDispatcher
*/
public generatorExists(typeIdentifier: string) {
return this._externalProvidedGenerators.has(typeIdentifier);
}
/**
* Function to adapt a Request name.
* Only used internally
*
* @protected
* @param {string} id the original ID
* @return {string} the adapted ID.
* @memberof nopeDispatcher
*/
protected _getServiceName(id: string, type: 'request' | 'response'): string {
return id.startsWith(`${type}/`) ? id : `${type}/${id}`;
}
/**
* Internal Helper Function to subscribe to a Function element.
*
* @protected
* @param {string} id the Id of the function
* @param {(...args) => Promise<any>} _cb The Callback of the Function
* @return {void} the adapted ID.
* @memberof nopeDispatcher
*/
protected _subscribeToService(id: string, _cb: (...args) => Promise<any>): void {
const _req = this._getServiceName(id, 'request');
if (
this._subscriptionMode === 'individual' &&
!this._communicatorCallbacks.has(_req)
) {
const _this = this;
// Define a Function.
const cb = (data: IRequestTaskMsg) => {
if (data.type === 'requestOfTask') {
_this._handleExternalRequest(data, _cb);
}
};
// Add the Callback.
this._communicatorCallbacks.set(_req, {
registeredId: _req,
type: 'request',
cb
});
// Register Functions.
this._communicator.onRpcRequest(_req, cb);
if (this._logger?.isSillyEnabled()) {
// If there is a Logger:
this._logger.silly('Dispatcher "' + this.id + '" listening on: "' + _req + '"');
}
}
}
/**
* Function, used to subscribe to results.
*
* @protected
* @param {string} id
* @param {boolean} deleteAfterCalling
* @memberof nopeDispatcher
*/
protected _subscribeToResult(id: string, deleteAfterCalling: boolean): void {
const _res = this._getServiceName(id, 'response');
if (
this._publishingMode === 'individual' &&
!this._communicatorCallbacks.has(_res)
) {
const _this = this;
// Define a Function.
const cb = (data: IResponseTaskMsg) => {
if (data.type === 'response') {
if (_this._handleExternalResponse(data) && deleteAfterCalling) {
_this._removeRpcSubscription(_res);
}
}
};
// Add the Callback.
this._communicatorCallbacks.set(_res, {
registeredId: _res,
type: 'response',
cb
});
// Register Functions.
this._communicator.onRpcResult(_res, cb);
}
}
/**
* Function to register a Function in the Dispatcher
*
* @param {(...args) => Promise<any>} func The function which should be called if a request is mapped to the Function.
* @param {{
* // Flag to enable unregistering the function after calling.
* deleteAfterCalling?: boolean,
* // Instead of generating a uuid an id could be provided
* id?: string;
* }} [options={}] Options to enhance the registered ID and enabling unregistering the Element after calling it.
* @return {*} {(...args) => Promise<any>} The registered Function
* @memberof nopeDispatcher
*/
public registerFunction(func: (...args) => Promise<any>, options: {
/** Flag to enable unregistering the function after calling. */
deleteAfterCalling?: boolean,
/** Instead of generating a uuid an id could be provided */
id?: string;
/** Flag to enable / disable sending to registery */
preventSendingToRegistery?: boolean;
} = {}): (...args) => Promise<any> {
const _this = this;
// Define / Use the ID of the Function.
const _id = options.id || generateId();
let _func = func;
if (options.deleteAfterCalling) {
_func = async (...args) => {
// Unregister the Method
_this.unregisterFunction(_id, {
preventSendingToRegistery: options.preventSendingToRegistery
});
// Return the Result of the Original Function.
return await func(...args);
}
}
// Define a ID for the Function
_func['id'] = _id;
// Define the callback.
_func['unregister'] = () => _this.unregisterFunction(_id);
// Reister the Function
this._definedFunctions.set(_func['id'], _func);
// Register the Callback:
this._subscribeToService(_id, _func);
if (!options.preventSendingToRegistery) {
// Publish the Available Services.
this._sendAvailableServices();
if (this._logger?.isSillyEnabled()) {
// If there is a Logger:
this._logger.silly('Dispatcher "' + this.id + '" registered: "' + _id + '"');
}
}
// Return the Function.
return _func;
}
/**
* Function to unregister a Function from the Dispatcher
* @param {(((...args) => void) | string | number)} func The Function to unregister
* @return {*} {boolean} Flag, whether the element was removed (only if found) or not.
* @memberof nopeDispatcher
*/
public unregisterFunction(func: ((...args) => void) | string, options: {
/** Flag to enable / disable sending to registery */
preventSendingToRegistery?: boolean;
} = {}): boolean {
const _id = typeof func === 'string' ? func : func['id'] as string || '0';
this._removeRpcSubscription(_id);
if (!options.preventSendingToRegistery) {
// Publish the Available Services.
this._sendAvailableServices();
if (this._logger?.isSillyEnabled()) {
// If there is a Logger:
this._logger.silly('Dispatcher "' + this.id + '" unregistered: "' + _id + '"');
}
}
return this._definedFunctions.delete(_id);
}
public unregisterObservable(observable: INopeObservable<any> | string, options: {
/** Flag to enable / disable sending to registery */
preventSendingToRegistery?: boolean;
} = {}): boolean {
const _id = typeof observable === 'string' ? observable : observable.id as string || '0';
if (!options.preventSendingToRegistery) {
// Publish the Available Services.
this._sendAvailableProperties();
if (this._logger?.isSillyEnabled()) {
// If there is a Logger:
this._logger.silly('Dispatcher "' + this.id + '" unregistered: "' + _id + '"');
}
}
return this._definedFunctions.delete(_id);
}
protected _externallySubscribeObservables: Map<string, {
observable: INopeObservable<IExternalEventMsg>,
cb: (...arg) => void,
}>;
protected _internallySubscribeObservables: Map<string, Set<INopeObservable<any>>>;
/**
* Creates an Event listener (if required)
*
* @protected
* @param {string} event The Event to Listen.
* @return {nopeObservable<IExternalEventMsg>} An Listener on the Communication Channel.
* @memberof nopeDispatcher
*/
protected _subscribeToEvent(event: string) {
const item = this._externallySubscribeObservables.get(event) || {
observable: this._generateObservable<IExternalEventMsg>(),
cb: () => { },
};
if (!item.observable.hasSubscriptions) {
const _this = this;
const cb = (data: IExternalEventMsg) => {
item.observable.setContent(data, _this.id);
};
this._communicator.onEvent(event, cb);
item.cb = cb;
}
// Set the Items.
this._externallySubscribeObservables.set(event, item);
return item.observable;
}
/**
* Function to unsubscribe from an event of the channel.
*
* @protected
* @param {string} event
* @memberof nopeDispatcher
*/
protected _unsubscribeEvent(event: string) {
const item = this._externallySubscribeObservables.get(event);
if (item) {
this._communicator.offEvent(event, item.cb);
// Dispose the Observable
const obs = this._externallySubscribeObservables.get(event).observable;
obs.dispose();
// Remove the Observable
this._externallySubscribeObservables.delete(event);
}
}
/**
* Helper Function to directly subscribe to a specific value.
* @param event The Event.
* @param callback The Callback used to subscribe to the event
* @param options Additional Options used to specify the Subscribing.
*/
public async subscribeToEvent<G=any, K=any>(event: string, callback: IObservableCallback<G>, options: {
pipe?: {
pipe?: IPipe<IExternalEventMsg, K>,
scope?: { [index: string]: any }
},
preventSendingToRegistery?: boolean,
mode?: 'immediate' | 'sync',
subscriptionOptions?: INopeSubscriptionOptions
}){
// Create a new observable:
const observable = this._generateObservable<G>();
// register the newly created observable.
this.registerObservable(observable, {
mode: 'subscribe',
topic: event,
preventSendingToRegistery: options.preventSendingToRegistery,
pipe: options.pipe
});
// Create an Observer by susbcribing to the external source (this is directly linked to the System)
const observer = observable.subscribe(callback,
options.mode,
options.subscriptionOptions
);
observer.unsubscribe = () => {
observable.dispose();
}
// Return the Observer.
return observer;
}
/**
* Function to manually emit an Event.
* @param _eventName
* @param data
* @param sender
* @param timestamp
* @param forced
* @param args
*/
public async emit<T>(_eventName: string, data: T, sender?: string, timestamp?: number, forced = false, ...args){
// Only Publish data, if there exists a Subscription.
if (forced || (this.subscriptionExists(_eventName) && this.id !== sender)) {
// Use the Communicator to emit the Event.
await this._communicator.emitEvent(_eventName, {
data: data,
topic: _eventName,
sender: this.id,
type: 'event',
timestamp
});
}
}
public registerObservable<T, K, S = T, G = T>(observable: INopeObservable<T, S, G>, options: {
mode: 'subscribe' | 'publish' | Array<'subscribe' | 'publish'>,
topic: string | {
subscribe?: string,
publish?: string,
},
pipe?: {
pipe?: IPipe<IExternalEventMsg, K>,
scope?: { [index: string]: any }
},
preventSendingToRegistery?: boolean
}): INopeObservable<T, S, G> {
// Reference to itself
const _this = this;
// Extract the Topics, pipe and scope.
const _subTopic = typeof options.topic === 'string' ? options.topic : options.topic.subscribe || null;
const _pubTopic = typeof options.topic === 'string' ? options.topic : options.topic.publish || null;
const _pipe = typeof options.pipe === 'function' ? options.pipe || null : null;
const _scope = typeof options.pipe === 'object' ? options.pipe.scope || null : null
// A Flag, indicating, whether the topic is new or not.
let newElement = false;
// Test if the Item should be subscribe or not.
if (options.mode == 'subscribe' || (Array.isArray(options.mode) && options.mode.includes('subscribe'))) {
newElement = newElement || !this._externallySubscribeObservables.has(_subTopic);
const _externalSource = this._subscribeToEvent(_subTopic);
if (_pipe) {
const observer = _externalSource.enhancedSubscription((data: IExternalEventMsg) => {
// Test if the Content, which has been forwared in here inst the own dispathcer.
if (data.sender != _this.id) {
if (!observable.setContent(data.data, _this.id, data.timestamp)){
observable.forcePublish()
}
}
}, {
scope: _scope,
pipe: _pipe
});
const dispose = observable.dispose;
observable.dispose = () => {
// Kill the Observer;
observer.unsubscribe();
// Unsubscribe the Event
_this._unsubscribeEvent(_subTopic);
// Call the original Dispose function;
dispose.apply(observable);
}
} else {
const observer = _externalSource.subscribe({
next(data: IExternalEventMsg) {
if (_this.id !== data.sender) {
// Externally force if required
if (!observable.setContent(data.data, _this.id, data.timestamp)){
observable.forcePublish()
}
}
},
complete() {
observable.observable.complete();
},
error(err) {
observable.observable.error(err);
}
});
// Overwrite the Original Dispose Function.
const dispose = observable.dispose;
observable.dispose = () => {
// Kill the Observer;
observer.unsubscribe();
// Unsubscribe the Event
_this._unsubscribeEvent(_subTopic);
// Call the original Dispose function;
dispose.apply(observable);
}
}
}
if (options.mode == 'publish' || (Array.isArray(options.mode) && options.mode.includes('publish'))) {
const cb = (data, sender?, timestamp?, ...args) => {
// Only Publish data, if there exists a Subscription.
if (_this.subscriptionExists(_pubTopic) && _this.id !== sender) {
// Use the Communicator to emit the Event.
_this._communicator.emitEvent(_pubTopic, {
data: data,
topic: _pubTopic,
sender: _this.id,
type: 'event',
timestamp
});
}
}
// Update the Flag.
newElement = newElement || !this._internallySubscribeObservables.has(_pubTopic);
// Register the Internally Subscribed Element.
const _set = this._internallySubscribeObservables.get(_pubTopic) || new Set();
_set.add(observable);
this._internallySubscribeObservables.set(_pubTopic, _set);
if (_pipe) {
const observer = observable.enhancedSubscription(cb, {
scope: _scope,
pipe: _pipe
});
// Overwrite the Original Dispose Function.
const dispose = observable.dispose;
observable.dispose = () => {
// Kill the Observer;
observer.unsubscribe();
// Unsubscribe the Event
_this._unsubscribeEvent(_subTopic);
// Unregister the Internally Subscribed Element.
const _set = _this._internallySubscribeObservables.get(_pubTopic) || new Set();
_set.delete(observable);
if (_set.size > 0) {
_this._internallySubscribeObservables.set(_pubTopic, _set);
} else {
_this._internallySubscribeObservables.delete(_pubTopic);
// Optionally send an update.
if (!options.preventSendingToRegistery) {
// Publish the Available Services.
_this._sendAvailableProperties();
}
}
// Call the original Dispose function;
dispose.apply(observable);
}
} else {
const observer = observable.subscribe(cb);
// Overwrite the Original Dispose Function.
const dispose = observable.dispose;
observable.dispose = () => {
// Kill the Observer;
observer.unsubscribe();
// Unsubscribe the Event
_this._unsubscribeEvent(_subTopic);
// Unregister the Internally Subscribed Element.
const _set = _this._internallySubscribeObservables.get(_pubTopic) || new Set();
_set.delete(observable);
if (_set.size > 0) {
_this._internallySubscribeObservables.set(_pubTopic, _set);
} else {
_this._internallySubscribeObservables.delete(_pubTopic);
// Optionally send an update.
if (!options.preventSendingToRegistery) {
// Publish the Available Services.
_this._sendAvailableProperties();
}
}
// Call the original Dispose function;
dispose.apply(observable);
}
}
}
if (!options.preventSendingToRegistery && newElement) {
// Publish the Available Services.
this._sendAvailableProperties();
}
// Return the Function.
return observable;
}
protected _removeRpcSubscription(_id: string) {
// Try to unregister the Callback from the communcator:
if (this._communicatorCallbacks.has(_id)) {
const _callbacks = this._communicatorCallbacks.get(_id);
switch (_callbacks.type) {
case 'request':
// Unregister the RPC-Request-Listener
this._communicator.offRpcRequest(_callbacks.registeredId, _callbacks.cb);
break;
case 'response':
// Unregister the RPC-Response-Listener
this._communicator.offRpcResponse(_callbacks.registeredId, _callbacks.cb);
break;
}
// Remove the Callback
this._communicatorCallbacks.delete(_id);
}
}
/**
* Function used to update the Available Services.
*
* @protected
* @memberof nopeDispatcher
*/
protected _sendAvailableServices() {
// Define the Message
const message: IAvailableServicesMsg = {
dispatcher: this.id,
services: Array.from(this._definedFunctions.keys())
}
if (this._logger?.isSillyEnabled()){
this._logger.silly('sending available services')
}
// Send the Message.
this._communicator.emitNewServicesAvailable(message);
}
/**
* Function to emit the available topics.
*
* @protected
* @memberof nopeDispatcher
*/
protected _sendAvailableProperties() {
// Define the Message
const message: IAvailableTopicsMsg = {
dispatcher: this.id,
published: Array.from(this._internallySubscribeObservables.keys()),
subscribed: Array.from(this._externallySubscribeObservables.keys())
}
if (this._logger?.isSillyEnabled()){
this._logger.silly('sending available properties. (subscribing and publishing events)')
}
// Send the Message.
this._communicator.emitNewTopicsAvailable(message);
}
protected _sendAvailableGenerators() {
// Define the Message
const message: IAvailableInstanceGeneratorsMsg = {
dispatcher: this.id,
generators: Array.from(this._externalGenerators.keys()),
}
if (this._logger?.isSillyEnabled()){
this._logger.silly('sending available instance generators')
}
// Send the Message.
this._communicator.emitNewInstanceGeneratorsAvailable(message);
}
/**
* Function which is used to perform a call on the remote.
*
* @template T
* @param {string} serviceName The Name / ID of the Function
* @param {any[]} params The provided Parameters.
* @param {({
* deletableCallbacks: Array<number>;
* })} [options={
* deletableCallbacks: [],
* paramsHasNoCallback: false,
* preventErrorTest: false
* }] You could additiona Optionf for the callback.
* @return {*} {Promise<T>} The result of the Operation
* @memberof nopeDispatcher
*/
public performCall<T>(serviceName: string, params: any[], options: Partial<ICallOptions> = {}): INopePromise<T> {
// Get a Call Id
const _taskId = generateId();
const _registeredIdx: Array<string> = [];
const _this = this;
const _options = Object.assign({
deletableCallbacks: [],
paramsHasNoCallback: false,
dynamicCallback: false,
resultSink: (this._publishingMode === 'generic' ? 'response' : this._getServiceName(serviceName, 'response')),
}, options) as ICallOptions;
this._subscribeToResult(serviceName, _options.dynamicCallback);
const clear = () => {
// Delete all Callbacks.
_registeredIdx.map(id => _this.unregisterFunction(id));
// Remove the task:
if (_this._runningInternalRequestedTasks.has(_taskId)) {
const task = _this._runningInternalRequestedTasks.get(_taskId);
if (task.timeout) {
clearTimeout(task.timeout)
}
// Remove the Timeout.
_this._runningInternalRequestedTasks.delete(_taskId);
}
if (_this._logger?.isSillyEnabled()) {
_this._logger.silly('Clearing Callbacks from ' + _taskId);
}
}
if (_this._logger?.isSillyEnabled()) {
_this._logger.silly('Dispatcher "' + this.id + '" requesting externally Function "' + serviceName + '" with task: "' + _taskId + '"');
}
// Define a Callback-Function, which will expect the Task.
const ret = new NopePromise<T>(async (resolve, reject) => {
try {
const requestedTask: any = {
resolve,
reject,
clear,
serviceName,
timeout: null
};
// Register the Handlers,
_this._runningInternalRequestedTasks.set(_taskId, requestedTask);
// Define a Task-Request
const taskRequest: IRequestTaskMsg = {
functionId: serviceName,
params: [],
callbacks: [],
taskId: _taskId,
type: 'requestOfTask',
resultSink: _options.resultSink
}
// Test if there is no Callback integrated
if (!options.paramsHasNoCallback) {
// If so, the parameters has to be detailled:
// Iterate over all Parameters and
// Determin Callbacks. Based on the Parameter-
// Type assign it either to packet.params (
// for parsable Parameters) and packet.callbacks
// (for callback Parameters)
for (const [idx, contentOfParameter] of params.entries()) {
// Test if the parameter is a Function
if (typeof contentOfParameter !== "function") {
taskRequest.params.push({
idx,
data: contentOfParameter
});
} else {
// The Parameter is a Callback => store a
// Description of the Callback and register
// the callback inside of the Dispatcher
const deleteAfterCalling = _options.deletableCallbacks.includes(idx);
const _func = _this.registerFunction(contentOfParameter, {
deleteAfterCalling,
preventSendingToRegistery: true,
});
_registeredIdx.push(_func['id']);
// Register the Callback
taskRequest.callbacks.push({
functionId: _func['id'],
idx,
deleteAfterCalling,
dynamicCallback: true,
deletableCallbacks: [],
resultSink: _this._publishingMode === 'generic' ? 'response' : _this._getServiceName(_func['id'], 'response')
});
}
}
} else {
for (const [idx, contentOfParameter] of params.entries()) {
taskRequest.params.push({
idx,
data: contentOfParameter
});
}
}
if (!_options.dynamicCallback && !_this.serviceExists(serviceName)) {
// Create an Error:
const error = new Error('No Service Provider known for "' + serviceName + '"');
if (_this._logger) {
_this._logger.error('No Service Provider known for "' + serviceName + '"');
_this._logger.error(error)
}
throw error;
}
// Send the Message to the specific element:
if (_this._subscriptionMode === 'individual') {
await _this._communicator.emitRpcRequest(_this._getServiceName(taskRequest.functionId, 'request'), taskRequest);
if (_this._logger && _this._logger.isSillyEnabled()) {
_this._logger.silly('Dispatcher "' + this.id + '" putting task "' + _taskId + '" on: "' + _this._getServiceName(taskRequest.functionId, 'request') + '"');
}
} else {
await _this._communicator.emitRpcRequest('request', taskRequest);
if (_this._logger && _this._logger.isSillyEnabled()) {
_this._logger.silly('Dispatcher "' + this.id + '" putting task "' + _taskId + '" on: "request"');
}
}
// If there is a timeout =>
if (options.timeout > 0) {
requestedTask.timeout = setTimeout(() => {
_this.cancelTask(_taskId, new Error('TIMEOUT. The Service allowed execution time of ' + options.timeout.toString() + '[ms] has been excided'));
}, options.timeout);
}
} catch (e) {
// Clear all Elements of the Function:
clear();
// Throw the error.
reject(e);
}
});
ret.taskId = _taskId;
ret.cancel = (reason) => {
_this.cancelTask(_taskId, reason);
};
return ret;
}
/**
* Function to clear all pending tasks
*
* @memberof nopeDispatcher
*/
public clearTasks(): void {
if (this._runningInternalRequestedTasks)
this._runningInternalRequestedTasks.clear();
else
this._runningInternalRequestedTasks = new Map();
}
/**
* Function to unregister all Functions of the Dispatcher.
*
* @memberof nopeDispatcher
*/
public unregisterAll(): void {
if (this._definedFunctions) {
for (const id of this._definedFunctions.keys()) {
this.unregisterFunction(id)
}
this._definedFunctions.clear();
} else {
this._definedFunctions = new Map<string, (...args) => Promise<any>>();
}
// Reset the Callbacks.
this._communicatorCallbacks = new Map();
}
/**
* Function to reset the Dispatcher.
*
* @memberof nopeDispatcher
*/
public reset(): void {
this._remotlyCalledFunctions = new Set();
this._mappingOfRemoteDispatchersAndServices = new Map();
this._mappingOfRemoteDispatchersAndTopics = new Map();
this._mappingOfRemoteDispatchersAndGenerators = new Map();
this._internalGenerators = new Map();
this._externalGenerators = new Map();
// If Instances Exists => Delete them.
if (this._instances) {
const _this = this;
// Dispose all Instances.
for (const [name, instance] of this._instances.entries()) {
// Remove the Instance.
this.deleteInstance(name).catch(e => {
if (_this._logger) {
_this._logger.error('Failed Removing Instance "' + name + '"');
_this._logger.error(e)
}
});
}
}
this._instances = new Map();
this._externalProvidedServices = new Set();
this._externalPublished = new Set();
this._externalSubscribed = new Set();
this._externalProvidedGenerators = new Set();
this._internallySubscribeObservables = new Map();
this._externallySubscribeObservables = new Map();
this.clearTasks();
this.unregisterAll();
}
}