2020-11-06 08:10:30 +00:00
|
|
|
/**
|
|
|
|
* @author Martin Karkowski
|
|
|
|
* @email m.karkowski@zema.de
|
|
|
|
* @create date 2020-11-06 08:52:30
|
2021-05-16 08:01:58 +00:00
|
|
|
* @modify date 2021-05-16 10:01:20
|
2020-11-06 08:10:30 +00:00
|
|
|
* @desc [description]
|
|
|
|
*/
|
|
|
|
|
2020-11-23 06:09:31 +00:00
|
|
|
import * as amqp from "amqplib";
|
2021-03-22 19:24:15 +00:00
|
|
|
import { getNopeLogger } from "../../logger/getLogger";
|
|
|
|
import { NopeObservable } from "../../observables/nopeObservable";
|
2020-12-04 18:10:33 +00:00
|
|
|
import {
|
|
|
|
IAvailableInstanceGeneratorsMsg,
|
|
|
|
IAvailableInstancesMsg,
|
|
|
|
IAvailableServicesMsg,
|
|
|
|
IAvailableTopicsMsg,
|
|
|
|
ICommunicationInterface,
|
|
|
|
IExternalEventMsg,
|
|
|
|
IRequestTaskMsg,
|
|
|
|
IResponseTaskMsg,
|
|
|
|
ITaskCancelationMsg
|
2021-03-22 19:24:15 +00:00
|
|
|
} from "../../types/nope/nopeCommunication.interface";
|
|
|
|
import { IDispatcherInfo } from "../../types/nope/nopeDispatcher.interface";
|
|
|
|
import { INopeObservable } from "../../types/nope/nopeObservable.interface";
|
2020-09-12 20:23:55 +00:00
|
|
|
|
2020-09-28 16:38:35 +00:00
|
|
|
export type QueuePublishOptions = {
|
2020-12-04 18:10:33 +00:00
|
|
|
subscribeOptions?: amqp.Options.AssertQueue;
|
|
|
|
consumeOptions?: amqp.Options.Consume;
|
|
|
|
deleteOptions?: amqp.Options.DeleteQueue;
|
|
|
|
prefetch?: number;
|
|
|
|
};
|
2020-09-15 05:58:54 +00:00
|
|
|
|
2020-09-28 16:38:35 +00:00
|
|
|
export type QueueSubscribeOptions = {
|
2020-12-04 18:10:33 +00:00
|
|
|
subscribeOptions?: amqp.Options.AssertQueue;
|
|
|
|
consumeOptions?: amqp.Options.Consume;
|
|
|
|
deleteOptions?: amqp.Options.DeleteQueue;
|
|
|
|
prefetch?: number;
|
|
|
|
};
|
2020-09-28 16:38:35 +00:00
|
|
|
|
2020-09-15 05:58:54 +00:00
|
|
|
export type SubscriptionOptions = {
|
2020-12-04 18:10:33 +00:00
|
|
|
subscribeOptions?: amqp.Options.AssertExchange;
|
|
|
|
exchangeType?: "direct" | "topic" | "fanout" | "headers";
|
|
|
|
consumeOptions?: amqp.Options.Consume;
|
|
|
|
deleteOptions?: amqp.Options.DeleteQueue;
|
|
|
|
prefetch?: number;
|
|
|
|
};
|
2020-09-12 20:23:55 +00:00
|
|
|
|
2020-09-28 16:38:35 +00:00
|
|
|
interface IConnection {
|
2020-12-04 18:10:33 +00:00
|
|
|
channel: amqp.Channel;
|
|
|
|
connectionLost: () => void;
|
|
|
|
delete: (
|
|
|
|
event?: string,
|
|
|
|
deleteOptions?: amqp.Options.DeleteQueue
|
|
|
|
) => Promise<void>;
|
|
|
|
subscribe: (
|
|
|
|
event?: string,
|
|
|
|
consumeOptions?: amqp.Options.Consume,
|
|
|
|
prefetch?: number
|
|
|
|
) => Promise<void>;
|
|
|
|
unsubscribe: () => Promise<void>;
|
|
|
|
isSubscribed(): boolean;
|
2021-06-14 12:51:11 +00:00
|
|
|
send: (...arg) => Promise<boolean>;
|
2020-09-28 16:38:35 +00:00
|
|
|
}
|
|
|
|
|
2020-09-12 20:23:55 +00:00
|
|
|
/**
|
|
|
|
* A Communication Layer for the Dispatchers.
|
|
|
|
* In this Layer AMQP (Rabbit-MQ) is used.
|
|
|
|
* Functions are matched to Queues.
|
|
|
|
*
|
|
|
|
* @export
|
|
|
|
* @class AmqpLayer
|
|
|
|
* @implements {ICommunicationInterface}
|
|
|
|
*/
|
2020-09-15 05:58:54 +00:00
|
|
|
export class AmqpInterface {
|
2020-12-04 18:10:33 +00:00
|
|
|
connected: INopeObservable<boolean>;
|
|
|
|
|
2021-05-16 08:01:58 +00:00
|
|
|
/**
|
|
|
|
* Unsubscribe from an Event
|
|
|
|
*
|
|
|
|
* @param {string} event
|
|
|
|
* @param {(data: any, msg: amqp.Message) => void} cb
|
|
|
|
* @memberof AmqpInterface
|
|
|
|
*/
|
|
|
|
async off(
|
|
|
|
event: string,
|
|
|
|
cb: (data: any, msg: amqp.Message) => void
|
|
|
|
): Promise<void> {
|
2020-12-04 18:10:33 +00:00
|
|
|
// Store the Function
|
|
|
|
const _set = this._userDefinedCallbacks.get(event) || new Set();
|
|
|
|
// Delete the callback.
|
|
|
|
_set.delete(cb);
|
|
|
|
// Based on the Numbers of Elements which are included in here
|
|
|
|
// the Queue is subscribed.
|
|
|
|
if (_set.size > 0) {
|
|
|
|
this._userDefinedCallbacks.set(event, _set);
|
|
|
|
} else {
|
|
|
|
const _connection = this._connections.get(event);
|
|
|
|
|
|
|
|
if (_connection?.unsubscribe) {
|
2020-09-12 20:23:55 +00:00
|
|
|
const _this = this;
|
2020-12-04 18:10:33 +00:00
|
|
|
await _connection.unsubscribe().catch((e) => {
|
|
|
|
_this._logger.error("failed unsubscribing");
|
|
|
|
_this._logger.error(e);
|
2020-09-12 20:23:55 +00:00
|
|
|
});
|
|
|
|
|
2020-12-04 18:10:33 +00:00
|
|
|
await _connection.delete().catch((e) => {
|
|
|
|
_this._logger.error("failed deleting");
|
|
|
|
_this._logger.error(e);
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-05-16 08:01:58 +00:00
|
|
|
/**
|
|
|
|
* Contains the AMQP-Connection
|
|
|
|
*
|
|
|
|
* @protected
|
|
|
|
* @type {amqp.Connection}
|
|
|
|
* @memberof AmqpInterface
|
|
|
|
*/
|
2020-12-04 18:10:33 +00:00
|
|
|
protected _socket: amqp.Connection;
|
|
|
|
|
2021-05-16 08:01:58 +00:00
|
|
|
/**
|
|
|
|
* Additional Connections.
|
|
|
|
*
|
|
|
|
* @protected
|
|
|
|
* @type {Map<string, IConnection>}
|
|
|
|
* @memberof AmqpInterface
|
|
|
|
*/
|
2020-12-04 18:10:33 +00:00
|
|
|
protected _connections: Map<string, IConnection>;
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Map holding a set of function for user defined events.
|
|
|
|
*
|
|
|
|
* @protected
|
|
|
|
* @memberof AmqpInterface
|
|
|
|
*/
|
|
|
|
protected _userDefinedCallbacks: Map<
|
|
|
|
string,
|
|
|
|
Set<(data: any, msg: amqp.Message) => void>
|
|
|
|
>;
|
|
|
|
|
2021-05-16 08:01:58 +00:00
|
|
|
/**
|
|
|
|
* Creates an instance of AmqpInterface.
|
|
|
|
* @param {string} uri
|
|
|
|
* @memberof AmqpInterface
|
|
|
|
*/
|
2020-12-04 18:10:33 +00:00
|
|
|
constructor(public uri: string) {
|
|
|
|
// Adapt the URI if required:
|
|
|
|
this.uri = uri.startsWith("amqp://") ? uri : "amqp://" + uri;
|
|
|
|
this.connected = new NopeObservable();
|
|
|
|
this.connected.setContent(false);
|
|
|
|
|
|
|
|
const _this = this;
|
|
|
|
amqp.connect(this.uri).then((connection) => {
|
|
|
|
_this._socket = connection;
|
|
|
|
_this._logger.info("Connection established with " + _this.uri);
|
|
|
|
|
2021-06-14 12:51:11 +00:00
|
|
|
// Use a small Timeout to wait until we are connected.
|
|
|
|
setTimeout(() => {
|
|
|
|
_this.connected.setContent(true);
|
|
|
|
}, 500);
|
|
|
|
|
2020-12-04 18:10:33 +00:00
|
|
|
});
|
|
|
|
|
|
|
|
this._connections = new Map();
|
|
|
|
this._userDefinedCallbacks = new Map();
|
|
|
|
}
|
|
|
|
|
2021-05-16 08:01:58 +00:00
|
|
|
/**
|
|
|
|
* Internal Function, to create a Conncetion
|
|
|
|
*
|
|
|
|
* @protected
|
|
|
|
* @param {string} event
|
|
|
|
* @param {QueuePublishOptions} [options={}]
|
|
|
|
* @return {*} {Promise<{
|
|
|
|
* connection: IConnection;
|
|
|
|
* created: boolean;
|
|
|
|
* }>}
|
|
|
|
* @memberof AmqpInterface
|
|
|
|
*/
|
2020-12-04 18:10:33 +00:00
|
|
|
protected async _createConnection(
|
|
|
|
event: string,
|
|
|
|
options: QueuePublishOptions = {}
|
2021-05-16 08:01:58 +00:00
|
|
|
): Promise<{
|
|
|
|
connection: IConnection;
|
|
|
|
created: boolean;
|
|
|
|
}> {
|
2020-12-04 18:10:33 +00:00
|
|
|
if (this._connections.has(event)) {
|
|
|
|
return {
|
|
|
|
connection: this._connections.get(event),
|
|
|
|
created: false
|
|
|
|
};
|
2020-09-12 20:23:55 +00:00
|
|
|
}
|
|
|
|
|
2020-12-04 18:10:33 +00:00
|
|
|
const _this = this;
|
|
|
|
const _channel = await this._socket.createChannel();
|
|
|
|
const connectionLost = () => {
|
|
|
|
_this.connected.setContent(false);
|
|
|
|
};
|
|
|
|
_channel.on("close", connectionLost);
|
2020-09-28 16:38:35 +00:00
|
|
|
|
2020-12-04 18:10:33 +00:00
|
|
|
let _sub: amqp.Replies.Consume = null;
|
2020-09-28 16:38:35 +00:00
|
|
|
|
2020-12-04 18:10:33 +00:00
|
|
|
// Define a Connection Object.
|
|
|
|
const _connection: IConnection = {
|
|
|
|
channel: _channel,
|
|
|
|
connectionLost,
|
2020-09-15 05:58:54 +00:00
|
|
|
|
2021-06-14 12:51:11 +00:00
|
|
|
async send(data) {
|
|
|
|
return await _channel.sendToQueue(event, Buffer.from(JSON.stringify(data)), {
|
2020-12-04 18:10:33 +00:00
|
|
|
contentEncoding: "application/json"
|
|
|
|
});
|
|
|
|
},
|
|
|
|
|
|
|
|
isSubscribed() {
|
|
|
|
return _sub !== null;
|
|
|
|
},
|
|
|
|
|
|
|
|
async subscribe(_event: string = event, _opts = options.consumeOptions) {
|
|
|
|
if (_sub === null) {
|
|
|
|
_sub = await _channel.consume(
|
|
|
|
_event,
|
|
|
|
(msg) => {
|
|
|
|
// Transform the Data:
|
|
|
|
const data = JSON.parse(msg.content.toString("utf-8"));
|
|
|
|
for (const _func of _this._userDefinedCallbacks.get(event) ||
|
|
|
|
[]) {
|
|
|
|
_func(data, msg);
|
|
|
|
}
|
2020-09-28 16:38:35 +00:00
|
|
|
},
|
2020-12-04 18:10:33 +00:00
|
|
|
options.consumeOptions
|
|
|
|
);
|
2020-09-28 16:38:35 +00:00
|
|
|
}
|
2020-12-04 18:10:33 +00:00
|
|
|
},
|
2020-09-28 16:38:35 +00:00
|
|
|
|
2020-12-04 18:10:33 +00:00
|
|
|
async unsubscribe() {
|
|
|
|
if (_sub !== null) {
|
|
|
|
// Delte the Subscription
|
|
|
|
await _channel.cancel(_sub.consumerTag);
|
|
|
|
_sub = null;
|
2020-09-28 16:38:35 +00:00
|
|
|
}
|
2020-12-04 18:10:33 +00:00
|
|
|
},
|
|
|
|
|
|
|
|
async delete(_event: string = event, _opts = options.deleteOptions) {
|
|
|
|
try {
|
|
|
|
await _channel.deleteQueue(_event, _opts);
|
2021-05-16 08:01:58 +00:00
|
|
|
} catch (e) {
|
|
|
|
_this._logger.error("AMQP Failed to delete the Queue");
|
|
|
|
}
|
2020-12-04 18:10:33 +00:00
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
this._connections.set(event, _connection);
|
|
|
|
|
|
|
|
return {
|
|
|
|
connection: _connection,
|
|
|
|
created: true
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
|
|
|
public async createQueue(
|
|
|
|
event: string,
|
|
|
|
options: QueuePublishOptions = {}
|
|
|
|
): Promise<IConnection> {
|
|
|
|
// Get the Connection
|
|
|
|
const _res = await this._createConnection(event, options);
|
|
|
|
|
|
|
|
if (_res.created) {
|
|
|
|
// Extract the Connection
|
|
|
|
const _connection: IConnection = _res.connection;
|
|
|
|
// Create a Queue
|
|
|
|
const _queue = await _connection.channel.assertQueue(
|
|
|
|
event,
|
|
|
|
options.subscribeOptions
|
|
|
|
);
|
|
|
|
|
|
|
|
// Enable Prefetch
|
|
|
|
if (options.prefetch > 0) {
|
|
|
|
await _connection.channel.prefetch(options.prefetch);
|
|
|
|
} else if (typeof options.prefetch === "undefined") {
|
|
|
|
await _connection.channel.prefetch(1);
|
|
|
|
}
|
2020-09-12 20:23:55 +00:00
|
|
|
}
|
|
|
|
|
2020-12-04 18:10:33 +00:00
|
|
|
return _res.connection;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Function to create an Subscription in the RabbitMQ-Broker
|
|
|
|
*
|
|
|
|
* @param {string} event
|
|
|
|
* @param {SubscriptionOptions} [options={}]
|
|
|
|
* @memberof AmqpInterface
|
|
|
|
*/
|
|
|
|
public async createSubscription(
|
|
|
|
event: string,
|
|
|
|
options: SubscriptionOptions = {}
|
|
|
|
): Promise<IConnection> {
|
|
|
|
options.consumeOptions = Object.assign(
|
|
|
|
{
|
|
|
|
noAck: false
|
|
|
|
},
|
|
|
|
options.consumeOptions || {}
|
|
|
|
);
|
|
|
|
|
|
|
|
// Get the Connection
|
|
|
|
const _res = await this._createConnection(event, options);
|
|
|
|
|
|
|
|
if (_res.created) {
|
|
|
|
// Extract the Connection
|
|
|
|
const _connection: IConnection = _res.connection;
|
|
|
|
|
|
|
|
// Create a Queue
|
|
|
|
// It will contain the subscribed Messages.
|
|
|
|
const _queue = await _connection.channel.assertQueue("", {
|
|
|
|
exclusive: true
|
|
|
|
});
|
|
|
|
|
2021-06-14 12:51:11 +00:00
|
|
|
options.subscribeOptions = { durable: false };
|
|
|
|
|
|
|
|
const _exchangeName = "nope";
|
|
|
|
|
2020-12-04 18:10:33 +00:00
|
|
|
// Create an Exchange.
|
|
|
|
const _exchange = await _connection.channel.assertExchange(
|
2021-06-14 12:51:11 +00:00
|
|
|
_exchangeName,
|
|
|
|
"topic",
|
|
|
|
{
|
|
|
|
durable: false
|
|
|
|
}
|
2020-12-04 18:10:33 +00:00
|
|
|
);
|
|
|
|
|
|
|
|
// Bind the Queue to the Exchange.
|
|
|
|
_connection.channel.bindQueue(_queue.queue, _exchange.exchange, event);
|
|
|
|
|
|
|
|
// Enable Prefetch
|
|
|
|
if (options.prefetch > 0) {
|
|
|
|
await _connection.channel.prefetch(options.prefetch);
|
|
|
|
}
|
|
|
|
|
|
|
|
// Update the Send Message.
|
2021-06-14 12:51:11 +00:00
|
|
|
_connection.send = async (data) => {
|
|
|
|
|
|
|
|
while (!await _connection.channel.publish(
|
|
|
|
_exchangeName,
|
2020-12-04 18:10:33 +00:00
|
|
|
event,
|
|
|
|
Buffer.from(JSON.stringify(data)),
|
|
|
|
{
|
|
|
|
contentEncoding: "application/json"
|
|
|
|
}
|
2021-06-14 12:51:11 +00:00
|
|
|
)) {
|
|
|
|
// Repeat until we have send the shit
|
|
|
|
}
|
|
|
|
|
|
|
|
// return await _connection.channel.publish(
|
|
|
|
// _exchangeName,
|
|
|
|
// event,
|
|
|
|
// Buffer.from(JSON.stringify(data)),
|
|
|
|
// {
|
|
|
|
// contentEncoding: "application/json"
|
|
|
|
// }
|
|
|
|
// );
|
2020-12-04 18:10:33 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
// Store a reference to the original Method
|
|
|
|
const _delete = _connection.delete;
|
|
|
|
|
|
|
|
// Delte additionally the exchange and dynamic queue
|
|
|
|
_connection.delete = async () => {
|
|
|
|
// Delte the Queue
|
|
|
|
await _delete(_queue.queue);
|
|
|
|
// Delte the Exchange.
|
|
|
|
await _connection.channel.deleteExchange(_exchange.exchange);
|
|
|
|
};
|
|
|
|
|
|
|
|
const _subscribe = _connection.subscribe;
|
|
|
|
_connection.subscribe = async () => {
|
|
|
|
await _subscribe(_queue.queue);
|
|
|
|
};
|
2020-09-12 20:23:55 +00:00
|
|
|
}
|
|
|
|
|
2020-12-04 18:10:33 +00:00
|
|
|
return _res.connection;
|
|
|
|
}
|
|
|
|
|
|
|
|
protected _logger = getNopeLogger("AMQP-Interface", "info");
|
|
|
|
|
|
|
|
async on(
|
|
|
|
mode: "queue" | "subscribe",
|
|
|
|
event: string,
|
|
|
|
cb: (data: any, msg: amqp.Message) => void,
|
|
|
|
options: QueuePublishOptions | SubscriptionOptions = {}
|
|
|
|
): Promise<void> {
|
|
|
|
// Store the Function
|
|
|
|
const _set = this._userDefinedCallbacks.get(event) || new Set();
|
|
|
|
// Add the callback.
|
|
|
|
_set.add(cb);
|
|
|
|
|
|
|
|
// Store the Set.
|
|
|
|
this._userDefinedCallbacks.set(event, _set);
|
|
|
|
|
|
|
|
// Based on the Numbers of Elements which are included in here
|
|
|
|
// the Queue is subscribed.
|
|
|
|
if (_set.size === 1) {
|
|
|
|
let _connection: IConnection;
|
|
|
|
switch (mode) {
|
|
|
|
case "queue":
|
|
|
|
_connection = await this.createQueue(
|
|
|
|
event,
|
|
|
|
options as QueueSubscribeOptions
|
|
|
|
);
|
|
|
|
await _connection.subscribe();
|
|
|
|
break;
|
|
|
|
case "subscribe":
|
|
|
|
_connection = await this.createSubscription(
|
|
|
|
event,
|
|
|
|
options as SubscriptionOptions
|
|
|
|
);
|
|
|
|
await _connection.subscribe();
|
|
|
|
break;
|
|
|
|
}
|
2020-09-12 20:23:55 +00:00
|
|
|
}
|
2020-12-04 18:10:33 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
public async emit(
|
|
|
|
mode: "queue" | "publish",
|
|
|
|
event: string,
|
|
|
|
data: any,
|
|
|
|
options: QueuePublishOptions | SubscriptionOptions = {}
|
|
|
|
): Promise<void> {
|
|
|
|
const _connection: IConnection = this._connections.get(event);
|
|
|
|
if (_connection) {
|
|
|
|
_connection.send(data);
|
|
|
|
} else {
|
|
|
|
// No Queue or Subscription is available =>
|
|
|
|
// Create the corresponding queue / subscription
|
|
|
|
switch (mode) {
|
|
|
|
case "queue":
|
|
|
|
await this.createQueue(event, options as QueueSubscribeOptions);
|
|
|
|
break;
|
|
|
|
case "publish":
|
|
|
|
await this.createSubscription(event, options as SubscriptionOptions);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Recall the Method.
|
|
|
|
await this.emit(mode, event, data, options);
|
2020-09-15 05:58:54 +00:00
|
|
|
}
|
2020-12-04 18:10:33 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
public close(): void {
|
|
|
|
this._socket.close();
|
|
|
|
}
|
2020-09-15 05:58:54 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* A Communication Layer for the Dispatchers.
|
|
|
|
* In this Layer AMQP (Rabbit-MQ) is used.
|
|
|
|
* Functions are matched to Queues.
|
|
|
|
*
|
|
|
|
* @export
|
|
|
|
* @class AmqpLayer
|
|
|
|
* @implements {ICommunicationInterface}
|
|
|
|
*/
|
2020-12-04 18:10:33 +00:00
|
|
|
export class AmqpLayer
|
|
|
|
extends AmqpInterface
|
2021-06-14 12:51:11 +00:00
|
|
|
implements ICommunicationInterface {
|
2021-05-16 08:01:58 +00:00
|
|
|
protected _logger = getNopeLogger("AMQP-Event-Layer", "info");
|
|
|
|
protected _tasks = new Map<string, () => void>();
|
|
|
|
|
|
|
|
public allowServiceRedundancy: boolean;
|
|
|
|
|
|
|
|
public id: string;
|
|
|
|
|
|
|
|
public considerConnection = true;
|
2021-03-22 19:24:15 +00:00
|
|
|
|
2021-01-08 10:04:41 +00:00
|
|
|
async emitStatusUpdate(status: IDispatcherInfo): Promise<void> {
|
2021-06-14 12:51:11 +00:00
|
|
|
await this.emit("publish", "status", status);
|
2021-01-08 10:04:41 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
async onStatusUpdate(cb: (status: IDispatcherInfo) => void): Promise<void> {
|
2021-06-14 12:51:11 +00:00
|
|
|
await this.on("subscribe", "status", cb);
|
2021-01-08 10:04:41 +00:00
|
|
|
}
|
|
|
|
|
2020-12-04 18:10:33 +00:00
|
|
|
async onAurevoir(cb: (dispatcher: string) => void): Promise<void> {
|
|
|
|
await this.on("subscribe", "aurevoir", cb);
|
|
|
|
}
|
|
|
|
|
|
|
|
async emitAurevoir(dispatcher: string): Promise<void> {
|
|
|
|
await this.emit("publish", "aurevoir", dispatcher);
|
|
|
|
}
|
|
|
|
|
|
|
|
async onTaskCancelation(
|
|
|
|
cb: (msg: ITaskCancelationMsg) => void
|
|
|
|
): Promise<void> {
|
2021-06-14 12:51:11 +00:00
|
|
|
await this.on("subscribe", "cancel", cb);
|
2020-12-04 18:10:33 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
async emitTaskCancelation(msg: ITaskCancelationMsg): Promise<void> {
|
2021-06-14 12:51:11 +00:00
|
|
|
await this.emit("publish", "cancel", msg);
|
2020-12-04 18:10:33 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
async emitNewObersvablesAvailable(
|
|
|
|
topics: IAvailableTopicsMsg
|
|
|
|
): Promise<void> {
|
2021-06-14 12:51:11 +00:00
|
|
|
await this.emit("publish", "observables", topics);
|
2020-12-04 18:10:33 +00:00
|
|
|
}
|
|
|
|
async onNewObservablesAvailable(
|
|
|
|
cb: (topics: IAvailableTopicsMsg) => void
|
|
|
|
): Promise<void> {
|
2021-06-14 12:51:11 +00:00
|
|
|
await this.on("subscribe", "observables", cb);
|
2020-12-04 18:10:33 +00:00
|
|
|
}
|
|
|
|
async onEvent(
|
|
|
|
event: string,
|
|
|
|
cb: (data: IExternalEventMsg) => void
|
|
|
|
): Promise<void> {
|
|
|
|
await this.on("subscribe", "event." + event, cb);
|
|
|
|
}
|
|
|
|
async offEvent(
|
|
|
|
event: string,
|
|
|
|
cb: (data: IExternalEventMsg) => void
|
|
|
|
): Promise<void> {
|
|
|
|
await this.off("event." + event, cb);
|
|
|
|
}
|
|
|
|
async emitEvent(event: string, data: IExternalEventMsg): Promise<void> {
|
|
|
|
await this.emit("publish", "event." + event, data);
|
|
|
|
}
|
|
|
|
async emitNewInstanceGeneratorsAvailable(
|
|
|
|
generators: IAvailableInstanceGeneratorsMsg
|
|
|
|
): Promise<void> {
|
2021-06-14 12:51:11 +00:00
|
|
|
await this.emit("publish", "generators", generators);
|
2020-12-04 18:10:33 +00:00
|
|
|
}
|
|
|
|
async onNewInstanceGeneratorsAvailable(
|
|
|
|
cb: (generators: IAvailableInstanceGeneratorsMsg) => void
|
|
|
|
): Promise<void> {
|
2021-06-14 12:51:11 +00:00
|
|
|
await this.on("subscribe", "generators", cb);
|
2020-12-04 18:10:33 +00:00
|
|
|
}
|
2021-01-08 10:04:41 +00:00
|
|
|
async onBonjour(cb: (msg: IDispatcherInfo) => void): Promise<void> {
|
2020-12-04 18:10:33 +00:00
|
|
|
await this.on("subscribe", "bonjour", cb);
|
|
|
|
}
|
2021-01-08 10:04:41 +00:00
|
|
|
async emitBonjour(msg: IDispatcherInfo): Promise<void> {
|
|
|
|
await this.emit("publish", "bonjour", msg);
|
2020-12-04 18:10:33 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
async emitRpcRequest(name: string, request: IRequestTaskMsg): Promise<void> {
|
|
|
|
await this.emit("queue", name, request, this.generateQueueOptions(name));
|
|
|
|
}
|
|
|
|
|
2021-01-08 10:04:41 +00:00
|
|
|
async emitRpcResponse(name: string, result: IResponseTaskMsg): Promise<void> {
|
2020-12-04 18:10:33 +00:00
|
|
|
// Ackknowlegde, that the Task was sucessfull
|
|
|
|
if (this._tasks.has(result.taskId)) {
|
|
|
|
// Send the Acknoledgement
|
|
|
|
this._tasks.get(result.taskId)();
|
|
|
|
this._tasks.delete(result.taskId);
|
2020-09-15 05:58:54 +00:00
|
|
|
}
|
2020-12-04 18:10:33 +00:00
|
|
|
await this.emit("publish", name, result);
|
|
|
|
}
|
|
|
|
|
2020-12-04 20:39:18 +00:00
|
|
|
async onRpcResponse(
|
2020-12-04 18:10:33 +00:00
|
|
|
name: string,
|
|
|
|
cb: (result: IResponseTaskMsg) => void
|
|
|
|
): Promise<void> {
|
|
|
|
await this.on("subscribe", name, cb);
|
|
|
|
}
|
|
|
|
|
|
|
|
async offRpcResponse(
|
|
|
|
name: string,
|
|
|
|
cb: (result: IResponseTaskMsg) => void
|
|
|
|
): Promise<void> {
|
|
|
|
await this.off(name, cb);
|
|
|
|
}
|
|
|
|
|
2021-05-16 08:01:58 +00:00
|
|
|
/**
|
|
|
|
* Using AMQP, we use a custom Handler. We will listen on the
|
|
|
|
* queues, but only get new information, if the current task is
|
|
|
|
* completed. This results in distributing the tasks among the
|
|
|
|
* network.
|
|
|
|
*
|
|
|
|
* @param {string} name
|
|
|
|
* @param {(req: IRequestTaskMsg) => void} cb
|
|
|
|
* @return {*} {Promise<void>}
|
|
|
|
* @memberof AmqpLayer
|
|
|
|
*/
|
2020-12-04 18:10:33 +00:00
|
|
|
async onRpcRequest(
|
|
|
|
name: string,
|
|
|
|
cb: (req: IRequestTaskMsg) => void
|
|
|
|
): Promise<void> {
|
|
|
|
const _this = this;
|
|
|
|
await this.on(
|
|
|
|
"queue",
|
|
|
|
name,
|
|
|
|
(req: IRequestTaskMsg, msg) => {
|
|
|
|
_this._logger.debug("Getting new Request " + req.taskId);
|
|
|
|
|
|
|
|
// Define a callback for finished Tasks.
|
|
|
|
_this._tasks.set(req.taskId, () => {
|
|
|
|
// Select the corresponding channel
|
|
|
|
const _channel = _this._connections.get(name).channel;
|
|
|
|
// Test if the Channel exists
|
|
|
|
if (_channel) {
|
|
|
|
_this._logger.debug(
|
|
|
|
"Sending Acknowledgement of Request " + req.taskId
|
|
|
|
);
|
|
|
|
// Ackknowlegde the Message.
|
|
|
|
_channel.ack(msg);
|
|
|
|
}
|
|
|
|
});
|
2020-09-15 05:58:54 +00:00
|
|
|
|
2020-12-04 18:10:33 +00:00
|
|
|
// Perform the Original Callback.
|
|
|
|
cb(req);
|
|
|
|
},
|
|
|
|
this.generateQueueOptions(name)
|
|
|
|
);
|
|
|
|
}
|
|
|
|
|
|
|
|
async offRpcRequest(
|
|
|
|
name: string,
|
|
|
|
cb: (data: IRequestTaskMsg) => void
|
|
|
|
): Promise<void> {
|
|
|
|
await this.off(name, cb);
|
|
|
|
}
|
|
|
|
|
|
|
|
async emitNewServicesAvailable(
|
|
|
|
services: IAvailableServicesMsg
|
|
|
|
): Promise<void> {
|
2021-06-14 12:51:11 +00:00
|
|
|
await this.emit("publish", "services", services);
|
2020-12-04 18:10:33 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
async onNewServicesAvailable(
|
|
|
|
cb: (services: IAvailableServicesMsg) => void
|
|
|
|
): Promise<void> {
|
2021-06-14 12:51:11 +00:00
|
|
|
await this.on("subscribe", "services", cb);
|
2020-12-04 18:10:33 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
public generateQueueOptions: (name: string) => QueuePublishOptions = (
|
|
|
|
name
|
|
|
|
) => {
|
|
|
|
return {
|
|
|
|
consumeOptions: {
|
|
|
|
noAck: false
|
|
|
|
},
|
|
|
|
subscribeOptions: {
|
|
|
|
durable: false
|
|
|
|
}
|
|
|
|
} as QueuePublishOptions;
|
|
|
|
};
|
|
|
|
|
|
|
|
async onNewInstancesAvailable(
|
|
|
|
cb: (instances: IAvailableInstancesMsg) => void
|
|
|
|
): Promise<void> {
|
2021-06-14 12:51:11 +00:00
|
|
|
await this.on("subscribe", "instances", cb);
|
2020-12-04 18:10:33 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
async emitNewInstancesAvailable(
|
|
|
|
instances: IAvailableInstancesMsg
|
|
|
|
): Promise<void> {
|
2021-06-14 12:51:11 +00:00
|
|
|
await this.emit("publish", "instances", instances);
|
2020-12-04 18:10:33 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
public generateSubscriptionOptions: (name: string) => SubscriptionOptions = (
|
|
|
|
name
|
|
|
|
) => {
|
|
|
|
return {};
|
|
|
|
};
|
2021-02-05 10:53:33 +00:00
|
|
|
|
2021-06-14 12:51:11 +00:00
|
|
|
public async dispose(): Promise<void> {
|
2021-02-05 10:53:33 +00:00
|
|
|
// Dispose the Connection Flag.
|
|
|
|
this.connected.dispose();
|
|
|
|
await this._socket.close();
|
|
|
|
}
|
2020-12-04 18:10:33 +00:00
|
|
|
}
|