nope/lib/dispatcher/nopeDispatcher.ts
2021-10-29 22:10:16 +02:00

3669 lines
109 KiB
TypeScript

/**
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2020-10-12 18:52:00
* @modify date 2021-10-19 09:15:25
* @desc [description]
*/
import * as Logger from "js-logger";
import { ILogger } from "js-logger";
import { avgOfArray, minOfArray } from "../helpers/arrayMethods";
import { isAsyncFunction, sleep, waitFor } from "../helpers/async";
import { generateId } from "../helpers/idMethods";
import { RUNNINGINNODE } from "../helpers/runtimeMethods";
import { determineDifference } from "../helpers/setMethods";
import { getNopeLogger } from "../logger/getLogger";
import { NopeGenericModule } from "../module/GenericModule";
import { NopePromise } from "../promise/nopePromise";
import {
IAvailableInstanceGeneratorsMsg,
IAvailableServicesMsg,
IAvailableTopicsMsg,
ICallOptions,
ICommunicationBridge,
IExternalEventMsg,
IInstanceCreationMsg,
IInstanceDescriptionMsg,
IInstanceRemovalMsg,
IRequestTaskMsg,
IResponseTaskMsg,
ITaskCancelationMsg
} from "../types/nope/nopeCommunication.interface";
import {
ENopeDispatcherStatus,
IDispatcherInfo,
IGenerateRemoteInstanceCallback,
IGenerateRemoteInstanceForOtherDispatcherCallback,
INopeDispatcher,
INopeDispatcherOptions,
INopeTimeOptions,
ValidDefaultSelectors,
ValidSelectorFunction
} from "../types/nope/nopeDispatcher.interface";
import {
INopeModule,
INopeModuleDescription,
IPropertyOptions
} from "../types/nope/nopeModule.interface";
import {
INopeObservable,
INopeObserver,
INopeSubscriptionOptions,
IObservableCallback,
IPipe
} from "../types/nope/nopeObservable.interface";
import { INopePromise } from "../types/nope/nopePromise.interface";
/**
* A Dispatcher to perform a function on a Remote
* Dispatcher. Therefore a Task is created and forwarded
* to the remote.
*
* @export
* @class nopeDispatcher
*/
export class nopeDispatcher implements INopeDispatcher {
public readonly id: string;
protected _logger: ILogger;
/**
* Internal Element to store the registered Functions
*
* @protected
* @memberof nopeDispatcher
*/
protected _definedFunctions: Map<
string,
{
options: {
preventSendingToRegistery: boolean;
};
func: (...args) => Promise<any>;
}
>;
protected _remotlyCalledFunctions: Set<string>;
protected _communicatorCallbacks: Map<
string,
{
registeredId: string;
type: "request" | "response";
cb: (data) => any;
}
>;
/**
* The used Communication interface
*
* @type {ICommunicationBridge}
* @memberof nopeDispatcher
*/
public readonly communicator: ICommunicationBridge;
/**
* A Map holding the current Status of external dispatchers.
* Key = Dispatcher-ID
* Value = Last Known status of the dispatcher
*
* @protected
* @type {Map<string, IDispatcherInfo>}
* @memberof nopeDispatcher
*/
protected _externalDispatchers: Map<string, IDispatcherInfo>;
/**
* A Mapping of the Services a dispatcher is hosting.
* Key = Dispatcher-ID
* Value = Available Services
*
* @protected
* @type {Map<
* string,
* IAvailableServicesMsg
* >}
* @memberof nopeDispatcher
*/
protected _mappingOfRemoteDispatchersAndServices: Map<
string,
IAvailableServicesMsg
>;
/**
* List with Services, which are available and will
* be performed by different dispatchers.
*
* @protected
* @type {Set<string>}
* @memberof nopeDispatcher
*/
protected _externalProvidedServices: Set<string>;
/**
* Element holding the information of how-many times,
* the service has been provided.
*
* Key = Service Name
* Value = Times of being provided
*
* @example example:
* service01: 2
* service02: 1
* ...
*
* @protected
* @type {Map<string, number>}
* @memberof nopeDispatcher
*/
protected _amountOfServicesProvidedByName: Map<string, number>;
/**
* Element holding the Mapping of the Dispatcher and its instance
* generators
*
* Key = Dispatcher-ID
* Value = Available Instance Generator of that Dispatcher
*
* @protected
* @type {Map<
* string,
* IAvailableInstanceGeneratorsMsg
* >}
* @memberof nopeDispatcher
*/
protected _mappingOfRemoteDispatchersAndGenerators: Map<
string,
IAvailableInstanceGeneratorsMsg
>;
/**
* Summary of the external provided generators.
*
* @protected
* @type {Set<string>}
* @memberof nopeDispatcher
*/
protected _externalProvidedGenerators: Set<string>;
/**
* Element holding the information of how-many times,
* the instance-generator has been provided.
*
* Key = Generator-Name
* Value = Times of being provided
*
* @example example:
* service01: 2
* service02: 1
* ...
*
* @protected
* @type {Map<string, number>}
* @memberof nopeDispatcher
*/
protected _amountOfGeneratorsProvidedByName: Map<string, number>;
public methodInterfaceWithOptions: {
[index: string]: <T>(options: ICallOptions, ...args) => INopePromise<T>;
};
public methodInterface: { [index: string]: <T>(...args) => INopePromise<T> };
protected _mappingOfRemoteDispatchersAndPropsOrEvents: Map<
string,
IAvailableTopicsMsg
>;
protected _externalSubscribed: Set<string>;
protected _externalPublished: Set<string>;
protected _internalSubscribed: Set<string>;
protected _internalPublished: Set<string>;
protected _registeredObservables: Map<INopeObservable<any>, IPropertyOptions>;
protected _mappingOfRemoteDispatchersAndInstances: Map<
string,
INopeModuleDescription[]
>;
protected _externalInstancesNames: Set<string>;
protected _externalInstances: Map<string, INopeModuleDescription>;
protected _eventsToSendCurrentValueOnSubscription: Map<
string,
Set<INopeObservable<any>>
>;
protected _lastPublishedEvent: Map<string, IExternalEventMsg>;
public readonly externallySubscribedProperties: INopeObservable<string[]>;
public readonly externallyPublishedProperties: INopeObservable<string[]>;
public readonly subscribedProperties: INopeObservable<string[]>;
public readonly publishedProperties: INopeObservable<string[]>;
public readonly externallySubscribedEvents: INopeObservable<string[]>;
public readonly externallyPublishedEvents: INopeObservable<string[]>;
public readonly subscribedEvents: INopeObservable<string[]>;
public readonly publishedEvents: INopeObservable<string[]>;
public readonly externalProvidedServices: INopeObservable<string[]>;
public readonly canceledTask: INopeObservable<ITaskCancelationMsg>;
public readonly ready: INopeObservable<boolean>;
public readonly availableInstances: INopeObservable<INopeModuleDescription[]>;
public readonly externalDispatchers: INopeObservable<IDispatcherInfo[]>;
/**
* Internal Element to store the running tasks.
*
* @protected
* @memberof nopeDispatcher
*/
protected _runningInternalRequestedTasks: Map<
string,
{
resolve: (value: any) => void;
reject: (error: any) => void;
clear: () => void;
serviceName: string;
timeout?: any;
}
>;
protected _forceEmittingUpdates: boolean;
protected _runningExternalRequestedTasks: Set<string>;
protected _timeouts: INopeTimeOptions;
private __warned: boolean;
/**
* Creates an instance of nopeDispatcher.
* @param {nopeRpcDispatcherOptions} options The Options, used by the Dispatcher.
* @param {() => INopeObservable<IExternalEventMsg>} _generateObservable A Helper, to generate Observables.
* @memberof nopeDispatcher
*/
constructor(
public options: INopeDispatcherOptions,
protected _generateObservable: <T>() => INopeObservable<T>
) {
this.communicator = options.communicator;
this.id = generateId();
if (!options.logger) {
this._logger = getNopeLogger("dispatcher " + this.id, "info");
} else {
this._logger = options.logger;
}
this._timeouts = {
sendAliveInterval: 500,
checkInterval: 250,
slow: 1000,
warn: 2000,
dead: 5000,
remove: 10000,
selectorTimeout: 1000
};
// Define the Timeouts.
if (options.timeouts) {
this._timeouts = Object.assign(this._timeouts, options.timeouts);
}
this.__warned = false;
if (RUNNINGINNODE) {
// eslint-disable-next-line
const os = require("os");
const getLoad = () => {
const cpus = os.cpus();
let totalTime = 0,
idleTime = 0;
// Determine the current load:
for (const cpu of cpus) {
for (const name in cpu.times) {
totalTime += cpu.times[name];
}
idleTime += cpu.times.idle;
}
return {
totalTime,
idleTime
};
};
// Initally store the load
let oldTimes = getLoad();
this._cpuInterval = setInterval(() => {
// Get the current CPU Times.
const currentTimes = getLoad();
// Determine the difference between the old Times an the current Times.
_this._cpuLoad =
1 -
(currentTimes.idleTime - oldTimes.idleTime) /
(currentTimes.totalTime - oldTimes.totalTime);
// Store the current CPU-Times
oldTimes = currentTimes;
}, 100);
}
// Use the default selector.
this.options.defaultSelector = this.options.defaultSelector ? this.options.defaultSelector : "first";
/**
* Define the flag, which will be used to force sending updates.
*/
this._forceEmittingUpdates =
typeof options.forceEmittingUpdates === "boolean"
? options.forceEmittingUpdates
: false;
/**
* Define A Proxy for accessing methods easier.
*/
const _this = this;
const _handlerWithOptions = {
get(target, name) {
return (options: ICallOptions, ...args) => {
return _this.performCall(name, args, options);
};
}
};
const _handlerWithoutOptions = {
get(target, name) {
return (...args) => {
return _this.performCall(name, args);
};
}
};
this.methodInterfaceWithOptions = new Proxy({}, _handlerWithOptions);
this.methodInterface = new Proxy({}, _handlerWithoutOptions);
// Define the Observables provided by the dispatcher.
this.externallySubscribedProperties = this._generateObservable();
this.externallySubscribedProperties.setContent([]);
this.externallyPublishedProperties = this._generateObservable();
this.externallyPublishedProperties.setContent([]);
this.publishedProperties = this._generateObservable();
this.publishedProperties.setContent([]);
this.subscribedProperties = this._generateObservable();
this.subscribedProperties.setContent([]);
this.externalProvidedServices = this._generateObservable();
this.externalProvidedServices.setContent([]);
this.canceledTask = this._generateObservable();
// Flag to show if the system is ready or not.
this.ready = this._generateObservable();
this.ready.setContent(false);
// Holding all available instances.
this.availableInstances = this._generateObservable();
this.availableInstances.setContent([]);
// Observable containing all Dispatcher Informations.
this.externalDispatchers = this._generateObservable();
this.externalDispatchers.setContent([]);
if (this._logger) {
this._logger.info(
"Dispatcher online. -> Reseting and Initializing: ",
this.id
);
}
this.reset();
this._init().catch((error) => {
if (_this._logger) {
_this._logger.error("Failed to intialize the Dispatcher", error);
}
});
}
protected _initializingInstance: Map<string, string>
/**
* Helper to provide an instance-generator for other dispatchers
*
* @template I
* @param {string} identifier
* @param {IGenerateRemoteInstanceForOtherDispatcherCallback<I>} cb
* @memberof nopeDispatcher
*/
async provideInstanceGeneratorForExternalDispatchers<I extends INopeModule>(
identifier: string,
cb: IGenerateRemoteInstanceForOtherDispatcherCallback<I>
): Promise<void> {
const _this = this;
if (this._logger?.enabledFor((Logger as any).DEBUG)) {
this._logger.debug(
"Adding instance generator for \"" +
identifier +
"\" to external Generators. Other Elements can now create instances of this type."
);
}
const _cb = await this.registerFunction(
async (data: IInstanceCreationMsg) => {
// Check if an instance exists or not.
// if not => create an instance an store it.
if (!_this._instances.has(data.identifier)) {
const hashable = [
data.identifier,
data.params,
data.type
];
const hash = "hashedSettings";
// It might happen, that an instance is requested multiple times.
// therefore we have to make shure, we wont create them multiple times:
// We will test it by using the "_internalInstances" set
if (!_this._initializingInstance.has(data.identifier)) {
// Mark the Instance as available.
_this._initializingInstance.set(data.identifier, hash);
// Create an Instance
const _instance = await cb(_this, data.identifier);
// Make shure the Data is expressed as Array.
if (!Array.isArray(data.params)) {
data.params = [data.params];
}
// Initialize the instance with the parameters.
await _instance.init(...data.params);
// A Function is registered, taking care of removing
// an instances, if it isnt needed any more.
_this.registerFunction(
async (_data: IInstanceRemovalMsg) => {
if (_this._instances.get(data.identifier)?.usedBy) {
// Get the Index of the dispatcher, which is using
// the element
const idx = _this._instances
.get(data.identifier)
.usedBy.indexOf(_data.dispatcherId);
if (idx > -1) {
_this._instances.get(data.identifier).usedBy.splice(idx, 1);
}
if (_this._instances.get(data.identifier).usedBy.length == 0) {
// Unmark as internal instance
_this._internalInstances.delete(data.identifier);
// Remove the Instance.
await _instance.dispose();
// Delete the Entry.
_this._instances.delete(data.identifier);
// Remove the Function itself
_this.unregisterFunction(
"instance_dispose_" + data.identifier
);
}
}
},
{
deleteAfterCalling: false,
id: "instance_dispose_" + data.identifier
}
);
// Store the Instance.
_this._instances.set(data.identifier, {
instance: _instance,
usedBy: [data.dispatcherId]
});
_this._internalInstances.add(data.identifier);
// Update the available instances:
_this._sendAvailableInstances();
// Make shure, we remove this instance.hash
_this._initializingInstance.delete(data.identifier);
} else if (_this._initializingInstance.get(data.identifier) != hash) {
throw Error("Providing different Parameters for the same Identifier");
} else {
// Check if the Instance is ready.
let firstHint = true;
await waitFor(() => {
if (firstHint) {
_this._logger.warn(`Parallel request for the same Instance "${data.identifier}" => Waiting until the Instance has been initialized`);
firstHint = false;
}
return _this._instances.has(data.identifier);
}, {
testFirst: true,
delay: 100
});
}
} else {
// If an Element exists => Add the Element.
_this._instances.get(data.identifier).usedBy.push(data.dispatcherId);
}
// Define the Response.
const response: IInstanceDescriptionMsg = {
description: _this._instances
.get(data.identifier)
.instance.toDescription(),
type: data.type
};
// Send the Response
return response;
},
{
id: "generateInstance_" + identifier
}
);
// Store the Generator.
this._externalGenerators.set(identifier, _cb);
this._updateAmountOf("generators");
// Send an update of the available Generators
this._sendAvailableGenerators();
}
async unprovideInstanceGeneratorForExternalDispatchers(
identifier: string
): Promise<void> {
if (this._externalGenerators.has(identifier)) {
if (this._logger?.enabledFor((Logger as any).DEBUG)) {
this._logger.debug(
"Removing instance generator for \"" +
identifier +
"\" from external Generators. Other Elements cant create instances of this type anymore."
);
}
this.unregisterFunction(this._externalGenerators.get(identifier));
this._externalGenerators.delete(identifier);
this._updateAmountOf("generators");
// Send an update of the available Generators
this._sendAvailableGenerators();
}
}
public registerInternalWrapperGenerator<I extends INopeModule>(
identifier: string,
cb: IGenerateRemoteInstanceCallback<I>
): void {
if (this._logger?.enabledFor((Logger as any).DEBUG)) {
this._logger.debug(
"Adding instance generator for \"" +
identifier +
"\" as internal Generator. This Generator wont be used externally."
);
}
this._internalGenerators.set(identifier, cb);
}
public unregisterInternalWrapperGenerator(identifier: string): void {
if (this._logger?.enabledFor((Logger as any).DEBUG)) {
this._logger.debug(
"Rmoving instance generator for \"" +
identifier +
"\" from internal Generator. The sytem cant create elements of this type any more."
);
}
this._internalGenerators.delete(identifier);
}
protected _internalGenerators: Map<
string,
IGenerateRemoteInstanceCallback<INopeModule>
>;
protected _externalGenerators: Map<
string,
IGenerateRemoteInstanceCallback<INopeModule>
>;
protected _instances: Map<
string,
{
instance: INopeModule;
usedBy: Array<string>;
manual?: boolean;
}
>;
protected _internalInstances: Set<string>;
/**
* Function used to unregister a function.
*
* @param {(string | ((...args) => any))} func
* @return {*} {Promise<void>}
* @memberof nopeDispatcher
*/
public async unregisterCallback(
func: string | ((...args) => any)
): Promise<void> {
if (typeof func === "string") {
await this.communicator.emitUnregisterRpc({
dispatcherId: this.id,
identifier: func
});
} else if (typeof func === "object" && func["id"] !== undefined) {
await this.communicator.emitUnregisterRpc({
dispatcherId: this.id,
identifier: func["id"]
});
} else {
throw Error("Wrong type provided");
}
}
/**
* Helper Function, that will create a Selector Function for Generators or Service Calls.
* Therefore the "testCallback" will be used to determine the whether a dispatcher is
* allowed to execute the service or not.
*
* @param {string} name
* @param {("service" | "generator")} type
* @param {(dispatcherId: string) => Promise<boolean>} testCallback
* @return {*}
* @memberof nopeDispatcher
*/
protected async _generateSelector(
name: string,
type: "service" | "generator",
testCallback: ValidSelectorFunction,
options: { timeout?: number } = {}
) {
const _this = this;
const _options = Object.assign(
{
timeout: this._timeouts.selectorTimeout
},
options
);
// Create a Variable, which will count the amout, the
// selection function has been called
let called = 0;
// Create a callback and Promise, that will be rejected
// if a timer has been elapsed.
let timerFinishedCallback: (...args) => void;
const timerFinishedPromise = new Promise(
(resolve, reject) => (timerFinishedCallback = reject)
);
// Create a callback and a Promise, that will be finished,
// if all elements has called the selection function.
let checksFinishedCallback: (...args) => void;
const checksFinishedPromise = new Promise(
(resolve) => (checksFinishedCallback = resolve)
);
// An finally we need to kown, if the selection was fine.
// Therefore we create a variable.
let executorSelected = false;
let firstCall = true;
const responses = new Set<string>();
// A Default callback, which should be called inside of a
// selection function. This function will ensure, that we
// trigger the corresponding timers to unregister the
// selection function.
const defaultCallbackHandler = () => {
const first = firstCall;
if (firstCall) {
firstCall = false;
// Now we start a timer to make shure to function will be unregistered after a specific amount of time.
setTimeout(
timerFinishedCallback,
_options.timeout,
new Error("Timeout: Selection timedout")
);
}
// Determine how many calles are allowed before we have to remove the callback
const amountOfCallsBeforeUnregister =
type == "generator"
? _this._amountOfGeneratorsProvidedByName.get(name) || 1
: _this._amountOfServicesProvidedByName.get(name) || 1;
// Now we make shure, we count this call
called++;
// If we have call the more items than required,
// we asume that we are taking care of the last
// element.
if (amountOfCallsBeforeUnregister <= called) {
checksFinishedCallback();
return {
last: true,
first
};
}
return {
last: false,
first
};
};
let selectorFunc = async (externalDispatcherId: string) => {
// Prevent taking care of the same dispatcher twice.
if (responses.has(externalDispatcherId)) {
return false;
}
// Mark this dispatcher, that a request hast been considered
responses.add(externalDispatcherId);
// Make shure we start the timers to unregister the function etc.
// We receive an info, whether we are checking the last call:
const options = Object.assign(defaultCallbackHandler(), {
timeout: _options.timeout - 50
});
if (
!executorSelected &&
(await testCallback(externalDispatcherId, options))
) {
// The value of "executorSelected" may changed during calling the
// testCallback. Therefore we check this again.
if (executorSelected) {
return false;
}
executorSelected = true;
return true;
}
// Return the result
return false;
};
selectorFunc = await this.registerFunction(selectorFunc, {
preventSendingToRegistery: true
});
const selectorFuncOptions: ICallOptions & { functionId: string } = {
resultSink: this._getServiceName(selectorFunc["id"], "response"),
timeout: _options.timeout,
paramsHasNoCallback: true,
functionId: selectorFunc["id"],
dynamicCallback: true
};
let rejectSelectionSucess: () => void;
// Our final Promise, that will be used to show, that
// The Selection has failed.
const selectionSucessPromise = new Promise<void>((resolve, reject) => {
rejectSelectionSucess = reject;
});
// Now we create a Callback which will be used to
Promise.race([timerFinishedPromise, checksFinishedPromise])
.catch((err) => {
// Now based on the result of the selection we may
// reject the "selectionSucess"
if (!executorSelected) {
rejectSelectionSucess();
}
})
.finally(async () => {
// Either all Elements has been
_this.unregisterFunction(selectorFunc, {
preventSendingToRegistery: true
});
});
return {
selectorFuncOptions,
selectionSucessPromise
};
}
/**
* Generates a selector function for services/generators that have been provided multiple-Times
*
* @protected
* @param {("first" | "a")} selector
* @memberof nopeDispatcher
*/
protected _generateAndRegisterSelectorFunction(
name: string,
type: "service" | "generator",
selector: ValidDefaultSelectors
) {
return this._generateSelector(
name,
type,
this._generateSelectorFunction(selector)
);
}
protected _generateSelectorFunction(selector: ValidDefaultSelectors) {
const _this = this;
let helperData: any;
switch (selector) {
default:
case "first":
return async (dispatcher: string) => {
return true;
};
case "dispatcher":
// Our selector compares the dispatcher - id
return async (externalDispatcher, options) => {
return externalDispatcher == _this.id;
};
case "host":
// Our selector compares the host-name:
// 1. Get the current Host name of our dispatcher
helperData = this._genAliveMessage().host.name;
return async (externalDispatcher) => {
const host = _this._externalDispatchers.get(externalDispatcher);
return host?.host?.name == helperData;
};
case "cpu-usage":
helperData = {
arr: [],
timeout: null,
done: false,
finish: () => {
if (!helperData.done) {
// Now we find the Min CPU usage:
const bestOption = minOfArray(helperData.arr, "cpuUsage").index;
for (const [index, { resolve }] of (
helperData.arr as any[]
).entries()) {
resolve(index === bestOption);
}
helperData.done = true;
}
if (helperData.timeout !== null) {
clearTimeout(helperData.timeout);
helperData.timeout = null;
}
}
};
return async (externalDispatcherId, options) => {
if (options.initalTest) {
return true;
}
let cpuUsage =
_this._externalDispatchers.get(externalDispatcherId).host.cpu.usage;
cpuUsage = cpuUsage === -1 ? Infinity : cpuUsage;
// For every Call, we
const promise = new Promise<boolean>((resolve) =>
helperData.arr.push({ resolve, cpuUsage })
);
// If we are calling the Element at the firsttime,
// we ensure, that we have callback, which will
// be called after the specified amount of time to
// assing the selection.
if (options.first) {
helperData.timeout = setTimeout(helperData.finish, options.timeout);
}
if (options.last) {
helperData.finish();
}
return await promise;
};
case "free-ram":
helperData = {
arr: [],
timeout: null,
done: false,
finish: () => {
if (!helperData.done) {
// Now we find the Min CPU usage:
const bestOption = minOfArray(helperData.arr, "freeRam").index;
for (const [index, { resolve }] of (
helperData.arr as any[]
).entries()) {
resolve(index === bestOption);
}
helperData.done = true;
}
if (helperData.timeout !== null) {
clearTimeout(helperData.timeout);
helperData.timeout = null;
}
}
};
return async (externalDispatcherId, options) => {
if (options.initalTest) {
return true;
}
let freeRam =
_this._externalDispatchers.get(externalDispatcherId).host.ram.free;
freeRam = freeRam === -1 ? Infinity : freeRam;
// For every Call, we
const promise = new Promise<boolean>((resolve) =>
helperData.arr.push({ resolve, freeRam })
);
// If we are calling the Element at the firsttime,
// we ensure, that we have callback, which will
// be called after the specified amount of time to
// assing the selection.
if (options.first) {
helperData.timeout = setTimeout(helperData.finish, options.timeout);
}
if (options.last) {
helperData.finish();
}
return await promise;
};
}
}
/**
* Function, that will extract the information of the instance and the
* provider.
*
* @author M.Karkowski
* @param {string} identifier The identifier of instance
* @return {*} {(INopeModuleDescription & { dispatcher: IDispatcherInfo })}
* @memberof nopeDispatcher
*/
public getInstanceInfo(
identifier: string
):
| { description: INopeModuleDescription; dispatcher: IDispatcherInfo }
| undefined {
// First check if the instance exists.
if (!this.instanceExists(identifier, false)) {
return undefined;
}
// Define the return type
const ret: {
description: INopeModuleDescription;
dispatcher: IDispatcherInfo;
} = {} as any;
// First we check if we are taking care of an internal instance, if so
// we will use this instance to enrich the description, otherwise, we
// will look in the external instances.
if (this._instances.has(identifier)) {
ret.description = this._instances
.get(identifier)
.instance.toDescription();
} else {
// Now extract teh
for (const modules of this._mappingOfRemoteDispatchersAndInstances.values()) {
for (const mod of modules) {
if (mod.identifier == identifier) {
ret.description = mod;
break;
}
}
}
}
// Additionally add some information about the used dispatcher.
ret.dispatcher = this.getDispatcherForInstance(identifier);
return ret;
}
/**
* Helper, which will return the corresponding dispathcer, which hosts
* the instance.
*
* @author M.Karkowski
* @param {string} identifier
* @return {*} {(IDispatcherInfo | undefined)}
* @memberof nopeDispatcher
*/
public getDispatcherForInstance(
identifier: string
): IDispatcherInfo | undefined {
// Check if the instance exists in general
if (!this.instanceExists(identifier, false)) {
return undefined;
}
// First we will check if the instance is available internally.
if (this._internalInstances.has(identifier)) {
return this._genAliveMessage();
}
// If that isnt the case, we will check all dispatchers and search the instance.
for (const [
dispatcher,
modules
] of this._mappingOfRemoteDispatchersAndInstances.entries()) {
for (const mod of modules) {
if (mod.identifier == identifier) {
return this._externalDispatchers.get(dispatcher);
}
}
}
return undefined;
}
/**
* Function, that will create an instance and will return an interface
* for that instance.
*
* @author M.Karkowski
* @template I
* @param {Partial<IInstanceCreationMsg>} description
* @param {({
* selector?: ValidDefaultSelectors | ValidSelectorFunction;
* })} [options={}]
* @return {*} {Promise<I>}
* @memberof nopeDispatcher
*/
public async generateInstance<I extends INopeModule>(
description: Partial<IInstanceCreationMsg>,
options: {
selector?: ValidDefaultSelectors | ValidSelectorFunction;
} = {}
): Promise<I> {
// Define the Default Description
// which will lead to an error.
const _defDescription: IInstanceCreationMsg = {
dispatcherId: this.id,
identifier: "error",
params: [],
type: "unkown"
};
// Assign the provided Description
const _description = Object.assign(_defDescription, description, {
dispatcherId: this.id
}) as IInstanceCreationMsg;
// Check if the description is complete
if (
_defDescription.type === "unkown" ||
_description.identifier === "error"
) {
throw Error(
"Please Provide at least a \"type\" and \"identifier\" in the paremeters"
);
}
if (this._logger?.enabledFor((Logger as any).DEBUG)) {
this._logger.debug(
"Requesting an Instance of type: \"" +
_defDescription.type +
"\" with the identifier: \"" +
_defDescription.identifier +
"\""
);
}
if (this._instances.has(_description.identifier)) {
if (this._logger?.enabledFor((Logger as any).DEBUG)) {
this._logger.debug(
"Already created instance with the identifiert: \"" +
_defDescription.identifier +
"\" => returning this instance"
);
}
const _instanceDetails = this.getInstanceInfo(_description.identifier);
// There is already an instance, now lets check if the types matches
if (
_instanceDetails !== undefined &&
_instanceDetails?.description.type !== _description.type
) {
throw Error(
"There exists an Instance named: '" +
_description.identifier +
"' but it uses a different type. Requested type: '" +
_description.type +
"', given type: '" +
_instanceDetails?.description.type +
"'"
);
}
// Now we need to assign the default selector.
// options.selector = options.selector ? options.selector : this.options.defaultSelector;
// If there exists an specific selector which we want to use.
if (options.selector) {
const dispatcherToUse = _instanceDetails?.dispatcher.id;
if (typeof options.selector === "function") {
if (!(await options.selector(dispatcherToUse, { initalTest: true }))) {
throw Error(
"Instance is available, but selector prevents using the provider."
);
}
} else if (typeof options.selector === "string") {
// Test the function
if (
!(await this._generateSelectorFunction(options.selector)(
dispatcherToUse,
{ initalTest: true }
))
) {
throw Error(
"Instance is available, but selector prevents using the provider."
);
}
}
}
// Add the Dispatcher to the Element:
this._instances
.get(_description.identifier)
.usedBy.push(_description.dispatcherId);
// Return the Instance.
return this._instances.get(_description.identifier).instance as I;
}
try {
let _type = _description.type;
if (!this._internalGenerators.has(_type)) {
// No default type is present for a remote
// => assing the default type which is "*""
_type = "*";
}
if (!this.generatorExists(_description.type)) {
throw Error(
"Generator \"" + _description.type + "\" isnt present in the network!"
);
}
if (this._internalGenerators.has(_type)) {
if (this._logger?.enabledFor((Logger as any).DEBUG)) {
this._logger.debug(
"No instance with the identifiert: \"" +
_defDescription.identifier +
"\" found, but an internal generator is available. Using the internal one for creating the instance and requesting the \"real\" instance externally"
);
}
// Now test if there is allready an instance with this name and type.
// If so, we check if we have the correct type etc. Additionally we
// try to extract its dispatcher-id and will use that as selector
// to allow the function be called.
const _instanceDetails = this.getInstanceInfo(_description.identifier);
if (
_instanceDetails !== undefined &&
_instanceDetails?.description.type !== _description.type
) {
throw Error(
"There exists an Instance named: '" +
_description.identifier +
"' but it uses a different type. Requested type: '" +
_description.type +
"', given type: '" +
_instanceDetails?.description.type +
"'"
);
}
const dispatcherToUse = _instanceDetails?.dispatcher.id;
if (dispatcherToUse) {
if (typeof options.selector === "function") {
if (await options.selector(dispatcherToUse, { initalTest: true })) {
// Now we generate a specific method to check against the dispatcher
options.selector = async (id: string) => {
return dispatcherToUse === id;
};
} else {
throw Error(
"Instance is available, but selector prevents using the host."
);
}
} else if (typeof options.selector === "string") {
// Test the function
if (
await this._generateSelectorFunction(options.selector)(
dispatcherToUse,
{ initalTest: true }
)
) {
// Now we generate a specific method to check against the dispatcher
options.selector = async (id: string) => {
return dispatcherToUse === id;
};
} else {
throw Error(
"Instance is available, but selector prevents using the host."
);
}
}
}
const result = await this.performCall<IInstanceDescriptionMsg>(
"generateInstance_" + _description.type,
[_description],
Object.assign(options, {
paramsHasNoCallback: true
})
);
if (this._logger?.enabledFor((Logger as any).DEBUG)) {
this._logger.debug("Received a description for the instance");
}
// Create the Instance
const instance = (await this._internalGenerators.get(_type)(
this,
result.description
)) as I;
// Store the Instances.
this._instances.set(_description.identifier, {
instance,
usedBy: [_description.dispatcherId]
});
return instance;
}
throw Error("No internal generator Available!");
} catch (e) {
if (this._logger) {
this._logger.error(
"During creating an Instance, the following error Occurd"
);
this._logger.error(e);
}
throw e;
}
}
public async registerInstance<I extends INopeModule>(
instance: I
): Promise<I> {
// Store the Instances.
this._instances.set(instance.identifier, {
instance,
usedBy: [],
manual: true
});
return instance;
}
public async deleteInstance<I extends INopeModule>(
instance: I | string,
preventSendingUpdate = false
): Promise<boolean> {
// Block to find the instance.
// Based on the property (string or instance)
// the corresponding instance object has to be select.
let _instance: { instance: INopeModule; usedBy: Array<string> };
if (typeof instance === "string") {
_instance = this._instances.get(instance);
} else {
for (const data of this._instances.values()) {
if (instance == data.instance) {
_instance = data;
break;
}
}
}
// if the instance has been found => delete the instance.
if (_instance) {
_instance.usedBy.pop();
if (_instance.usedBy.length === 0) {
const params: IInstanceRemovalMsg = {
dispatcherId: this.id,
identifier: _instance.instance.identifier
};
// Call the corresponding Dispose Function for the "real" instance
// All other elements are just accessors.
await this.performCall(
"instance_dispose_" + _instance.instance.identifier,
[params]
);
// Delete the Identifier
this._instances.delete(_instance.instance.identifier);
// Check if an update should be emitted or not.
if (!preventSendingUpdate) {
// Update the Instances provided by this module.
this._sendAvailableInstances();
}
// Dispose the Handler;
await _instance.instance.dispose();
}
return true;
}
return false;
}
/**
* Internal Method to handle some requests.
*
* @protected
* @param {IRequestTaskMsg} data The provided data of the request
* @param {*} [_function=this._definedFunctions.get(data.functionId)] The Function can be provided
* @return {Promise<void>}
* @memberof nopeDispatcher
*/
protected async _handleExternalRequest(
data: IRequestTaskMsg,
_function?: (...args) => Promise<any>
): Promise<void> {
try {
// Try to get the function if not provided:
if (typeof _function !== "function") {
_function = this._definedFunctions.get(data.functionId)?.func;
}
if (this._logger?.enabledFor((Logger as any).DEBUG)) {
// If there is a Logger:
this._logger.debug(
"Dispatcher \"" +
this.id +
"\" received request: \"" +
data.functionId +
"\" -> task: \"" +
data.taskId +
"\""
);
}
const _this = this;
if (typeof _function === "function") {
// Now we check, if we have to perform test, whether
// we are allowed to execute the task:
if (data?.selector?.functionId) {
const allowedToExecute = await _this.performCall<boolean>(
data.selector.functionId,
[_this.id],
{
// Make shure, the selector doesnt requires a selector:
preventSelector: true,
// Provide the rest of the data.
...data.selector
}
);
// We now test if our system is allowed,
if (!allowedToExecute) {
return;
}
}
// Callbacks
const cbs: Array<(reason) => void> = [];
const observer = _this.canceledTask.subscribe((cancelEvent) => {
if (cancelEvent.taskId == data.taskId) {
// Call Every Callback.
cbs.map((cb) => cb(cancelEvent.reason));
// Although we are allowed to Cancel the Subscription
observer.unsubscribe();
}
});
// Only if the Function is present extract the arguments etc.
const args = [];
// First extract the basic arguments
data.params.map((item) => (args[item.idx] = item.data));
// Add the Callbacks. Therefore create a function which will
// trigger the remote.
data.callbacks.map(
(options) =>
(args[options.idx] = async (..._args) => {
// And Create the Task and its Promise.
const servicePromise = _this.performCall<any>(
options.functionId,
_args,
options
);
const cancelCallback = (reason) => {
// The Main Task has been canceled =>
// We are allowed to canel the Subtask as well.
servicePromise.cancel(reason);
};
cbs.push(cancelCallback);
// Await the Result. If an Task is canceled => The Error is Thrown.
const result = await servicePromise;
// Remove the Index
cbs.splice(cbs.indexOf(cancelCallback), 1);
return result;
})
);
// Perform the Task it self.
const _resultPromise = _function(...args);
if (
typeof (_resultPromise as INopePromise<any>)?.cancel === "function"
) {
// Push the Callback to the Result.
cbs.push((reason) =>
(_resultPromise as INopePromise<any>).cancel(reason)
);
}
// Wait for the Result to finish.
const _result = await _resultPromise;
// Define the Result message
const result: IResponseTaskMsg = {
result: typeof _result !== "undefined" ? _result : null,
taskId: data.taskId,
type: "response"
};
// Use the communicator to publish the result.
await this.communicator.emitRpcResponse(data.resultSink, result);
}
} catch (error) {
if (this._logger) {
// If there is a Logger:
this._logger.error(
"Dispatcher \"" +
this.id +
"\" failed with request: \"" +
data.taskId +
"\""
);
this._logger.error(error);
}
// An Error occourd => Forward the Error.
const result: IResponseTaskMsg = {
error: {
error,
msg: error.toString()
},
taskId: data.taskId,
type: "response"
};
// Send the Error via the communicator to the remote.
await this.communicator.emitRpcResponse(data.resultSink, result);
}
}
/**
* Internal Function to handle responses. In Generale,
* the dispatcher checks if there is an open task with
* the provided id. If so => finish the promise.
*
* @protected
* @param {IResponseTaskMsg} data The Data provided to handle the Response.
* @return {boolean} Returns a boolean, indicating whether a corresponding task was found or not.
* @memberof nopeDispatcher
*/
protected _handleExternalResponse(data: IResponseTaskMsg): boolean {
try {
// Extract the Task
const task = this._runningInternalRequestedTasks.get(data.taskId);
// Delete the Task:
this._runningInternalRequestedTasks.delete(data.taskId);
// Based on the Result of the Remote => proceed.
// Either throw an error or forward the result
if (task && data.error) {
if (this._logger) {
this._logger.error("Failed with task " + data.taskId);
this._logger.error("Reason: " + data.error.msg);
this._logger.error(data.error);
}
task.reject(data.error);
// Clearout the Timer
if (task.timeout) {
clearTimeout(task.timeout);
}
return true;
}
if (task) {
task.resolve(data.result);
// Clearout the Timer
if (task.timeout) {
clearTimeout(task.timeout);
}
return true;
}
} catch (e) {
this._logger.error("Error during handling an external response");
this._logger.error(e);
}
return false;
}
protected _checkInterval: any = null;
protected _sendInterval: any = null;
protected _cpuInterval: any = null;
/**
* Internal Function, used to initialize the Dispatcher.
* It subscribes to the "Messages" of the communicator.
*
* @protected
* @memberof nopeDispatcher
*/
protected async _init(): Promise<void> {
const _this = this;
// Wait until the Element is connected.
await this.communicator.connected.waitFor((value) => value, {
testCurrent: true
});
// Setup Test Intervals:
if (this._timeouts.checkInterval > 0) {
// Define a Checker, which will test the status
// of the external Dispatchers.
this._checkInterval = setInterval(
() => _this._checkDispachterHealth(),
this._timeouts.checkInterval
);
}
if (this._timeouts.sendAliveInterval > 0) {
// Define a Timer, which will emit Status updates with
// the disered delay.
this._sendInterval = setInterval(
() => _this._emitStatus(),
this._timeouts.sendAliveInterval
);
}
this.registerInternalWrapperGenerator(
"*",
async (dispather, description) => {
const mod = new NopeGenericModule(dispather, _this._generateObservable);
await mod.fromDescription(description, "overwrite");
return mod;
}
);
// Iterate over the Defined Functions and create Subscriptions
for (const [id, item] of this._definedFunctions.entries()) {
// Subscribe the Function
this._subscribeToService(id, item.func);
}
// Subscribe to the availableServices of Remotes.
// If there is a new Service => udpate the External Services
await this.communicator.onNewServicesAvailable((data) => {
try {
if (data.dispatcher !== _this.id) {
_this._mappingOfRemoteDispatchersAndServices.set(
data.dispatcher,
data
);
_this._updateExternalServices();
if (this._logger?.enabledFor((Logger as any).DEBUG)) {
// If there is a Logger:
this._logger.debug(
this.id,
"received new services from",
data.dispatcher
);
}
}
} catch (e) {
this._logger.error("Error during handling an onNewServicesAvailable");
this._logger.error(e);
}
});
// Subscribe to new available Topics.
await this.communicator.onNewObservablesAvailable((data) => {
try {
if (data.dispatcher !== _this.id) {
// Update the new Subscribed/Published Observables
_this._mappingOfRemoteDispatchersAndPropsOrEvents.set(
data.dispatcher,
data
);
// Update the new Observables of the dispatcher
_this._updateExternalObservables(data.dispatcher);
if (this._logger?.enabledFor((Logger as any).DEBUG)) {
// If there is a Logger:
this._logger.debug(
this.id,
"received new Observables from",
data.dispatcher
);
}
}
// Update the all internal subscribed / published events.
this.subscribedProperties.setContent(
Array.from(
new Set([..._this._externalSubscribed, ...this._internalSubscribed])
)
);
this.publishedProperties.setContent(
Array.from(
new Set([..._this._externalPublished, ...this._internalPublished])
)
);
} catch (e) {
this._logger.error("Error during handling an onNewObservablesAvailable");
this._logger.error(e);
}
});
await this.communicator.onNewInstanceGeneratorsAvailable((data) => {
try {
_this._mappingOfRemoteDispatchersAndGenerators.set(
data.dispatcher,
data
);
_this._updateExternalGenerators();
if (this._logger?.enabledFor((Logger as any).DEBUG)) {
// If there is a Logger:
this._logger.debug(
this.id,
"received new generators from",
data.dispatcher
);
}
} catch (e) {
this._logger.error("Error during handling an onNewInstanceGeneratorsAvailable");
this._logger.error(e);
}
});
await this.communicator.onStatusUpdate((info) => {
_this._externalDispatchers.set(info.id, info);
_this.externalDispatchers.setContent(
Array.from(_this._externalDispatchers.values())
);
});
await this.communicator.onBonjour((info) => {
_this._externalDispatchers.set(info.id, info);
_this.externalDispatchers.setContent(
Array.from(_this._externalDispatchers.values())
);
if (_this.id !== info.id) {
_this._sendAvailableServices();
_this._sendAvailableObservables();
_this._sendAvailableGenerators();
_this._sendAvailableInstances();
_this._emitStatus();
if (_this._logger?.enabledFor((Logger as any).DEBUG)) {
// If there is a Logger:
_this._logger.debug(
"Remote Dispatcher \"" + info.id + "\" went online"
);
}
}
});
await this.communicator.onAurevoir((dispatcher: string) =>
_this._removeDispatcher(dispatcher)
);
// Listen to newly created instances.
await this.communicator.onNewInstancesAvailable((message) => {
// Store the instances:
_this._mappingOfRemoteDispatchersAndInstances.set(
message.dispatcher,
message.instances
);
// Update the Mapping:
_this._updateExternalInstances();
if (_this._logger?.enabledFor((Logger as any).DEBUG)) {
// If there is a Logger:
_this._logger.debug(
"Remote Dispatcher \"" +
message.dispatcher +
"\" updated its available instances"
);
}
});
await this.communicator.onTaskCancelation((event) => {
if (event.dispatcher !== _this.id) {
_this.canceledTask.setContent(event);
}
});
// Now we listen to
this.communicator.onUnregisterRpc((msg) => {
if (_this._definedFunctions.has(msg.identifier)) {
const item = _this._definedFunctions.get(msg.identifier);
_this.unregisterFunction(msg.identifier, {
preventSendingToRegistery: item.options.preventSendingToRegistery
});
}
});
this.communicator.connected.subscribe((connected) => {
// Handle an unconnect.
if (connected) {
_this._logger.debug("Sending Bonjour");
if (RUNNINGINNODE) {
_this.emitBonjour();
} else {
setTimeout(() => {
_this.emitBonjour();
}, 2000);
}
}
});
if (this._logger) {
this._logger.info(this.id, "initialized");
}
// We sleep 500 ms
await sleep(500);
await this.emitBonjour();
await sleep(500);
this.ready.setContent(true);
}
/**
* Helper Function to manually emit a Bonjour!
*
* @return {*} {Promise<void>}
* @memberof nopeDispatcher
*/
public async emitBonjour(): Promise<void> {
this.communicator.emitBonjour(this._genAliveMessage());
this._sendAvailableServices();
this._sendAvailableObservables();
this._sendAvailableGenerators();
this._sendAvailableInstances();
}
protected _checkDispachterHealth(): void {
const currentTime = Date.now();
let changes = false;
for (const status of this._externalDispatchers.values()) {
// determine the Difference
const diff = currentTime - status.timestamp;
// Based on the Difference Determine the Status
if (diff > this._timeouts.remove) {
// remove the Dispatcher. But be quite.
// Perhaps more dispatchers will be removed
this._removeDispatcher(status.id, true);
changes = true;
} else if (
diff > this._timeouts.dead &&
status.status !== ENopeDispatcherStatus.DEAD
) {
status.status = ENopeDispatcherStatus.DEAD;
changes = true;
} else if (
diff > this._timeouts.warn &&
diff <= this._timeouts.dead &&
status.status !== ENopeDispatcherStatus.WARNING
) {
status.status = ENopeDispatcherStatus.WARNING;
changes = true;
} else if (
diff > this._timeouts.slow &&
diff <= this._timeouts.warn &&
status.status !== ENopeDispatcherStatus.SLOW
) {
status.status = ENopeDispatcherStatus.SLOW;
changes = true;
} else if (
diff <= this._timeouts.slow &&
status.status !== ENopeDispatcherStatus.HEALTHY
) {
status.status = ENopeDispatcherStatus.HEALTHY;
changes = true;
}
}
if (changes) {
// Update the External Dispatchers
this.externalDispatchers.setContent(
Array.from(this._externalDispatchers.values())
);
}
}
protected _removeDispatcher(dispatcher: string, quite = false): void {
// Delete the Generators of the Instances.
this._mappingOfRemoteDispatchersAndGenerators.delete(dispatcher);
this._mappingOfRemoteDispatchersAndServices.delete(dispatcher);
this._mappingOfRemoteDispatchersAndPropsOrEvents.delete(dispatcher);
this._mappingOfRemoteDispatchersAndInstances.delete(dispatcher);
const dispatcherInfo = this._externalDispatchers.get(dispatcher);
const deleted = this._externalDispatchers.delete(dispatcher);
// Iterate over the available instances and remove the providers:
for (const instance of this._instances.values()) {
// Remove all Dispachers:
let idx = instance.usedBy.indexOf(dispatcher);
while (idx !== -1) {
instance.usedBy.splice(idx, 1);
idx = instance.usedBy.indexOf(dispatcher);
}
// Check if the Element isnt required any more => delete it.
if (instance.usedBy.length === 0 && !instance.manual) {
this._logger.info(
"Disposing instance, because it isnt used any more.",
instance.instance.identifier
);
instance.instance
.dispose()
.catch((e) => {
this._logger.error("Failed to remove the Instance.", e);
})
.then(() => {
this._logger.info(
"sucessfully disposed instance ",
instance.instance.identifier
);
});
}
}
this._updateExternalServices();
this._updateExternalGenerators();
this._updateExternalObservables();
this._updateExternalInstances();
if (!quite) {
this.externalDispatchers.setContent(
Array.from(this._externalDispatchers.values())
);
}
if (deleted && this._logger?.enabledFor((Logger as any).WARN)) {
// If there is a Logger:
this._logger.warn(
"a dispatcher on",
dispatcherInfo?.host.name || "unkown",
"went offline. ID of the Dispatcher: ",
dispatcher
);
}
}
protected _cpuLoad = -1;
/**
* Generates the current Status Message of the Dispatcher.
*
* @author M.Karkowski
* @protected
* @return {*} {IDispatcherInfo} The current status of our dispatcher.
* @memberof nopeDispatcher
*/
protected _genAliveMessage(): IDispatcherInfo {
if (RUNNINGINNODE) {
// eslint-disable-next-line
const os = require("os");
const cpus = os.cpus();
return {
id: this.id,
env: "javascript",
version: "0.9.7",
host: {
cores: cpus.length,
cpu: {
model: `${cpus[0].model}`.slice(
0,
(cpus[0].model as string).indexOf("@") - 1
),
speed: avgOfArray(cpus, "speed"),
usage: this._cpuLoad
},
os: os.platform(),
ram: {
// Return the used Memory
usedPerc: 1 - os.freemem() / os.totalmem(),
// The Values are given in Byte but we want MByte
free: Math.round(os.freemem() / 1048576),
total: Math.round(os.totalmem() / 1048576)
},
name: os.hostname()
},
pid: process.pid,
timestamp: Date.now(),
status: ENopeDispatcherStatus.HEALTHY
};
}
return {
env: "javascript",
version: "0.9.5",
host: {
cores: -1,
cpu: {
model: "unkown",
speed: -1,
usage: -1
},
name: navigator.appCodeName + " " + navigator.appName,
os: navigator.platform,
ram: {
free: -1,
usedPerc: -1,
total: -1
}
},
id: this.id,
pid: this.id,
timestamp: Date.now(),
status: ENopeDispatcherStatus.HEALTHY
};
}
/**
* Function to cancel an indivual Task.
*
* @param {string} taskId The Id of the Task. Which should be canceled.
* @param {Error} reason The Reason, why the Task should be canceled (In general shoudl be something meaning full)
* @return {*} Flag, that indicates, whether cancelation was sucessfull or not.
* @memberof nopeDispatcher
*/
public cancelTask(taskId: string, reason: Error, quite = false) {
if (this._runningInternalRequestedTasks.has(taskId)) {
const task = this._runningInternalRequestedTasks.get(taskId);
// Delete the task
this._runningInternalRequestedTasks.delete(taskId);
// Propagate the Cancellation (internally):
task.reject(reason);
// Propagate the Cancellation externally.
// Therefore use the desired Mode.
this.communicator.emitTaskCancelation({
dispatcher: this.id,
reason,
taskId,
quite
});
// Indicate a successful cancelation.
return true;
}
// Task hasnt been found => Cancel the Task.
return false;
}
/**
* Internal Helper Function, used to close all tasks with a specific service.
*
* @protected
* @param {string} serviceName The Name of the Service.
* @param {Error} reason The provided Reason, why cancelation is reuqired.
* @memberof nopeDispatcher
*/
public cancelRunningTasksOfService(serviceName: string, reason: Error) {
// Provide a List containing all Tasks, that has to be canceled
const _tasksToCancel: {
reject: (error: Error) => void;
id: string;
}[] = [];
// Filter all Tasks that shoud be canceled.
for (const [id, task] of this._runningInternalRequestedTasks.entries()) {
// Therefore compare the reuqired Service by the Task
if (task.serviceName === serviceName) {
// if the service matches, put it to our list.
_tasksToCancel.push({
id,
reject: task.reject
});
}
}
if (_tasksToCancel.length > 0) {
// First remove all Tasks.
// Then cancel them to avoid side effects
for (const item of _tasksToCancel) {
this._runningInternalRequestedTasks.delete(item.id);
}
// Now Reject all Tasks.
for (const item of _tasksToCancel) {
item.reject(reason);
}
}
}
/**
* Will dispose the Dispatcher. Must be called on exit for a clean exit. Otherwise it is defined as dirty exits
*/
public async dispose(): Promise<void> {
for (const task of Array.from(this._runningInternalRequestedTasks.keys())) {
this.cancelTask(task, new Error("Client going offline"), false);
}
if (this._sendInterval) {
clearInterval(this._sendInterval);
}
if (this._checkInterval) {
clearInterval(this._checkInterval);
}
if (this._cpuInterval) {
clearInterval(this._cpuInterval);
}
// Emits the aurevoir Message.
this.communicator.emitAurevoir(this.id);
}
/**
* Internal function to update the amount of the services or
* generators, that are hosted.
*
* @author M.Karkowski
* @protected
* @param {("services" | "generators")} type
* @memberof nopeDispatcher
*/
protected _updateAmountOf(type: "services" | "generators"): void {
const props =
type === "services"
? {
map: "_amountOfServicesProvidedByName",
iter: "_mappingOfRemoteDispatchersAndServices",
name: "services"
}
: {
map: "_amountOfGeneratorsProvidedByName",
iter: "_mappingOfRemoteDispatchersAndGenerators",
name: "generators"
};
const _this = this;
this[props.map].clear();
for (const dispatcherInfo of this[props.iter].values()) {
dispatcherInfo[props.name].map((service) => {
// Add the service and tell the System, that
// we have an additional service-provider for that.
_this[props.map].set(service, (_this[props.map].get(service) || 0) + 1);
});
}
for (const service of this._definedFunctions.keys()) {
this[props.map].set(service, (this[props.map].get(service) || 0) + 1);
}
}
/**
* Function to update the used Services.
*
* @protected
* @memberof serviceRegistry
*/
protected _updateExternalServices(): {
// Contains the "new" services
addedServices: string[];
// contains the "unavailable services"
removedServices: string[];
} {
const _this = this;
// Store the Availabe Services before the Update.
const _servicesBeforeUpdate = new Set(this._externalProvidedServices);
// Clear the Services and now add the available
// Services to this Array
this._externalProvidedServices.clear();
for (const dispatcherInfo of this._mappingOfRemoteDispatchersAndServices.values()) {
dispatcherInfo.services.map((service) => {
// Add the Service to the Set => Then we kown which services
// are available.
_this._externalProvidedServices.add(service);
});
}
// Determine the Difference of the Items, before and afterwards
const difference = determineDifference(
_servicesBeforeUpdate,
_this._externalProvidedServices
);
// We combine the provided
// and external available.
this.externalProvidedServices.setContent(
Array.from(this._externalProvidedServices)
);
// If there are unavailable tasks => cancel their tasks.
if (_servicesBeforeUpdate.size > 0) {
for (const unavailable of difference.removed) {
// Cancel the Tasks
this.cancelRunningTasksOfService(
unavailable,
new Error("Service unavailable!")
);
}
}
this._updateAmountOf("services");
return {
// Contains the "new" services
addedServices: Array.from(difference.added),
// contains the "unavailable services"
removedServices: Array.from(difference.removed)
};
}
/**
* Function to update the used Generators.
*
* @protected
* @memberof nopeDispatcher
*/
protected _updateExternalGenerators(): void {
const _this = this;
// Clear the known Generators
this._externalProvidedGenerators.clear();
for (const generators of this._mappingOfRemoteDispatchersAndGenerators.values()) {
generators.generators.map((gen) => {
// Add the Generator to the Element
_this._externalProvidedGenerators.add(gen);
});
}
this._updateAmountOf("generators");
}
/**
* Internal Function to update the Listing of external Topcis.
* This Function creates a list containing all subscriptions
* and publishers which are external.
*
* @protected
* @memberof nopeDispatcher
*/
protected _updateExternalObservables(externalDispatcher = ""): void {
const _this = this;
// Clear the Services
const _published = new Set<string>();
const _subscribed = new Set<string>();
for (const dispatcherInfo of this._mappingOfRemoteDispatchersAndPropsOrEvents.values()) {
dispatcherInfo.published.map((propOrEvent) =>
_published.add(propOrEvent)
);
dispatcherInfo.subscribed.map((propOrEvent) =>
_subscribed.add(propOrEvent)
);
}
// Send the Values of the Requested Properties
// Therefore iterate over the new elements and
// provide the last value
// 1. Filter the new Subscribed Elements:
const _newlySubscribed = new Set(
[..._subscribed].filter((item) => !_this._externalSubscribed.has(item))
);
if (externalDispatcher !== "") {
// Make shure, that the subscribed elements of the
// new element are added correctly.
this._mappingOfRemoteDispatchersAndPropsOrEvents
.get(externalDispatcher)
.subscribed.map((item) => _newlySubscribed.add(item));
}
// 2. Iterate over the Elements and
// add the lastly available value.
// TODO: CHECK IF REQUIRED.
for (const _topic of _newlySubscribed) {
if (_this._logger.enabledFor((Logger as any).DEBUG)) {
_this._logger.debug(`Determined the topic: "${_topic}" and checking if th`);
}
if (this._lastPublishedEvent.has(_topic)) {
// Send the Element
this.communicator.emitEvent(
_topic,
this._lastPublishedEvent.get(_topic)
);
}
}
// Store the new values:
this._externalSubscribed = _subscribed;
this._externalPublished = _published;
// Update the Elements.
this.externallySubscribedProperties.setContent(
Array.from(this._externalSubscribed)
);
this.externallyPublishedProperties.setContent(
Array.from(this._externalPublished)
);
}
/**
* Internal Function to update the Listing of external provided instances
*
*
* @protected
* @memberof nopeDispatcher
*/
protected _updateExternalInstances(): void {
const _this = this;
// Clear the available instances.
this._externalInstances.clear();
this._externalInstancesNames.clear();
for (const dispatcherInfo of this._mappingOfRemoteDispatchersAndInstances.values()) {
dispatcherInfo.map((instance) => {
// Check if the Instance exists:
if (!_this._externalInstances.has(instance.identifier)) {
_this._externalInstancesNames.add(instance.identifier);
_this._externalInstances.set(instance.identifier, instance);
} else if (
_this._externalInstances.get(instance.identifier)?.type !==
instance.type
) {
// A Miss Matching occurd.
// That should lead to a fatal error ?
_this._logger.warn(
"An Instance with the name \"" +
instance.identifier +
"\" has already been declared",
_this._externalInstances.get(instance.identifier)?.type,
"!=",
instance.type
);
}
});
}
// Store the Instance Mapping and Publish it.
this.availableInstances.setContent(
Array.from(this._externalInstances.values())
);
}
/**
* Function to test if a specific Service exists.
*
* @param {string} id The Id of the Serivce
* @return {boolean} The result of the Test. True if either local or remotly a service is known.
* @memberof nopeDispatcher
*/
public serviceExists(id: string): boolean {
return (
this._definedFunctions.has(id) || this._externalProvidedServices.has(id)
);
}
/**
* Function to test if an subscription exists,
* for the given topic.
*
* @param {string} topic The topic to test.
* @return {boolean} The result of the test. True if an external subscription exsits
* @memberof nopeDispatcher
*/
public subscriptionForPropertyExists(topic: string, externalOnly = true): boolean {
if (externalOnly) {
return this._externalSubscribed.has(topic);
} else {
this._internalSubscribed.has(topic) ||
this._externalSubscribed.has(topic);
}
}
public publisherForPropertyExists(topic: string, externalOnly = true): boolean {
if (externalOnly) {
return this._externalPublished.has(topic);
} else {
this._internalPublished.has(topic) || this._externalPublished.has(topic);
}
}
public instanceExists(identifier: string, externalOnly = true): boolean {
if (externalOnly) {
return this._externalInstances.has(identifier);
} else {
this._externalInstances.has(identifier) ||
this._instances.has(identifier);
}
}
/**
* Function to test if a generator exists for
* the given type idenfitier
*
* @param {string} typeIdentifier Identifier of the type.
* @return {boolean} The result of the test. True if an external generator exsits.
* @memberof nopeDispatcher
*/
public generatorExists(typeIdentifier: string): boolean {
return this._externalProvidedGenerators.has(typeIdentifier);
}
/**
* Function to adapt a Request name.
* Only used internally
*
* @protected
* @param {string} id the original ID
* @return {string} the adapted ID.
* @memberof nopeDispatcher
*/
protected _getServiceName(id: string, type: "request" | "response"): string {
return id.startsWith(`${type}/`) ? id : `${type}/${id}`;
}
/**
* Internal Helper Function to subscribe to a Function element.
*
* @protected
* @param {string} id the Id of the function
* @param {(...args) => Promise<any>} _cb The Callback of the Function
* @return {void} the adapted ID.
* @memberof nopeDispatcher
*/
protected _subscribeToService(
id: string,
_cb: (...args) => Promise<any>
): void {
const _req = this._getServiceName(id, "request");
if (!this._communicatorCallbacks.has(_req)) {
const _this = this;
// Define a Function.
const cb = (data: IRequestTaskMsg) => {
if (data.type === "requestOfTask") {
_this._handleExternalRequest(data, _cb);
}
};
// Add the Callback.
this._communicatorCallbacks.set(_req, {
registeredId: _req,
type: "request",
cb
});
// Register Functions.
this.communicator.onRpcRequest(_req, cb);
if (this._logger?.enabledFor((Logger as any).DEBUG)) {
// If there is a Logger:
this._logger.debug(
"Dispatcher \"" + this.id + "\" listening on: \"" + _req + "\""
);
}
}
}
/**
* Function, used to subscribe to results.
*
* @protected
* @param {string} id
* @param {boolean} deleteAfterCalling
* @memberof nopeDispatcher
*/
protected _subscribeToResult(id: string, deleteAfterCalling: boolean): void {
const _res = this._getServiceName(id, "response");
if (!this._communicatorCallbacks.has(_res)) {
const _this = this;
// Define a Function.
const cb = (data: IResponseTaskMsg) => {
if (data.type === "response") {
if (_this._handleExternalResponse(data) && deleteAfterCalling) {
_this._removeRpcSubscription(_res);
}
}
};
// Add the Callback.
this._communicatorCallbacks.set(_res, {
registeredId: _res,
type: "response",
cb
});
// Register Functions.
this.communicator.onRpcResponse(_res, cb);
}
}
/**
* Creates an Event listener (if required)
*
* @protected
* @param {string} event The Event to Listen.
* @return {nopeObservable<IExternalEventMsg>} An Listener on the Communication Channel.
* @memberof nopeDispatcher
*/
protected _subscribeToEvent(event: string): {
newSubscription: boolean,
observable: INopeObservable<IExternalEventMsg>
} {
const item = this._externalTopicLinkedWithObservable.get(event) || {
observable: this._generateObservable<IExternalEventMsg>(),
cb: () => {
// Default callback
},
// we must know, whether this element is new or not.
newSubscription: this._externalTopicLinkedWithObservable.has(event)
};
if (!item.observable.hasSubscriptions) {
const _this = this;
const cb = (data: IExternalEventMsg) => {
item.observable.setContent(data, { sender: _this.id });
};
this.communicator.onEvent(event, cb);
item.cb = cb;
}
// Set the Items.
this._externalTopicLinkedWithObservable.set(event, item);
const { cb, ...ret } = item;
return ret as {
newSubscription: boolean,
observable: INopeObservable<IExternalEventMsg>
};
}
/**
* Function to register a Function in the Dispatcher
*
* @param {(...args) => Promise<any>} func The function which should be called if a request is mapped to the Function.
* @param {{
* // Flag to enable unregistering the function after calling.
* deleteAfterCalling?: boolean,
* // Instead of generating a uuid an id could be provided
* id?: string;
* }} [options={}] Options to enhance the registered ID and enabling unregistering the Element after calling it.
* @return {*} {(...args) => Promise<any>} The registered Function
* @memberof nopeDispatcher
*/
public registerFunction(
func: (...args) => Promise<any>,
options: {
/** Flag to enable unregistering the function after calling. */
deleteAfterCalling?: boolean;
/** Instead of generating a uuid an id could be provided */
id?: string;
/** Flag to enable / disable sending to registery */
preventSendingToRegistery?: boolean;
} = {}
): (...args) => Promise<any> {
const _this = this;
// Define / Use the ID of the Function.
const _id = options.id || generateId();
let _func = func;
if (!this.__warned && !isAsyncFunction(func)) {
this._logger.warn("!!! You have provided synchronous functions. They may break NoPE. Use them with care !!!");
this.__warned = true;
}
if (options.deleteAfterCalling) {
_func = async (...args) => {
// Unregister the Method
_this.unregisterFunction(_id, {
preventSendingToRegistery: options.preventSendingToRegistery
});
// Return the Result of the Original Function.
return await func(...args);
};
}
// Define a ID for the Function
_func["id"] = _id;
// Define the callback.
_func["unregister"] = () => _this.unregisterFunction(_id);
// Reister the Function
this._definedFunctions.set(_func["id"], {
options: {
preventSendingToRegistery: options.preventSendingToRegistery || false
},
func: _func
});
// Register the Callback:
this._subscribeToService(_id, _func);
if (!options.preventSendingToRegistery) {
this._updateAmountOf("services");
// Publish the Available Services.
this._sendAvailableServices();
if (this._logger?.enabledFor((Logger as any).DEBUG)) {
// If there is a Logger:
this._logger.debug(
"Dispatcher \"" + this.id + "\" registered: \"" + _id + "\""
);
}
}
// Return the Function.
return _func;
}
/**
* Function to unregister a Function from the Dispatcher
* @param {(((...args) => void) | string | number)} func The Function to unregister
* @return {*} {boolean} Flag, whether the element was removed (only if found) or not.
* @memberof nopeDispatcher
*/
public unregisterFunction(
func: ((...args) => void) | string,
options: {
/** Flag to enable / disable sending to registery */
preventSendingToRegistery?: boolean;
} = {}
): boolean {
const _id = typeof func === "string" ? func : (func["id"] as string) || "0";
this._removeRpcSubscription(_id);
if (!options.preventSendingToRegistery) {
this._updateAmountOf("services");
// Publish the Available Services.
this._sendAvailableServices();
if (this._logger?.enabledFor((Logger as any).DEBUG)) {
// If there is a Logger:
this._logger.debug(
"Dispatcher \"" + this.id + "\" unregistered: \"" + _id + "\""
);
}
}
return this._definedFunctions.delete(_id);
}
public registerObservable<T, K, S = T, G = T>(
observable: INopeObservable<T, S, G>,
options: IPropertyOptions
): INopeObservable<T, S, G> {
// Reference to itself
const _this = this;
// Extract the Topic, pipe and scope.
const _subTopic =
typeof options.topic === "string"
? options.topic
: options.topic.subscribe || null;
const _pubTopic =
typeof options.topic === "string"
? options.topic
: options.topic.publish || null;
const _pipe =
typeof options.pipe === "function" ? options.pipe || null : null;
const _scope =
typeof options.pipe === "object" ? options.pipe.scope || null : null;
this._registeredObservables.set(observable, options);
// A Flag, indicating, whether the topic is new or not.
let newElement = false;
// Test if the Item should be subscribe or not.
if (
options.mode == "subscribe" ||
(Array.isArray(options.mode) && options.mode.includes("subscribe"))
) {
// Now we want to subscribe to external events.
newElement =
newElement || !this._externalTopicLinkedWithObservable.has(_subTopic);
// Now we store the external source.
const _externalSource = this._subscribeToEvent(_subTopic);
if (_pipe) {
const observer = _externalSource.enhancedSubscription(
(data: IExternalEventMsg) => {
// Test if the Content, which has been forwared in here inst the own dispathcer.
if ((data.forced && data.sender != _this.id) || data.sender != _this.id) {
if (_this._logger?.enabledFor((Logger as any).DEBUG)) {
_this._logger.debug("Forwarding data on \"" + _subTopic + "\" to registered observables");
}
// Problem: If running internally (no external event emitter -> no update will be forward)
observable.setContent(data.data, {
sender: _this.id,
timestamp: data.timestamp,
forced: data.forced || false
});
}
},
{
scope: _scope,
pipe: _pipe
}
);
const dispose = observable.dispose;
observable.dispose = () => {
// Kill the Observer;
observer.unsubscribe();
// Unsubscribe the Event
_this._unsubscribeObservable(_subTopic);
// Call the original Dispose function;
dispose.apply(observable);
};
} else {
const observer = _externalSource.subscribe({
next(data: IExternalEventMsg) {
// We will forward the content, if its from a different sender or its forced
if ((data.forced && data.sender != _this.id) || data.sender != _this.id) {
if (_this._logger?.enabledFor((Logger as any).DEBUG)) {
_this._logger.debug("Forwarding data on \"" + _subTopic + "\" to registered observables");
}
observable.setContent(data.data, {
sender: _this.id,
timestamp: data.timestamp,
forced: data.forced || false
});
}
},
complete() {
observable.observable.complete();
},
error(err) {
observable.observable.error(err);
}
});
// Overwrite the Original Dispose Function.
const dispose = observable.dispose;
observable.dispose = () => {
// Kill the Observer;
observer.unsubscribe();
// Unsubscribe the Event
_this._unsubscribeObservable(_subTopic);
// Call the original Dispose function;
dispose.apply(observable);
};
}
// Only if required => update the Value:
// This is done, by using the lastly published /
// received value
if (!options.preventSendingUpdateOnNewSubscription) {
// Get the available data.
// try to use the lastly published data.
const data = this._lastPublishedEvent.get(_subTopic);
// This Step is not clear. it should be prevented. Otherwise inconsistent
// bahavoir. Commented out for testing purposes.
// const data =
// _sourceData !== null && _sourceData !== undefined
// ? _sourceData
// : _externalSource.getContent();
if (typeof data !== "undefined" && data !== null) {
if (!observable.setContent(data.data, {
sender: _this.id,
timestamp: data.timestamp
})) {
observable.forcePublish();
}
}
}
}
if (
options.mode == "publish" ||
(Array.isArray(options.mode) && options.mode.includes("publish"))
) {
// Now we want to subscribe to external events.
newElement =
newElement || !this._externalTopicLinkedWithObservable.has(_pubTopic);
// Now we store the external source.
const _externalSource = this._subscribeToEvent(_pubTopic);
// In Here, we define a callback, which we will use
// to forward the events of this particular source
// to the network.
const cb: IObservableCallback<any> = (data, options) => {
// Only Publish data, if there exists a Subscription.
if (_this.id !== options.sender) {
const msg: IExternalEventMsg = {
data: data,
topic: _pubTopic,
sender: _this.id,
type: "event",
timestamp: options.timestamp,
};
if (options.forced) {
msg.forced = true;
}
// Test if the Message is subscribed externally or Emitting
// Updates is enabled: if so, use the socket to send the data
// to other dispatchers.
if (
_this.subscriptionForPropertyExists(_pubTopic) ||
_this._forceEmittingUpdates ||
// If the Update is forced
options.forced
) {
// Use the Communicator to emit the Event.
_this.communicator.emitEvent(_pubTopic, msg);
}
// Store the lastly published message, this will be published if
// a new subscription is provided
if (_this._eventsToSendCurrentValueOnSubscription.has(_pubTopic)) {
if (_this._logger.enabledFor((Logger as any).DEBUG)) {
_this._logger.debug(`Storing data for "${_pubTopic}"`);
}
_this._lastPublishedEvent.set(_pubTopic, msg);
if (_this._logger.enabledFor((Logger as any).DEBUG)) {
_this._logger.debug(`Forwarding data for "${_pubTopic}"`);
}
_externalSource.setContent({
data: data,
topic: _pubTopic,
// Watchout => We are using the provided sender here.
// It is used
sender: options.sender,
type: "event",
timestamp: options.timestamp,
});
} else if (_externalSource.observerLength > 0) {
// The Observable it is used multiple times internally.
// For this purpose, send the data using the "external source"
// -channel. because we are using the original id of the sender,
// we prevent endless loops. Observables are always checking
// if an update has been send by them self or not.
if (_this._logger.enabledFor((Logger as any).DEBUG)) {
_this._logger.debug(`Forwarding data for "${_pubTopic}" because there is someone listening`);
}
_externalSource.setContent({
data: data,
topic: _pubTopic,
// Watchout => We are using the provided sender here.
// It is used
sender: options.sender,
type: "event",
timestamp: options.timestamp
});
}
}
};
// Register the Internally Subscribed Observable to mark it as forwarded.
const _set01 =
this._internalObservablesForwardDataToNetwork.get(_pubTopic) ||
new Set();
_set01.add(observable);
this._internalObservablesForwardDataToNetwork.set(_pubTopic, _set01);
// If desired, we mark the Observable as forwarded on new subscription.
// We mark it by storing it in the set "_eventsToSendCurrentValueOnSubscription"
// If we detect a new subscription in this section, we will update the value.
// Additionally, we will store the current value in the "external-source" which
// is linked to the network.
if (!options.preventSendingUpdateOnNewSubscription) {
const _set02 =
this._eventsToSendCurrentValueOnSubscription.get(_pubTopic) ||
new Set();
_set02.add(observable);
this._eventsToSendCurrentValueOnSubscription.set(_pubTopic, _set02);
// Test if there exists content, if so => store it
// to the external source and to the "_lastPublishedEvent"
// Should not be required, because during calling subscribe this action will be called:
// const data = observable.getContent();
// if (data !== null && data !== undefined) {
// this._lastPublishedEvent.set(_pubTopic, {
// data: data,
// topic: _pubTopic,
// sender: _this.id,
// type: "event",
// timestamp: Date.now()
// });
// // Use the External Source to store the Last Value
// _externalSource.setContent({
// data: data,
// topic: _pubTopic,
// sender: observable.id,
// type: "event",
// timestamp: Date.now()
// });
// }
}
if (_pipe) {
const observer = observable.enhancedSubscription(cb, {
scope: _scope,
pipe: _pipe
});
// Overwrite the Original Dispose Function.
const dispose = observable.dispose;
observable.dispose = () => {
// Kill the Observer;
observer.unsubscribe();
// Unsubscribe the Event
_this._unsubscribeObservable(_subTopic);
// Unregister the Internally Subscribed Element.
const _set01 =
_this._internalObservablesForwardDataToNetwork.get(_pubTopic) ||
new Set();
_set01.delete(observable);
if (_set01.size > 0) {
_this._internalObservablesForwardDataToNetwork.set(
_pubTopic,
_set01
);
} else {
_this._internalObservablesForwardDataToNetwork.delete(_pubTopic);
// Optionally send an update.
if (!options.preventSendingToRegistery) {
// Publish the Available Services.
_this._sendAvailableObservables();
}
}
// Remove the Element.
const _set02 =
this._eventsToSendCurrentValueOnSubscription.get(_pubTopic) ||
new Set();
_set02.delete(observable);
if (_set02.size > 0) {
this._eventsToSendCurrentValueOnSubscription.set(_pubTopic, _set02);
} else {
// Topic isnt provided any more.
// delete the flag
_this._eventsToSendCurrentValueOnSubscription.delete(_pubTopic);
_this._lastPublishedEvent.delete(_pubTopic);
}
// Call the original Dispose function;
dispose.apply(observable);
};
} else {
const observer = observable.subscribe(cb);
// Overwrite the Original Dispose Function.
const dispose = observable.dispose;
observable.dispose = () => {
// Kill the Observer;
observer.unsubscribe();
// Unsubscribe the Event
_this._unsubscribeObservable(_subTopic);
// Unregister the Internally Subscribed Element.
const _set01 =
_this._internalObservablesForwardDataToNetwork.get(_pubTopic) ||
new Set();
_set01.delete(observable);
if (_set01.size > 0) {
_this._internalObservablesForwardDataToNetwork.set(
_pubTopic,
_set01
);
} else {
_this._internalObservablesForwardDataToNetwork.delete(_pubTopic);
// Optionally send an update.
if (!options.preventSendingToRegistery) {
// Publish the Available Services.
_this._sendAvailableObservables();
}
}
// Remove the Element.
const _set02 =
this._eventsToSendCurrentValueOnSubscription.get(_pubTopic) ||
new Set();
_set02.delete(observable);
if (_set02.size > 0) {
this._eventsToSendCurrentValueOnSubscription.set(_pubTopic, _set02);
} else {
// Topic isnt provided any more.
// delete the flag
_this._eventsToSendCurrentValueOnSubscription.delete(_pubTopic);
_this._lastPublishedEvent.delete(_pubTopic);
}
// Call the original Dispose function;
dispose.apply(observable);
};
}
}
if (newElement) {
this._updateListsOfInternallySubscribedAndPublishedValues();
}
if (!options.preventSendingToRegistery && newElement) {
// Publish the Available Services.
this._sendAvailableObservables();
}
// Return the Function.
return observable;
}
public unregisterObservable(
observable: INopeObservable<any> | string,
options: {
// Flag to enable / disable sending to registery
preventSendingToRegistery?: boolean;
} = {}
): boolean {
const _id =
typeof observable === "string"
? observable
: (observable.id as string) || "0";
let _observable: INopeObservable<any> = null;
for (const obs of this._registeredObservables.keys()) {
if (obs.id == _id) {
_observable = obs;
}
}
if (!options.preventSendingToRegistery) {
// Publish the Available Services.
this._sendAvailableObservables();
if (this._logger?.enabledFor((Logger as any).DEBUG)) {
// If there is a Logger:
this._logger.debug(
"Dispatcher \"" + this.id + "\" unregistered: \"" + _id + "\""
);
}
}
return this._registeredObservables.delete(_observable);
}
protected _externalTopicLinkedWithObservable: Map<
string,
{
observable: INopeObservable<IExternalEventMsg>;
cb: (...arg) => void;
}
>;
/**
* Internal Observables, which will forward data to the network.
* The "key" represents the topic
*/
protected _internalObservablesForwardDataToNetwork: Map<
string,
Set<INopeObservable<any>>
>;
/**
* Function to unsubscribe from an event of the channel.
*
* @protected
* @param {string} path
* @memberof nopeDispatcher
*/
protected _unsubscribeObservable(path: string) {
const item = this._externalTopicLinkedWithObservable.get(path);
if (item) {
this.communicator.offEvent(path, item.cb);
// Dispose the Observable
const obs = item.observable;
obs.dispose();
// Remove the Observable
this._externalTopicLinkedWithObservable.delete(path);
}
}
/**
* Helper Function to directly subscribe to a specific value.
* @param event The Event.
* @param callback The Callback used to subscribe to the event
* @param options Additional Options used to specify the Subscribing.
*/
public async subscribeToEvent<G = any, K = any>(
event: string,
callback: IObservableCallback<G>,
options: {
pipe?: {
pipe?: IPipe<IExternalEventMsg, K>;
scope?: { [index: string]: any };
};
preventSendingToRegistery?: boolean;
subscriptionOptions?: INopeSubscriptionOptions;
} = {}
): Promise<INopeObserver> {
// Create a new observable:
const observable = this._generateObservable<G>();
// register the newly created observable.
this.registerObservable(observable, {
mode: "subscribe",
topic: event,
preventSendingToRegistery: options.preventSendingToRegistery,
pipe: options.pipe,
schema: {}
});
// Create an Observer by susbcribing to the external source (this is directly linked to the System)
const observer = observable.subscribe(
callback,
options.subscriptionOptions
);
observer.unsubscribe = () => {
observable.dispose();
};
// Return the Observer.
return observer;
}
/**
* Function to manually emit an Event.
* @param _eventName
* @param data
* @param sender
* @param timestamp
* @param forced
* @param args
*/
public async emit<T>(
_eventName: string,
data: T,
sender?: string,
timestamp?: number,
forced = false,
...args
): Promise<void> {
// Only Publish data, if there exists a Subscription.
if (forced || (this.subscriptionForPropertyExists(_eventName) && this.id !== sender)) {
// Use the Communicator to emit the Event or its forced
await this.communicator.emitEvent(_eventName, {
forced,
data: data,
topic: _eventName,
sender: this.id,
type: "event",
timestamp
});
}
}
protected _removeRpcSubscription(_id: string): void {
// Try to unregister the Callback from the communcator:
if (this._communicatorCallbacks.has(_id)) {
const _callbacks = this._communicatorCallbacks.get(_id);
switch (_callbacks.type) {
case "request":
// Unregister the RPC-Request-Listener
this.communicator.offRpcRequest(
_callbacks.registeredId,
_callbacks.cb
);
break;
case "response":
// Unregister the RPC-Response-Listener
this.communicator.offRpcResponse(
_callbacks.registeredId,
_callbacks.cb
);
break;
}
// Remove the Callback
this._communicatorCallbacks.delete(_id);
}
}
/**
* Function used to update the Available Services.
*
* @protected
* @memberof nopeDispatcher
*/
protected _sendAvailableServices(): void {
// Define the Message
const message: IAvailableServicesMsg = {
dispatcher: this.id,
services: Array.from(this._definedFunctions.keys())
};
if (this._logger?.enabledFor((Logger as any).DEBUG)) {
this._logger.debug("sending available services");
}
// Send the Message.
this.communicator.emitNewServicesAvailable(message);
}
/**
* Function to emit the available topics.
*
* @protected
* @memberof nopeDispatcher
*/
protected _sendAvailableObservables(): void {
// Define the Message
const message: IAvailableTopicsMsg = {
dispatcher: this.id,
published: Array.from(
this._internalObservablesForwardDataToNetwork.keys()
),
subscribed: Array.from(this._externalTopicLinkedWithObservable.keys())
};
if (this._logger?.enabledFor((Logger as any).DEBUG)) {
this._logger.debug(
"sending available properties. (subscribing and publishing events)"
);
}
// Send the Message.
this.communicator.emitNewObservablesAvailable(message);
}
/**
* Internal Function, which will enable to
* update the lists of internal subscribed and published values.
* @param considerHidden
*/
protected _updateListsOfInternallySubscribedAndPublishedValues(
considerHidden = true
): void {
// Clear the Overview
this._internalSubscribed.clear();
this._internalPublished.clear();
for (const options of this._registeredObservables.values()) {
if (!options.preventSendingToRegistery || considerHidden) {
// Extract the Topic, pipe and scope.
const _subTopic =
typeof options.topic === "string"
? options.topic
: options.topic.subscribe || null;
const _pubTopic =
typeof options.topic === "string"
? options.topic
: options.topic.publish || null;
if (_subTopic !== null) {
this._internalSubscribed.add(_subTopic);
}
if (_pubTopic !== null) {
this._internalPublished.add(_pubTopic);
}
}
}
}
protected _emitStatus(): void {
this.communicator.emitStatusUpdate(this._genAliveMessage());
}
protected _sendAvailableGenerators(): void {
// Define the Message
const message: IAvailableInstanceGeneratorsMsg = {
dispatcher: this.id,
generators: Array.from(this._externalGenerators.keys())
};
if (this._logger?.enabledFor((Logger as any).DEBUG)) {
this._logger.debug("sending available instance generators");
}
// Send the Message.
this.communicator.emitNewInstanceGeneratorsAvailable(message);
}
/**
* Update the Available Instances
*
* @protected
* @memberof nopeDispatcher
*/
protected _sendAvailableInstances(): void {
const _this = this;
// Update the Instances provided by this module.
this.communicator.emitNewInstancesAvailable({
dispatcher: this.id,
instances: Array.from(this._internalInstances).map((identifier) =>
// Generate the Module Description
_this._instances.get(identifier).instance.toDescription()
)
});
}
/**
* Function which is used to perform a call on the remote.
*
* @author M.Karkowski
* @template T
* @param {string} serviceName serviceName The Name / ID of the Function
* @param {any[]} params
* @param {(Partial<ICallOptions> & {
* selector?: ValidDefaultSelectors | ValidSelectorFunction;
* quite?: boolean;
* })} [options={}] Options for the Call. You can assign a different selector.
* @return {*} {INopePromise<T>} The result of the call
* @memberof nopeDispatcher
*/
public performCall<T>(
serviceName: string,
params: any[],
options: Partial<ICallOptions> & {
selector?: ValidDefaultSelectors | ValidSelectorFunction;
quite?: boolean;
preventSelector?: boolean;
} = {}
): INopePromise<T> {
// Get a Call Id
const _taskId = generateId();
const _registeredIdx: Array<string> = [];
const _this = this;
const _options = Object.assign(
{
deletableCallbacks: [],
paramsHasNoCallback: false,
dynamicCallback: false,
resultSink: this._getServiceName(serviceName, "response"),
selector: this.options.defaultSelector
},
options
) as ICallOptions;
this._subscribeToResult(serviceName, _options.dynamicCallback);
const clear = () => {
// Delete all Callbacks.
_registeredIdx.map((id) => _this.unregisterFunction(id));
// Remove the task:
if (_this._runningInternalRequestedTasks.has(_taskId)) {
const task = _this._runningInternalRequestedTasks.get(_taskId);
// Remove the Timeout.
if (task.timeout) {
clearTimeout(task.timeout);
}
// Remove the Task itself
_this._runningInternalRequestedTasks.delete(_taskId);
}
if (_this._logger?.enabledFor((Logger as any).DEBUG)) {
_this._logger.debug("Clearing Callbacks from " + _taskId);
}
};
if (_this._logger?.enabledFor((Logger as any).DEBUG)) {
_this._logger.debug(
"Dispatcher \"" +
this.id +
"\" requesting externally Function \"" +
serviceName +
"\" with task: \"" +
_taskId +
"\""
);
}
// Define a Callback-Function, which will expect the Task.
const ret = new NopePromise<T>(async (resolve, reject) => {
try {
const requestedTask: any = {
resolve,
reject,
clear,
serviceName,
timeout: null
};
// Register the Handlers,
_this._runningInternalRequestedTasks.set(_taskId, requestedTask);
// Define a Task-Request
const taskRequest: IRequestTaskMsg = {
functionId: serviceName,
params: [],
callbacks: [],
taskId: _taskId,
type: "requestOfTask",
resultSink: _options.resultSink
};
// Test if there is no Callback integrated
if (!options.paramsHasNoCallback) {
// If so, the parameters has to be detailled:
// Iterate over all Parameters and
// Determin Callbacks. Based on the Parameter-
// Type assign it either to packet.params (
// for parsable Parameters) and packet.callbacks
// (for callback Parameters)
for (const [idx, contentOfParameter] of params.entries()) {
// Test if the parameter is a Function
if (typeof contentOfParameter !== "function") {
taskRequest.params.push({
idx,
data: contentOfParameter
});
} else {
// The Parameter is a Callback => store a
// Description of the Callback and register
// the callback inside of the Dispatcher
const deleteAfterCalling =
_options.deletableCallbacks.includes(idx);
const _func = _this.registerFunction(contentOfParameter, {
deleteAfterCalling,
preventSendingToRegistery: true
});
_registeredIdx.push(_func["id"]);
// Register the Callback
taskRequest.callbacks.push({
functionId: _func["id"],
idx,
deleteAfterCalling,
dynamicCallback: true,
deletableCallbacks: [],
resultSink: _this._getServiceName(_func["id"], "response")
});
}
}
} else {
for (const [idx, contentOfParameter] of params.entries()) {
taskRequest.params.push({
idx,
data: contentOfParameter
});
}
}
if (!_options.dynamicCallback && !_this.serviceExists(serviceName)) {
// Create an Error:
const error = new Error(
"No Service Provider known for \"" + serviceName + "\""
);
if (!options.quite && _this._logger) {
_this._logger.error(
"No Service Provider known for \"" + serviceName + "\""
);
_this._logger.error(error);
}
throw error;
}
if (!options.preventSelector && (_this.options.forceUsingSelectors || _this._amountOfServicesProvidedByName.get(serviceName)) > 1) {
if (typeof options?.selector === "function") {
const selector = await this._generateSelector(
serviceName,
"service",
options.selector
);
// Assign the Selector Promise
selector.selectionSucessPromise.catch((_) => {
_this.cancelTask(
_taskId,
new Error(
"No dispatcher has been selected for executing the task!"
),
false
);
});
// Assign the Selector:
taskRequest.selector = selector.selectorFuncOptions;
} else {
const selector = await this._generateAndRegisterSelectorFunction(
serviceName,
"service",
typeof options?.selector === "string" ? options.selector : this.options.defaultSelector
);
// Assign the Selector Promise
selector.selectionSucessPromise.catch((_) => {
_this.cancelTask(
_taskId,
new Error(
"No dispatcher has been selected for executing the task!"
),
false
);
});
// Assign the Selector:
taskRequest.selector = selector.selectorFuncOptions;
}
}
// Send the Message to the specific element:
await _this.communicator.emitRpcRequest(
_this._getServiceName(taskRequest.functionId, "request"),
taskRequest
);
if (_this._logger?.enabledFor((Logger as any).DEBUG)) {
_this._logger.debug(
"Dispatcher \"" +
this.id +
"\" putting task \"" +
_taskId +
"\" on: \"" +
_this._getServiceName(taskRequest.functionId, "request") +
"\""
);
}
// If there is a timeout =>
if (options.timeout > 0) {
requestedTask.timeout = setTimeout(() => {
_this.cancelTask(
_taskId,
new Error(
"TIMEOUT. The Service allowed execution time of " +
options.timeout.toString() +
"[ms] has been excided"
),
options.quite || false
);
}, options.timeout);
}
} catch (e) {
// Clear all Elements of the Function:
clear();
// Throw the error.
reject(e);
}
});
ret.taskId = _taskId;
ret.cancel = (reason) => {
_this.cancelTask(_taskId, reason);
};
return ret;
}
/**
* Function to clear all pending tasks
*
* @memberof nopeDispatcher
*/
public clearTasks(): void {
if (this._runningInternalRequestedTasks)
this._runningInternalRequestedTasks.clear();
else this._runningInternalRequestedTasks = new Map();
}
/**
* Function to unregister all Functions of the Dispatcher.
*
* @memberof nopeDispatcher
*/
public unregisterAll(): void {
if (this._definedFunctions) {
for (const id of this._definedFunctions.keys()) {
this.unregisterFunction(id);
}
this._definedFunctions.clear();
} else {
this._definedFunctions = new Map();
}
// Reset the Callbacks.
this._communicatorCallbacks = new Map();
}
/**
* Function to reset the Dispatcher.
*
* @memberof nopeDispatcher
*/
public reset(): void {
this._remotlyCalledFunctions = new Set();
this._mappingOfRemoteDispatchersAndServices = new Map();
this._mappingOfRemoteDispatchersAndPropsOrEvents = new Map();
this._mappingOfRemoteDispatchersAndGenerators = new Map();
this._mappingOfRemoteDispatchersAndInstances = new Map();
this._internalGenerators = new Map();
this._externalGenerators = new Map();
// If Instances Exists => Delete them.
if (this._instances) {
const _this = this;
// Dispose all Instances.
for (const [name, instance] of this._instances.entries()) {
// Remove the Instance.
this.deleteInstance(name, true).catch((e) => {
if (_this._logger) {
_this._logger.error("Failed Removing Instance \"" + name + "\"");
_this._logger.error(e);
}
});
}
}
this._instances = new Map();
this._externalInstances = new Map();
this._internalInstances = new Set();
this._initializingInstance = new Map();
this._externalInstancesNames = new Set();
this._externalDispatchers = new Map();
this._lastPublishedEvent = new Map();
this._eventsToSendCurrentValueOnSubscription = new Map();
if (this.communicator.connected.getContent()) {
// Update the Instances
this._sendAvailableInstances();
}
this._externalProvidedServices = new Set();
this._amountOfServicesProvidedByName = new Map();
this._externalPublished = new Set();
this._externalSubscribed = new Set();
this._internalPublished = new Set();
this._internalSubscribed = new Set();
this._externalProvidedGenerators = new Set();
this._amountOfGeneratorsProvidedByName = new Map();
this._registeredObservables = new Map();
this._internalObservablesForwardDataToNetwork = new Map();
this._externalTopicLinkedWithObservable = new Map();
this.clearTasks();
this.unregisterAll();
}
}