{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# NoPE - Connectivity Manager\n", "\n", "The NoPE-Dispatcher uses one `ConnectivityManager`. The manager observes the connection and remotly connected dispatchers (and their `ConnectivityManager`). The Manager detects newly connected dispatchers and disconnected dispatchers. Additionally, it sends a StatusMessage (in the form of `INopeStatusInfo`). This status message is interpreted as heartbeat. The `ConnectivityManager` checks those heartbeats with a defined interval. If a specific amount of time is ellapsed, the remote dispatcher is marked as `slow` -> `warning` -> `dead`. After an additional delay in the state `dead` the dispatcher is altough removed.\n", "\n", "## Master\n", "\n", "Defaultly a `ConnectivityManager` is elected as `master`. The master is defined as the `ConnectivityManager` with the highest `upTime`. \n", "\n", "> Alternativly a master can be forced.\n", "\n", "## Synchronizing time\n", "\n", "Because we asume, that **NoPE** is running on different computing nodes, we have to be able to synchronize the time between those elements. Therefore the `ConnectivityManager` is able to sync the time (by providing a `timestamp` and an additional `delay` that was needed to get to the call (for instance `ping / 2`))\n" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "// First lets install nope using npm\n", "const nope = require(\"../dist-nodejs/index.nodejs\")\n", "\n", "// Create a communicator:\n", "// We will use the event layer (which just runs internally)\n", "const communicator = nope.getLayer(\"event\");\n", "\n", "// Lets create our dispatcher\n", "\n", "// 1. Dispatcher simulates our local system\n", "const localDispatcher = nope.dispatcher.getDispatcher({\n", " communicator,\n", " id: \"local\"\n", "}, {\n", " singleton: false,\n", " useBaseServices: false\n", "});" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "> For Jupyter we need an extra async wrapper to wait for initalizing the dispatcher:\n", "\n", "see here for the details in Jupyter: https://n-riesco.github.io/ijavascript/doc/async.ipynb.html" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "true" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "$$.async();\n", "// Lets wait for our element to be ready.\n", "localDispatcher.ready.waitFor().then($$.done);" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we want to listen to newly connected dispatchers. For this purpose, we create an observer, which will listen to changes." ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "// Subscribe to changes\n", "const observer = localDispatcher.connectivityManager.dispatchers.onChange.subscribe(data => {\n", " // Log the changes\n", " console.log((new Date()).toISOString(),\"onChange - listener\");\n", " console.log(\"\\tadded =\", data.added);\n", " console.log(\"\\tremoved =\", data.removed);\n", "});" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Additionally we want to show the currently connected dispatchers. In this data the own dispatcher will **allways** be included:" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "connectedDispatchers = [ 'local' ]\n", "localDispatcherIncluded = true\n" ] } ], "source": [ "// Show our connected Dispatchers\n", "let connectedDispatchers = localDispatcher.connectivityManager.dispatchers.data.getContent();\n", "let localDispatcherIncluded = connectedDispatchers.includes(localDispatcher.id);\n", "\n", "// Now lets log our results.\n", "console.log(\"connectedDispatchers =\", connectedDispatchers);\n", "console.log(\"localDispatcherIncluded =\", localDispatcherIncluded);" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now that we have implemented our listeners and have seen the connected dispatchers (which is only the `\"local\"`-dispatchre), We will add an additional dispatcher. This should result in calling our `onChange`-listener. Additionally, we wait until our `remoteDispatcher` is initalized" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "2022-01-20T11:39:55.766Z onChange - listener\n", "\tadded = [ 'remote' ]\n", "\tremoved = []\n" ] } ], "source": [ "// 2. Dispatcher simulates our remote system\n", "const remoteDispatcher = nope.dispatcher.getDispatcher({\n", " communicator,\n", " id: \"remote\"\n", "}, {\n", " singleton: false,\n", " useBaseServices: false\n", "});\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we want to see, which system is the current master. This should be our `local`." ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "master = local\n" ] } ], "source": [ "// We expect to be the master, because the localDispatcher has been created first.\n", "console.log(\"master =\", localDispatcher.connectivityManager.master.id);" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can now force the remote dispatcher to be our master, by setting the master. (For this purpose we can later use a base service ==> then we just have to call the service) " ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "$$.async();\n", "\n", "remoteDispatcher.connectivityManager.isMaster = true;\n", "localDispatcher.connectivityManager.isMaster = false;\n", "\n", "// Our messaging is async ==> we wait an amount of time\n", "setTimeout(() => $$.done(),1000);" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "master = remote\n", "master-info = {\n", " id: 'remote',\n", " env: 'javascript',\n", " version: '1.0.0',\n", " isMaster: true,\n", " host: {\n", " cores: 8,\n", " cpu: {\n", " model: 'Intel(R) Core(TM) i7-8565U CPU',\n", " speed: 1992,\n", " usage: 0.0038778477944740875\n", " },\n", " os: 'win32',\n", " ram: { usedPerc: 0.362681220626356, free: 20676, total: 32442 },\n", " name: 'nz-078'\n", " },\n", " pid: 18068,\n", " timestamp: 1642678798813,\n", " upTime: 3049,\n", " status: 0\n", "}\n" ] } ], "source": [ "// We expect the master to be the remote.\n", "console.log(\"master =\", localDispatcher.connectivityManager.master.id);\n", "console.log(\"master-info =\", localDispatcher.connectivityManager.master);" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now lets see what happens if we adapt the heartbeat intervall of our *local* instance. We want to receive every 50 ms a heartbeat:" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "2022-01-20T11:40:01.089Z master-info = 0\n", "2022-01-20T11:40:01.789Z master-info = 1\n", "2022-01-20T11:40:02.536Z master-info = 2\n", "2022-01-20T11:40:03.543Z master-info = 3\n", "2022-01-20T11:40:03.977Z onChange - listener\n", "\tadded = []\n", "\tremoved = [ 'remote' ]\n", "2022-01-20T11:40:04.547Z onChange - listener\n", "\tadded = [ 'remote' ]\n", "\tremoved = []\n" ] } ], "source": [ "$$.async()\n", "\n", "const renderStatus = () => {\n", " console.log((new Date()).toISOString(),\"master-info =\", localDispatcher.connectivityManager.master.status)\n", "}\n", "\n", "setTimeout(renderStatus, 50);\n", "setTimeout(renderStatus, 750);\n", "setTimeout(renderStatus, 1500);\n", "setTimeout(renderStatus, 2500);\n", "\n", "\n", "localDispatcher.connectivityManager.setTimings({\n", " // our system will send every 50 ms an heartbeat.\n", " sendAliveInterval: 250,\n", " // we will check that after\n", " checkInterval: 125,\n", " // will mark dispatchers as slow after not receiving heartbeats for 50ms\n", " slow: 500,\n", " // we will mark dispatchers with a warning flag after 50 ms\n", " warn: 1000,\n", " // we mark it as dead after 0.5 s\n", " dead: 2000,\n", " // We will remove the dispatcher after 1 s\n", " remove: 3000,\n", "});\n", "\n", "remoteDispatcher.connectivityManager.setTimings({\n", " // our system will send every 50 ms an heartbeat.\n", " sendAliveInterval: 5000,\n", "});\n", "\n", "\n", "\n", "// We reset the timeouts.\n", "setTimeout(() => localDispatcher.connectivityManager.setTimings({}), 3000);\n", "setTimeout(() => remoteDispatcher.connectivityManager.setTimings({}), 3000);\n", "setTimeout(() => $$.done(), 5000);\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "JavaScript (Node.js)", "language": "javascript", "name": "javascript" }, "language_info": { "file_extension": ".js", "mimetype": "application/javascript", "name": "javascript", "version": "17.3.1" } }, "nbformat": 4, "nbformat_minor": 2 }