/** * @author Martin Karkowski * @email m.karkowski@zema.de * @create date 2018-05-22 12:51:09 * @modify date 2018-08-13 07:15:19 * @desc [description] */ /** Test Client for testing a MQTT-Server */ import * as mqtt from 'mqtt'; console.log('Client Starting'); const client = mqtt.connect('mqtt://dz-013'); const connectedClients = new Set(); const subscribedTopics = new Map>(); function getSubscribed(msg: string) { for (const _client of connectedClients) { if (msg.includes(_client)) { return msg.replace(_client, 'client').match(/(?<=client\s\d+\s).*/g) } } return null; } function getUnsubscribed(msg: string) { for (const _client of connectedClients) { if (msg.includes(_client)) { return msg.replace(_client, 'client').match(/(?<=client\s).*/g) } } return null; } function display() { console.log('---------------------' + (new Date(Date.now())).toISOString() + '---------------------') const _connected = new Array(); connectedClients.forEach(client => _connected.push(client)); console.log('Connected Clients:\n' + JSON.stringify(_connected, null, 4)); const _subscribed = new Array(); for (const [topic, subscribers] of subscribedTopics.entries()) { if (subscribers.size > 0) { _subscribed.push(topic); } } console.log('Subscribed-Topics:\n' + JSON.stringify(_subscribed, null, 4)); } client.on('connect', function () { /** Define Message */ client.subscribe('$SYS/broker/log/M/subscribe'); client.subscribe('$SYS/broker/log/M/unsubscribe'); client.subscribe('$SYS/broker/log/N'); }); client.on('message', (topic, payload) => { /** Convert the Buffer */ const data = payload.toString(); /** Create a Content field */ let content: Array | null = null; switch (topic) { case '$SYS/broker/log/M/subscribe': /** Try to find a connected Client */ content = getSubscribed(data) if (content) { content.forEach(value => { if (!subscribedTopics.has(value)) { subscribedTopics.set(value, new Set()); } (subscribedTopics.get(value) as Set).add(value) }); } display(); break; case '$SYS/broker/log/M/unsubscribe': content = getUnsubscribed(data) if (content) { content.forEach(value => { if (!subscribedTopics.has(value)) { subscribedTopics.set(value, new Set()); } (subscribedTopics.get(value) as Set).delete(value) }); } display(); break; case '$SYS/broker/log/N': content = data.match(/(?<=Socket error on client ).*(?=, disconnecting)/g); if (content) { content.forEach(value => { connectedClients.delete(value); /** Delete the Susbcriber */ for (const _subscribers of subscribedTopics.values()) { _subscribers.delete(value); } }); display(); break; } /** Regular Expression to Match disconnected */ content = data.match(/(?<=Client\s).*(?=\sdisconnected)/g); if (content) { content.forEach(value => { connectedClients.delete(value); /** Delete the Susbcriber */ for (const _subscribers of subscribedTopics.values()) { _subscribers.delete(value); } }); display(); break; } /** Regular Expression to Match connected clients */ content = data.match(/(?<=as\s).*(?=\s\()/g) if (content) { content.forEach(value => connectedClients.add(value)); display(); break; } break; } })