removing files

This commit is contained in:
Martin Karkowski 2020-12-01 13:04:22 +01:00
parent 0d923cb648
commit 9d34f309ea
9 changed files with 0 additions and 1186 deletions

View File

@ -1,34 +0,0 @@
/**
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2018-05-22 12:49:31
* @modify date 2018-07-05 19:14:36
* @desc [description]
*/
import { ISubscriptionOptions } from "../../mod-Publish-And-Subscribe-System/type/interfaces";
export interface connectInput {
uri: string;
options: {
autoSubscribe?: boolean,
autoPublish?: {
delimitTime?: number,
subscriptionOptions?: ISubscriptionOptions
active: boolean,
},
subscribe?: Array<string>,
publish?: Array<string> | {
[index: string]: { delimitTime?: number, subscriptionOptions?: ISubscriptionOptions }
},
internalTopicToMQTT?: (value: string) => string,
MQTTTopicToInternal?: (value: string) => string,
blockedTopicsForSubscribing?: Array<string>,
blockedTopicsForPublishing?: Array<string>,
QoS?: 0 | 1 | 2,
retain?: boolean,
};
}
export type connectOutput = null;

View File

@ -1,151 +0,0 @@
/**
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2018-05-22 12:49:31
* @modify date 2020-09-08 16:19:29
* @desc [description]
*/
import { Container } from 'inversify';
import * as des from '../../mod-Descriptors/src/Descriptions';
import * as MQTT from '../assembly/manual-assembly';
import { connectInput, connectOutput } from './mqtt-interface-Module-Types';
export class ExportedMQTTInterface extends des.MODULEBASE {
private _instance: MQTT.MQTTInterface;
/**
* Function which is performed during the Startup
*
* @memberof ExportedMQTTInterface
*/
public onLoad(): void {
/**
/**
* Define the Name, the Description, the Author and the currently used Version.
*
*/
this.name = 'mqttinterface';
this.description = 'A Client connection to an external MQTT-Client';
this.author = {
forename: 'Martin',
surename: 'Karkowski',
mail: 'm.karkowski'
};
this.version = {
/** Enter a Number */
version: 1.0,
/** Enter a Year.Month.Day */
date: new Date('2019.01.07')
}
/** Define the required NPM-Modules below */
this.requiredNpmPackages.mqtt = {};
/** Create a Reference to it self */
const _self = this;
/** Create the offered Methods HERE below */
this.functions.connect = new des.FUNCTION(
/** The Name of The Function. Everything will be replaced to Lower-Case */
'connect',
/** The Used Description of the Function. You can use multiple Lines */
`connects the MQTT-Interface to the given MQTT-Broker`,
/** The Input Parameters below */
{
path: 'definitions.connectInput'
},
/** The Output Parameters below */
{
path: 'definitions.connectOutput'
},
/* Enter the Function below !
*
* The Paramter 'parameter' contains the Input Parameters
*
* In Order to work correctly store the Result of your Operation
* in the Parameter as well and return it with the given callback.
*
* The parameter 'cancelHandler' is called if the function should
* be aborted. To cancel the execution please link a cancel-Function.
*
* Example:
* cancelHandler.cancelFunc = function () {
* // Do your action here if required...
* };
*
*
* The parameter 'callback' is as callback to Show the System, that the
* Execution has been finished. It offers the following possiblities.
*
* 1) error - put an occourd error in here otherwise select 'null'
* 2) parameter - put the Adapted Parameter here (see comment above)
* 3) result - put the isolated result in here. This is used for automatic conversion
*
*/
function (parameter: connectInput, CancelHandler: des.CANCELHANDLE, callback: (error: any, parameter: any, result: connectOutput) => void) {
/** Enter your Logic down below! */
try {
_self._instance.connect(parameter.uri, parameter.options, () => {
callback(null, parameter, null);
});
} catch (error) {
/** Adapt the Callback */
callback(error, parameter, null as any);
}
}
);
const _connectSettings: connectInput = {
uri: 'mqtt://localhost:1883',
options: {
autoPublish: {
delimitTime: 0,
subscriptionOptions: { mode: ['direct'] },
active: true,
},
blockedTopicsForSubscribing: [
'logic.places',
'logic.structure',
'logic.activatedTransitions',
]
}
}
/** Mark the Function as Auto-Start-Function*/
this.autoStart.push({ funcname: 'mqttinterface.connect', params: _connectSettings, delay: 5000 });
}
/**,
* Function which is after all Module of the Kernel are Loaded
* By default instances should by created in here using the requested
* Modules.
*
* @memberof ExportedMQTTInterface
*/
public init(_container: Container): void {
/** Create an Instance of the Class */
this._instance = _container.get<MQTT.MQTTInterface>(MQTT.TYPES.MQTTInterface);
}
/**,
* Function which is called if the Kernel stops. By default a Dispose
* Methode if available is called.
*
* @memberof ExportedMQTTInterface
*/
public exit(): void {
}
}
export const EXTENSION = ExportedMQTTInterface;

