This commit is contained in:
Martin Karkowski 2021-05-21 09:34:08 +02:00
commit 2dd1ed83a8

View File

@ -2,7 +2,7 @@
* @author Martin Karkowski * @author Martin Karkowski
* @email m.karkowski@zema.de * @email m.karkowski@zema.de
* @create date 2020-11-06 08:52:30 * @create date 2020-11-06 08:52:30
* @modify date 2021-02-05 09:52:01 * @modify date 2021-05-16 10:01:20
* @desc [description] * @desc [description]
*/ */
@ -74,7 +74,17 @@ interface IConnection {
export class AmqpInterface { export class AmqpInterface {
connected: INopeObservable<boolean>; connected: INopeObservable<boolean>;
async off(event: string, cb: (data: any, msg: amqp.Message) => void) { /**
* Unsubscribe from an Event
*
* @param {string} event
* @param {(data: any, msg: amqp.Message) => void} cb
* @memberof AmqpInterface
*/
async off(
event: string,
cb: (data: any, msg: amqp.Message) => void
): Promise<void> {
// Store the Function // Store the Function
const _set = this._userDefinedCallbacks.get(event) || new Set(); const _set = this._userDefinedCallbacks.get(event) || new Set();
// Delete the callback. // Delete the callback.
@ -101,8 +111,22 @@ export class AmqpInterface {
} }
} }
/**
* Contains the AMQP-Connection
*
* @protected
* @type {amqp.Connection}
* @memberof AmqpInterface
*/
protected _socket: amqp.Connection; protected _socket: amqp.Connection;
/**
* Additional Connections.
*
* @protected
* @type {Map<string, IConnection>}
* @memberof AmqpInterface
*/
protected _connections: Map<string, IConnection>; protected _connections: Map<string, IConnection>;
/** /**
@ -116,6 +140,11 @@ export class AmqpInterface {
Set<(data: any, msg: amqp.Message) => void> Set<(data: any, msg: amqp.Message) => void>
>; >;
/**
* Creates an instance of AmqpInterface.
* @param {string} uri
* @memberof AmqpInterface
*/
constructor(public uri: string) { constructor(public uri: string) {
// Adapt the URI if required: // Adapt the URI if required:
this.uri = uri.startsWith("amqp://") ? uri : "amqp://" + uri; this.uri = uri.startsWith("amqp://") ? uri : "amqp://" + uri;
@ -134,10 +163,25 @@ export class AmqpInterface {
this._userDefinedCallbacks = new Map(); this._userDefinedCallbacks = new Map();
} }
/**
* Internal Function, to create a Conncetion
*
* @protected
* @param {string} event
* @param {QueuePublishOptions} [options={}]
* @return {*} {Promise<{
* connection: IConnection;
* created: boolean;
* }>}
* @memberof AmqpInterface
*/
protected async _createConnection( protected async _createConnection(
event: string, event: string,
options: QueuePublishOptions = {} options: QueuePublishOptions = {}
) { ): Promise<{
connection: IConnection;
created: boolean;
}> {
if (this._connections.has(event)) { if (this._connections.has(event)) {
return { return {
connection: this._connections.get(event), connection: this._connections.get(event),
@ -197,7 +241,9 @@ export class AmqpInterface {
async delete(_event: string = event, _opts = options.deleteOptions) { async delete(_event: string = event, _opts = options.deleteOptions) {
try { try {
await _channel.deleteQueue(_event, _opts); await _channel.deleteQueue(_event, _opts);
} catch (e) {} } catch (e) {
_this._logger.error("AMQP Failed to delete the Queue");
}
} }
}; };
@ -395,11 +441,17 @@ export class AmqpInterface {
*/ */
export class AmqpLayer export class AmqpLayer
extends AmqpInterface extends AmqpInterface
implements ICommunicationInterface { implements ICommunicationInterface
allowServiceRedundancy: boolean; {
id: string; protected _logger = getNopeLogger("AMQP-Event-Layer", "info");
protected _tasks = new Map<string, () => void>();
public allowServiceRedundancy: boolean;
public id: string;
public considerConnection = true;
considerConnection = true;
async emitStatusUpdate(status: IDispatcherInfo): Promise<void> { async emitStatusUpdate(status: IDispatcherInfo): Promise<void> {
await this.emit("publish", "statusUdpate", status); await this.emit("publish", "statusUdpate", status);
} }
@ -468,8 +520,6 @@ export class AmqpLayer
await this.emit("publish", "bonjour", msg); await this.emit("publish", "bonjour", msg);
} }
protected _tasks = new Map<string, () => void>();
async emitRpcRequest(name: string, request: IRequestTaskMsg): Promise<void> { async emitRpcRequest(name: string, request: IRequestTaskMsg): Promise<void> {
await this.emit("queue", name, request, this.generateQueueOptions(name)); await this.emit("queue", name, request, this.generateQueueOptions(name));
} }
@ -498,6 +548,17 @@ export class AmqpLayer
await this.off(name, cb); await this.off(name, cb);
} }
/**
* Using AMQP, we use a custom Handler. We will listen on the
* queues, but only get new information, if the current task is
* completed. This results in distributing the tasks among the
* network.
*
* @param {string} name
* @param {(req: IRequestTaskMsg) => void} cb
* @return {*} {Promise<void>}
* @memberof AmqpLayer
*/
async onRpcRequest( async onRpcRequest(
name: string, name: string,
cb: (req: IRequestTaskMsg) => void cb: (req: IRequestTaskMsg) => void
@ -549,8 +610,6 @@ export class AmqpLayer
await this.on("subscribe", "newServicesAvailable", cb); await this.on("subscribe", "newServicesAvailable", cb);
} }
protected _logger = getNopeLogger("AMQP-Event-Layer", "info");
public generateQueueOptions: (name: string) => QueuePublishOptions = ( public generateQueueOptions: (name: string) => QueuePublishOptions = (
name name
) => { ) => {