nope/lib/communication/amqpLayer.ts

589 lines
16 KiB
TypeScript
Raw Normal View History

2020-11-06 08:10:30 +00:00
/**
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2020-11-06 08:52:30
* @modify date 2021-02-05 09:52:01
2020-11-06 08:10:30 +00:00
* @desc [description]
*/
2020-11-23 06:09:31 +00:00
import * as amqp from "amqplib";
import { getNopeLogger } from "../logger/getLogger";
2020-11-23 06:09:31 +00:00
import { NopeObservable } from "../observables/nopeObservable";
import {
IAvailableInstanceGeneratorsMsg,
IAvailableInstancesMsg,
IAvailableServicesMsg,
IAvailableTopicsMsg,
ICommunicationInterface,
IExternalEventMsg,
IRequestTaskMsg,
IResponseTaskMsg,
ITaskCancelationMsg
} from "../types/nope/nopeCommunication.interface";
2021-01-08 10:04:41 +00:00
import { IDispatcherInfo } from "../types/nope/nopeDispatcher.interface";
2020-11-23 06:09:31 +00:00
import { INopeObservable } from "../types/nope/nopeObservable.interface";
2020-09-12 20:23:55 +00:00
export type QueuePublishOptions = {
subscribeOptions?: amqp.Options.AssertQueue;
consumeOptions?: amqp.Options.Consume;
deleteOptions?: amqp.Options.DeleteQueue;
prefetch?: number;
};
2020-09-15 05:58:54 +00:00
export type QueueSubscribeOptions = {
subscribeOptions?: amqp.Options.AssertQueue;
consumeOptions?: amqp.Options.Consume;
deleteOptions?: amqp.Options.DeleteQueue;
prefetch?: number;
};
2020-09-15 05:58:54 +00:00
export type SubscriptionOptions = {
subscribeOptions?: amqp.Options.AssertExchange;
exchangeType?: "direct" | "topic" | "fanout" | "headers";
consumeOptions?: amqp.Options.Consume;
deleteOptions?: amqp.Options.DeleteQueue;
prefetch?: number;
};
2020-09-12 20:23:55 +00:00
interface IConnection {
channel: amqp.Channel;
connectionLost: () => void;
delete: (
event?: string,
deleteOptions?: amqp.Options.DeleteQueue
) => Promise<void>;
subscribe: (
event?: string,
consumeOptions?: amqp.Options.Consume,
prefetch?: number
) => Promise<void>;
unsubscribe: () => Promise<void>;
isSubscribed(): boolean;
send: (...arg) => boolean;
}
2020-09-12 20:23:55 +00:00
/**
* A Communication Layer for the Dispatchers.
* In this Layer AMQP (Rabbit-MQ) is used.
* Functions are matched to Queues.
*
* @export
* @class AmqpLayer
* @implements {ICommunicationInterface}
*/
2020-09-15 05:58:54 +00:00
export class AmqpInterface {
connected: INopeObservable<boolean>;
async off(event: string, cb: (data: any, msg: amqp.Message) => void) {
// Store the Function
const _set = this._userDefinedCallbacks.get(event) || new Set();
// Delete the callback.
_set.delete(cb);
// Based on the Numbers of Elements which are included in here
// the Queue is subscribed.
if (_set.size > 0) {
this._userDefinedCallbacks.set(event, _set);
} else {
const _connection = this._connections.get(event);
if (_connection?.unsubscribe) {
2020-09-12 20:23:55 +00:00
const _this = this;
await _connection.unsubscribe().catch((e) => {
_this._logger.error("failed unsubscribing");
_this._logger.error(e);
2020-09-12 20:23:55 +00:00
});
await _connection.delete().catch((e) => {
_this._logger.error("failed deleting");
_this._logger.error(e);
});
}
}
}
protected _socket: amqp.Connection;
protected _connections: Map<string, IConnection>;
/**
* Map holding a set of function for user defined events.
*
* @protected
* @memberof AmqpInterface
*/
protected _userDefinedCallbacks: Map<
string,
Set<(data: any, msg: amqp.Message) => void>
>;
constructor(public uri: string) {
// Adapt the URI if required:
this.uri = uri.startsWith("amqp://") ? uri : "amqp://" + uri;
this.connected = new NopeObservable();
this.connected.setContent(false);
const _this = this;
amqp.connect(this.uri).then((connection) => {
_this._socket = connection;
_this._logger.info("Connection established with " + _this.uri);
_this.connected.setContent(true);
});
this._connections = new Map();
this._userDefinedCallbacks = new Map();
}
protected async _createConnection(
event: string,
options: QueuePublishOptions = {}
) {
if (this._connections.has(event)) {
return {
connection: this._connections.get(event),
created: false
};
2020-09-12 20:23:55 +00:00
}
const _this = this;
const _channel = await this._socket.createChannel();
const connectionLost = () => {
_this.connected.setContent(false);
};
_channel.on("close", connectionLost);
let _sub: amqp.Replies.Consume = null;
// Define a Connection Object.
const _connection: IConnection = {
channel: _channel,
connectionLost,
2020-09-15 05:58:54 +00:00
send(data) {
return _channel.sendToQueue(event, Buffer.from(JSON.stringify(data)), {
contentEncoding: "application/json"
});
},
isSubscribed() {
return _sub !== null;
},
async subscribe(_event: string = event, _opts = options.consumeOptions) {
if (_sub === null) {
_sub = await _channel.consume(
_event,
(msg) => {
// Transform the Data:
const data = JSON.parse(msg.content.toString("utf-8"));
for (const _func of _this._userDefinedCallbacks.get(event) ||
[]) {
_func(data, msg);
}
},
options.consumeOptions
);
}
},
async unsubscribe() {
if (_sub !== null) {
// Delte the Subscription
await _channel.cancel(_sub.consumerTag);
_sub = null;
}
},
async delete(_event: string = event, _opts = options.deleteOptions) {
try {
await _channel.deleteQueue(_event, _opts);
} catch (e) {}
}
};
this._connections.set(event, _connection);
return {
connection: _connection,
created: true
};
}
public async createQueue(
event: string,
options: QueuePublishOptions = {}
): Promise<IConnection> {
// Get the Connection
const _res = await this._createConnection(event, options);
if (_res.created) {
// Extract the Connection
const _connection: IConnection = _res.connection;
// Create a Queue
const _queue = await _connection.channel.assertQueue(
event,
options.subscribeOptions
);
// Enable Prefetch
if (options.prefetch > 0) {
await _connection.channel.prefetch(options.prefetch);
} else if (typeof options.prefetch === "undefined") {
await _connection.channel.prefetch(1);
}
2020-09-12 20:23:55 +00:00
}
return _res.connection;
}
/**
* Function to create an Subscription in the RabbitMQ-Broker
*
* @param {string} event
* @param {SubscriptionOptions} [options={}]
* @memberof AmqpInterface
*/
public async createSubscription(
event: string,
options: SubscriptionOptions = {}
): Promise<IConnection> {
options.consumeOptions = Object.assign(
{
noAck: false
},
options.consumeOptions || {}
);
// Get the Connection
const _res = await this._createConnection(event, options);
if (_res.created) {
// Extract the Connection
const _connection: IConnection = _res.connection;
// Create a Queue
// It will contain the subscribed Messages.
const _queue = await _connection.channel.assertQueue("", {
exclusive: true
});
// Create an Exchange.
const _exchange = await _connection.channel.assertExchange(
"event",
options.exchangeType || "topic",
options.subscribeOptions
);
// Bind the Queue to the Exchange.
_connection.channel.bindQueue(_queue.queue, _exchange.exchange, event);
// Enable Prefetch
if (options.prefetch > 0) {
await _connection.channel.prefetch(options.prefetch);
}
// Update the Send Message.
_connection.send = (data) => {
return _connection.channel.publish(
"event",
event,
Buffer.from(JSON.stringify(data)),
{
contentEncoding: "application/json"
}
);
};
// Store a reference to the original Method
const _delete = _connection.delete;
// Delte additionally the exchange and dynamic queue
_connection.delete = async () => {
// Delte the Queue
await _delete(_queue.queue);
// Delte the Exchange.
await _connection.channel.deleteExchange(_exchange.exchange);
};
const _subscribe = _connection.subscribe;
_connection.subscribe = async () => {
await _subscribe(_queue.queue);
};
2020-09-12 20:23:55 +00:00
}
return _res.connection;
}
protected _logger = getNopeLogger("AMQP-Interface", "info");
async on(
mode: "queue" | "subscribe",
event: string,
cb: (data: any, msg: amqp.Message) => void,
options: QueuePublishOptions | SubscriptionOptions = {}
): Promise<void> {
// Store the Function
const _set = this._userDefinedCallbacks.get(event) || new Set();
// Add the callback.
_set.add(cb);
// Store the Set.
this._userDefinedCallbacks.set(event, _set);
// Based on the Numbers of Elements which are included in here
// the Queue is subscribed.
if (_set.size === 1) {
let _connection: IConnection;
switch (mode) {
case "queue":
_connection = await this.createQueue(
event,
options as QueueSubscribeOptions
);
await _connection.subscribe();
break;
case "subscribe":
_connection = await this.createSubscription(
event,
options as SubscriptionOptions
);
await _connection.subscribe();
break;
}
2020-09-12 20:23:55 +00:00
}
}
public async emit(
mode: "queue" | "publish",
event: string,
data: any,
options: QueuePublishOptions | SubscriptionOptions = {}
): Promise<void> {
const _connection: IConnection = this._connections.get(event);
if (_connection) {
_connection.send(data);
} else {
// No Queue or Subscription is available =>
// Create the corresponding queue / subscription
switch (mode) {
case "queue":
await this.createQueue(event, options as QueueSubscribeOptions);
break;
case "publish":
await this.createSubscription(event, options as SubscriptionOptions);
break;
}
// Recall the Method.
await this.emit(mode, event, data, options);
2020-09-15 05:58:54 +00:00
}
}
public close(): void {
this._socket.close();
}
2020-09-15 05:58:54 +00:00
}
/**
* A Communication Layer for the Dispatchers.
* In this Layer AMQP (Rabbit-MQ) is used.
* Functions are matched to Queues.
*
* @export
* @class AmqpLayer
* @implements {ICommunicationInterface}
*/
export class AmqpLayer
extends AmqpInterface
implements ICommunicationInterface {
considerConnection = true;
2021-01-08 10:04:41 +00:00
async emitStatusUpdate(status: IDispatcherInfo): Promise<void> {
await this.emit("publish", "statusUdpate", status);
}
async onStatusUpdate(cb: (status: IDispatcherInfo) => void): Promise<void> {
await this.on("subscribe", "statusUdpate", cb);
}
async onAurevoir(cb: (dispatcher: string) => void): Promise<void> {
await this.on("subscribe", "aurevoir", cb);
}
async emitAurevoir(dispatcher: string): Promise<void> {
await this.emit("publish", "aurevoir", dispatcher);
}
async onTaskCancelation(
cb: (msg: ITaskCancelationMsg) => void
): Promise<void> {
await this.on("subscribe", "taskCancelation", cb);
}
async emitTaskCancelation(msg: ITaskCancelationMsg): Promise<void> {
await this.emit("publish", "taskCancelation", msg);
}
async emitNewObersvablesAvailable(
topics: IAvailableTopicsMsg
): Promise<void> {
await this.emit("publish", "newTopicsAvailable", topics);
}
async onNewObservablesAvailable(
cb: (topics: IAvailableTopicsMsg) => void
): Promise<void> {
await this.on("subscribe", "newTopicsAvailable", cb);
}
async onEvent(
event: string,
cb: (data: IExternalEventMsg) => void
): Promise<void> {
await this.on("subscribe", "event." + event, cb);
}
async offEvent(
event: string,
cb: (data: IExternalEventMsg) => void
): Promise<void> {
await this.off("event." + event, cb);
}
async emitEvent(event: string, data: IExternalEventMsg): Promise<void> {
await this.emit("publish", "event." + event, data);
}
async emitNewInstanceGeneratorsAvailable(
generators: IAvailableInstanceGeneratorsMsg
): Promise<void> {
await this.emit("publish", "newInstanceGeneratorsAvailable", generators);
}
async onNewInstanceGeneratorsAvailable(
cb: (generators: IAvailableInstanceGeneratorsMsg) => void
): Promise<void> {
await this.on("subscribe", "newInstanceGeneratorsAvailable", cb);
}
2021-01-08 10:04:41 +00:00
async onBonjour(cb: (msg: IDispatcherInfo) => void): Promise<void> {
await this.on("subscribe", "bonjour", cb);
}
2021-01-08 10:04:41 +00:00
async emitBonjour(msg: IDispatcherInfo): Promise<void> {
await this.emit("publish", "bonjour", msg);
}
protected _tasks = new Map<string, () => void>();
async emitRpcRequest(name: string, request: IRequestTaskMsg): Promise<void> {
await this.emit("queue", name, request, this.generateQueueOptions(name));
}
2021-01-08 10:04:41 +00:00
async emitRpcResponse(name: string, result: IResponseTaskMsg): Promise<void> {
// Ackknowlegde, that the Task was sucessfull
if (this._tasks.has(result.taskId)) {
// Send the Acknoledgement
this._tasks.get(result.taskId)();
this._tasks.delete(result.taskId);
2020-09-15 05:58:54 +00:00
}
await this.emit("publish", name, result);
}
async onRpcResponse(
name: string,
cb: (result: IResponseTaskMsg) => void
): Promise<void> {
await this.on("subscribe", name, cb);
}
async offRpcResponse(
name: string,
cb: (result: IResponseTaskMsg) => void
): Promise<void> {
await this.off(name, cb);
}
async onRpcRequest(
name: string,
cb: (req: IRequestTaskMsg) => void
): Promise<void> {
const _this = this;
await this.on(
"queue",
name,
(req: IRequestTaskMsg, msg) => {
_this._logger.debug("Getting new Request " + req.taskId);
// Define a callback for finished Tasks.
_this._tasks.set(req.taskId, () => {
// Select the corresponding channel
const _channel = _this._connections.get(name).channel;
// Test if the Channel exists
if (_channel) {
_this._logger.debug(
"Sending Acknowledgement of Request " + req.taskId
);
// Ackknowlegde the Message.
_channel.ack(msg);
}
});
2020-09-15 05:58:54 +00:00
// Perform the Original Callback.
cb(req);
},
this.generateQueueOptions(name)
);
}
async offRpcRequest(
name: string,
cb: (data: IRequestTaskMsg) => void
): Promise<void> {
await this.off(name, cb);
}
async emitNewServicesAvailable(
services: IAvailableServicesMsg
): Promise<void> {
await this.emit("publish", "newServicesAvailable", services);
}
async onNewServicesAvailable(
cb: (services: IAvailableServicesMsg) => void
): Promise<void> {
await this.on("subscribe", "newServicesAvailable", cb);
}
protected _logger = getNopeLogger("AMQP-Event-Layer", "info");
public generateQueueOptions: (name: string) => QueuePublishOptions = (
name
) => {
return {
consumeOptions: {
noAck: false
},
subscribeOptions: {
durable: false
}
} as QueuePublishOptions;
};
async onNewInstancesAvailable(
cb: (instances: IAvailableInstancesMsg) => void
): Promise<void> {
await this.on("subscribe", "newInstancesAvailable", cb);
}
async emitNewInstancesAvailable(
instances: IAvailableInstancesMsg
): Promise<void> {
await this.emit("publish", "newInstancesAvailable", instances);
}
public generateSubscriptionOptions: (name: string) => SubscriptionOptions = (
name
) => {
return {};
};
public async dispose(){
// Dispose the Connection Flag.
this.connected.dispose();
await this._socket.close();
}
}