Adding open TSDB

This commit is contained in:
Martin Karkowski 2021-05-13 13:26:55 +02:00
parent 995b15e942
commit 7813d6feae
5 changed files with 451 additions and 0 deletions

16
modules/logger/README.md Normal file
View File

@ -0,0 +1,16 @@
# Setup
## Run OpenTSDB
Install docker and use the terminal to run an instance with `docker run -d -p 4242:4242 petergrace/opentsdb-docker`
## Run Grafana
Install docker and use the terminal to run an instance with `docker run -d -p 3000:3000 grafana/grafana`
Now you should be able to use the http://localhost:3000 to checkout grafana.
### login to Grafana
| user | password |
| ------- | -------- |
| `admin` | `admin` |

View File

@ -0,0 +1,59 @@
/**
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2021-05-13 11:26:07
* @modify date 2021-05-13 11:26:07
* @desc [description]
*/
import { IPackageDescription } from "../../../lib/types/nope/nopePackage.interface";
import { NopeMetricLogger } from "./nopeMetrics.module";
import { OpenTSDBLogger } from "./openTSDB.module";
const TYPES = {
nopeMetricLogger: Symbol.for("NopeMetricLogger"),
openTSDBLogger: Symbol.for("openTSDBLogger")
};
export const DESCRIPTION: IPackageDescription<typeof TYPES> = {
activationHandlers: [],
autostart: {},
defaultInstances: [
{
options: {
identifier: "nope-metric-logger",
params: ["http://localhost:4242"],
type: NopeMetricLogger.prototype.constructor.name.toString()
},
selector: NopeMetricLogger.prototype.constructor.name.toString()
}
],
nameOfPackage: "data-logger-package",
providedClasses: [
{
description: {
name: NopeMetricLogger.prototype.constructor.name.toString(),
selector: TYPES.nopeMetricLogger,
type: NopeMetricLogger
},
settings: {
allowInstanceGeneration: true
}
},
{
description: {
name: OpenTSDBLogger.prototype.constructor.name.toString(),
selector: TYPES.openTSDBLogger,
type: OpenTSDBLogger
},
settings: {
allowInstanceGeneration: true
}
}
],
providedFunctions: [],
requiredPackages: [],
types: TYPES
};
export default DESCRIPTION;

View File

@ -0,0 +1,64 @@
/**
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2021-05-13 11:15:47
* @modify date 2021-05-13 11:15:47
* @desc [description]
*/
import { injectable } from "inversify";
import { hostname } from "os";
import { OpenTSDBLogger } from "./openTSDB.module";
@injectable()
export class NopeMetricLogger extends OpenTSDBLogger {
public async init(uri: string): Promise<void> {
await super.init(uri);
const _this = this;
const tags = {
observer: hostname()
};
// We Observe the Number of Dispatchers and Hosts,
// Subscribed and Published Events, Instances
// and generators (<= not implemented jet)
this._dispatcher.externalDispatchers.subscribe((overview) => {
_this.storeMetric("nope.dispatchers", overview.length, tags);
const hosts = new Set<string>();
overview.map((item) => hosts.add(item.host.name));
_this.storeMetric("nope.computing.hosts", hosts.size, tags);
let maxResponse = Number.MIN_SAFE_INTEGER;
let minResponse = Number.MAX_SAFE_INTEGER;
let avgResponse = 0;
let dispatchers = 0;
const currentTime = Date.now();
for (const dispatcher of overview) {
if (dispatcher.id !== _this._dispatcher.id) {
const delta = currentTime - dispatcher.timestamp;
maxResponse = Math.max(delta, maxResponse);
minResponse = Math.min(delta, minResponse);
avgResponse += delta;
dispatchers += 1;
}
}
if (dispatchers > 0) {
_this.storeMetric("nope.timings.min", minResponse, tags);
_this.storeMetric("nope.timings.max", maxResponse, tags);
_this.storeMetric("nope.timings.avg", avgResponse / dispatchers, tags);
}
});
this._dispatcher.subscribedEvents.subscribe((overview) => {
_this.storeMetric("nope.events.subscribed", overview.length, tags);
});
this._dispatcher.publishedEvents.subscribe((overview) => {
_this.storeMetric("nope.events.published", overview.length, tags);
});
this._dispatcher.availableInstances.subscribe((overview) => {
_this.storeMetric("nope.instances", overview.length, tags);
});
}
}

View File