View File

@ -1,66 +0,0 @@
/**
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2018-05-22 12:50:31
* @modify date 2020-09-08 16:19:29
* @desc [description]
*/
import * as bunyanMQTT from 'bunyan-mqtt';
import { inject, injectable } from 'inversify';
import * as DELIMITER from '../../mod-Delimiter/assembly/manual-assembly';
import * as IDGEN from '../../mod-ID-Generator/assembly/manual-assembly';
import { SystemLogger } from '../../mod-Logger/src/Unique.Logger';
import * as PUBSUB from '../../mod-Publish-And-Subscribe-System/assembly/manual-assembly';
import { IMqttConverter } from '../type/interfaces';
import { TYPES } from '../type/types';
import { ExternalMQTTServer } from './mqtt-interface';
@injectable()
export class ExternalMQTTServerWithConverter extends ExternalMQTTServer {
protected mqttToInternal(internalTopic: string, payload: any) {
return this._converter.convertFromMqtt(internalTopic, payload.toString(), this.config.parseFunctions);
}
protected internalToMqtt(internalTopic: string, data: any) {
return this._converter.convertToMqtt(internalTopic, data, this.config.parseFunctions);
}
protected onConnect(uri: string, firstConnect: boolean) {
/** Only if connected at the first time, make shure the content is logged */
if (firstConnect) {
const adaptedUri = uri.includes('//') ? uri.split('//')[1] : uri;
/** Add a Logger for MQTT */
SystemLogger.logger.addStream({
level: 'debug',
type: 'raw',
stream: bunyanMQTT({
level: SystemLogger.logger.level,
topic: 'log',
host: adaptedUri.split(':')[0],
port: adaptedUri.split(':')[1] || 1883
})
});
}
}
/**
* Creates an instance of ExternalMoscaMqttBroker.
* @memberof ExternalMoscaMqttBroker
*/
constructor(
@inject(PUBSUB.TYPES.PubSubSystem) _pubSub: PUBSUB.PubSubSystem,
@inject(IDGEN.TYPES.IdGenerator) _idGen: IDGEN.IdGenerator,
@inject(DELIMITER.TYPES.Delimiter) _delimiter: DELIMITER.Delimiter,
@inject(TYPES.MQTTLibrary) _mqttLibrary: any,
@inject(TYPES.MQTTConverter) private _converter: IMqttConverter,
) {
super(_pubSub, _idGen, _delimiter, _mqttLibrary)
}
}

View File

