nope/lib/communication/bridge.ts

271 lines
7.3 KiB
TypeScript
Raw Normal View History

/**
* @author Martin Karkowski
* @email m.karkowski@zema.de
*/
2021-03-19 18:17:39 +00:00
2021-03-19 13:01:51 +00:00
import { EventEmitter } from "events";
import { ILogger } from "js-logger";
2021-03-19 13:01:51 +00:00
import { generateId } from "../helpers/idMethods";
import {
DEBUG,
defineNopeLogger,
ValidLoggerDefinition,
WARN,
} from "../logger/index.browser";
import { NopeObservable } from "../observables/nopeObservable";
2021-03-19 13:01:51 +00:00
import {
Eventname,
EventnameToEventType,
2021-03-19 13:01:51 +00:00
ICommunicationBridge,
2021-03-22 18:02:24 +00:00
ICommunicationInterface,
2022-01-10 06:52:05 +00:00
INopeObservable,
} from "../types/nope";
2021-03-19 18:17:39 +00:00
export class Bridge implements ICommunicationBridge {
public connected: INopeObservable<boolean>;
public considerConnection = true;
2021-10-29 20:10:16 +00:00
public allowsServiceRedundancy = false;
2021-03-12 07:34:22 +00:00
public ownDispatcherId: string;
2021-03-19 13:01:51 +00:00
public id: string;
2021-08-04 12:56:37 +00:00
protected _useInternalEmitter: boolean;
2021-03-19 13:01:51 +00:00
protected _logger: ILogger;
protected _internalEmitter: EventEmitter;
protected _layers: Map<
2021-03-12 07:40:31 +00:00
string,
{
2021-03-19 13:01:51 +00:00
layer: ICommunicationInterface;
considerConnection: boolean;
forwardData: boolean;
2021-03-12 07:40:31 +00:00
}
>;
2021-03-12 07:34:22 +00:00
protected _callbacks: Map<Eventname, Array<(...args) => any>>;
2021-03-19 18:17:39 +00:00
/**
* Creates an instance of Bridge.
* @param {*} [id=generateId()] The ID. (this can be adapted later and is only used to simplify debugging)
* @param {string} [loggerName="bridge"] The Name of the Logger.
* @param {LoggerLevel} [level="info"] The Level of the Logger.
* @memberof Bridge
*/
2022-01-03 18:13:51 +00:00
constructor(id = generateId(), logger: ValidLoggerDefinition = false) {
2021-03-19 13:01:51 +00:00
this._internalEmitter = new EventEmitter();
this._callbacks = new Map();
this._layers = new Map();
2022-01-03 18:13:51 +00:00
2021-03-19 13:01:51 +00:00
this.id = id;
2022-01-03 18:13:51 +00:00
this._logger = defineNopeLogger(logger, `nope.bridge`);
2021-08-04 12:56:37 +00:00
this._useInternalEmitter = true;
2021-03-19 13:01:51 +00:00
const _this = this;
2021-03-12 07:34:22 +00:00
2021-03-19 13:01:51 +00:00
this.connected = new NopeObservable();
this.connected.setContent(false);
2021-03-12 07:34:22 +00:00
2021-03-19 13:01:51 +00:00
// Add a custom handler for the connect flag.
// the Flag is defined as true, if every socket
// is connected.
this.connected.getter = () => {
for (const data of _this._layers.values()) {
2021-04-12 05:09:47 +00:00
if (data.considerConnection && !data.layer.connected.getContent()) {
2021-03-19 13:01:51 +00:00
return false;
2021-04-12 05:09:47 +00:00
}
2021-03-12 07:34:22 +00:00
}
2021-03-19 13:01:51 +00:00
return true;
};
2021-03-12 07:34:22 +00:00
}
2021-09-03 05:42:37 +00:00
async on<T extends keyof EventnameToEventType>(
eventname: T,
cb: (data: EventnameToEventType[T]) => void
): Promise<void> {
return this._on(eventname, cb);
}
async emit<T extends keyof EventnameToEventType>(
eventname: T,
data: EventnameToEventType[T]
): Promise<void> {
return this._emit(eventname, null, data);
}
detailListeners(
type: "event" | "rpc" | "data" | "response",
listeners: string[]
) {}
2021-11-12 07:57:03 +00:00
get receivesOwnMessages(): boolean {
for (const layer of this._layers.values()) {
if (!layer.layer.receivesOwnMessages) {
return false;
}
}
return true;
}
2021-09-03 05:42:37 +00:00
2021-10-19 08:01:00 +00:00
async dispose(): Promise<void> {
// Iterate over the Layers and dispose them.
for (const item of this._layers.values()) {
await item.layer.dispose();
}
2021-09-03 05:42:37 +00:00
}
2021-03-12 07:34:22 +00:00
2021-08-04 12:56:37 +00:00
protected _checkInternalEmitter(): void {
this._useInternalEmitter = true;
for (const layer of this._layers.values()) {
if (layer.layer.receivesOwnMessages) {
this._useInternalEmitter = false;
break;
}
}
}
2021-03-12 07:34:22 +00:00
/**
2021-03-19 13:01:51 +00:00
* Helper Function, which will internally subscribe to the Events of the Layer.
2021-03-12 07:34:22 +00:00
*
* @protected
2021-03-19 13:01:51 +00:00
* @param {ICommunicationInterface} layer The Layer to consinder, on this layer, we will subscribe to the events
* @param {keyof ICommunicationInterface} method The method used for subscription
2021-03-19 18:17:39 +00:00
* @param {string} event The name of the Event
2021-03-19 13:01:51 +00:00
* @param {boolean} forwardData Flag, showing whether data will be forwarded or not.
* @memberof BridgeV2
2021-03-12 07:34:22 +00:00
*/
2021-03-19 13:01:51 +00:00
protected _subscribeToCallback(
2021-03-12 07:34:22 +00:00
layer: ICommunicationInterface,
event: Eventname,
2021-03-19 13:01:51 +00:00
forwardData: boolean
): void {
2021-03-12 07:34:22 +00:00
const _this = this;
// Subscribe to the Event.
layer
.on(event, (data) => {
2021-03-19 13:01:51 +00:00
// Now we are able to iterate over the Methods and forward the content
// but only if the Layer forwards the content
if (forwardData) {
_this._emit(event, layer, data);
2021-08-17 15:52:46 +00:00
} else {
_this._internalEmitter.emit(event, data);
2021-03-19 13:01:51 +00:00
}
})
.catch((error) => {
2022-01-03 18:13:51 +00:00
if (_this._logger) {
_this._logger.error(`failed subscribing to event "${event}"`);
2022-01-03 18:13:51 +00:00
_this._logger.error(error);
}
2021-03-19 13:01:51 +00:00
});
}
2021-03-12 07:34:22 +00:00
protected _on(event: Eventname, cb): void {
2021-03-19 13:01:51 +00:00
// Subscribe on the Event
this._internalEmitter.setMaxListeners(
this._internalEmitter.getMaxListeners() + 1
);
2021-03-19 18:17:39 +00:00
if (this._logger?.enabledFor(DEBUG) && event !== "StatusChanged") {
2021-03-19 18:17:39 +00:00
this._logger.debug("subscribe to", event);
// If logging is enable, we subscribe to that.
const _this = this;
this._internalEmitter.on(event, (data) => {
_this._logger.debug("received", event, data);
});
}
2021-03-19 13:01:51 +00:00
this._internalEmitter.on(event, cb);
2021-03-12 07:34:22 +00:00
2021-03-19 18:17:39 +00:00
// Store the Unspecific callbacks
if (!this._callbacks.has(event)) {
this._callbacks.set(event, [cb]);
2021-03-19 13:01:51 +00:00
} else {
this._callbacks.get(event).push(cb);
2021-03-19 18:17:39 +00:00
}
// Iterate over the Layers and on the connected Layers,
// subscribe the methods.
for (const data of this._layers.values()) {
if (data.layer.connected.getContent()) {
this._subscribeToCallback(data.layer, event, data.forwardData);
2021-03-19 18:17:39 +00:00
}
}
}
2021-03-19 13:01:51 +00:00
protected _emit(
event: Eventname,
toExclude: ICommunicationInterface = null,
2021-08-17 15:52:46 +00:00
dataToSend: any,
force = false
2021-03-19 18:17:39 +00:00
): void {
if (this._logger?.enabledFor(WARN) && event !== "StatusChanged") {
this._logger.debug("emitting", event, dataToSend);
2021-03-19 18:17:39 +00:00
}
2021-08-17 15:52:46 +00:00
if (this._useInternalEmitter || force) {
2021-08-04 12:56:37 +00:00
// Emit the Event on the internal Layer.
this._internalEmitter.emit(event, dataToSend);
2021-08-04 12:56:37 +00:00
}
2021-03-19 18:17:39 +00:00
const _this = this;
// Iterate over the Layers.
for (const data of this._layers.values()) {
// If the Layer has been conneced
2021-03-22 18:02:24 +00:00
if (data.layer !== toExclude && data.layer.connected.getContent()) {
// Only Publish the Data, on which we are forwarding
data.layer.emit(event, dataToSend).catch((error) => {
2022-01-03 18:13:51 +00:00
if (_this._logger) {
_this._logger.error(`failed to emit the event "${event}"`);
2022-01-03 18:13:51 +00:00
_this._logger.error(error);
}
2021-03-19 18:17:39 +00:00
});
}
}
}
public async addCommunicationLayer(
2021-01-12 15:40:54 +00:00
layer: ICommunicationInterface,
2021-03-19 13:01:51 +00:00
forwardData = false,
considerConnection = false
): Promise<void> {
if (!this._layers.has(layer.id)) {
// Store the Layers:
this._layers.set(layer.id, {
layer,
considerConnection,
2021-12-04 07:25:26 +00:00
forwardData,
2021-03-19 13:01:51 +00:00
});
2020-12-05 01:28:33 +00:00
2021-03-19 13:01:51 +00:00
// Forward the Events of the Layer
// being connected to our aggregated
// state
const _this = this;
layer.connected.subscribe(() => {
_this.connected.forcePublish();
});
2021-03-01 13:32:43 +00:00
2021-03-19 13:01:51 +00:00
// Wait until the Layer is connected.
await layer.connected.waitFor((value) => value);
2021-03-12 07:34:22 +00:00
2021-03-19 18:17:39 +00:00
// Register all know unspecific methods
for (const [event, cbs] of this._callbacks.entries()) {
2021-03-19 18:17:39 +00:00
for (const callback of cbs) {
layer.on(event, callback);
2020-12-05 01:28:33 +00:00
}
}
2021-03-19 18:17:39 +00:00
2021-08-04 12:56:37 +00:00
this._checkInternalEmitter();
2021-03-19 18:17:39 +00:00
}
}
public async removeCommunicationLayer(
layer: ICommunicationInterface
): Promise<void> {
2021-03-19 18:17:39 +00:00
if (this._layers.has(layer.id)) {
this._layers.delete(layer.id);
2021-08-04 12:56:37 +00:00
this._checkInternalEmitter();
}
}
}