390 lines
13 KiB
Markdown
390 lines
13 KiB
Markdown
|
# Pub Sub System - Publish and Subscribe System
|
||
|
|
||
|
The Pub-Sub-System is a basic Publish and Subscribe System. A Pub-Sub System consists of 3 Main Elements:
|
||
|
|
||
|
* Publishers
|
||
|
* Subscribers
|
||
|
* Broker
|
||
|
|
||
|
Whereas Publishers publishes data on a given Topic, Subscribers receives updates of the data of a subscribed Topic. Topics are used as a Base to share data. For instance, if a Sensor publishes its value on the Topic `Room1.Temperature` every subscribtion which subscribes to `Room1` or `Room1.Temperature` will receive the updated Value. By unsubscribing, a Subscriber doesn't receive Updates anymore. Thereby, the Publish-and-Subscribe mechanism ensure, a loose coupleing between the Elements and scalability.
|
||
|
|
||
|
|
||
|
Summarized Attributes of a Pub-Sub-System
|
||
|
|
||
|
* A Publisher doesn't know its subscribers
|
||
|
* A Subscriber doesn't kown the Publisher(s)
|
||
|
* Different Publishers are able to Publish data on the same Topic
|
||
|
* Broker are responsible for sharing data and subscriptions etc.
|
||
|
|
||
|
---
|
||
|
|
||
|
## Description of the Implementation
|
||
|
|
||
|
The Basic Pub-Sub-System which is based on a Binary Tree-Model. For Instance, there are Publishers for the following Topics:
|
||
|
|
||
|
* `Root.Topic1`
|
||
|
* `Root.Topic2`
|
||
|
* `Root.Topic2.Part1`
|
||
|
* `Root.Topic2.Part2`
|
||
|
* `Root.Topic2.Part3`
|
||
|
|
||
|
Thereby the **Tree** is build up like that.
|
||
|
|
||
|
```
|
||
|
(Root)
|
||
|
/ \
|
||
|
/ \
|
||
|
(Topic1) (Topic2)
|
||
|
/ | \
|
||
|
/ | \
|
||
|
(Part1) | (Part2)
|
||
|
|
|
||
|
(Part3)
|
||
|
```
|
||
|
|
||
|
Data of Topics are shared only to its '*Super-Topics*' and '*Child-Topics*' (see the Examples)
|
||
|
|
||
|
### Example 1 - Publishing data on `Root/Topic2`
|
||
|
|
||
|
If a Publisher of the Topic `Root.Topic2` publishes an update, the Subscribe of the following Topics receives an update.
|
||
|
|
||
|
* `Root`
|
||
|
* `Root.Topic2`
|
||
|
* `Root.Topic2.Part1`
|
||
|
* `Root.Topic2.Part2`
|
||
|
* `Root.Topic2.Part3`
|
||
|
|
||
|
Thereby each element gets the new Data in the following format:
|
||
|
|
||
|
> **Example-Data:** `{ Part1 : 1, Part2 : 'test', Part3 : 1.337 }` is published on the Topic `Root/Topic2`
|
||
|
|
||
|
| Topic | Received Data of the Subscription in ***JSON-Notation***|
|
||
|
| ---- | ---- |
|
||
|
| `Root` | `{ Topic1: 'someData', Topic2 : { Part1 : 1, Part2 : 'test', Part3 : [1.337, 1.338] } }` |
|
||
|
| `Root.Topic2` | `{ Part1 : 1, Part2 : 'test', Part3 : 1.337 }` |
|
||
|
| `Root.Topic2.Part1` | `1` |
|
||
|
| `Root.Topic2.Part2` | `'test'` |
|
||
|
| `Root.Topic2.Part3` | `[1.337, 1.338]` |
|
||
|
| `Root.Topic2.Part3.0` | `1.337` |
|
||
|
| `Root.Topic2.Part3.1` | `1.338` |
|
||
|
|
||
|
> Subscriber of the Topic `Root.Topic1` doesn't receive Updates.
|
||
|
|
||
|
### Example 2 - Publishing data on `Root.Topic2.Part3`
|
||
|
|
||
|
If a Publisher of the Topic `Root.Topic2.Part3` publishes an update, the Subscribe of the following Topics receives an update.
|
||
|
|
||
|
* `Root`
|
||
|
* `Root.Topic2`
|
||
|
* `Root.Topic2.Part3`
|
||
|
|
||
|
Thereby each element gets the new Data in the following format:
|
||
|
|
||
|
> **Example-Data:** `[1234, 1.338]` is published on the Topic `Root.Topic2.Part3`
|
||
|
|
||
|
| Topic | Received Data of the Subscription in ***JSON-Notation***|
|
||
|
| ---- | ---- |
|
||
|
| `Root` | `{ Topic1: 'someData', Topic2 : { Part1 : 1, Part2 : 'test', Part3 : [1234, 1.338] } }` |
|
||
|
| `Root.Topic2` | `{ Part1 : 1, Part2 : 'test', Part3 : [1234, 1.338] }` |
|
||
|
| `Root.Topic2.Part3` | `[1234, 1.338]` |
|
||
|
| `Root.Topic2.Part3.0` | `1234` |
|
||
|
| `Root.Topic2.Part3.1` | `1.338` No Update will be Pusblished, cause the value didn't change |
|
||
|
|
||
|
> Subscriber of the Topic `Root.Topic1`, `Root.Topic2.Part1`, `Root.Topic2.Part3.1` and `Root.Topic2.Part2` doesn't receive Updates, cause their values didn't change.
|
||
|
|
||
|
---
|
||
|
|
||
|
## Technical Implementation
|
||
|
|
||
|
The Pub-Sub-System is build up with a Base-Node-Container. This Container is responsible to create (if necessary) new Topics on which a Subscription could be subscribe or data could be published. Based on all available Subscriptions published Data send to the corresponding Subscriptions.
|
||
|
|
||
|
* No multiple Pub-Sub-Systems at a Time, otherwise an exception is thrown.
|
||
|
* **Subscribers must unsubscribe** if the should not listening any more. For Instance pause/deactivate or dispose an specific Object. (use the `unregister` function of a Subscribtion)
|
||
|
* Subscriptions could be activated again by calling `register`
|
||
|
* Subscriptions are automatically active.
|
||
|
|
||
|
---
|
||
|
|
||
|
## Observables
|
||
|
|
||
|
Observables allow a simple integration of attributes of an object. Therefore the `IObserveable<T>` interface is used.
|
||
|
|
||
|
|
||
|
### Example - Integrate an Observable
|
||
|
|
||
|
To use an Observable the following code must be used, where as `TYPE` describes the data type like `number`, `string`, etc.
|
||
|
|
||
|
```typescript
|
||
|
@inject(PUBSUB.TYPES.Observable)
|
||
|
public attributeName: PUBSUB.Observable<TYPE>;
|
||
|
```
|
||
|
|
||
|
Furthermore the Class containing the observable attribute **must** implement the **`IContainsObservables`-Interface**, as presented in the example below:
|
||
|
|
||
|
```typescript
|
||
|
/** Import the Pub-Sub-System (Path must be adapted) */
|
||
|
import * as PUBSUB from 'Pub-Sub-System/assembly/manual-assembly';
|
||
|
|
||
|
/** Define a Class containing an observable @see attribute_01 */
|
||
|
@injectable()
|
||
|
class TestClass implements IContainsObservables {
|
||
|
/** Values of the IOnlineValues - Interface */
|
||
|
path: string = '';
|
||
|
offeredTopics: Map<string, GenericObservable<any>> ;
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Defining the Attribute which should be integrated into the Publish
|
||
|
* and Subscribe System. The value should be a 'number'
|
||
|
*
|
||
|
* @type {PUBSUB.Observer<number>}
|
||
|
* @memberof TestClass
|
||
|
*/
|
||
|
@inject(PUBSUB.TYPES.Observable)
|
||
|
public attribute_01: PUBSUB.Observable<number>;
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Creates an instance of TestClass.
|
||
|
* @memberof TestClass
|
||
|
*/
|
||
|
constructor() {
|
||
|
/** Set a Values - That must be done during the construction */
|
||
|
this.path = '';
|
||
|
this.offeredTopics = new Map<string, GenericObservable<any>>();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
```
|
||
|
|
||
|
### Working with Observables
|
||
|
|
||
|
By default oberserables publish their content on an automatically generated Topic. This topic could be adapted and read with the `path` - attibute of an Observable.
|
||
|
|
||
|
```typescript
|
||
|
|
||
|
```
|
||
|
|
||
|
#### Subscribing to Changes
|
||
|
|
||
|
To subscribe to changes of an obserable simply use the metho `subscribe`. You can determine whether the subscription should receive the information `async` (Putting task on the next Tick of the Nodejs / JavaScript Eventloop) or `sync` (immediately).
|
||
|
|
||
|
The Following Parameters are used:
|
||
|
|
||
|
| Param | Type | Default | Description |
|
||
|
| --- | --- | --- | --- |
|
||
|
| func | [<code>subscriptionCallback</code>](#subscriptionCallback) | | Function which is called when new Datas are pushed |
|
||
|
| [mode] | `'sync'` \ `'async'` | `'sync'`| Mode of the Subscription |
|
||
|
|
||
|
**Returns**: <code>ISubscription</code> - The corresponding Subscription
|
||
|
|
||
|
whereas the <code>subscriptionCallback</code> is defined as:
|
||
|
|
||
|
| Param | Type | Description |
|
||
|
| --- | --- | --- |
|
||
|
| content | <code>any</code> | The new Data |
|
||
|
| [topic] | <code>string</code> | The Topic on which the Data where from |
|
||
|
| [timeStamp] | <code>number</code> | The timeStamp on which the data where changed |
|
||
|
|
||
|
**Returns** - <code>void</code>
|
||
|
|
||
|
|
||
|
##### Example
|
||
|
|
||
|
```typescript
|
||
|
/** Simplified Construction of the Object */
|
||
|
const _test = new TestClass();
|
||
|
|
||
|
/** Subscribe to changes of the attribute */
|
||
|
_test.attribute_01.subscribe((_num, _topic) => {
|
||
|
console.log('subscription on', _topic, '. Value changed to:', _num);
|
||
|
}, 'sync');
|
||
|
|
||
|
/** Change the Value of the observable => Subscription will be triggered */
|
||
|
_test.attribute_01.value = 1337;
|
||
|
```
|
||
|
|
||
|
|
||
|
|
||
|
#### Changing Values and reading values
|
||
|
|
||
|
The value of an observable could be changed with the `value` attribute of an observable
|
||
|
|
||
|
```typescript
|
||
|
/** Simplified Construction of the Object */
|
||
|
const _test = new TestClass();
|
||
|
|
||
|
/** Change the Value of the observable => Subscription will be triggered */
|
||
|
_test.attribute_01.value = 1337;
|
||
|
|
||
|
/** Reading a Value */
|
||
|
const _aVar = _test.attribute_01.value;
|
||
|
```
|
||
|
|
||
|
### Sample Application
|
||
|
|
||
|
```typescript
|
||
|
/** Clear the Screen */
|
||
|
declare const process: any;
|
||
|
|
||
|
|
||
|
import { Builder } from '../../ZISS-Assembly-Builder/src/Container-Builder.FileLoader';
|
||
|
|
||
|
import * as PUBSUB from '../../ZISS-Publish-And-Subscribe-System/assembly/manual-assembly';
|
||
|
import * as LOGGING from '../../ZISS-Logger/assembly/manual-assembly';
|
||
|
|
||
|
|
||
|
import { addObservables, IContainsObservables, GenericObservable } from '../src/Pub-Sub-Observable';
|
||
|
import { inject, injectable } from 'inversify';
|
||
|
|
||
|
/** Add the Observer-ActivationHandler */
|
||
|
Builder.instance.addActivationHandler(addObservables);
|
||
|
|
||
|
/** Trigger Loading the Modules */
|
||
|
Builder.load();
|
||
|
|
||
|
Builder.on('loaded', () => {
|
||
|
|
||
|
/** Change the Log-Level */
|
||
|
|
||
|
Builder.instance.container.get<LOGGING.Printer>(LOGGING.TYPES.Printer).printLog('warn');
|
||
|
|
||
|
/** Define a Class containing an observable @see attribute_01 */
|
||
|
@injectable()
|
||
|
class TestClass implements IContainsObservables {
|
||
|
path: string = '';
|
||
|
offeredTopics: Map<string, GenericObservable<any>>;
|
||
|
|
||
|
init(): void {
|
||
|
/** Set a Value */
|
||
|
this.attribute_01.value = 0;
|
||
|
}
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Defining the Attribute as observable
|
||
|
*
|
||
|
* @type {PUBSUB.Observable<number>}
|
||
|
* @memberof TestClass
|
||
|
*/
|
||
|
@inject(PUBSUB.TYPES.Observable)
|
||
|
public attribute_01: PUBSUB.Observable<number>;
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Creates an instance of TestClass.
|
||
|
* @memberof TestClass
|
||
|
*/
|
||
|
constructor() {
|
||
|
/** Set a Values - That must be done during the construction */
|
||
|
this.path = '';
|
||
|
this.offeredTopics = new Map<string, GenericObservable<any>>();
|
||
|
}
|
||
|
|
||
|
|
||
|
|
||
|
}
|
||
|
|
||
|
/** Create a Definition for the Builder */
|
||
|
const _definition = [{
|
||
|
selector: 'Test',
|
||
|
type: TestClass
|
||
|
}];
|
||
|
|
||
|
/** Add the Definition to the Builder */
|
||
|
Builder.instance.addElements(_definition);
|
||
|
|
||
|
/** Generate an Object of the Test-Class */
|
||
|
const _test = Builder.instance.container.get<TestClass>("Test");
|
||
|
|
||
|
let _counter = 0;
|
||
|
|
||
|
/** Subscribe tot the attribute */
|
||
|
_test.attribute_01.subscribe((_num, _topic) => {
|
||
|
console.log('Calling Subscription of', _topic, ' the ' + _counter + '\'th time. Value changed to:', _num);
|
||
|
_counter++;
|
||
|
}, 'sync');
|
||
|
|
||
|
/** Set the value to trigger the execution of the Subscription */
|
||
|
console.log('Changing Value to 1337');
|
||
|
_test.attribute_01.value = 1337;
|
||
|
|
||
|
console.log('Adding Setter');
|
||
|
|
||
|
/** Define a specialized Setter */
|
||
|
_test.attribute_01.setter = (value) => {
|
||
|
console.log('In Setter of the attribute_01');
|
||
|
/** The value should be limeted to Min 0 or Max 100 */
|
||
|
if (value > 100) {
|
||
|
return 100;
|
||
|
}
|
||
|
if (value < 0) {
|
||
|
return 0;
|
||
|
}
|
||
|
return value;
|
||
|
}
|
||
|
|
||
|
/** Setting the Attribute to change the Value and trigger the setter and subscription */
|
||
|
console.log('Changing Value to 133123');
|
||
|
_test.attribute_01.value = 133123;
|
||
|
console.log('Reading after setter', _test.attribute_01.value);
|
||
|
|
||
|
console.log('Changing Path')
|
||
|
/** Change the path. Subscriptions on the Observable are automatically adapted, to subscribe the new Path */
|
||
|
_test.path = '_test';
|
||
|
|
||
|
/** */
|
||
|
console.log('Changing Value to 1337');
|
||
|
_test.attribute_01.value = 1337;
|
||
|
|
||
|
console.log('create Link with Pub-Sub-System')
|
||
|
/** Enabling changing the Data externally */
|
||
|
_test.attribute_01.subscribeInPubSubSystem = true;
|
||
|
|
||
|
|
||
|
/** Get the underlying Publish-and-Subscribe System */
|
||
|
const _pubSub = Builder.instance.container.get<PUBSUB.PubSubSystem>(PUBSUB.TYPES.PubSubSystem);
|
||
|
|
||
|
/** Publish data to change the Attribute */
|
||
|
console.log('plublish 1336 on _test.attribute_01')
|
||
|
_pubSub.createPublisher<number>('_test.attribute_01').publishData(1336);
|
||
|
|
||
|
/** Turn off online changes of the var by the Publish-and-Subscribe System */
|
||
|
_test.attribute_01.subscribeInPubSubSystem = false;
|
||
|
|
||
|
/** Publish data to change the Attribute */
|
||
|
console.log('plublish 1336 on _test.attribute_01')
|
||
|
_pubSub.createPublisher<number>('_test.attribute_01').publishData(1336);
|
||
|
|
||
|
});
|
||
|
|
||
|
```
|
||
|
|
||
|
Results in the Output
|
||
|
|
||
|
```
|
||
|
Changing Value to 1337
|
||
|
Calling Subscription of root._id8ebg4qDkQjjx the 0'th time. Value changed to: 1337
|
||
|
Adding Setter
|
||
|
Changing Value to 133123
|
||
|
In Setter of the attribute_01
|
||
|
Calling Subscription of root._id8ebg4qDkQjjx the 1'th time. Value changed to: 100
|
||
|
Reading after setter 100
|
||
|
Changing Path
|
||
|
Changing Value to 1337
|
||
|
In Setter of the attribute_01
|
||
|
In Setter of the attribute_01
|
||
|
Calling Subscription of root._test.attribute_01 the 2'th time. Value changed to: 100
|
||
|
create Link with Pub-Sub-System
|
||
|
plublish 1336 on _test.attribute_01
|
||
|
In Setter of the attribute_01
|
||
|
Calling Subscription of root._test.attribute_01 the 3'th time. Value changed to: 1336
|
||
|
plublish 1336 on _test.attribute_01
|
||
|
Calling Subscription of root._test.attribute_01 the 4'th time. Value changed to: 1336
|
||
|
```
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|