@ -1,569 +0,0 @@
/**
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2018-05-22 12:50:31
* @modify date 2020-09-08 16:23:30
* @desc [description]
*/
import { inject, injectable } from 'inversify';
import { generateId } from '../../../lib/helpers/idMethods';
import { SPLITCHAR } from '../../../lib/helpers/objectMethods';
import { replaceAll } from '../../../lib/helpers/stringMethods';
import * as DELIMITER from '../../mod-Delimiter/assembly/manual-assembly';
import { SystemLogger } from '../../mod-Logger/src/Unique.Logger';
import * as PUBSUB from '../../mod-Publish-And-Subscribe-System/assembly/manual-assembly';
import { ISubscriptionOptions } from '../../mod-Publish-And-Subscribe-System/type/interfaces';
import { TYPES } from '../type/types';
const MQTT_SUBSCRIPTIONS_TOPIC = 'mqtt/subscriptions';
@injectable()
export class ExternalMQTTServer {
private _logger = SystemLogger.logger.getLogger('mqtt.interface');
// Element Holding the Subscriptions
private _subscriptions = new Map<string, any>();
public config = {
replaceRootNameWithClientID: false,
replaceLeadingSingleLevelSeperatorWithRootName: true,
SubscribeToTopicsOfDifferentRoots: true,
useRootName: false,
subscribeHidden: false,
parseFunctions: true,
};
private _isBlocked(topic: string, blockedTopics: Array<string>) {
// Adapt the Topics
topic = this._pubSub.adaptPath(topic)
blockedTopics = blockedTopics.map(tp => this._pubSub.adaptPath(tp));
// Check if Blocked
let topicBlocked = false;
for (const blocked of blockedTopics) {
if (topic.startsWith(blocked + SPLITCHAR) || topic === blocked) {
topicBlocked = true;
break;
}
}
// Return the Result
return topicBlocked;
}
blockedTopicsForSubscribing: Array<string> = ['log'];
blockedTopicsForPublishing: Array<string> = [];
/**
* Connects to a given MQTT-Server and starts observing
*
* @param {string} uri URI of the Server
* @param {() => void} _callback Callback which is called after the Client has been connected.
* @memberof ExternalMoscaMqttBroker
*/
public connect(uri: string, options?: {
autoSubscribe?: boolean,
autoPublish?: {
delimitTime?: number,
subscriptionOptions?: ISubscriptionOptions
active: boolean,
},
subscribe?: Array<string>,
publish?: Array<string> | {
[index: string]: { delimitTime?: number, subscriptionOptions?: ISubscriptionOptions }
},
internalTopicToMQTT?: (value: string) => string,
MQTTTopicToInternal?: (value: string) => string,
blockedTopicsForSubscribing?: Array<string>,
blockedTopicsForPublishing?: Array<string>,
QoS?: 0 | 1 | 2,
retain?: boolean,
}, callback?: (err) => void): void {
const _this = this;
// Define a Callback, if no Callback is defined
if (typeof callback !== 'function') {
callback = () => { }
}
if (!this._subscriptions.has(uri)) {
/** Log the Start of MQTT. */
_this._logger.info('connecting to %s', uri);
options = options ? options : {};
/** Create a Map containing the Subscriptions */
const _interallySubscribedTopics = new Map<string, () => void>();
const _interallyPublishedTopics = new Map<string, { unsubscribe: () => void }>();
const _adaptedTopics = new Map<string, string>();
const _clientID = generateId('_mqtt');
/** Define the Options if required */
const _options = {
autoSubscribe: (options.autoSubscribe !== undefined) ? options.autoSubscribe : (Array.isArray(options.subscribe) ? false : true),
autoPublish: (options.autoPublish !== undefined) ?
options.autoPublish :
(options.publish !== undefined ?
false :
{
delimitTime: 500,
subscriptionOptions: { mode: ['direct'] },
active: true,
}
),
subscribe: options.subscribe || new Array<string>(),
publish: options.publish || new Array<string>(),
internalTopicToMQTT: (internalTopic: string, subscribe = false) => {
if (_adaptedTopics.has(internalTopic)) {
return _adaptedTopics.get(internalTopic);
}
/** Replace the Topic - Name */
let _mqttTopic = replaceAll(internalTopic, SPLITCHAR, '/');
if (typeof options.internalTopicToMQTT === 'function') {
_mqttTopic = options.internalTopicToMQTT(_mqttTopic);
}
if (_this._pubSub.rootName) {
if (!_this.config.useRootName) {
/** Replace the default Name with the Client-ID Name. */
_mqttTopic = _mqttTopic.replace(_this._pubSub.rootName + '/', '');
} else if (subscribe && _this.config.SubscribeToTopicsOfDifferentRoots) {
/** Replace the default Name with the Client-ID Name. */
_mqttTopic = _mqttTopic.replace(_this._pubSub.rootName, '+');
}
else if (_this.config.replaceRootNameWithClientID && !subscribe) {
/** Replace the default Name with the Client-ID Name. */
_mqttTopic = _mqttTopic.replace(_this._pubSub.rootName, _clientID);
}
}
/** Store the Topic */
_adaptedTopics.set(internalTopic, _mqttTopic);
return _mqttTopic;
},
MQTTTopicToInternal: (mqttTopic: string) => {
/** Test if the conerting already was perfomed */
if (_adaptedTopics.has(mqttTopic)) {
return _adaptedTopics.get(mqttTopic);
}
/** Replace the Topic - Name */
let _internalTopic = replaceAll(mqttTopic, '/', SPLITCHAR);
if (typeof options.MQTTTopicToInternal === 'function') {
_internalTopic = options.MQTTTopicToInternal(_internalTopic);
}
if (!_this.config.useRootName && _this._pubSub.rootName) {
if (!mqttTopic.startsWith('+')) {
_internalTopic = _this._pubSub.rootName + SPLITCHAR + _internalTopic;
} else if (_this.config.replaceLeadingSingleLevelSeperatorWithRootName && mqttTopic.startsWith('+')) {
_internalTopic = _internalTopic.replace('+', _this._pubSub.rootName);
}
} else {
/** If a Root-Name is provided, Adapt the MQTT-Name */
if (_this.config.replaceRootNameWithClientID && _this._pubSub.rootName) {
/** Replace the default Name with the Client-ID Name. */
_internalTopic = _internalTopic.replace(_this._pubSub.rootName, '+');
}
}
/** Store the Topic */
_adaptedTopics.set(mqttTopic, _internalTopic);
return _internalTopic;
},
blockedTopicsForSubscribing: options.blockedTopicsForSubscribing || [],
blockedTopicsForPublishing: options.blockedTopicsForPublishing || [],
QoS: options.QoS || 0,
retain: options.retain || false
};
let configurationRequired = true;
/** Create a Broker and use the provided ID*/
const _broker = _this._mqttLibrary.connect(uri, {
clientId: _clientID,
qos: _options.QoS
});
/** Define a Function to Create a Subscription on MQTT.
* This Function, checks whether the Topic is
* blocked, or, whether the conecnten is hidden
* (starts with _)
*/
const _subscribeMQTT = function (topic: string) {
try {
/** Adapt the Name */
topic = _this._pubSub.adaptPath(topic);
if (!_interallySubscribedTopics.has(topic)) {
let topicBlocked = false;
/** Test if topic is hidden or explicitly blocked */
if (_this.config.subscribeHidden && topic.indexOf(SPLITCHAR + '_') !== -1) {
topicBlocked = true;
} else {
topicBlocked = _this._isBlocked(topic, _options.blockedTopicsForSubscribing) || _this._isBlocked(topic, _this.blockedTopicsForSubscribing)
}
/** Prevent, subscribing to topics twice and check whether the topic is blocked or hidden */
if (!topicBlocked) {
/** Replace the internal Splitchar with the MQTT one */
const mqttTopic = _options.internalTopicToMQTT(topic);
_this._logger.info('susbcribing to %s', mqttTopic);
/** Subscribe the Topic */
_broker.subscribe(mqttTopic);
/** Store the Unsubscribe Function */
_interallySubscribedTopics.set(topic, () => {
_broker.unsubscribe(mqttTopic);
});
}
}
} catch (e) {
_this._logger.error(e, 'failed to create a subscription');
}
};
/** Function, used to Publish data on MQTT. */
const _publishToMQTT = function (data, sender, topic) {
/**
* Check if the message is provided by MQTT, if so
* don't forward the Message to prevent Endless loops
*/
if (sender !== _clientID) {
/**
* All Data will be forwarded as JSON Message
* Therefore try converting the Message to a
* valid JSON message.
*/
let _json: any = {};
try {
/** Try to translate to JSON */
_json = _this.internalToMqtt(topic, data)
/** Check if the Content is convertable */
if (_json.indexOf('[object Object]') !== -1) {
throw Error('Converting data Failed');
}
} catch (e) {
_this._logger.error(e, 'cant convert data on %s', topic);
return;
}
/**
* Based on the Settings, try to adapt the Topic, to Match an MQTT-Topic.
*/
const _mqttTopic = _options.internalTopicToMQTT(topic);
try {
/** Publish the Content */
_broker.publish(_mqttTopic, _json, { qos: _options.QoS, retain: _options.retain });
} catch (e) {
_this._logger.error(e, 'cant foward data on %s to %s', topic, uri);
}
}
};
/** Function, which is used to subscribe internally to publish the Stuff to MQTT. */
const _subscribeToPublish = function (topic, _SubscribeOptions?: {
delimitTime?: number,
subscriptionOptions?: ISubscriptionOptions,
}) {
if (!_interallyPublishedTopics.has(topic)) {
/** Check wether the Topic is blocked or not */
if (!(_this._isBlocked(topic, _options.blockedTopicsForPublishing) || _this._isBlocked(topic, _this.blockedTopicsForPublishing))) {
/** Get the Topic */
const _tp = _this._pubSub.getTopic(topic);
/** Assign the Options to a default value */
Object.assign({
delimitTime: 500,
subscriptionOptions: { mode: ['direct', 'super', 'sub'] }
}, _SubscribeOptions || {});
if (SystemLogger.logger.isLogging('mqtt.interface', 'debug')) {
_this._logger.debug('subscribing %s internally to publish it on MQTT', topic);
}
if (_SubscribeOptions.delimitTime > 0) {
/** Delimiting required */
const element = _this._delimiter.addObservable(
/** The Topic */
_tp,
/** The Delimit Time */
_SubscribeOptions.delimitTime,
/** The Subscribe Options */
_SubscribeOptions.subscriptionOptions,
);
/** Providing Callback for Limiter to cancel subscription */
_interallyPublishedTopics.set(topic, element);
/** Therefore subscribe the Observable */
element.observable.subscribe(
/** On Update => Publish */
(content, sender) => {
_publishToMQTT(content, sender, topic);
},
);
} else {
/** Subscribe to the Topic directly */
_interallyPublishedTopics.set(topic, _tp.subscribe(
/** Define the Callback => Publish to MQTT */
(content, sender) => {
_publishToMQTT(content, sender, topic);
}, 'sync', _SubscribeOptions.subscriptionOptions)
);
}
}
}
};
const _subscribeAllRequiredTopics = function (listOfSubscriptions: Array<string>) {
const required = new Set<string>();
/** Subscribe for existing Subscriptions */
for (const _topic of listOfSubscriptions) {
const _usedTopic = _this._pubSub.adaptPath(_topic);
/** Add the Topic to the Required ones */
required.add(_usedTopic);
/** Add a Subscription, if no subscription is available */
if (!_interallyPublishedTopics.has(_usedTopic)) {
_subscribeToPublish(_usedTopic,
_options.autoPublish as {
delimitTime: number,
subscriptionOptions: ISubscriptionOptions,
},
);
}
}
/** Unsubscribe unused Topics and delete them */
for (const [topic, element] of _interallyPublishedTopics.entries()) {
if (!required.has(topic)) {
if (SystemLogger.logger.isLogging('mqtt.interface', 'debug')) {
_this._logger.debug('unsubscribing %s internally. Topic is not subscribed by another system anymore', topic);
}
element.unsubscribe();
_interallyPublishedTopics.delete(topic);
}
}
};
/** If the Client is Ready connect to the subscribtions */
_broker.on('connect', (_data) => {
_this._logger.info('connection to %s established', uri);
/** Subscribe to the Broker-functionalities to receive Updates of the Register Topics*/
_broker.subscribe(MQTT_SUBSCRIPTIONS_TOPIC);
if (configurationRequired) {
/** AutoSubscribe to the requested Topics */
if (_options.autoSubscribe === true) {
/** Subscribe for existing Subscriptions */
for (const topic of _this._pubSub.registry.availableTopics.getContent()) {
const _tp = _this._pubSub.getTopic(topic);
_subscribeMQTT(topic);
/** only subscribe if an internal subscription exists */
if (_tp.hasSubscriptions) {
_subscribeMQTT(topic);
}
}
/** Define the Behaviour on creating new Topics, and New Subscriptions */
_this._pubSub.registry.onNewSubscription.subscribe((topic) => _subscribeMQTT(topic));
/** Define behaviour to Close a Subscription */
_this._pubSub.registry.onDisposeSubscription.subscribe((topic) => {
_interallySubscribedTopics.get(topic)();
_interallySubscribedTopics.delete(topic);
});
} else if (_options.subscribe) {
/** Subscribe the given Data */
for (const _topic of _options.subscribe) {
/** Subscribe to an adapted Path */
_subscribeMQTT(_this._pubSub.adaptPath(_topic));
}
}
if (_options.publish) {
/** Make difference between Array and Object */
if (Array.isArray(_options.publish)) {
/** Subscribe for existing Subscriptions */
for (const topic of _options.publish) {
_subscribeToPublish(_this._pubSub.adaptPath(topic), {});
}
} else {
/** Subscribe the Content */
for (const topic in _options.publish) {
if (_options.publish.hasOwnProperty(topic)) {
_subscribeToPublish(_this._pubSub.adaptPath(topic), _options.publish[topic]);
}
}
}
}
}
_this.onConnect(uri, configurationRequired);
_broker.publish('request/encoding', '', { qos: _options.QoS, retain: false });
configurationRequired = false;
callback(null);
});
/** Define the Function handeling incomming subscriptions etc */
_broker.on('message', (mqttTopic, _payload) => {
if (SystemLogger.logger.isLogging('mqtt.interface', 'debug')) {
_this._logger.debug('mqtt got message on "' + mqttTopic + '"');
}
if (mqttTopic === MQTT_SUBSCRIPTIONS_TOPIC && (_options.autoPublish || (typeof _options.autoPublish === 'object' && _options.autoPublish.active))) {
/** Get the Topics, that are subscribed */
const _externallySubscribed: { [index: string]: Array<string> } = JSON.parse(_payload.toString('utf-8'));
/** Filter elements, which are only subscribe by this System */
const _subscribed = new Array<string>();
for (const _topic of Object.getOwnPropertyNames(_externallySubscribed)) {
if (
(_externallySubscribed[_topic].length > 1) ||
(_externallySubscribed[_topic].length === 1 &&
!(
_externallySubscribed[_topic].includes(_clientID) ||
(_this._pubSub.rootName && _externallySubscribed[_topic].includes(_this._pubSub.rootName))
)
)
) {
_subscribed.push(_options.MQTTTopicToInternal(_topic));
}
}
/** Subscribe the Required Topics. */
_subscribeAllRequiredTopics(_subscribed);
} else {
try {
const _topic = _options.MQTTTopicToInternal(mqttTopic);
if (
/** Check whether the Topic contains a MQTT Wildcard
* or the client ID => if so skip that message
*/
_topic.indexOf('+') === -1 &&
_topic.indexOf('#') === -1 &&
!_topic.startsWith(_clientID)) {
/** Parse the Data to JSON */
const _data = _this.mqttToInternal(_topic, _payload);
if (SystemLogger.logger.isLogging('mqtt.interface', 'debug')) {
_this._logger.debug('mqtt converted message on "' + _topic + '"', _data);
}
/** Publish in Pub and Sub System => Thereby forward the Message */
_this._pubSub.getTopic(_topic).setContent(_data, _clientID);
}
} catch (e) {
/** Logging the Error */
_this._logger.error('failed receiving data on "' + mqttTopic + '"', e);
}
}
});
/** The external Broker went offline. */
_broker.on('offline', () => {
/** Try to Reconnect Client */
_this._logger.warn('MQTT - went Offline');
callback('offline');
});
_broker.on('error', (_error) => {
/** MQTT an Error occourd */
_this._logger.error('fatal error occourd', _error);
callback('error');
});
/** Store the Broker */
_this._subscriptions.set(uri, _broker);
} else {
callback(null);
}
}
protected mqttToInternal(internalTopic: string, payload: any) {
return parse(payload.toString(), this.config.parseFunctions);
}
protected internalToMqtt(internalTopic: string, data: any) {
return stringify(data, this.config.parseFunctions);
}
protected onConnect(uri: string, firstConnect: boolean) {
}
/**
* Function to Force Data Beeing Published over MQTT.
* @param topic The Topic (in MQTT Form)
* @param data The Data itself
* @param options The provided Options.
*/
public force(topic: string, data: string, options: { qos?: 0 | 1 | 3, retain?: boolean } = {}) {
for (const [uri, broker] of this._subscriptions.entries()) {
try {
/** Publish the Content */
broker.publish(topic, data, options);
} catch (e) {
this._logger.error(e, 'cant force data on %s to %s', topic, uri);
}
}
}
/**
* Creates an instance of ExternalMoscaMqttBroker.
* @memberof ExternalMoscaMqttBroker
*/
constructor(
@inject(PUBSUB.TYPES.PubSubSystem) private _pubSub: PUBSUB.PubSubSystem,
@inject(DELIMITER.TYPES.Delimiter) private _delimiter: DELIMITER.Delimiter,
@inject(TYPES.MQTTLibrary) private _mqttLibrary: any,
) {
}
}