@ -0,0 +1,276 @@
/**
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2018-10-22 11:14:01
* @modify date 2018-10-29 11:37:45
* @desc [description]
*/
import { injectable } from "inversify";
import { Socket } from "net";
import { getNopeLogger } from "../../../lib/logger/getLogger";
import { InjectableNopeBaseModule } from "../../../lib/module/BaseModule.injectable";
import { NopeObservable } from "../../../lib/observables/nopeObservable";
import { INopeObserver } from "../../../lib/types/nope/nopeObservable.interface";
import {
IOpenTSDBLogger,
ITSDBConfiguration,
ITSDBData
} from "../type/interfaces";
@injectable()
export class OpenTSDBLogger
extends InjectableNopeBaseModule
implements IOpenTSDBLogger
{
private _client: Socket;
private _logger = getNopeLogger("opentsdb", "debug");
public uri = new NopeObservable<string>();
public connected = new NopeObservable<boolean>();
public bufferSize = new NopeObservable<number>();
private _buffer = new Array<ITSDBData>();
private _observers = new Array<INopeObserver>();
public async init(
uri: string,
config: { [index: string]: ITSDBConfiguration } = {},
bufferSize = 0
): Promise<void> {
const _this = this;
// During the initialization,
// we have to perform several steps.
// 1. Create a Client, which will connect to the DB.
this._client = new Socket();
this._client.on("close", () => _this.connected.setContent(false));
this._client.on("connect", () => _this.connected.setContent(true));
this._client.on("data", (err) => {
_this._logger.error(err, "write error");
});
this._client.on("error", (err) => {
_this._logger.error(err, "connection error");
});
// Connect Client
try {
const split = uri.split(":");
// Extract the Port
const port = parseFloat(split.pop() as string);
// Extract Hostname
const host = split.join(":");
this._client.connect(port, host);
// Store the URI
this.uri.setContent(uri);
} catch (e) {
this._logger.error(e, "cant connect to " + uri);
}
// Now we wait until we are connected.
await this.connected.waitFor((value) => value === true);
this.bufferSize.setter = (value) => {
return {
data: Math.max(0, Math.min(5000, value)),
valid: true
};
};
this.bufferSize.setContent(bufferSize);
// Now we will read the configuartion
// and subscribe the elements as described
// in the Configuration.
for (const eventName in config) {
this._observers.push(
await this._dispatcher.subscribeToEvent(
eventName,
(value, sender, timestamp) => {
if (typeof value === "number") {
_this.write({
metric: config[eventName].metric,
value,
timestamp,
tags: config[eventName].tags
});
}
},
{}
)
);
}
}
/**
* Helper Function, which only will store a value assigned to a specific
* metric.
*
* @param {string} name The Name of the Metric
* @param {number} value Value to Store. The Value must be a number
* @return {*} {Promise<void>}
* @memberof OpenTSDBWriter
*/
public async storeMetric(
name: string,
value: number,
tags: { [index: string]: string } = {}
): Promise<void> {
return this.write({
metric: name,
value,
timestamp: Date.now(),
tags
});
}
/**
* Internal Function, which will store the Data in the Database.
*
* @private
* @param {Array<ITSDBData>} data The Data to Store
* @return {*} {Promise<void>}
* @memberof OpenTSDBWriter
*/
private async _storeInDB(data: Array<ITSDBData>): Promise<void> {
if (!this.connected.getContent()) {
await this.connected.waitFor((value) => value == true);
}
let dataToSend = "";
/** Build up the command */
for (const element of data) {
if (typeof element.value === "number") {
dataToSend += this._convert(element);
} else {
this._logger.error("trying to send a non number value");
}
}
const _this = this;
// use a Promise to track the task,
return new Promise((resolve, reject) => {
// Write the data to the client
_this._client.write(dataToSend, (err) => {
if (err) {
_this._logger.error(err);
reject(err);
} else {
resolve();
}
});
});
}
/**
* Before we write stuff to the Database => we have to make shure,
* that we fill our buffer.
*
* @private
* @param {(Array<ITSDBData> | ITSDBData)} data
* @memberof OpenTSDBWriter
*/
private async _writeToBuffer(data: Array<ITSDBData> | ITSDBData) {
// Make shure we are working with arrays.
if (!Array.isArray(data)) {
data = [data];
}
if (this.bufferSize.getContent() > 0) {
this._buffer.push(...data);
// Check if the Buffer is now full:
if (this.bufferSize.getContent() > this._buffer.length) {
// If so, we extract a slice and send this slice
// to the database
const dataToStore = this._buffer.splice(
0,
this.bufferSize.getContent()
);
await this._storeInDB(dataToStore);
}
} else {
// We send the data on its own
await this._storeInDB(data);
}
}
/**
* Function to Write Data to the DA3VID System.
*
*
* @param {(IDa3vidData[] | IDa3vidData)} data The Data to Send.
* @memberof DavidRestPoster
*/
public async write(data: Array<ITSDBData> | ITSDBData): Promise<void> {
return await this._writeToBuffer(data);
}
/**
* Internal Helper to create the command to store data into the system
*
* @private
* @param {string} metric
* @param {number} value
* @param {number} timestamp
* @param {{[index:string]: string}} tags
* @returns {string}
* @memberof OpenTSDBRestWriter
*/
private _convert(element: {
metric: string;
value: number;
timestamp: number;
tags: { [index: string]: string };
}): string {
/** put sys.cpu.user 1356998400 42.5 host=webserver01 cpu=0 */
let _ret =
"put " +
element.metric +
" " +
element.timestamp.toString() +
" " +
element.value.toExponential();
/** Add all Tags */
for (const tag of Object.getOwnPropertyNames(element.tags)) {
_ret += " " + tag + "=" + element.tags[tag];
}
_ret += "\n";
return _ret;
}
/**
* If the Client is getting shut down
* the Buffer will be stored and then
* the client will be closed.
*
* @memberof OpenTSDBWriter
*/
public async dispose(): Promise<void> {
for (const observer of this._observers) {
observer.unsubscribe();
}
// Store the buffer
await this._storeInDB(this._buffer);
// Now end the connection
const _this = this;
const closePromise = new Promise<void>((resolve, reject) => {
_this._client.end((err) => {
if (err) reject(err);
else resolve();
});
});
await closePromise;
// And finially dispose the Module.
await super.dispose();
}
}

View File

@ -0,0 +1,36 @@
/**
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2018-05-22 11:13:05
* @modify date 2018-05-22 11:13:05
* @desc [description]
*/
import { INopeObservable } from "../../../lib/types/nope/nopeObservable.interface";
export interface IOpenTSDBLogger {
write(data: Array<ITSDBData> | ITSDBData): Promise<void>;
uri: INopeObservable<string>;
}
/**
* Interface describing the used Data
*
* @export
* @interface ITSDBData
*/
export interface ITSDBData extends ITSDBConfiguration {
value: number;
timestamp: number;
}
export interface ITSDBConfiguration {
/**
* Element containing the Tags for a Converter
*
* @type {{[index:string]: any}}
* @memberof ITSDBConfiguration
*/
tags: { [index: string]: string };
metric: string;
}