Fixing AMQP

This commit is contained in:
Martin Karkowski 2021-06-14 14:51:11 +02:00
parent 15644f1a9e
commit 205a27ffd3
2 changed files with 56 additions and 28 deletions

View File

@ -59,7 +59,7 @@ interface IConnection {
) => Promise<void>; ) => Promise<void>;
unsubscribe: () => Promise<void>; unsubscribe: () => Promise<void>;
isSubscribed(): boolean; isSubscribed(): boolean;
send: (...arg) => boolean; send: (...arg) => Promise<boolean>;
} }
/** /**
@ -156,7 +156,11 @@ export class AmqpInterface {
_this._socket = connection; _this._socket = connection;
_this._logger.info("Connection established with " + _this.uri); _this._logger.info("Connection established with " + _this.uri);
_this.connected.setContent(true); // Use a small Timeout to wait until we are connected.
setTimeout(() => {
_this.connected.setContent(true);
}, 500);
}); });
this._connections = new Map(); this._connections = new Map();
@ -203,8 +207,8 @@ export class AmqpInterface {
channel: _channel, channel: _channel,
connectionLost, connectionLost,
send(data) { async send(data) {
return _channel.sendToQueue(event, Buffer.from(JSON.stringify(data)), { return await _channel.sendToQueue(event, Buffer.from(JSON.stringify(data)), {
contentEncoding: "application/json" contentEncoding: "application/json"
}); });
}, },
@ -313,11 +317,17 @@ export class AmqpInterface {
exclusive: true exclusive: true
}); });
options.subscribeOptions = { durable: false };
const _exchangeName = "nope";
// Create an Exchange. // Create an Exchange.
const _exchange = await _connection.channel.assertExchange( const _exchange = await _connection.channel.assertExchange(
"event", _exchangeName,
options.exchangeType || "topic", "topic",
options.subscribeOptions {
durable: false
}
); );
// Bind the Queue to the Exchange. // Bind the Queue to the Exchange.
@ -329,15 +339,27 @@ export class AmqpInterface {
} }
// Update the Send Message. // Update the Send Message.
_connection.send = (data) => { _connection.send = async (data) => {
return _connection.channel.publish(
"event", while (!await _connection.channel.publish(
_exchangeName,
event, event,
Buffer.from(JSON.stringify(data)), Buffer.from(JSON.stringify(data)),
{ {
contentEncoding: "application/json" contentEncoding: "application/json"
} }
); )) {
// Repeat until we have send the shit
}
// return await _connection.channel.publish(
// _exchangeName,
// event,
// Buffer.from(JSON.stringify(data)),
// {
// contentEncoding: "application/json"
// }
// );
}; };
// Store a reference to the original Method // Store a reference to the original Method
@ -441,8 +463,7 @@ export class AmqpInterface {
*/ */
export class AmqpLayer export class AmqpLayer
extends AmqpInterface extends AmqpInterface
implements ICommunicationInterface implements ICommunicationInterface {
{
protected _logger = getNopeLogger("AMQP-Event-Layer", "info"); protected _logger = getNopeLogger("AMQP-Event-Layer", "info");
protected _tasks = new Map<string, () => void>(); protected _tasks = new Map<string, () => void>();
@ -453,11 +474,11 @@ export class AmqpLayer
public considerConnection = true; public considerConnection = true;
async emitStatusUpdate(status: IDispatcherInfo): Promise<void> { async emitStatusUpdate(status: IDispatcherInfo): Promise<void> {
await this.emit("publish", "statusUdpate", status); await this.emit("publish", "status", status);
} }
async onStatusUpdate(cb: (status: IDispatcherInfo) => void): Promise<void> { async onStatusUpdate(cb: (status: IDispatcherInfo) => void): Promise<void> {
await this.on("subscribe", "statusUdpate", cb); await this.on("subscribe", "status", cb);
} }
async onAurevoir(cb: (dispatcher: string) => void): Promise<void> { async onAurevoir(cb: (dispatcher: string) => void): Promise<void> {
@ -471,22 +492,22 @@ export class AmqpLayer
async onTaskCancelation( async onTaskCancelation(
cb: (msg: ITaskCancelationMsg) => void cb: (msg: ITaskCancelationMsg) => void
): Promise<void> { ): Promise<void> {
await this.on("subscribe", "taskCancelation", cb); await this.on("subscribe", "cancel", cb);
} }
async emitTaskCancelation(msg: ITaskCancelationMsg): Promise<void> { async emitTaskCancelation(msg: ITaskCancelationMsg): Promise<void> {
await this.emit("publish", "taskCancelation", msg); await this.emit("publish", "cancel", msg);
} }
async emitNewObersvablesAvailable( async emitNewObersvablesAvailable(
topics: IAvailableTopicsMsg topics: IAvailableTopicsMsg
): Promise<void> { ): Promise<void> {
await this.emit("publish", "newTopicsAvailable", topics); await this.emit("publish", "observables", topics);
} }
async onNewObservablesAvailable( async onNewObservablesAvailable(
cb: (topics: IAvailableTopicsMsg) => void cb: (topics: IAvailableTopicsMsg) => void
): Promise<void> { ): Promise<void> {
await this.on("subscribe", "newTopicsAvailable", cb); await this.on("subscribe", "observables", cb);
} }
async onEvent( async onEvent(
event: string, event: string,
@ -506,12 +527,12 @@ export class AmqpLayer
async emitNewInstanceGeneratorsAvailable( async emitNewInstanceGeneratorsAvailable(
generators: IAvailableInstanceGeneratorsMsg generators: IAvailableInstanceGeneratorsMsg
): Promise<void> { ): Promise<void> {
await this.emit("publish", "newInstanceGeneratorsAvailable", generators); await this.emit("publish", "generators", generators);
} }
async onNewInstanceGeneratorsAvailable( async onNewInstanceGeneratorsAvailable(
cb: (generators: IAvailableInstanceGeneratorsMsg) => void cb: (generators: IAvailableInstanceGeneratorsMsg) => void
): Promise<void> { ): Promise<void> {
await this.on("subscribe", "newInstanceGeneratorsAvailable", cb); await this.on("subscribe", "generators", cb);
} }
async onBonjour(cb: (msg: IDispatcherInfo) => void): Promise<void> { async onBonjour(cb: (msg: IDispatcherInfo) => void): Promise<void> {
await this.on("subscribe", "bonjour", cb); await this.on("subscribe", "bonjour", cb);
@ -601,13 +622,13 @@ export class AmqpLayer
async emitNewServicesAvailable( async emitNewServicesAvailable(
services: IAvailableServicesMsg services: IAvailableServicesMsg
): Promise<void> { ): Promise<void> {
await this.emit("publish", "newServicesAvailable", services); await this.emit("publish", "services", services);
} }
async onNewServicesAvailable( async onNewServicesAvailable(
cb: (services: IAvailableServicesMsg) => void cb: (services: IAvailableServicesMsg) => void
): Promise<void> { ): Promise<void> {
await this.on("subscribe", "newServicesAvailable", cb); await this.on("subscribe", "services", cb);
} }
public generateQueueOptions: (name: string) => QueuePublishOptions = ( public generateQueueOptions: (name: string) => QueuePublishOptions = (
@ -626,13 +647,13 @@ export class AmqpLayer
async onNewInstancesAvailable( async onNewInstancesAvailable(
cb: (instances: IAvailableInstancesMsg) => void cb: (instances: IAvailableInstancesMsg) => void
): Promise<void> { ): Promise<void> {
await this.on("subscribe", "newInstancesAvailable", cb); await this.on("subscribe", "instances", cb);
} }
async emitNewInstancesAvailable( async emitNewInstancesAvailable(
instances: IAvailableInstancesMsg instances: IAvailableInstancesMsg
): Promise<void> { ): Promise<void> {
await this.emit("publish", "newInstancesAvailable", instances); await this.emit("publish", "instances", instances);
} }
public generateSubscriptionOptions: (name: string) => SubscriptionOptions = ( public generateSubscriptionOptions: (name: string) => SubscriptionOptions = (
@ -641,7 +662,7 @@ export class AmqpLayer
return {}; return {};
}; };
public async dispose() { public async dispose(): Promise<void> {
// Dispose the Connection Flag. // Dispose the Connection Flag.
this.connected.dispose(); this.connected.dispose();
await this._socket.close(); await this._socket.close();

View File

@ -8,6 +8,7 @@
import * as Logger from "js-logger"; import * as Logger from "js-logger";
import { ILogger } from "js-logger"; import { ILogger } from "js-logger";
import { promisify } from "util";
import { generateId } from "../helpers/idMethods"; import { generateId } from "../helpers/idMethods";
import { RUNNINGINNODE } from "../helpers/runtimeMethods"; import { RUNNINGINNODE } from "../helpers/runtimeMethods";
import { getNopeLogger } from "../logger/getLogger"; import { getNopeLogger } from "../logger/getLogger";
@ -49,6 +50,8 @@ import {
} from "../types/nope/nopeObservable.interface"; } from "../types/nope/nopeObservable.interface";
import { INopePromise } from "../types/nope/nopePromise.interface"; import { INopePromise } from "../types/nope/nopePromise.interface";
const sleep = promisify(setTimeout);
/** /**
* A Dispatcher to perform a function on a Remote * A Dispatcher to perform a function on a Remote
* Dispatcher. Therefore a Task is created and forwarded * Dispatcher. Therefore a Task is created and forwarded
@ -952,11 +955,11 @@ export class nopeDispatcher implements INopeDispatcher {
this.communicator.connected.subscribe((connected) => { this.communicator.connected.subscribe((connected) => {
// Handle an unconnect. // Handle an unconnect.
if (connected) { if (connected) {
_this._logger.info("Sending Bonjour");
if (RUNNINGINNODE) { if (RUNNINGINNODE) {
_this.emitBonjour(); _this.emitBonjour();
} else { } else {
setTimeout(() => { setTimeout(() => {
console.log("HERE");
_this.emitBonjour(); _this.emitBonjour();
}, 2000); }, 2000);
} }
@ -967,7 +970,11 @@ export class nopeDispatcher implements INopeDispatcher {
this._logger.info("initialized"); this._logger.info("initialized");
} }
// await this.emitBonjour(); // We sleep 500 ms
await sleep(500);
await this.emitBonjour();
await sleep(500);
this.ready.setContent(true); this.ready.setContent(true);
} }