View File

@ -1,77 +0,0 @@
/**
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2018-05-22 12:50:53
* @modify date 2018-08-13 07:15:09
* @desc [description]
*/
import { injectable } from 'inversify';
import { MatcherContainer } from '../../mod-Publish-And-Subscribe-System/src/Pub-Sub-Matcher';
import { SPLITCHAR } from '../../mod-TypeScript-Library/src/Object-Methods';
export const MULTILEVELWILDCARD = '#';
export const SINGLELEVELWILDCARD = '+';
export function mqttTopicMatch(topicToCheck: string, internalTopic: string): boolean {
let _topicRelevant = false;
/** Check equality */
_topicRelevant = (topicToCheck === internalTopic);
const _ls_01 = topicToCheck.split(SPLITCHAR);
const _ls_02 = internalTopic.split(SPLITCHAR);
/**
* Check whether the topics doesn't match.
* If so check whether symbols for a Wildcard
* (Multilevel or Singlelevel) are included.
*
* If Wildcard is contained => Search for similarity.
*/
if ((!_topicRelevant) && (_ls_01.length <= _ls_02.length) &&
(topicToCheck.indexOf(MULTILEVELWILDCARD) !== -1 || topicToCheck.indexOf(SINGLELEVELWILDCARD) !== -1)) {
_topicRelevant = true;
/** Create a List containing all Sub Topic elements */
/** Iterate through the Elements and check :
* 1) MULTILEVELWILDCARD
* 2) SINGLELEVELWILDCARD
* 3) COMPARE THE ELEMENTS
*/
for (const [_idx, _part] of _ls_01.entries()) {
/** Check whether the part matches a Wildcard or */
if (_part === MULTILEVELWILDCARD) {
/** Only sub messages are relevant. */
return true;
} else if (_part === SINGLELEVELWILDCARD) {
/** Check the Next thing. */
continue;
} else {
/** A Comparison is possible, cause the ls_02.length is greater or equals ls_01.length */
_topicRelevant = (_part === _ls_02[_idx]);
// this._logger.debug('--> comparing: ', _part, 'and', _ls_02[_idx]);
/** */
if (!_topicRelevant) {
/** If the Topic doesnt match break */
break;
}
}
}
}
/** Return the Result */
return _topicRelevant;
}
@injectable()
export class MQTTMatcher extends MatcherContainer {
constructor() {
super();
this.internalCheckMethod = mqttTopicMatch;
}
}

