Adding comments

This commit is contained in:
Martin Karkowski 2021-05-16 10:01:58 +02:00
parent 7813d6feae
commit 21c19beb86

View File

@ -2,7 +2,7 @@
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2020-11-06 08:52:30
* @modify date 2021-02-05 09:52:01
* @modify date 2021-05-16 10:01:20
* @desc [description]
*/
@ -74,7 +74,17 @@ interface IConnection {
export class AmqpInterface {
connected: INopeObservable<boolean>;
async off(event: string, cb: (data: any, msg: amqp.Message) => void) {
/**
* Unsubscribe from an Event
*
* @param {string} event
* @param {(data: any, msg: amqp.Message) => void} cb
* @memberof AmqpInterface
*/
async off(
event: string,
cb: (data: any, msg: amqp.Message) => void
): Promise<void> {
// Store the Function
const _set = this._userDefinedCallbacks.get(event) || new Set();
// Delete the callback.
@ -101,8 +111,22 @@ export class AmqpInterface {
}
}
/**
* Contains the AMQP-Connection
*
* @protected
* @type {amqp.Connection}
* @memberof AmqpInterface
*/
protected _socket: amqp.Connection;
/**
* Additional Connections.
*
* @protected
* @type {Map<string, IConnection>}
* @memberof AmqpInterface
*/
protected _connections: Map<string, IConnection>;
/**
@ -116,6 +140,11 @@ export class AmqpInterface {
Set<(data: any, msg: amqp.Message) => void>
>;
/**
* Creates an instance of AmqpInterface.
* @param {string} uri
* @memberof AmqpInterface
*/
constructor(public uri: string) {
// Adapt the URI if required:
this.uri = uri.startsWith("amqp://") ? uri : "amqp://" + uri;
@ -134,10 +163,25 @@ export class AmqpInterface {
this._userDefinedCallbacks = new Map();
}
/**
* Internal Function, to create a Conncetion
*
* @protected
* @param {string} event
* @param {QueuePublishOptions} [options={}]
* @return {*} {Promise<{
* connection: IConnection;
* created: boolean;
* }>}
* @memberof AmqpInterface
*/
protected async _createConnection(
event: string,
options: QueuePublishOptions = {}
) {
): Promise<{
connection: IConnection;
created: boolean;
}> {
if (this._connections.has(event)) {
return {
connection: this._connections.get(event),
@ -197,7 +241,9 @@ export class AmqpInterface {
async delete(_event: string = event, _opts = options.deleteOptions) {
try {
await _channel.deleteQueue(_event, _opts);
} catch (e) {}
} catch (e) {
_this._logger.error("AMQP Failed to delete the Queue");
}
}
};
@ -395,11 +441,17 @@ export class AmqpInterface {
*/
export class AmqpLayer
extends AmqpInterface
implements ICommunicationInterface {
allowServiceRedundancy: boolean;
id: string;
implements ICommunicationInterface
{
protected _logger = getNopeLogger("AMQP-Event-Layer", "info");
protected _tasks = new Map<string, () => void>();
public allowServiceRedundancy: boolean;
public id: string;
public considerConnection = true;
considerConnection = true;
async emitStatusUpdate(status: IDispatcherInfo): Promise<void> {
await this.emit("publish", "statusUdpate", status);
}
@ -468,8 +520,6 @@ export class AmqpLayer
await this.emit("publish", "bonjour", msg);
}
protected _tasks = new Map<string, () => void>();
async emitRpcRequest(name: string, request: IRequestTaskMsg): Promise<void> {
await this.emit("queue", name, request, this.generateQueueOptions(name));
}
@ -498,6 +548,17 @@ export class AmqpLayer
await this.off(name, cb);
}
/**
* Using AMQP, we use a custom Handler. We will listen on the
* queues, but only get new information, if the current task is
* completed. This results in distributing the tasks among the
* network.
*
* @param {string} name
* @param {(req: IRequestTaskMsg) => void} cb
* @return {*} {Promise<void>}
* @memberof AmqpLayer
*/
async onRpcRequest(
name: string,
cb: (req: IRequestTaskMsg) => void
@ -549,8 +610,6 @@ export class AmqpLayer
await this.on("subscribe", "newServicesAvailable", cb);
}
protected _logger = getNopeLogger("AMQP-Event-Layer", "info");
public generateQueueOptions: (name: string) => QueuePublishOptions = (
name
) => {