/** * @author Martin Karkowski * @email m.karkowski@zema.de * @create date 2020-11-06 08:52:30 * @modify date 2020-11-11 16:22:10 * @desc [description] */ import * as amqp from 'amqplib'; import { getNopeLogger } from "../logger/getLogger"; import { IAvailableInstanceGeneratorsMsg, IAvailableServicesMsg, IAvailableTopicsMsg, ICommunicationInterface, IExternalEventMsg, IRequestTaskMsg, IResponseTaskMsg, ITaskCancelationMsg } from '../types/nope/nopeCommunication.interface'; export type QueuePublishOptions = { subscribeOptions?: amqp.Options.AssertQueue, consumeOptions?: amqp.Options.Consume, deleteOptions?: amqp.Options.DeleteQueue prefetch?: number } export type QueueSubscribeOptions = { subscribeOptions?: amqp.Options.AssertQueue, consumeOptions?: amqp.Options.Consume, deleteOptions?: amqp.Options.DeleteQueue prefetch?: number } export type SubscriptionOptions = { subscribeOptions?: amqp.Options.AssertExchange, exchangeType?: 'direct' | 'topic' | 'fanout' | 'headers' consumeOptions?: amqp.Options.Consume, deleteOptions?: amqp.Options.DeleteQueue prefetch?: number } interface IConnection { channel: amqp.Channel, delete: (event?: string, deleteOptions?: amqp.Options.DeleteQueue) => Promise; subscribe: (event?: string, consumeOptions?: amqp.Options.Consume, prefetch?: number) => Promise; unsubscribe: () => Promise; isSubscribed(): boolean; send: (...arg) => boolean } /** * A Communication Layer for the Dispatchers. * In this Layer AMQP (Rabbit-MQ) is used. * Functions are matched to Queues. * * @export * @class AmqpLayer * @implements {ICommunicationInterface} */ export class AmqpInterface { async off(event: string, cb: (data: any, msg: amqp.Message) => void) { // Store the Function const _set = this._userDefinedCallbacks.get(event) || new Set(); // Delete the callback. _set.delete(cb) // Based on the Numbers of Elements which are included in here // the Queue is subscribed. if (_set.size > 0) { this._userDefinedCallbacks.set(event, _set); } else { const _connection = this._connections.get(event); if (_connection?.unsubscribe) { const _this = this; await _connection.unsubscribe().catch(e => { _this._logger.error('failed unsubscribing') _this._logger.error(e); }); await _connection.delete().catch(e => { _this._logger.error('failed deleting') _this._logger.error(e); }); } } } protected _socket: amqp.Connection; protected _connections: Map /** * Map holding a set of function for user defined events. * * @protected * @memberof AmqpInterface */ protected _userDefinedCallbacks: Map void>>; constructor(public uri: string) { // Adapt the URI if required: this.uri = uri.startsWith('amqp://') ? uri : 'amqp://' + uri; const _this = this; amqp.connect(this.uri).then(connection => { _this._socket = connection; _this._logger.info('Connection established with ' + _this.uri); _this._socket.createChannel }); this._connections = new Map(); this._userDefinedCallbacks = new Map(); } protected async _createConnection(event: string, options: QueuePublishOptions = {}) { if (this._connections.has(event)) { return { connection: this._connections.get(event), created: false }; } const _this = this; const _channel = await this._socket.createChannel(); let _sub: amqp.Replies.Consume = null; // Define a Connection Object. const _connection: IConnection = { channel: _channel, send(data) { return _channel.sendToQueue( event, Buffer.from( JSON.stringify(data) ), { contentEncoding: 'application/json', } ); }, isSubscribed() { return _sub !== null; }, async subscribe(_event: string = event, _opts = options.consumeOptions) { if (_sub === null) { _sub = await _channel.consume(_event, (msg) => { // Transform the Data: const data = JSON.parse(msg.content.toString('utf-8')) for (const _func of _this._userDefinedCallbacks.get(event) || []) { _func(data, msg) } }, options.consumeOptions); } }, async unsubscribe() { if (_sub !== null) { // Delte the Subscription await _channel.cancel(_sub.consumerTag); _sub = null; } }, async delete(_event: string = event, _opts = options.deleteOptions) { try { await _channel.deleteQueue(_event, _opts); } catch (e) { } } } this._connections.set(event, _connection); return { connection: _connection, created: true }; } public async createQueue(event: string, options: QueuePublishOptions = {}) { // Get the Connection const _res = await this._createConnection(event, options); if (_res.created) { // Extract the Connection const _connection: IConnection = _res.connection; // Create a Queue const _queue = await _connection.channel.assertQueue(event, options.subscribeOptions); // Enable Prefetch if (options.prefetch > 0) { await _connection.channel.prefetch(options.prefetch); } else if (typeof options.prefetch === 'undefined') { await _connection.channel.prefetch(1); } } return _res.connection; } /** * Function to create an Subscription in the RabbitMQ-Broker * * @param {string} event * @param {SubscriptionOptions} [options={}] * @memberof AmqpInterface */ public async createSubscription(event: string, options: SubscriptionOptions = {}) { options.consumeOptions = Object.assign({ noAck: false }, options.consumeOptions || {}); // Get the Connection const _res = await this._createConnection(event, options); if (_res.created) { // Extract the Connection const _connection: IConnection = _res.connection; // Create a Queue // It will contain the subscribed Messages. const _queue = await _connection.channel.assertQueue('', { exclusive: true }); // Create an Exchange. const _exchange = await _connection.channel.assertExchange('event', options.exchangeType || 'topic', options.subscribeOptions); // Bind the Queue to the Exchange. _connection.channel.bindQueue(_queue.queue, _exchange.exchange, event); // Enable Prefetch if (options.prefetch > 0) { await _connection.channel.prefetch(options.prefetch); } // Update the Send Message. _connection.send = (data) => { return _connection.channel.publish( 'event', event, Buffer.from( JSON.stringify(data) ), { contentEncoding: 'application/json' } ); } // Store a reference to the original Method const _delete = _connection.delete; // Delte additionally the exchange and dynamic queue _connection.delete = async () => { // Delte the Queue await _delete(_queue.queue); // Delte the Exchange. await _connection.channel.deleteExchange(_exchange.exchange); } const _subscribe = _connection.subscribe; _connection.subscribe = async () => { await _subscribe(_queue.queue) } } return _res.connection; } protected _logger = getNopeLogger('AMQP-Interface', 'info'); async on(mode: 'queue' | 'subscribe', event: string, cb: (data: any, msg: amqp.Message) => void, options: QueuePublishOptions | SubscriptionOptions = {}) { // Store the Function const _set = this._userDefinedCallbacks.get(event) || new Set(); // Add the callback. _set.add(cb) // Store the Set. this._userDefinedCallbacks.set(event, _set); // Based on the Numbers of Elements which are included in here // the Queue is subscribed. if (_set.size === 1) { let _connection: IConnection; switch (mode) { case 'queue': _connection = await this.createQueue(event, options as QueueSubscribeOptions); await _connection.subscribe(); break; case 'subscribe': _connection = await this.createSubscription(event, options as SubscriptionOptions); await _connection.subscribe(); break; } } } public async emit(mode: 'queue' | 'publish', event: string, data: any, options: QueuePublishOptions | SubscriptionOptions = {}): Promise { const _connection: IConnection = this._connections.get(event); if (_connection) { _connection.send(data); } else { // No Queue or Subscription is available => // Create the corresponding queue / subscription switch (mode) { case 'queue': await this.createQueue(event, options as QueueSubscribeOptions); break; case 'publish': await this.createSubscription(event, options as SubscriptionOptions) break; } // Recall the Method. await this.emit(mode, event, data, options); } } close(){ this._socket.close(); } } /** * A Communication Layer for the Dispatchers. * In this Layer AMQP (Rabbit-MQ) is used. * Functions are matched to Queues. * * @export * @class AmqpLayer * @implements {ICommunicationInterface} */ export class AmqpLayer extends AmqpInterface implements ICommunicationInterface { async onAurevoir(cb: (dispatcher: string) => void): Promise { await this.on('subscribe','aurevoir',cb); } async emitAurevoir(dispatcher: string): Promise { await this.emit('publish','aurevoir', dispatcher); } async onTaskCancelation(cb: (msg: ITaskCancelationMsg) => void): Promise { await this.on('subscribe','taskCancelation',cb); } async emitTaskCancelation(msg: ITaskCancelationMsg): Promise { await this.emit('publish','taskCancelation',msg); } async emitNewTopicsAvailable(topics: IAvailableTopicsMsg): Promise { await this.emit('publish','newTopicsAvailable',topics); } async onNewTopicsAvailable(cb: (topics: IAvailableTopicsMsg) => void) { await this.on('subscribe','newTopicsAvailable',cb); } async onEvent(event: string, cb: (data: IExternalEventMsg) => void): Promise { await this.on('subscribe','event.' + event, cb); } async offEvent(event: string, cb: (data: IExternalEventMsg) => void): Promise { await this.off('event.' + event, cb); } async emitEvent(event: string, data: IExternalEventMsg): Promise { await this.emit('publish','event.' + event, data); } async emitNewInstanceGeneratorsAvailable(generators: IAvailableInstanceGeneratorsMsg): Promise { await this.emit('publish', 'newInstanceGeneratorsAvailable', generators) } async onNewInstanceGeneratorsAvailable(cb: (generators: IAvailableInstanceGeneratorsMsg) => void) { await this.on('subscribe', 'newInstanceGeneratorsAvailable', cb); } async onBonjour(cb: (dispatcher: string) => void): Promise { await this.on('subscribe','bonjour', cb); } async emitBonjour(dispatcher: string): Promise { await this.emit('publish', 'bonjour' , dispatcher); } subscriptionMode?: "individual" | "generic" = 'individual'; resultSharing?: "individual" | "generic" = 'individual'; protected _tasks = new Map void>(); async emitRpcRequest(name: string, request: IRequestTaskMsg): Promise { await this.emit('queue', name, request, this.generateQueueOptions(name)); } async emitRpcResult(name: string, result: IResponseTaskMsg): Promise { // Ackknowlegde, that the Task was sucessfull if (this._tasks.has(result.taskId)){ // Send the Acknoledgement this._tasks.get(result.taskId)(); this._tasks.delete(result.taskId); } await this.emit('publish', name, result); } async onRpcResult(name: string, cb: (result: IResponseTaskMsg) => void): Promise { await this.on('subscribe', name, cb); } async offRpcResponse(name: string, cb: (result: IResponseTaskMsg) => void): Promise { await this.off(name, cb); } async onRpcRequest(name: string, cb: (req: IRequestTaskMsg) => void): Promise { const _this = this; await this.on('queue', name, (req: IRequestTaskMsg, msg) => { _this._logger.debug('Getting new Request ' + req.taskId); // Define a callback for finished Tasks. _this._tasks.set(req.taskId, () => { // Select the corresponding channel const _channel = _this._connections.get(name).channel; // Test if the Channel exists if (_channel) { _this._logger.debug('Sending Acknowledgement of Request ' + req.taskId); // Ackknowlegde the Message. _channel.ack(msg) } }); // Perform the Original Callback. cb(req); }, this.generateQueueOptions(name)); } async offRpcRequest(name: string, cb: (data: IRequestTaskMsg) => void): Promise { await this.off(name, cb); } async emitNewServicesAvailable(services: IAvailableServicesMsg): Promise { await this.emit('publish', 'newServicesAvailable', services) } async onNewServicesAvailable(cb: (services: IAvailableServicesMsg) => void) { await this.on('subscribe', 'newServicesAvailable', cb); } protected _logger = getNopeLogger('AMQP-Event-Layer', 'info'); public generateQueueOptions: (name: string) => QueuePublishOptions = (name) => { return { consumeOptions: { noAck: false, }, subscribeOptions: { durable: false, } } as QueuePublishOptions } public generateSubscriptionOptions: (name: string) => SubscriptionOptions = (name) => { return {} } }