/** * @author Martin Karkowski * @email m.karkowski@zema.de * @create date 2020-11-06 08:52:30 * @modify date 2020-12-03 16:13:58 * @desc [description] */ import * as amqp from "amqplib"; import { getNopeLogger } from "../logger/getLogger"; import { NopeObservable } from "../observables/nopeObservable"; import { IAvailableInstanceGeneratorsMsg, IAvailableInstancesMsg, IAvailableServicesMsg, IAvailableTopicsMsg, ICommunicationInterface, IExternalEventMsg, IRequestTaskMsg, IResponseTaskMsg, ITaskCancelationMsg } from "../types/nope/nopeCommunication.interface"; import { INopeObservable } from "../types/nope/nopeObservable.interface"; export type QueuePublishOptions = { subscribeOptions?: amqp.Options.AssertQueue; consumeOptions?: amqp.Options.Consume; deleteOptions?: amqp.Options.DeleteQueue; prefetch?: number; }; export type QueueSubscribeOptions = { subscribeOptions?: amqp.Options.AssertQueue; consumeOptions?: amqp.Options.Consume; deleteOptions?: amqp.Options.DeleteQueue; prefetch?: number; }; export type SubscriptionOptions = { subscribeOptions?: amqp.Options.AssertExchange; exchangeType?: "direct" | "topic" | "fanout" | "headers"; consumeOptions?: amqp.Options.Consume; deleteOptions?: amqp.Options.DeleteQueue; prefetch?: number; }; interface IConnection { channel: amqp.Channel; connectionLost: () => void; delete: ( event?: string, deleteOptions?: amqp.Options.DeleteQueue ) => Promise; subscribe: ( event?: string, consumeOptions?: amqp.Options.Consume, prefetch?: number ) => Promise; unsubscribe: () => Promise; isSubscribed(): boolean; send: (...arg) => boolean; } /** * A Communication Layer for the Dispatchers. * In this Layer AMQP (Rabbit-MQ) is used. * Functions are matched to Queues. * * @export * @class AmqpLayer * @implements {ICommunicationInterface} */ export class AmqpInterface { connected: INopeObservable; async off(event: string, cb: (data: any, msg: amqp.Message) => void) { // Store the Function const _set = this._userDefinedCallbacks.get(event) || new Set(); // Delete the callback. _set.delete(cb); // Based on the Numbers of Elements which are included in here // the Queue is subscribed. if (_set.size > 0) { this._userDefinedCallbacks.set(event, _set); } else { const _connection = this._connections.get(event); if (_connection?.unsubscribe) { const _this = this; await _connection.unsubscribe().catch((e) => { _this._logger.error("failed unsubscribing"); _this._logger.error(e); }); await _connection.delete().catch((e) => { _this._logger.error("failed deleting"); _this._logger.error(e); }); } } } protected _socket: amqp.Connection; protected _connections: Map; /** * Map holding a set of function for user defined events. * * @protected * @memberof AmqpInterface */ protected _userDefinedCallbacks: Map< string, Set<(data: any, msg: amqp.Message) => void> >; constructor(public uri: string) { // Adapt the URI if required: this.uri = uri.startsWith("amqp://") ? uri : "amqp://" + uri; this.connected = new NopeObservable(); this.connected.setContent(false); const _this = this; amqp.connect(this.uri).then((connection) => { _this._socket = connection; _this._logger.info("Connection established with " + _this.uri); _this.connected.setContent(true); }); this._connections = new Map(); this._userDefinedCallbacks = new Map(); } protected async _createConnection( event: string, options: QueuePublishOptions = {} ) { if (this._connections.has(event)) { return { connection: this._connections.get(event), created: false }; } const _this = this; const _channel = await this._socket.createChannel(); const connectionLost = () => { _this.connected.setContent(false); }; _channel.on("close", connectionLost); let _sub: amqp.Replies.Consume = null; // Define a Connection Object. const _connection: IConnection = { channel: _channel, connectionLost, send(data) { return _channel.sendToQueue(event, Buffer.from(JSON.stringify(data)), { contentEncoding: "application/json" }); }, isSubscribed() { return _sub !== null; }, async subscribe(_event: string = event, _opts = options.consumeOptions) { if (_sub === null) { _sub = await _channel.consume( _event, (msg) => { // Transform the Data: const data = JSON.parse(msg.content.toString("utf-8")); for (const _func of _this._userDefinedCallbacks.get(event) || []) { _func(data, msg); } }, options.consumeOptions ); } }, async unsubscribe() { if (_sub !== null) { // Delte the Subscription await _channel.cancel(_sub.consumerTag); _sub = null; } }, async delete(_event: string = event, _opts = options.deleteOptions) { try { await _channel.deleteQueue(_event, _opts); } catch (e) {} } }; this._connections.set(event, _connection); return { connection: _connection, created: true }; } public async createQueue( event: string, options: QueuePublishOptions = {} ): Promise { // Get the Connection const _res = await this._createConnection(event, options); if (_res.created) { // Extract the Connection const _connection: IConnection = _res.connection; // Create a Queue const _queue = await _connection.channel.assertQueue( event, options.subscribeOptions ); // Enable Prefetch if (options.prefetch > 0) { await _connection.channel.prefetch(options.prefetch); } else if (typeof options.prefetch === "undefined") { await _connection.channel.prefetch(1); } } return _res.connection; } /** * Function to create an Subscription in the RabbitMQ-Broker * * @param {string} event * @param {SubscriptionOptions} [options={}] * @memberof AmqpInterface */ public async createSubscription( event: string, options: SubscriptionOptions = {} ): Promise { options.consumeOptions = Object.assign( { noAck: false }, options.consumeOptions || {} ); // Get the Connection const _res = await this._createConnection(event, options); if (_res.created) { // Extract the Connection const _connection: IConnection = _res.connection; // Create a Queue // It will contain the subscribed Messages. const _queue = await _connection.channel.assertQueue("", { exclusive: true }); // Create an Exchange. const _exchange = await _connection.channel.assertExchange( "event", options.exchangeType || "topic", options.subscribeOptions ); // Bind the Queue to the Exchange. _connection.channel.bindQueue(_queue.queue, _exchange.exchange, event); // Enable Prefetch if (options.prefetch > 0) { await _connection.channel.prefetch(options.prefetch); } // Update the Send Message. _connection.send = (data) => { return _connection.channel.publish( "event", event, Buffer.from(JSON.stringify(data)), { contentEncoding: "application/json" } ); }; // Store a reference to the original Method const _delete = _connection.delete; // Delte additionally the exchange and dynamic queue _connection.delete = async () => { // Delte the Queue await _delete(_queue.queue); // Delte the Exchange. await _connection.channel.deleteExchange(_exchange.exchange); }; const _subscribe = _connection.subscribe; _connection.subscribe = async () => { await _subscribe(_queue.queue); }; } return _res.connection; } protected _logger = getNopeLogger("AMQP-Interface", "info"); async on( mode: "queue" | "subscribe", event: string, cb: (data: any, msg: amqp.Message) => void, options: QueuePublishOptions | SubscriptionOptions = {} ): Promise { // Store the Function const _set = this._userDefinedCallbacks.get(event) || new Set(); // Add the callback. _set.add(cb); // Store the Set. this._userDefinedCallbacks.set(event, _set); // Based on the Numbers of Elements which are included in here // the Queue is subscribed. if (_set.size === 1) { let _connection: IConnection; switch (mode) { case "queue": _connection = await this.createQueue( event, options as QueueSubscribeOptions ); await _connection.subscribe(); break; case "subscribe": _connection = await this.createSubscription( event, options as SubscriptionOptions ); await _connection.subscribe(); break; } } } public async emit( mode: "queue" | "publish", event: string, data: any, options: QueuePublishOptions | SubscriptionOptions = {} ): Promise { const _connection: IConnection = this._connections.get(event); if (_connection) { _connection.send(data); } else { // No Queue or Subscription is available => // Create the corresponding queue / subscription switch (mode) { case "queue": await this.createQueue(event, options as QueueSubscribeOptions); break; case "publish": await this.createSubscription(event, options as SubscriptionOptions); break; } // Recall the Method. await this.emit(mode, event, data, options); } } public close(): void { this._socket.close(); } } /** * A Communication Layer for the Dispatchers. * In this Layer AMQP (Rabbit-MQ) is used. * Functions are matched to Queues. * * @export * @class AmqpLayer * @implements {ICommunicationInterface} */ export class AmqpLayer extends AmqpInterface implements ICommunicationInterface { async onAurevoir(cb: (dispatcher: string) => void): Promise { await this.on("subscribe", "aurevoir", cb); } async emitAurevoir(dispatcher: string): Promise { await this.emit("publish", "aurevoir", dispatcher); } async onTaskCancelation( cb: (msg: ITaskCancelationMsg) => void ): Promise { await this.on("subscribe", "taskCancelation", cb); } async emitTaskCancelation(msg: ITaskCancelationMsg): Promise { await this.emit("publish", "taskCancelation", msg); } async emitNewObersvablesAvailable( topics: IAvailableTopicsMsg ): Promise { await this.emit("publish", "newTopicsAvailable", topics); } async onNewObservablesAvailable( cb: (topics: IAvailableTopicsMsg) => void ): Promise { await this.on("subscribe", "newTopicsAvailable", cb); } async onEvent( event: string, cb: (data: IExternalEventMsg) => void ): Promise { await this.on("subscribe", "event." + event, cb); } async offEvent( event: string, cb: (data: IExternalEventMsg) => void ): Promise { await this.off("event." + event, cb); } async emitEvent(event: string, data: IExternalEventMsg): Promise { await this.emit("publish", "event." + event, data); } async emitNewInstanceGeneratorsAvailable( generators: IAvailableInstanceGeneratorsMsg ): Promise { await this.emit("publish", "newInstanceGeneratorsAvailable", generators); } async onNewInstanceGeneratorsAvailable( cb: (generators: IAvailableInstanceGeneratorsMsg) => void ): Promise { await this.on("subscribe", "newInstanceGeneratorsAvailable", cb); } async onBonjour(cb: (dispatcher: string) => void): Promise { await this.on("subscribe", "bonjour", cb); } async emitBonjour(dispatcher: string): Promise { await this.emit("publish", "bonjour", dispatcher); } subscriptionMode?: "individual" | "generic" = "individual"; resultSharing?: "individual" | "generic" = "individual"; protected _tasks = new Map void>(); async emitRpcRequest(name: string, request: IRequestTaskMsg): Promise { await this.emit("queue", name, request, this.generateQueueOptions(name)); } async emitRpcResult(name: string, result: IResponseTaskMsg): Promise { // Ackknowlegde, that the Task was sucessfull if (this._tasks.has(result.taskId)) { // Send the Acknoledgement this._tasks.get(result.taskId)(); this._tasks.delete(result.taskId); } await this.emit("publish", name, result); } async onRpcResponse( name: string, cb: (result: IResponseTaskMsg) => void ): Promise { await this.on("subscribe", name, cb); } async offRpcResponse( name: string, cb: (result: IResponseTaskMsg) => void ): Promise { await this.off(name, cb); } async onRpcRequest( name: string, cb: (req: IRequestTaskMsg) => void ): Promise { const _this = this; await this.on( "queue", name, (req: IRequestTaskMsg, msg) => { _this._logger.debug("Getting new Request " + req.taskId); // Define a callback for finished Tasks. _this._tasks.set(req.taskId, () => { // Select the corresponding channel const _channel = _this._connections.get(name).channel; // Test if the Channel exists if (_channel) { _this._logger.debug( "Sending Acknowledgement of Request " + req.taskId ); // Ackknowlegde the Message. _channel.ack(msg); } }); // Perform the Original Callback. cb(req); }, this.generateQueueOptions(name) ); } async offRpcRequest( name: string, cb: (data: IRequestTaskMsg) => void ): Promise { await this.off(name, cb); } async emitNewServicesAvailable( services: IAvailableServicesMsg ): Promise { await this.emit("publish", "newServicesAvailable", services); } async onNewServicesAvailable( cb: (services: IAvailableServicesMsg) => void ): Promise { await this.on("subscribe", "newServicesAvailable", cb); } protected _logger = getNopeLogger("AMQP-Event-Layer", "info"); public generateQueueOptions: (name: string) => QueuePublishOptions = ( name ) => { return { consumeOptions: { noAck: false }, subscribeOptions: { durable: false } } as QueuePublishOptions; }; async onNewInstancesAvailable( cb: (instances: IAvailableInstancesMsg) => void ): Promise { await this.on("subscribe", "newInstancesAvailable", cb); } async emitNewInstancesAvailable( instances: IAvailableInstancesMsg ): Promise { await this.emit("publish", "newInstancesAvailable", instances); } public generateSubscriptionOptions: (name: string) => SubscriptionOptions = ( name ) => { return {}; }; }