diff --git a/CHANGELOG.md b/CHANGELOG.md index f26f83e..405f441 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -65,4 +65,24 @@ Inital commit, which is working with the browser - dispatchers.ConnectivityManager.ConnectivityManager: - INopeConnectivityManager added "connectedSince" (which is expressed in the adapted Timestamp.) - Added: - - dispatchers.ConnectivityManager.ConnectivityManager.spec: Added Master - Test \ No newline at end of file + - dispatchers.ConnectivityManager.ConnectivityManager.spec: Added Master - Test + +# 1.0.35 +- Fixes: + - dispatchers.ConnectivityManager.ConnectivityManager: fixing isMaster. Now deals corecctly with multiple masters. +- Modified: + - cli.runNopeBackend: prevented io-server to be a master. + - dispatcher.getDispatcher: Adapted input to `options`. This includes all options + - dispatcher.core.NopeCore: Add flag Displising. This shows, if the dispatcher is getting disposed + - loader.getPackageLoader.browser: Adapted input to `options`. This includes all options + - loader.getPackageLoader.nodejs: Adapted input to `options`. This includes all options + - types.nope.ConnectivityManager.interface: + - INopeStatusInfo.isMasterForced: Flag if the master mode is forced + - INopeStatusInfo.isMaster: Flag if the node is a master. this could be forced or selected + - types.nope.nopeCore.interface: + - INopeCore.disposing: A Flag, that indicates, that the core is disposing. + - types.nope.nopeDispatcher.interface: + - INopeDispatcherOptions: Utilizes `INopeINopeConnectivityOptions` now. + - dispatchers.ConnectivityManager.ConnectivityManager.spec: + - Added test for forced masters. + - helpers.arrayMethods: Added Typings for `minOfArray` \ No newline at end of file diff --git a/contribute/VERSION b/contribute/VERSION index 64a736b..fff1fac 100644 --- a/contribute/VERSION +++ b/contribute/VERSION @@ -1 +1 @@ -1.0.34 \ No newline at end of file +1.0.35 \ No newline at end of file diff --git a/lib/cli/runNopeBackend.ts b/lib/cli/runNopeBackend.ts index f267a27..1dea072 100644 --- a/lib/cli/runNopeBackend.ts +++ b/lib/cli/runNopeBackend.ts @@ -410,6 +410,7 @@ export async function runNopeBackend( defaultSelector: args.defaultSelector, forceUsingSelectors: args.forceUsingSelectors, id: args.id, + isMaster: args.channel !== "io-server" ? null : false, }, { singleton: _args.singleton, diff --git a/lib/dispatcher/ConnectivityManager/ConnectivityManager.spec.ts b/lib/dispatcher/ConnectivityManager/ConnectivityManager.spec.ts index 1e6ec10..966e42b 100644 --- a/lib/dispatcher/ConnectivityManager/ConnectivityManager.spec.ts +++ b/lib/dispatcher/ConnectivityManager/ConnectivityManager.spec.ts @@ -232,5 +232,56 @@ describe("NopeConnectivityManager", function () { first.dispose(true); second.dispose(true); }); + + it("master - forced", async () => { + // Wait for the Handshake + await sleep(10); + + assert(first.isMaster, "First should be master"); + + first.isMaster = false; + + // Create the second Element. + const second = new NopeConnectivityManager( + { + communicator, + logger: false, + }, + () => new NopeObservable(), + "second" + ); + + // Wait for the Handshake + await sleep(10); + + assert(first.isMaster == false, "First should not be master"); + assert(second.isMaster == true, "Second should be master"); + assert( + first.master.id == second.id, + "First should recognize the second as master" + ); + assert( + second.master.id == second.id, + "Second should recognize the second as master" + ); + + // Wait for the Handshake + first.isMaster = null; + await sleep(40); + + assert(first.isMaster === true, "First should be master"); + assert(second.isMaster == false, "Second should not be master"); + assert( + first.master.id == first.id, + "First should recognize the first as master" + ); + assert( + second.master.id == first.id, + "Second should recognize the first as master" + ); + + first.dispose(true); + second.dispose(true); + }); }); }); diff --git a/lib/dispatcher/ConnectivityManager/ConnectivityManager.ts b/lib/dispatcher/ConnectivityManager/ConnectivityManager.ts index b2ce418..efa1aa6 100644 --- a/lib/dispatcher/ConnectivityManager/ConnectivityManager.ts +++ b/lib/dispatcher/ConnectivityManager/ConnectivityManager.ts @@ -7,7 +7,7 @@ */ import { ILogger } from "js-logger"; -import { avgOfArray, maxOfArray } from "../../helpers/arrayMethods"; +import { avgOfArray, minOfArray } from "../../helpers/arrayMethods"; import { generateId } from "../../helpers/idMethods"; import { MapBasedMergeData } from "../../helpers/mergedData"; import { RUNNINGINNODE } from "../../helpers/runtimeMethods"; @@ -124,6 +124,7 @@ export class NopeConnectivityManager implements INopeConnectivityManager { env: "javascript", version: "1.0.0", isMaster: this.isMaster, + isMasterForced: typeof this.__isMaster === "boolean", host: { cores: cpus.length, cpu: { @@ -154,6 +155,7 @@ export class NopeConnectivityManager implements INopeConnectivityManager { env: "javascript", version: "1.0.0", isMaster: this.isMaster, + isMasterForced: typeof this.__isMaster === "boolean", host: { cores: -1, cpu: { @@ -191,9 +193,11 @@ export class NopeConnectivityManager implements INopeConnectivityManager { public readonly id: string = null ) { this._communicator = options.communicator; - this._connectedSince = Date.now(); + this.__isMaster = + typeof options.isMaster === "boolean" ? options.isMaster : null; + if (id === null) { this.id = generateId(); } @@ -265,7 +269,7 @@ export class NopeConnectivityManager implements INopeConnectivityManager { * @type {boolean} * @memberof NopeConnectivityManager */ - protected __isMaster: boolean = null; + protected __isMaster: boolean; /** * see {@link INopeConnectivityManager.isMaster} @@ -273,7 +277,7 @@ export class NopeConnectivityManager implements INopeConnectivityManager { * @author M.Karkowski * @memberof NopeConnectivityManager */ - public set isMaster(value: boolean) { + public set isMaster(value: boolean | null) { this.__isMaster = value; // We want to forward our new status. this._sendStatus(); @@ -287,18 +291,34 @@ export class NopeConnectivityManager implements INopeConnectivityManager { * @memberof NopeConnectivityManager */ public get isMaster(): boolean { - if (this.__isMaster === null) { - const connectedSince = this._connectedSince; - for (const info of this.dispatchers.originalData.values()) { - if (info.id !== this.id && info.connectedSince < connectedSince) { - return false; - } + if (typeof this.__isMaster !== "boolean") { + try { + return this.id == this.master.id; + } catch (e) { + return false; } - return true; } return this.__isMaster; } + /** + * Helper, to extract the possible-masters + * @returns + */ + protected _getPossibleMasterCandidates() { + const possibleMasters: INopeStatusInfo[] = []; + + for (const info of this.dispatchers.originalData.values()) { + if (info.isMasterForced && !info.isMaster) { + continue; + } + + possibleMasters.push(info); + } + + return possibleMasters; + } + /** * see {@link INopeConnectivityManager.master} * @@ -308,19 +328,21 @@ export class NopeConnectivityManager implements INopeConnectivityManager { * @memberof NopeConnectivityManager */ public get master(): INopeStatusInfo { - const data = Array.from(this.dispatchers.originalData.values()); - - if (this.__isMaster === null) { - const idx = maxOfArray(data, "upTime").index; - return data[idx]; - } - - const masters = data.filter((item) => item.isMaster); + const candidates = this._getPossibleMasterCandidates(); + const masters = candidates.filter((item) => item.isMaster); if (masters.length === 0) { + const idx = minOfArray(candidates, "connectedSince").index; + if (idx !== -1) { + return candidates[idx]; + } + throw Error("No Master has been found !"); } else if (masters.length > 1) { - throw Error("Multiple Masters has been found!" + JSON.stringify(masters)); + throw Error( + "Multiple Masters has been found!" + + JSON.stringify(masters, undefined, 4) + ); } return masters[0]; @@ -364,6 +386,13 @@ export class NopeConnectivityManager implements INopeConnectivityManager { await this._communicator.on("StatusChanged", (info) => { _this._externalDispatchers.set(info.id, info); + + // If there is an update, we have to make shure, that our information + // is propageted correctly. + if (info.id !== _this.id) { + _this._externalDispatchers.set(_this.id, _this.info); + } + _this.dispatchers.update(); }); @@ -387,6 +416,8 @@ export class NopeConnectivityManager implements INopeConnectivityManager { _this.dispatchers.update(); }); + await this._sendStatus(); + if (this._logger) { this._logger.info("core.connectivity-manager", this.id, "initialized"); } @@ -490,10 +521,16 @@ export class NopeConnectivityManager implements INopeConnectivityManager { * @protected * @memberof NopeConnectivityManager */ - protected _sendStatus(): void { + protected async _sendStatus(): Promise { // Test if we are connected if (this._communicator.connected.getContent()) { - this._communicator.emit("StatusChanged", this.info); + try { + const info = this.info; + this._externalDispatchers.set(this.id, info); + await this._communicator.emit("StatusChanged", info); + } catch (e) { + this._logger.error("Failled to send the status"); + } } } diff --git a/lib/dispatcher/Core/NopeCore.ts b/lib/dispatcher/Core/NopeCore.ts index d1b700b..5842245 100644 --- a/lib/dispatcher/Core/NopeCore.ts +++ b/lib/dispatcher/Core/NopeCore.ts @@ -163,10 +163,14 @@ export class NopeCore implements INopeCore { this.instanceManager.ready.subscribe((_) => { this.ready.forcePublish(); }); + + this.disposing = false; } // See interface description public async dispose() { + this.disposing = true; + await this.ready.dispose(); await this.eventDistributor.dispose(); await this.dataDistributor.dispose(); @@ -174,4 +178,6 @@ export class NopeCore implements INopeCore { await this.rpcManager.dispose(); await this.instanceManager.dispose(); } + + public disposing: boolean; } diff --git a/lib/dispatcher/baseServices/connectivy.ts b/lib/dispatcher/baseServices/connectivy.ts index 89b447b..b5b77da 100644 --- a/lib/dispatcher/baseServices/connectivy.ts +++ b/lib/dispatcher/baseServices/connectivy.ts @@ -143,11 +143,21 @@ export async function enableTimeSyncing(dispatcher: INopeDispatcher) { }; dispatcher.connectivityManager.dispatchers.onChange.subscribe((changes) => { + if (dispatcher.disposing) { + return; + } + if (!dispatcher.connectivityManager.isMaster) { - manualSyncTime().catch((e) => { - logger.error("Failed synchronizing time"); - logger.error(e); - }); + manualSyncTime() + .catch((e) => { + logger.error("Failed synchronizing time"); + logger.error(e); + }) + .then((_) => { + logger.info( + `Synchronized time with master=${dispatcher.connectivityManager.master.id}` + ); + }); } }); @@ -179,6 +189,14 @@ export async function waitForDispatcher( return {}; } +/** + * A Helper to create a Service to manually define a master. + * + * @author M.Karkowski + * @export + * @param {INopeDispatcher} dispatcher The Dispatcher to use. + * @return {*} + */ export async function generateDefineMaster(dispatcher: INopeDispatcher) { /** * Create a Ping service. diff --git a/lib/dispatcher/baseServices/data.ts b/lib/dispatcher/baseServices/data.ts index 14311ee..929d7be 100644 --- a/lib/dispatcher/baseServices/data.ts +++ b/lib/dispatcher/baseServices/data.ts @@ -1,14 +1,15 @@ /** * @author Martin Karkowski * @email m.karkowski@zema.de - * @create date 2022-01-14 20:29:13 - * @modify date 2022-01-14 20:36:36 * @desc [description] */ import { sleep } from "../../helpers/async"; +import { getNopeLogger } from "../../logger/getLogger"; import { INopeDispatcher } from "../../types/nope"; +const logger = getNopeLogger("baseService"); + /** * Generate and registers a ping service. * @@ -21,27 +22,39 @@ export async function enablingSyncingData(dispatcher: INopeDispatcher) { // Registers the Ping Method at the Dispatcher. await dispatcher.connectivityManager.dispatchers.onChange.subscribe( async (eventData) => { - // If there is added Data - if (eventData.added.length > 0) { - // And if we are the master module - // we will emit the new data. - if (dispatcher.connectivityManager.isMaster) { - // But before, wait for shure. - await sleep(0); + // If the Dispatcher is disposing we do not consider that. + if (dispatcher.disposing) { + return; + } - // Get the Data. - const data = dispatcher.dataDistributor.pullData("", {}); + try { + // If there is added Data + if (eventData.added.length > 0) { + // And if we are the master module + // we will emit the new data. + // Alternativ: dispatcher.id == dispatcher.connectivityManager.master.id + if (dispatcher.connectivityManager.isMaster) { + // But before, wait for shure. + await sleep(0); - // Emit the Data. - dispatcher.communicator.emit("DataChanged", { - args: [], - data: data, - forced: false, - path: "", - sender: dispatcher.id, - timestamp: dispatcher.connectivityManager.now, - }); + // Get the Data. + const data = dispatcher.dataDistributor.pullData("", {}); + + // Emit the Data. + dispatcher.communicator.emit("DataChanged", { + args: [], + data: data, + forced: false, + path: "", + sender: dispatcher.id, + timestamp: dispatcher.connectivityManager.now, + }); + + logger.info(`Send data to synchronized data. Acting as master`); + } } + } catch (e) { + logger.error("Failed to send an update."); } } ); diff --git a/lib/helpers/arrayMethods.ts b/lib/helpers/arrayMethods.ts index 6038de2..cc2d544 100644 --- a/lib/helpers/arrayMethods.ts +++ b/lib/helpers/arrayMethods.ts @@ -234,9 +234,9 @@ export function avgOfArray(arr: any[], path: string, defaultValue = 0): number { return added / arr.length; } -export function minOfArray( - arr: any[], - path: string, +export function minOfArray( + arr: T[], + path: keyof T | string, defaultValue = 0 ): { min: number; @@ -250,7 +250,7 @@ export function minOfArray( } const arrOfValues = arr.map((item) => - rgetattr(item, path, defaultValue) + rgetattr(item, path as string, defaultValue) ) as number[]; const min = Math.min(...arrOfValues); return { diff --git a/lib/types/nope/nopeConnectivityManager.interface.ts b/lib/types/nope/nopeConnectivityManager.interface.ts index 041818e..86f56ee 100644 --- a/lib/types/nope/nopeConnectivityManager.interface.ts +++ b/lib/types/nope/nopeConnectivityManager.interface.ts @@ -98,6 +98,14 @@ export interface INopeStatusInfo { * @memberof INopeStatusInfo */ isMaster: boolean; + /** + * Status, whether master-status is forced or not. + * + * @author M.Karkowski + * @type {boolean} + * @memberof INopeStatusInfo + */ + isMasterForced: boolean; /** * The Environment, in which the Dispatcher is running * In nodejs it should be "nodejs". @@ -208,6 +216,11 @@ export type INopeINopeConnectivityOptions = { * @type {INopeINopeConnectivityTimeOptions} */ timeouts?: Partial; + + /** + * Flag to force the Master. If set to null "default" -> the auto selection will be used. + */ + isMaster?: boolean; }; export interface INopeConnectivityManager { diff --git a/lib/types/nope/nopeCore.interface.ts b/lib/types/nope/nopeCore.interface.ts index 840043c..9307657 100644 --- a/lib/types/nope/nopeCore.interface.ts +++ b/lib/types/nope/nopeCore.interface.ts @@ -94,4 +94,13 @@ export interface INopeCore { * @memberof INopeCore */ readonly instanceManager: INopeInstanceManager; + + /** + * A Flag, that indicates, that the core is disposing. + * + * @author M.Karkowski + * @type {boolean} + * @memberof INopeCore + */ + disposing: boolean; } diff --git a/lib/types/nope/nopeDispatcher.interface.ts b/lib/types/nope/nopeDispatcher.interface.ts index fbeaabd..2d52dab 100644 --- a/lib/types/nope/nopeDispatcher.interface.ts +++ b/lib/types/nope/nopeDispatcher.interface.ts @@ -6,11 +6,9 @@ * @desc [description] */ import { IFunctionOptions } from "."; -import { ValidLoggerDefinition } from "../../logger/getLogger"; -import { ICommunicationBridge } from "./nopeCommunication.interface"; import { IHost, - INopeINopeConnectivityTimeOptions, + INopeINopeConnectivityOptions, INopeStatusInfo, } from "./nopeConnectivityManager.interface"; import { INopeCore } from "./nopeCore.interface"; @@ -38,14 +36,6 @@ export type IGenerateRemoteInstanceForOtherDispatcherCallback< export type IValidPromise = Promise | INopePromise; export type INopeDispatcherOptions = { - /** - * The Communicator to use. - * - * @author M.Karkowski - * @type {ICommunicationBridge} - */ - communicator: ICommunicationBridge; - /** * The Id of the Dispatcher * @@ -54,23 +44,6 @@ export type INopeDispatcherOptions = { */ id?: string; - /** - * A Specific logger which should be used. - * - * @author M.Karkowski - * @type {ILogger} - */ - logger?: ValidLoggerDefinition; - - /** - * Timeout Definitions. These are relevant, to determine - * alive, slow, dead, ... dispatchers. - * - * @author M.Karkowski - * @type {INopeINopeConnectivityTimeOptions} - */ - timeouts?: Partial; - /** * The default-selector to select the service providers * @@ -87,7 +60,7 @@ export type INopeDispatcherOptions = { * @type {boolean} */ forceUsingSelectors?: boolean; -}; +} & INopeINopeConnectivityOptions; export interface IHostOverview extends IHost { dispatchers: {