Implementing ROS-Bridge
This commit is contained in:
parent
7813d6feae
commit
9c92f1f1ca
160
modules/logger/src/abstract.data.logger.module.ts
Normal file
160
modules/logger/src/abstract.data.logger.module.ts
Normal file
@ -0,0 +1,160 @@
|
||||
/**
|
||||
* @author Martin Karkowski
|
||||
* @email m.karkowski@zema.de
|
||||
* @create date 2021-05-14 08:05:37
|
||||
* @modify date 2021-05-14 08:05:37
|
||||
* @desc [description]
|
||||
*/
|
||||
|
||||
import { injectable } from "inversify";
|
||||
import { getNopeLogger } from "../../../lib/logger/getLogger";
|
||||
import { InjectableNopeBaseModule } from "../../../lib/module/BaseModule.injectable";
|
||||
import { NopeObservable } from "../../../lib/observables/nopeObservable";
|
||||
import { INopeObserver } from "../../../lib/types/nope/nopeObservable.interface";
|
||||
|
||||
@injectable()
|
||||
export class AbstractDataLogger<C, V> extends InjectableNopeBaseModule {
|
||||
protected _logger = getNopeLogger("data-logger", "debug");
|
||||
protected _buffer = new Array<V>();
|
||||
protected _observers = new Array<INopeObserver>();
|
||||
protected _config: { [index: string]: C } = {};
|
||||
|
||||
public bufferSize = new NopeObservable<number>();
|
||||
|
||||
public async init(
|
||||
bufferSize = 0,
|
||||
config: { [index: string]: C } = {}
|
||||
): Promise<void> {
|
||||
const _this = this;
|
||||
|
||||
this.bufferSize.setter = (value) => {
|
||||
return {
|
||||
data: Math.max(0, Math.min(5000, value)),
|
||||
valid: true
|
||||
};
|
||||
};
|
||||
this.bufferSize.setContent(bufferSize);
|
||||
|
||||
// Now we will read the configuartion
|
||||
// and subscribe the elements as described
|
||||
// in the Configuration.
|
||||
for (const eventName in config) {
|
||||
this._observers.push(
|
||||
await this._dispatcher.subscribeToEvent(
|
||||
eventName,
|
||||
(value, sender, timestamp) => {
|
||||
_this._internallyStoreData(eventName, value, timestamp);
|
||||
},
|
||||
{}
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal Function, which must be overwritten, to store the data of the initally defined
|
||||
* events.
|
||||
*
|
||||
* @protected
|
||||
* @param {string} eventName Name of the Event
|
||||
* @param {*} value the Value to store
|
||||
* @param {number} timestamp The Timestamp.
|
||||
* @memberof AbstractDataLogger
|
||||
*/
|
||||
protected _internallyStoreData(
|
||||
eventName: string,
|
||||
value: any,
|
||||
timestamp: number
|
||||
) {
|
||||
throw Error("Not Implemented");
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper Function, which only will store a value assigned to a specific
|
||||
* name.
|
||||
*
|
||||
* @param {string} name The Name of the Metric
|
||||
* @param {number} value Value to Store. The Value must be a number
|
||||
* @param {*} args Additional Arguments
|
||||
* @return {*} {Promise<void>}
|
||||
* @memberof AbstractDataLogger
|
||||
*/
|
||||
public async storeMetric(name: string, value: any, ...args): Promise<void> {
|
||||
throw Error("Not Implemented");
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal Function, which will store the Data in the Database.
|
||||
*
|
||||
* @private
|
||||
* @param {Array<ITSDBData>} data The Data to Store
|
||||
* @return {*} {Promise<void>}
|
||||
* @memberof OpenTSDBWriter
|
||||
*/
|
||||
private async _storeValue(data: Array<V>): Promise<void> {
|
||||
throw Error("Not Implemented");
|
||||
}
|
||||
|
||||
/**
|
||||
* Before we write stuff to the Database => we have to make shure,
|
||||
* that we fill our buffer.
|
||||
*
|
||||
* @private
|
||||
* @param {(Array<V> | V)} data The Data to Store.
|
||||
* @memberof OpenTSDBWriter
|
||||
*/
|
||||
private async _writeToBuffer(data: Array<V> | V) {
|
||||
// Make shure we are working with arrays.
|
||||
if (!Array.isArray(data)) {
|
||||
data = [data];
|
||||
}
|
||||
|
||||
if (this.bufferSize.getContent() > 0) {
|
||||
this._buffer.push(...data);
|
||||
|
||||
// Check if the Buffer is now full:
|
||||
if (this.bufferSize.getContent() > this._buffer.length) {
|
||||
// If so, we extract a slice and send this slice
|
||||
// to the database
|
||||
const dataToStore = this._buffer.splice(
|
||||
0,
|
||||
this.bufferSize.getContent()
|
||||
);
|
||||
|
||||
await this._storeValue(dataToStore);
|
||||
}
|
||||
} else {
|
||||
// We send the data on its own
|
||||
await this._storeValue(data);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Function to Write Data to Logger
|
||||
*
|
||||
* @param {(V[] | V)} data The Data to Log.
|
||||
* @memberof DavidRestPoster
|
||||
*/
|
||||
public async write(data: Array<V> | V): Promise<void> {
|
||||
return await this._writeToBuffer(data);
|
||||
}
|
||||
|
||||
/**
|
||||
* If the Client is getting shut down
|
||||
* the Buffer will be stored and then
|
||||
* the client will be closed.
|
||||
*
|
||||
* @memberof OpenTSDBWriter
|
||||
*/
|
||||
public async dispose(): Promise<void> {
|
||||
for (const observer of this._observers) {
|
||||
observer.unsubscribe();
|
||||
}
|
||||
|
||||
// Store the buffer
|
||||
await this._storeValue(this._buffer);
|
||||
|
||||
// And finially dispose the Module.
|
||||
await super.dispose();
|
||||
}
|
||||
}
|
@ -2,7 +2,7 @@
|
||||
* @author Martin Karkowski
|
||||
* @email m.karkowski@zema.de
|
||||
* @create date 2021-05-13 11:26:07
|
||||
* @modify date 2021-05-13 11:26:07
|
||||
* @modify date 2021-05-14 08:54:49
|
||||
* @desc [description]
|
||||
*/
|
||||
|
||||
@ -22,7 +22,7 @@ export const DESCRIPTION: IPackageDescription<typeof TYPES> = {
|
||||
{
|
||||
options: {
|
||||
identifier: "nope-metric-logger",
|
||||
params: ["http://localhost:4242"],
|
||||
params: [],
|
||||
type: NopeMetricLogger.prototype.constructor.name.toString()
|
||||
},
|
||||
selector: NopeMetricLogger.prototype.constructor.name.toString()
|
||||
|
@ -2,7 +2,7 @@
|
||||
* @author Martin Karkowski
|
||||
* @email m.karkowski@zema.de
|
||||
* @create date 2021-05-13 11:15:47
|
||||
* @modify date 2021-05-13 11:15:47
|
||||
* @modify date 2021-05-14 08:53:06
|
||||
* @desc [description]
|
||||
*/
|
||||
|
||||
@ -12,17 +12,56 @@ import { OpenTSDBLogger } from "./openTSDB.module";
|
||||
|
||||
@injectable()
|
||||
export class NopeMetricLogger extends OpenTSDBLogger {
|
||||
public async init(uri: string): Promise<void> {
|
||||
await super.init(uri);
|
||||
|
||||
protected _interval;
|
||||
|
||||
public async init(uri = "localhost:4242", intervalSize = 100): Promise<void> {
|
||||
await super.init(100,{},uri);
|
||||
|
||||
const _this = this;
|
||||
const tags = {
|
||||
observer: hostname()
|
||||
};
|
||||
|
||||
let maxJitter = Number.MIN_SAFE_INTEGER;
|
||||
let minJitter = Number.MAX_SAFE_INTEGER;
|
||||
let avgJitter = 0;
|
||||
let jitterCallbacks = 0;
|
||||
|
||||
// Define an interval to reset the Jitter
|
||||
this._interval = setInterval(() => {
|
||||
|
||||
// Update the Values in the DB
|
||||
if (jitterCallbacks > 0){
|
||||
_this.storeMetric("nope.jitter.min", maxJitter, tags);
|
||||
_this.storeMetric("nope.jitter.max", minJitter, tags);
|
||||
_this.storeMetric("nope.jitter.avg", avgJitter / jitterCallbacks, tags);
|
||||
}
|
||||
|
||||
// Reset the Values.
|
||||
maxJitter = Number.MIN_SAFE_INTEGER;
|
||||
minJitter = Number.MAX_SAFE_INTEGER;
|
||||
avgJitter = 0;
|
||||
jitterCallbacks= 0;
|
||||
|
||||
}, intervalSize);
|
||||
|
||||
// With this Function, will will determine the Jitter
|
||||
this._dispatcher.communicator.onStatusUpdate((status)=> {
|
||||
const delta = Date.now() - status.timestamp;
|
||||
|
||||
maxJitter = Math.max(delta, maxJitter);
|
||||
minJitter = Math.min(delta, minJitter);
|
||||
avgJitter += delta;
|
||||
|
||||
jitterCallbacks +=1;
|
||||
});
|
||||
|
||||
// We Observe the Number of Dispatchers and Hosts,
|
||||
// Subscribed and Published Events, Instances
|
||||
// and generators (<= not implemented jet)
|
||||
this._observers.push(
|
||||
|
||||
this._dispatcher.externalDispatchers.subscribe((overview) => {
|
||||
_this.storeMetric("nope.dispatchers", overview.length, tags);
|
||||
const hosts = new Set<string>();
|
||||
@ -32,7 +71,10 @@ export class NopeMetricLogger extends OpenTSDBLogger {
|
||||
let maxResponse = Number.MIN_SAFE_INTEGER;
|
||||
let minResponse = Number.MAX_SAFE_INTEGER;
|
||||
let avgResponse = 0;
|
||||
let dispatchers = 0;
|
||||
let checkedDispatchers = 0;
|
||||
|
||||
// Determine the Current Time. This will be used, to
|
||||
// Determine the Delay of th
|
||||
const currentTime = Date.now();
|
||||
|
||||
for (const dispatcher of overview) {
|
||||
@ -41,24 +83,37 @@ export class NopeMetricLogger extends OpenTSDBLogger {
|
||||
maxResponse = Math.max(delta, maxResponse);
|
||||
minResponse = Math.min(delta, minResponse);
|
||||
avgResponse += delta;
|
||||
dispatchers += 1;
|
||||
checkedDispatchers += 1;
|
||||
}
|
||||
}
|
||||
|
||||
if (dispatchers > 0) {
|
||||
if (checkedDispatchers > 0) {
|
||||
_this.storeMetric("nope.timings.min", minResponse, tags);
|
||||
_this.storeMetric("nope.timings.max", maxResponse, tags);
|
||||
_this.storeMetric("nope.timings.avg", avgResponse / dispatchers, tags);
|
||||
_this.storeMetric("nope.timings.avg", avgResponse / checkedDispatchers, tags);
|
||||
}
|
||||
});
|
||||
}),
|
||||
this._dispatcher.subscribedEvents.subscribe((overview) => {
|
||||
_this.storeMetric("nope.events.subscribed", overview.length, tags);
|
||||
});
|
||||
}),
|
||||
this._dispatcher.publishedEvents.subscribe((overview) => {
|
||||
_this.storeMetric("nope.events.published", overview.length, tags);
|
||||
});
|
||||
}),
|
||||
this._dispatcher.availableInstances.subscribe((overview) => {
|
||||
_this.storeMetric("nope.instances", overview.length, tags);
|
||||
});
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Dispose Function.
|
||||
*/
|
||||
public async dispose(): Promise<void>{
|
||||
// Delete the Timer.
|
||||
clearInterval(this._interval);
|
||||
|
||||
// Remove the Subscription.
|
||||
await super.dispose();
|
||||
}
|
||||
}
|
||||
|
@ -1,40 +1,35 @@
|
||||
/**
|
||||
* @author Martin Karkowski
|
||||
* @email m.karkowski@zema.de
|
||||
* @create date 2018-10-22 11:14:01
|
||||
* @modify date 2018-10-29 11:37:45
|
||||
* @create date 2021-05-14 08:53:16
|
||||
* @modify date 2021-05-14 08:53:21
|
||||
* @desc [description]
|
||||
*/
|
||||
|
||||
import { injectable } from "inversify";
|
||||
import { Socket } from "net";
|
||||
import { getNopeLogger } from "../../../lib/logger/getLogger";
|
||||
import { InjectableNopeBaseModule } from "../../../lib/module/BaseModule.injectable";
|
||||
import { NopeObservable } from "../../../lib/observables/nopeObservable";
|
||||
import { INopeObserver } from "../../../lib/types/nope/nopeObservable.interface";
|
||||
import {
|
||||
IOpenTSDBLogger,
|
||||
ITSDBConfiguration,
|
||||
ITSDBData
|
||||
} from "../type/interfaces";
|
||||
import { AbstractDataLogger } from "./abstract.data.logger.module";
|
||||
|
||||
@injectable()
|
||||
export class OpenTSDBLogger
|
||||
extends InjectableNopeBaseModule
|
||||
extends AbstractDataLogger<ITSDBConfiguration, ITSDBData>
|
||||
implements IOpenTSDBLogger
|
||||
{
|
||||
private _client: Socket;
|
||||
private _logger = getNopeLogger("opentsdb", "debug");
|
||||
|
||||
public uri = new NopeObservable<string>();
|
||||
public connected = new NopeObservable<boolean>();
|
||||
public bufferSize = new NopeObservable<number>();
|
||||
private _buffer = new Array<ITSDBData>();
|
||||
private _observers = new Array<INopeObserver>();
|
||||
|
||||
public async init(
|
||||
uri: string,
|
||||
bufferSize = 0,
|
||||
config: { [index: string]: ITSDBConfiguration } = {},
|
||||
bufferSize = 0
|
||||
uri = "localhost:4242"
|
||||
): Promise<void> {
|
||||
const _this = this;
|
||||
|
||||
@ -71,35 +66,32 @@ export class OpenTSDBLogger
|
||||
// Now we wait until we are connected.
|
||||
await this.connected.waitFor((value) => value === true);
|
||||
|
||||
this.bufferSize.setter = (value) => {
|
||||
return {
|
||||
data: Math.max(0, Math.min(5000, value)),
|
||||
valid: true
|
||||
};
|
||||
};
|
||||
|
||||
this.bufferSize.setContent(bufferSize);
|
||||
|
||||
// Now we will read the configuartion
|
||||
// and subscribe the elements as described
|
||||
// in the Configuration.
|
||||
for (const eventName in config) {
|
||||
this._observers.push(
|
||||
await this._dispatcher.subscribeToEvent(
|
||||
eventName,
|
||||
(value, sender, timestamp) => {
|
||||
if (typeof value === "number") {
|
||||
_this.write({
|
||||
metric: config[eventName].metric,
|
||||
value,
|
||||
timestamp,
|
||||
tags: config[eventName].tags
|
||||
});
|
||||
await super.init(bufferSize, config);
|
||||
}
|
||||
},
|
||||
{}
|
||||
)
|
||||
);
|
||||
|
||||
/**
|
||||
* Internal Function, to store the data.
|
||||
*
|
||||
* @protected
|
||||
* @param {string} eventName
|
||||
* @param {*} value
|
||||
* @param {number} timestamp
|
||||
* @memberof OpenTSDBLogger
|
||||
*/
|
||||
protected _internallyStoreData(
|
||||
eventName: string,
|
||||
value: any,
|
||||
timestamp: number
|
||||
) {
|
||||
if (typeof value === "number") {
|
||||
this._writeToBuffer({
|
||||
metric: this._config[eventName].metric,
|
||||
tags: this._config[eventName].tags,
|
||||
timestamp,
|
||||
value
|
||||
});
|
||||
} else {
|
||||
this._logger.warn("Unable to store the Value. OpenTSDB needs a number");
|
||||
}
|
||||
}
|
||||
|
||||
@ -226,9 +218,11 @@ export class OpenTSDBLogger
|
||||
timestamp: number;
|
||||
tags: { [index: string]: string };
|
||||
}): string {
|
||||
let _ret ="";
|
||||
if (element.value < Number.MAX_SAFE_INTEGER && element.value > Number.MIN_SAFE_INTEGER && !isNaN(element.value)){
|
||||
/** put sys.cpu.user 1356998400 42.5 host=webserver01 cpu=0 */
|
||||
let _ret =
|
||||
"put " +
|
||||
|
||||
_ret ="put " +
|
||||
element.metric +
|
||||
" " +
|
||||
element.timestamp.toString() +
|
||||
@ -241,7 +235,7 @@ export class OpenTSDBLogger
|
||||
}
|
||||
|
||||
_ret += "\n";
|
||||
|
||||
}
|
||||
return _ret;
|
||||
}
|
||||
|
||||
@ -253,12 +247,8 @@ export class OpenTSDBLogger
|
||||
* @memberof OpenTSDBWriter
|
||||
*/
|
||||
public async dispose(): Promise<void> {
|
||||
for (const observer of this._observers) {
|
||||
observer.unsubscribe();
|
||||
}
|
||||
|
||||
// Store the buffer
|
||||
await this._storeInDB(this._buffer);
|
||||
// And finially dispose the Module.
|
||||
await super.dispose();
|
||||
|
||||
// Now end the connection
|
||||
const _this = this;
|
||||
@ -269,8 +259,5 @@ export class OpenTSDBLogger
|
||||
});
|
||||
});
|
||||
await closePromise;
|
||||
|
||||
// And finially dispose the Module.
|
||||
await super.dispose();
|
||||
}
|
||||
}
|
||||
|
@ -2,7 +2,7 @@
|
||||
* @author Martin Karkowski
|
||||
* @email m.karkowski@zema.de
|
||||
* @create date 2020-11-13 09:53:27
|
||||
* @modify date 2021-04-27 13:09:49
|
||||
* @modify date 2021-05-14 11:55:13
|
||||
* @desc [description]
|
||||
*/
|
||||
|
||||
@ -200,7 +200,7 @@ export class MQTTClientModule
|
||||
* @param {IMqttSettings} [settings] Settings used to define the Element.
|
||||
* @memberof MQTTClientModule
|
||||
*/
|
||||
async init(uri: string, settings?: IMqttSettings, waitForConnection = true) {
|
||||
async init(uri: string, settings?: IMqttSettings, waitForConnection = true):Promise<void> {
|
||||
this.uri = new NopeObservable();
|
||||
this.settings = new NopeObservable();
|
||||
|
||||
@ -590,11 +590,6 @@ export class MQTTClientModule
|
||||
_this.settings.getContent().autoSubscribe === true ||
|
||||
_this.settings.getContent().autoSubscribe === "alias"
|
||||
) {
|
||||
// Subscribe for existing Subscriptions of the Dispatcher:
|
||||
// for (const topic of _this._dispatcher){
|
||||
// _subscribeMQTT(topic);
|
||||
// }
|
||||
|
||||
// Define the Behaviour on creating new Topics, and New Subscriptions
|
||||
_this._dispatcher.subscribedEvents.subscribe((event) =>
|
||||
_subscribeAllRequiredTopics(event)
|
||||
|
7
modules/ros/helper/parseDatatypes.ts
Normal file
7
modules/ros/helper/parseDatatypes.ts
Normal file
@ -0,0 +1,7 @@
|
||||
/**
|
||||
* @author Martin Karkowski
|
||||
* @email m.karkowski@zema.de
|
||||
* @create date 2021-05-14 11:32:16
|
||||
* @modify date 2021-05-14 11:32:16
|
||||
* @desc [description]
|
||||
*/
|
162
modules/ros/src/nope2ros.bridge.module.ts
Normal file
162
modules/ros/src/nope2ros.bridge.module.ts
Normal file
@ -0,0 +1,162 @@
|
||||
/**
|
||||
* @author Martin Karkowski
|
||||
* @email m.karkowski@zema.de
|
||||
* @create date 2021-05-14 11:27:19
|
||||
* @modify date 2021-05-14 11:32:21
|
||||
* @desc [description]
|
||||
*/
|
||||
|
||||
import * as ros from "rclnodejs";
|
||||
import { getNopeLogger } from "../../../lib/logger/getLogger";
|
||||
import { InjectableNopeBaseModule } from "../../../lib/module/BaseModule.injectable";
|
||||
import { NopeObservable } from "../../../lib/observables/nopeObservable";
|
||||
import { IGenericNopeModule } from "../../../lib/types/nope/nopeModule.interface";
|
||||
import { INopeObservable } from "../../../lib/types/nope/nopeObservable.interface";
|
||||
|
||||
|
||||
export class NopeToROSBridge extends InjectableNopeBaseModule {
|
||||
|
||||
protected _logger = getNopeLogger("nope-to-ros-bridge")
|
||||
protected _instances = new Map<string, IGenericNopeModule>();
|
||||
protected _rosWrappers = new Map<string, any>();
|
||||
protected _requestedInstances = new Set<string>();
|
||||
|
||||
protected _done = false;
|
||||
|
||||
public bridgedModules = new NopeObservable<string[]>();
|
||||
|
||||
public async init(instanceName: string | string[]){
|
||||
await super.init();
|
||||
|
||||
// Assign the Modules, which should be hosted.
|
||||
this.bridgedModules.setContent(Array.isArray(instanceName) ? instanceName : [instanceName]);
|
||||
|
||||
const _this = this;
|
||||
|
||||
// Listen for the Available instances.
|
||||
this._dispatcher.availableInstances.subscribe( async instances => {
|
||||
// Test if the All instances has been created.
|
||||
if (_this._done){
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
for (const description of instances){
|
||||
// Test if the Instance should be generated.
|
||||
// 1. Test if we already have an accessor
|
||||
// 2. Test if the Instace is required.
|
||||
if (
|
||||
!_this._requestedInstances.has(description.identifier) &&
|
||||
_this.bridgedModules.getContent().includes(description.identifier)
|
||||
){
|
||||
// Store, that we already created an instance
|
||||
_this._requestedInstances.add(description.identifier);
|
||||
|
||||
// Generate the Instance Accessor.
|
||||
const instance: IGenericNopeModule = await _this._dispatcher.generateInstance({
|
||||
type: description.type,
|
||||
identifier: description.identifier,
|
||||
params: []
|
||||
});
|
||||
|
||||
// Now, we have an accesor and altough know, which elements are hosted
|
||||
// and which services are offered. Now we use theese information, to
|
||||
// create the Properties and Services in ROS.
|
||||
const _rosNode = ros.createNode(description.identifier);
|
||||
|
||||
_this._rosWrappers.set(description.identifier, _rosNode);
|
||||
|
||||
// 1. Create the Properties / Events
|
||||
for (const propName in description.properties){
|
||||
// Extract the Description of the Property
|
||||
const propDescription = description.properties[propName];
|
||||
|
||||
// Based on the Settings of the Property,
|
||||
// we must subscribe and publish them
|
||||
|
||||
if (propDescription.mode.includes("publish")) {
|
||||
const topicToPublish = typeof(propDescription.topic) === "string" ? propDescription.topic : propDescription.topic.publish;
|
||||
const publisher = _rosNode.createPublisher("HERE WE MUST PARSE STUFF",topicToPublish);
|
||||
|
||||
(instance[propName] as INopeObservable<any>).subscribe(value => {
|
||||
publisher.publish(value);
|
||||
});
|
||||
}
|
||||
|
||||
if (propDescription.mode.includes("subscribe")) {
|
||||
// The Property must be subscribed.
|
||||
// We do this by extracting the topic
|
||||
// to subscribe and than we
|
||||
const topicToSubscribe = typeof(propDescription.topic) === "string" ? propDescription.topic : propDescription.topic.subscribe;
|
||||
_rosNode.createSubscription("HERE WE MUST PARSE STUFF",topicToSubscribe, (value) => {
|
||||
(instance[propName] as INopeObservable<any>).setContent(value);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Now that we have registered all properties,
|
||||
// we offer the services of our module.
|
||||
// 1. Create the Properties / Events
|
||||
for (const serviceName in description.functions){
|
||||
// Extract the Description of the Function
|
||||
const funcDescription = description.functions[serviceName];
|
||||
|
||||
// Using the Description, we are able to offer the Service in Ros.
|
||||
// therefore we have to do a little parsing, but that its straight
|
||||
// forward to call out service.
|
||||
_rosNode.createService(
|
||||
"Magic Parsing here",
|
||||
serviceName,
|
||||
async (request, response) => {
|
||||
try {
|
||||
|
||||
// Firstly we have to parse the Object to a List of Parameters
|
||||
const params = [];
|
||||
// Therefore we iterate over the Description and based on the Order
|
||||
// we push the parameters into the "params" list. This should result
|
||||
// in the required order for our method.
|
||||
funcDescription.schema.inputs.map(param => {
|
||||
params.push(request[param.name]);
|
||||
});
|
||||
|
||||
// Result in NopeShape.
|
||||
const _nopeResult = await instance[serviceName](...params);
|
||||
// Perhaps we have to parse it. (But this shouldnt be required)
|
||||
const result = {..._nopeResult};
|
||||
|
||||
// Add some Magic here
|
||||
response.send(result);
|
||||
} catch (e) {
|
||||
_this._logger.error("we failed to offer " + instance.identifier + "." + serviceName);
|
||||
_this._logger.error(e);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
// Register the Node and start the Server
|
||||
ros.spin(_rosNode);
|
||||
|
||||
// Store the Accessor
|
||||
_this._instances.set(description.identifier, instance);
|
||||
}
|
||||
}
|
||||
} catch (e){
|
||||
_this._logger.error("Failed to create some instances. Error:",e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* The Default Function, to shut down the other nodes.
|
||||
*/
|
||||
public async dispose(): Promise<void> {
|
||||
await super.init();
|
||||
|
||||
// Now we shutdown the Ros Nodes.
|
||||
for (const rosNode of this._rosWrappers.values()){
|
||||
rosNode.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
180
modules/ros/src/ros2nope.bridge.module copy.ts
Normal file
180
modules/ros/src/ros2nope.bridge.module copy.ts
Normal file
@ -0,0 +1,180 @@
|
||||
/**
|
||||
* @author Martin Karkowski
|
||||
* @email m.karkowski@zema.de
|
||||
* @create date 2021-05-14 11:27:19
|
||||
* @modify date 2021-05-14 11:32:21
|
||||
* @desc [description]
|
||||
*/
|
||||
|
||||
import * as ros from "rclnodejs";
|
||||
import { getNopeLogger } from "../../../lib/logger/getLogger";
|
||||
import { InjectableNopeBaseModule } from "../../../lib/module/BaseModule.injectable";
|
||||
import { NopeObservable } from "../../../lib/observables/nopeObservable";
|
||||
import { IGenericNopeModule } from "../../../lib/types/nope/nopeModule.interface";
|
||||
import { INopeObservable } from "../../../lib/types/nope/nopeObservable.interface";
|
||||
|
||||
|
||||
export class NopeToROSBridge extends InjectableNopeBaseModule {
|
||||
|
||||
protected _logger = getNopeLogger("nope-to-ros-bridge")
|
||||
|
||||
/**
|
||||
* A Map, containig the Nope-Generic-Instances mapped to their Names
|
||||
*
|
||||
* @protected
|
||||
* @memberof NopeToROSBridge
|
||||
*/
|
||||
protected _instances = new Map<string, IGenericNopeModule>();
|
||||
/**
|
||||
* A Map, containing the wrapped Ros Elements
|
||||
*
|
||||
* @protected
|
||||
* @memberof NopeToROSBridge
|
||||
*/
|
||||
protected _rosWrappers = new Map<string, any>();
|
||||
protected _requestedInstances = new Set<string>();
|
||||
|
||||
protected _done = false;
|
||||
|
||||
public bridgedModules = new NopeObservable<string[]>();
|
||||
|
||||
public async init(instanceName: string | string[]): Promise<void> {
|
||||
await super.init();
|
||||
|
||||
// Assign the Modules, which should be hosted.
|
||||
this.bridgedModules.setContent(Array.isArray(instanceName) ? instanceName : [instanceName]);
|
||||
|
||||
const _this = this;
|
||||
|
||||
// Listen for the Available instances.
|
||||
this._dispatcher.availableInstances.subscribe( async instances => {
|
||||
// Test if the All instances has been created.
|
||||
if (_this._done){
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
for (const description of instances){
|
||||
// Test if the Instance should be generated.
|
||||
// 1. Test if we already have an accessor
|
||||
// 2. Test if the Instace is required.
|
||||
if (
|
||||
!_this._requestedInstances.has(description.identifier) &&
|
||||
_this.bridgedModules.getContent().includes(description.identifier)
|
||||
){
|
||||
// Store, that we already created an instance
|
||||
_this._requestedInstances.add(description.identifier);
|
||||
|
||||
// Generate the Instance Accessor.
|
||||
const instance: IGenericNopeModule = await _this._dispatcher.generateInstance({
|
||||
type: description.type,
|
||||
identifier: description.identifier,
|
||||
params: []
|
||||
});
|
||||
|
||||
// Now, we have an accesor and altough know, which elements are hosted
|
||||
// and which services are offered. Now we use theese information, to
|
||||
// create the Properties and Services in ROS.
|
||||
const _rosNode = ros.createNode(description.identifier);
|
||||
|
||||
_this._rosWrappers.set(description.identifier, _rosNode);
|
||||
|
||||
// 1. Create the Properties / Events
|
||||
for (const propName in description.properties){
|
||||
// Extract the Description of the Property
|
||||
const propDescription = description.properties[propName];
|
||||
|
||||
// Based on the Settings of the Property,
|
||||
// we must subscribe and publish them
|
||||
|
||||
if (propDescription.mode.includes("publish")) {
|
||||
const topicToPublish = typeof(propDescription.topic) === "string" ? propDescription.topic : propDescription.topic.publish;
|
||||
const publisher = _rosNode.createPublisher("HERE WE MUST PARSE STUFF",topicToPublish);
|
||||
|
||||
(instance[propName] as INopeObservable<any>).subscribe(value => {
|
||||
publisher.publish(value);
|
||||
});
|
||||
}
|
||||
|
||||
if (propDescription.mode.includes("subscribe")) {
|
||||
// The Property must be subscribed.
|
||||
// We do this by extracting the topic
|
||||
// to subscribe and than we
|
||||
const topicToSubscribe = typeof(propDescription.topic) === "string" ? propDescription.topic : propDescription.topic.subscribe;
|
||||
_rosNode.createSubscription("HERE WE MUST PARSE STUFF",topicToSubscribe, (value) => {
|
||||
(instance[propName] as INopeObservable<any>).setContent(value);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Now that we have registered all properties,
|
||||
// we offer the services of our module.
|
||||
// 1. Create the Properties / Events
|
||||
for (const serviceName in description.functions){
|
||||
// Extract the Description of the Function
|
||||
const funcDescription = description.functions[serviceName];
|
||||
|
||||
// Using the Description, we are able to offer the Service in Ros.
|
||||
// therefore we have to do a little parsing, but that its straight
|
||||
// forward to call out service.
|
||||
_rosNode.createService(
|
||||
"Magic Parsing here",
|
||||
serviceName,
|
||||
async (request, response) => {
|
||||
try {
|
||||
|
||||
// Firstly we have to parse the Object to a List of Parameters
|
||||
const params = [];
|
||||
// Therefore we iterate over the Description and based on the Order
|
||||
// we push the parameters into the "params" list. This should result
|
||||
// in the required order for our method.
|
||||
funcDescription.schema.inputs.map(param => {
|
||||
params.push(request[param.name]);
|
||||
});
|
||||
|
||||
// Result in NopeShape.
|
||||
const _nopeResult = await instance[serviceName](...params);
|
||||
// Perhaps we have to parse it. (But this shouldnt be required)
|
||||
const result = {..._nopeResult};
|
||||
|
||||
// Add some Magic here
|
||||
response.send(result);
|
||||
} catch (e) {
|
||||
_this._logger.error("we failed to offer " + instance.identifier + "." + serviceName);
|
||||
_this._logger.error(e);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
// Register the Node and start the Server
|
||||
ros.spin(_rosNode);
|
||||
|
||||
// Store the Accessor
|
||||
_this._instances.set(description.identifier, instance);
|
||||
}
|
||||
}
|
||||
|
||||
// After we have fully looped over our instances,
|
||||
// we should be able to determine, whether we are
|
||||
// done or not.
|
||||
_this._done = _this.bridgedModules.getContent().length === _this._instances.size;
|
||||
} catch (e){
|
||||
_this._logger.error("Failed to create some instances. Error:",e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* The Default Function, to shut down the other nodes.
|
||||
*/
|
||||
public async dispose(): Promise<void> {
|
||||
await super.init();
|
||||
|
||||
// Now we shutdown the Ros Nodes.
|
||||
for (const rosNode of this._rosWrappers.values()){
|
||||
rosNode.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
10278
package-lock.json
generated
10278
package-lock.json
generated
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue
Block a user