diff --git a/lib/communication/layers/amqpLayer.ts b/lib/communication/layers/amqpLayer.ts index 0c79388..bafd0cd 100644 --- a/lib/communication/layers/amqpLayer.ts +++ b/lib/communication/layers/amqpLayer.ts @@ -2,7 +2,7 @@ * @author Martin Karkowski * @email m.karkowski@zema.de * @create date 2020-11-06 08:52:30 - * @modify date 2021-02-05 09:52:01 + * @modify date 2021-05-16 10:01:20 * @desc [description] */ @@ -74,7 +74,17 @@ interface IConnection { export class AmqpInterface { connected: INopeObservable; - async off(event: string, cb: (data: any, msg: amqp.Message) => void) { + /** + * Unsubscribe from an Event + * + * @param {string} event + * @param {(data: any, msg: amqp.Message) => void} cb + * @memberof AmqpInterface + */ + async off( + event: string, + cb: (data: any, msg: amqp.Message) => void + ): Promise { // Store the Function const _set = this._userDefinedCallbacks.get(event) || new Set(); // Delete the callback. @@ -101,8 +111,22 @@ export class AmqpInterface { } } + /** + * Contains the AMQP-Connection + * + * @protected + * @type {amqp.Connection} + * @memberof AmqpInterface + */ protected _socket: amqp.Connection; + /** + * Additional Connections. + * + * @protected + * @type {Map} + * @memberof AmqpInterface + */ protected _connections: Map; /** @@ -116,6 +140,11 @@ export class AmqpInterface { Set<(data: any, msg: amqp.Message) => void> >; + /** + * Creates an instance of AmqpInterface. + * @param {string} uri + * @memberof AmqpInterface + */ constructor(public uri: string) { // Adapt the URI if required: this.uri = uri.startsWith("amqp://") ? uri : "amqp://" + uri; @@ -134,10 +163,25 @@ export class AmqpInterface { this._userDefinedCallbacks = new Map(); } + /** + * Internal Function, to create a Conncetion + * + * @protected + * @param {string} event + * @param {QueuePublishOptions} [options={}] + * @return {*} {Promise<{ + * connection: IConnection; + * created: boolean; + * }>} + * @memberof AmqpInterface + */ protected async _createConnection( event: string, options: QueuePublishOptions = {} - ) { + ): Promise<{ + connection: IConnection; + created: boolean; + }> { if (this._connections.has(event)) { return { connection: this._connections.get(event), @@ -197,7 +241,9 @@ export class AmqpInterface { async delete(_event: string = event, _opts = options.deleteOptions) { try { await _channel.deleteQueue(_event, _opts); - } catch (e) {} + } catch (e) { + _this._logger.error("AMQP Failed to delete the Queue"); + } } }; @@ -395,11 +441,17 @@ export class AmqpInterface { */ export class AmqpLayer extends AmqpInterface - implements ICommunicationInterface { - allowServiceRedundancy: boolean; - id: string; + implements ICommunicationInterface +{ + protected _logger = getNopeLogger("AMQP-Event-Layer", "info"); + protected _tasks = new Map void>(); + + public allowServiceRedundancy: boolean; + + public id: string; + + public considerConnection = true; - considerConnection = true; async emitStatusUpdate(status: IDispatcherInfo): Promise { await this.emit("publish", "statusUdpate", status); } @@ -468,8 +520,6 @@ export class AmqpLayer await this.emit("publish", "bonjour", msg); } - protected _tasks = new Map void>(); - async emitRpcRequest(name: string, request: IRequestTaskMsg): Promise { await this.emit("queue", name, request, this.generateQueueOptions(name)); } @@ -498,6 +548,17 @@ export class AmqpLayer await this.off(name, cb); } + /** + * Using AMQP, we use a custom Handler. We will listen on the + * queues, but only get new information, if the current task is + * completed. This results in distributing the tasks among the + * network. + * + * @param {string} name + * @param {(req: IRequestTaskMsg) => void} cb + * @return {*} {Promise} + * @memberof AmqpLayer + */ async onRpcRequest( name: string, cb: (req: IRequestTaskMsg) => void @@ -549,8 +610,6 @@ export class AmqpLayer await this.on("subscribe", "newServicesAvailable", cb); } - protected _logger = getNopeLogger("AMQP-Event-Layer", "info"); - public generateQueueOptions: (name: string) => QueuePublishOptions = ( name ) => {