9.6 KiB
Pub Sub System - Publish and Subscribe System
The Pub-Sub-System is a basic Publish and Subscribe System. A Pub-Sub System consists of 3 Main Elements:
- Publishers
- Subscribers
- Broker
Whereas Publishers publishes data on a given Topic, Subscribers receives updates of the data of a subscribed Topic. Topics are used as a Base to share data. For instance, if a Sensor publishes its value on the Topic Room1.Temperature
every subscribtion which subscribes to Room1
or Room1.Temperature
will receive the updated Value. By unsubscribing, a Subscriber doesn't receive Updates anymore. Thereby, the Publish-and-Subscribe mechanism ensure, a loose coupleing between the Elements and scalability.
Summarized Attributes of a Pub-Sub-System
- A Publisher doesn't know its subscribers
- A Subscriber doesn't kown the Publisher(s)
- Different Publishers are able to Publish data on the same Topic
- Broker are responsible for sharing data and subscriptions etc.
Description of the Implementation
The Basic Pub-Sub-System which is based on a Binary Tree-Model. For Instance, there are Publishers for the following Topics:
Root.Topic1
Root.Topic2
Root.Topic2.Part1
Root.Topic2.Part2
Root.Topic2.Part3
Thereby the Tree is build up like that.
(Root)
/ \
/ \
(Topic1) (Topic2)
/ | \
/ | \
(Part1) | (Part2)
|
(Part3)
Data of Topics are shared only to its 'Super-Topics' and 'Child-Topics' (see the Examples)
Example 1 - Publishing data on Root/Topic2
If a Publisher of the Topic Root.Topic2
publishes an update, the Subscribe of the following Topics receives an update.
Root
Root.Topic2
Root.Topic2.Part1
Root.Topic2.Part2
Root.Topic2.Part3
Thereby each element gets the new Data in the following format:
Example-Data:
{ Part1 : 1, Part2 : 'test', Part3 : 1.337 }
is published on the TopicRoot/Topic2
Topic | Received Data of the Subscription in JSON-Notation |
---|---|
Root |
{ Topic1: 'someData', Topic2 : { Part1 : 1, Part2 : 'test', Part3 : [1.337, 1.338] } } |
Root.Topic2 |
{ Part1 : 1, Part2 : 'test', Part3 : 1.337 } |
Root.Topic2.Part1 |
1 |
Root.Topic2.Part2 |
'test' |
Root.Topic2.Part3 |
[1.337, 1.338] |
Root.Topic2.Part3.0 |
1.337 |
Root.Topic2.Part3.1 |
1.338 |
Subscriber of the Topic
Root.Topic1
doesn't receive Updates.
Example 2 - Publishing data on Root.Topic2.Part3
If a Publisher of the Topic Root.Topic2.Part3
publishes an update, the Subscribe of the following Topics receives an update.
Root
Root.Topic2
Root.Topic2.Part3
Thereby each element gets the new Data in the following format:
Example-Data:
[1234, 1.338]
is published on the TopicRoot.Topic2.Part3
Topic | Received Data of the Subscription in JSON-Notation |
---|---|
Root |
{ Topic1: 'someData', Topic2 : { Part1 : 1, Part2 : 'test', Part3 : [1234, 1.338] } } |
Root.Topic2 |
{ Part1 : 1, Part2 : 'test', Part3 : [1234, 1.338] } |
Root.Topic2.Part3 |
[1234, 1.338] |
Root.Topic2.Part3.0 |
1234 |
Root.Topic2.Part3.1 |
1.338 No Update will be Pusblished, cause the value didn't change |
Subscriber of the Topic
Root.Topic1
,Root.Topic2.Part1
,Root.Topic2.Part3.1
andRoot.Topic2.Part2
doesn't receive Updates, cause their values didn't change.
Technical Implementation
The Pub-Sub-System is build up with a Base-Node-Container. This Container is responsible to create (if necessary) new Topics on which a Subscription could be subscribe or data could be published. Based on all available Subscriptions published Data send to the corresponding Subscriptions.
- No multiple Pub-Sub-Systems at a Time, otherwise an exception is thrown.
- Subscribers must unsubscribe if the should not listening any more. For Instance pause/deactivate or dispose an specific Object. (use the
unregister
function of a Subscribtion) - Subscriptions could be activated again by calling
register
- Subscriptions are automatically active.
Observables
Observables allow a simple integration of attributes of an object. Therefore the IObserveable<T>
interface is used.
Example - Integrate an Observable
To use an Observable the following code must be used, where as TYPE
describes the data type like number
, string
, etc.
@inject(PUBSUB.TYPES.Observable)
public attributeName: PUBSUB.Observable<TYPE>;
Furthermore the Class containing the observable attribute must implement the IContainsObservables
-Interface, as presented in the example below:
/** Import the Pub-Sub-System (Path must be adapted) */
import * as PUBSUB from 'Pub-Sub-System/assembly/manual-assembly';
/** Define a Class containing an observable @see attribute_01 */
@injectable()
class TestClass implements IContainsObservables {
/** Values of the IOnlineValues - Interface */
path: string = '';
offeredTopics: Map<string, GenericObservable<any>> ;
/**
* Defining the Attribute which should be integrated into the Publish
* and Subscribe System. The value should be a 'number'
*
* @type {PUBSUB.Observer<number>}
* @memberof TestClass
*/
@inject(PUBSUB.TYPES.Observable)
public attribute_01: PUBSUB.Observable<number>;
/**
* Creates an instance of TestClass.
* @memberof TestClass
*/
constructor() {
/** Set a Values - That must be done during the construction */
this.path = '';
this.offeredTopics = new Map<string, GenericObservable<any>>();
}
}
Working with Observables
By default oberserables publish their content on an automatically generated Topic. This topic could be adapted and read with the path
- attibute of an Observable.
Subscribing to Changes
To subscribe to changes of an obserable simply use the metho subscribe
. You can determine whether the subscription should receive the information async
(Putting task on the next Tick of the Nodejs / JavaScript Eventloop) or sync
(immediately).
The Following Parameters are used:
Param | Type | Default | Description |
---|---|---|---|
func | subscriptionCallback |
Function which is called when new Datas are pushed | |
[mode] | 'sync' \ 'immediate' |
'sync' |
Mode of the Subscription |
Returns: ISubscription
- The corresponding Subscription
whereas the subscriptionCallback
is defined as:
Param | Type | Description |
---|---|---|
content | any |
The new Data |
[topic] | string |
The Topic on which the Data where from |
[timeStamp] | number |
The timeStamp on which the data where changed |
Returns - void
Example
/** Simplified Construction of the Object */
const _test = new TestClass();
/** Subscribe to changes of the attribute */
_test.attribute_01.subscribe((_num, _topic) => {
console.log('subscription on', _topic, '. Value changed to:', _num);
}, 'sync');
/** Change the Value of the observable => Subscription will be triggered */
_test.attribute_01.value = 1337;
Changing Values and reading values
The value of an observable could be changed with the value
attribute of an observable
/** Simplified Construction of the Object */
const _test = new TestClass();
/** Change the Value of the observable => Subscription will be triggered */
_test.attribute_01.value = 1337;
/** Reading a Value */
const _aVar = _test.attribute_01.value;
Sample Application
import { Builder } from '../../mod-Assembly-Builder/src/Container-Builder.FileLoader';
import { DESCRIPTION, PubSubSystem } from '../assembly/manual-assembly';
Builder.instance.addElements(DESCRIPTION.PROVIDED_CLASSES)
Builder.instance.addActivationHandler(DESCRIPTION.ACTIVATIONHANDLERS);
Builder.load();
Builder.on('loaded', () => {
// Create a Publish and Subscribe System
const _pubSub = Builder.instance.container.get<PubSubSystem>(DESCRIPTION.TYPES.PubSubSystem);
// Create a Subscription on the topic: "test.topic"
const _sub = _pubSub.createSubscription<any>('test.topic', (_data, _topic, _timestamp) => {
console.log('received with inform', _data);
});
// Create a Publisher and Publish "1" on a Subtopic
_pubSub.createPublisher<number>('test.topic.child').publishData(1);
_pubSub.createPublisher<number>('test.topic').publishData(2);
// Change the Subscription Mode.
_sub.options.mode = ['direct'];
// The following Subscription will be now ignored
_pubSub.createPublisher<number>('test.topic.child').publishData(3);
_pubSub.createPublisher<number>('test.topic').publishData(4);
});
Results in the Output
Changing Value to 1337
Calling Subscription of root._id8ebg4qDkQjjx the 0'th time. Value changed to: 1337
Adding Setter
Changing Value to 133123
In Setter of the attribute_01
Calling Subscription of root._id8ebg4qDkQjjx the 1'th time. Value changed to: 100
Reading after setter 100
Changing Path
Changing Value to 1337
In Setter of the attribute_01
In Setter of the attribute_01
Calling Subscription of root._test.attribute_01 the 2'th time. Value changed to: 100
create Link with Pub-Sub-System
plublish 1336 on _test.attribute_01
In Setter of the attribute_01
Calling Subscription of root._test.attribute_01 the 3'th time. Value changed to: 1336
plublish 1336 on _test.attribute_01
Calling Subscription of root._test.attribute_01 the 4'th time. Value changed to: 1336