View File

@ -1,140 +0,0 @@
/**
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2018-05-22 12:51:09
* @modify date 2018-08-13 07:15:19
* @desc [description]
*/
/** Test Client for testing a MQTT-Server */
import * as mqtt from 'mqtt';
console.log('Client Starting');
const client = mqtt.connect('mqtt://dz-013');
const connectedClients = new Set<string>();
const subscribedTopics = new Map<string, Set<string>>();
function getSubscribed(msg: string) {
for (const _client of connectedClients) {
if (msg.includes(_client)) {
return msg.replace(_client, 'client').match(/(?<=client\s\d+\s).*/g)
}
}
return null;
}
function getUnsubscribed(msg: string) {
for (const _client of connectedClients) {
if (msg.includes(_client)) {
return msg.replace(_client, 'client').match(/(?<=client\s).*/g)
}
}
return null;
}
function display() {
console.log('---------------------' + (new Date(Date.now())).toISOString() + '---------------------')
const _connected = new Array<string>();
connectedClients.forEach(client => _connected.push(client));
console.log('Connected Clients:\n' + JSON.stringify(_connected, null, 4));
const _subscribed = new Array<string>();
for (const [topic, subscribers] of subscribedTopics.entries()) {
if (subscribers.size > 0) {
_subscribed.push(topic);
}
}
console.log('Subscribed-Topics:\n' + JSON.stringify(_subscribed, null, 4));
}
client.on('connect', function () {
/** Define Message */
client.subscribe('$SYS/broker/log/M/subscribe');
client.subscribe('$SYS/broker/log/M/unsubscribe');
client.subscribe('$SYS/broker/log/N');
});
client.on('message', (topic, payload) => {
/** Convert the Buffer */
const data = payload.toString();
/** Create a Content field */
let content: Array<string> | null = null;
switch (topic) {
case '$SYS/broker/log/M/subscribe':
/** Try to find a connected Client */
content = getSubscribed(data)
if (content) {
content.forEach(value => {
if (!subscribedTopics.has(value)) {
subscribedTopics.set(value, new Set<string>());
}
(subscribedTopics.get(value) as Set<string>).add(value)
});
}
display();
break;
case '$SYS/broker/log/M/unsubscribe':
content = getUnsubscribed(data)
if (content) {
content.forEach(value => {
if (!subscribedTopics.has(value)) {
subscribedTopics.set(value, new Set<string>());
}
(subscribedTopics.get(value) as Set<string>).delete(value)
});
}
display();
break;
case '$SYS/broker/log/N':
content = data.match(/(?<=Socket error on client ).*(?=, disconnecting)/g);
if (content) {
content.forEach(value => {
connectedClients.delete(value);
/** Delete the Susbcriber */
for (const _subscribers of subscribedTopics.values()) {
_subscribers.delete(value);
}
});
display();
break;
}
/** Regular Expression to Match disconnected */
content = data.match(/(?<=Client\s).*(?=\sdisconnected)/g);
if (content) {
content.forEach(value => {
connectedClients.delete(value);
/** Delete the Susbcriber */
for (const _subscribers of subscribedTopics.values()) {
_subscribers.delete(value);
}
});
display();
break;
}
/** Regular Expression to Match connected clients */
content = data.match(/(?<=as\s).*(?=\s\()/g)
if (content) {
content.forEach(value => connectedClients.add(value));
display();
break;
}
break;
}
})

View File

@ -1,33 +0,0 @@
/**
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2018-06-11 05:14:18
* @modify date 2018-08-17 08:32:49
* @desc [description]
*/
/** Clear the Screen */
declare const process: any;
import { Builder } from '../../mod-Assembly-Builder/src/Container-Builder.FileLoader';
import * as MQTT from '../assembly/manual-assembly';
Builder.load();
Builder.on('loaded', () => {
const _mqttBridge = Builder.instance.container.get<MQTT.MQTTInterface>(MQTT.TYPES.MQTTInterface);
_mqttBridge.config.subscribeHidden = true;
_mqttBridge.config.SubscribeToTopicsOfDifferentRoots = true;
_mqttBridge.connect('mqtt://localhost', {
QoS: 0,
subscribe: ['test'],
autoSubscribe: false,
autoPublish: {
active: false
}
});
});

View File

@ -1,104 +0,0 @@
/**
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2018-05-22 12:51:57
* @modify date 2018-05-22 12:51:57
* @desc [description]
*/
/**
*
*
* @export
* @interface IMqttBroker
*/
export interface IMqttBroker {
/**
* Function to Start the MQTT-Server
*
* @memberof IBroker
*/
start: (uri: string, callback: () => void ) => void;
/**
* Function to Close the Connection.
*
* @memberof IMqttBroker
*/
close: () => void;
/**
* Value containing the URI of the Server.
*
* @type {string}
* @memberof IMqttBroker
*/
uri: string;
/**
* Flag showing whether the Broker is running
*
* @type {boolean}
* @memberof IMqttBroker
*/
running: boolean;
/**
* Function for Publishing Data.
*
* @memberof IBroker
*/
publish: (param: {
topic: string,
payload: string | any,
qos: 0 | 1 | 2 ,
retain: boolean
}) => void;
/**
* Callback which should be called if a new
* Client is connected to the System.
*
* @memberof IBroker
*/
onClientConnected: (client) => void;
/**
* Callback which should be called if a
* Client disconnects.
*
* @memberof IBroker
*/
onClientDisconnected: (client) => void;
/**
* Callback which is called if datas are Published.
*
* @memberof IBroker
*/
onPublished: (topic: string, payload: any, client?: string) => void;
/**
* Callback which is called if a new Subscription
* is registered.
*
* @memberof IBroker
*/
onNewSubscription: (topic: string, id: string) => void;
/**
* Callback which is called if a Subscription is
* removed.
*/
onDeleteSubscription: (topic: string, id: string) => void;
}
export interface IMqttConverter {
registerFromMqttReceiver: ( topic: string, payload: any) => void;
registerToMqttReceiver: ( topic: string, payload: any) => void;
convertFromMqtt: ( topic: string, payload: any, useFunctions: boolean) => any;
convertToMqtt: ( topic: string, payload: any, useFunctions: boolean) => string | any;
SUBSCRIBED: string;
PUBLISHED: string;
}

View File

@ -1,12 +0,0 @@
/**
* @author Martin Karkowski
* @email m.karkowski@zema.de
* @create date 2018-05-22 12:52:04
* @modify date 2018-05-22 12:52:04
* @desc [description]
*/
export const TYPES = {
MQTTMatcher: Symbol.for('MQTTMatcher'),
MQTTConverter: Symbol.for('MQTTConverter'),
MQTTLibrary: Symbol.for('MQTTLibrary')
};