nope/modules/pub-sub
2021-08-17 17:52:46 +02:00
..
src Updating to version 0.9.7 2021-08-17 17:52:46 +02:00
types fixing formats 2020-11-24 15:14:56 +01:00
README.md fixing names 2021-08-04 14:57:28 +02:00

Pub Sub System - Publish and Subscribe System

The Pub-Sub-System is a basic Publish and Subscribe System. A Pub-Sub System consists of 3 Main Elements:

  • Publishers
  • Subscribers
  • Broker

Whereas Publishers publishes data on a given Topic, Subscribers receives updates of the data of a subscribed Topic. Topics are used as a Base to share data. For instance, if a Sensor publishes its value on the Topic Room1.Temperature every subscribtion which subscribes to Room1 or Room1.Temperature will receive the updated Value. By unsubscribing, a Subscriber doesn't receive Updates anymore. Thereby, the Publish-and-Subscribe mechanism ensure, a loose coupleing between the Elements and scalability.

Summarized Attributes of a Pub-Sub-System

  • A Publisher doesn't know its subscribers
  • A Subscriber doesn't kown the Publisher(s)
  • Different Publishers are able to Publish data on the same Topic
  • Broker are responsible for sharing data and subscriptions etc.

Description of the Implementation

The Basic Pub-Sub-System which is based on a Binary Tree-Model. For Instance, there are Publishers for the following Topics:

  • Root.Topic1
  • Root.Topic2
  • Root.Topic2.Part1
  • Root.Topic2.Part2
  • Root.Topic2.Part3

Thereby the Tree is build up like that.

        (Root)
        /    \
       /      \
  (Topic1)  (Topic2)
            /  |   \
           /   |    \
       (Part1) |  (Part2)
               |
            (Part3)

Data of Topics are shared only to its 'Super-Topics' and 'Child-Topics' (see the Examples)

Example 1 - Publishing data on Root/Topic2

If a Publisher of the Topic Root.Topic2 publishes an update, the Subscribe of the following Topics receives an update.

  • Root
  • Root.Topic2
  • Root.Topic2.Part1
  • Root.Topic2.Part2
  • Root.Topic2.Part3

Thereby each element gets the new Data in the following format:

Example-Data: { Part1 : 1, Part2 : 'test', Part3 : 1.337 } is published on the Topic Root/Topic2

Topic Received Data of the Subscription in JSON-Notation
Root { Topic1: 'someData', Topic2 : { Part1 : 1, Part2 : 'test', Part3 : [1.337, 1.338] } }
Root.Topic2 { Part1 : 1, Part2 : 'test', Part3 : 1.337 }
Root.Topic2.Part1 1
Root.Topic2.Part2 'test'
Root.Topic2.Part3 [1.337, 1.338]
Root.Topic2.Part3.0 1.337
Root.Topic2.Part3.1 1.338

Subscriber of the Topic Root.Topic1 doesn't receive Updates.

Example 2 - Publishing data on Root.Topic2.Part3

If a Publisher of the Topic Root.Topic2.Part3 publishes an update, the Subscribe of the following Topics receives an update.

  • Root
  • Root.Topic2
  • Root.Topic2.Part3

Thereby each element gets the new Data in the following format:

Example-Data: [1234, 1.338] is published on the Topic Root.Topic2.Part3

Topic Received Data of the Subscription in JSON-Notation
Root { Topic1: 'someData', Topic2 : { Part1 : 1, Part2 : 'test', Part3 : [1234, 1.338] } }
Root.Topic2 { Part1 : 1, Part2 : 'test', Part3 : [1234, 1.338] }
Root.Topic2.Part3 [1234, 1.338]
Root.Topic2.Part3.0 1234
Root.Topic2.Part3.1 1.338 No Update will be Pusblished, cause the value didn't change

Subscriber of the Topic Root.Topic1, Root.Topic2.Part1, Root.Topic2.Part3.1 and Root.Topic2.Part2 doesn't receive Updates, cause their values didn't change.


Technical Implementation

The Pub-Sub-System is build up with a Base-Node-Container. This Container is responsible to create (if necessary) new Topics on which a Subscription could be subscribe or data could be published. Based on all available Subscriptions published Data send to the corresponding Subscriptions.

  • No multiple Pub-Sub-Systems at a Time, otherwise an exception is thrown.
  • Subscribers must unsubscribe if the should not listening any more. For Instance pause/deactivate or dispose an specific Object. (use the unregister function of a Subscribtion)
  • Subscriptions could be activated again by calling register
  • Subscriptions are automatically active.

Observables

Observables allow a simple integration of attributes of an object. Therefore the IObserveable<T> interface is used.

Example - Integrate an Observable

To use an Observable the following code must be used, where as TYPE describes the data type like number, string, etc.

@inject(PUBSUB.TYPES.Observable)
public attributeName: PUBSUB.Observable<TYPE>;

Furthermore the Class containing the observable attribute must implement the IContainsObservables-Interface, as presented in the example below:

/** Import the Pub-Sub-System (Path must be adapted) */
import * as PUBSUB from "Pub-Sub-System/assembly/manual-assembly";

/** Define a Class containing an observable @see attribute_01 */
@injectable()
class TestClass implements IContainsObservables {
  /** Values of the IOnlineValues - Interface */
  path: string = "";
  offeredTopics: Map<string, GenericObservable<any>>;

  /**
   * Defining the Attribute which should be integrated into the Publish
   * and Subscribe System. The value should be a 'number'
   *
   * @type {PUBSUB.Observer<number>}
   * @memberof TestClass
   */
  @inject(PUBSUB.TYPES.Observable)
  public attribute_01: PUBSUB.Observable<number>;

  /**
   * Creates an instance of TestClass.
   * @memberof TestClass
   */
  constructor() {
    /** Set a Values - That must be done during the construction */
    this.path = "";
    this.offeredTopics = new Map<string, GenericObservable<any>>();
  }
}

Working with Observables

By default observables publish their content on an automatically generated Topic. This topic could be adapted and read with the path - attibute of an Observable.


Subscribing to Changes

To subscribe to changes of an obserable simply use the metho subscribe. You can determine whether the subscription should receive the information async (Putting task on the next Tick of the Nodejs / JavaScript Eventloop) or sync (immediately).

The Following Parameters are used:

Param Type Default Description
func subscriptionCallback Function which is called when new Datas are pushed
[mode] 'sync' \ 'immediate' 'sync' Mode of the Subscription

Returns: ISubscription - The corresponding Subscription

whereas the subscriptionCallback is defined as:

Param Type Description
content any The new Data
[topic] string The Topic on which the Data where from
[timeStamp] number The timeStamp on which the data where changed

Returns - void

Example
/** Simplified Construction of the Object */
const _test = new TestClass();

/** Subscribe to changes of the attribute */
_test.attribute_01.subscribe((_num, _topic) => {
  console.log("subscription on", _topic, ". Value changed to:", _num);
}, "sync");

/** Change the Value of the observable => Subscription will be triggered */
_test.attribute_01.value = 1337;

Changing Values and reading values

The value of an observable could be changed with the value attribute of an observable

/** Simplified Construction of the Object */
const _test = new TestClass();

/** Change the Value of the observable => Subscription will be triggered */
_test.attribute_01.value = 1337;

/** Reading a Value */
const _aVar = _test.attribute_01.value;

Sample Application

import { Builder } from "../../mod-Assembly-Builder/src/Container-Builder.FileLoader";
import { DESCRIPTION, PubSubSystem } from "../assembly/manual-assembly";

Builder.instance.addElements(DESCRIPTION.PROVIDED_CLASSES);
Builder.instance.addActivationHandler(DESCRIPTION.ACTIVATIONHANDLERS);

Builder.load();

Builder.on("loaded", () => {
  // Create a Publish and Subscribe System
  const _pubSub = Builder.instance.container.get<PubSubSystem>(
    DESCRIPTION.TYPES.PubSubSystem
  );

  // Create a Subscription on the topic: "test.topic"
  const _sub = _pubSub.createSubscription<any>(
    "test.topic",
    (_data, _topic, _timestamp) => {
      console.log("received with inform", _data);
    }
  );

  // Create a Publisher and Publish "1" on a Subtopic
  _pubSub.createPublisher<number>("test.topic.child").publishData(1);
  _pubSub.createPublisher<number>("test.topic").publishData(2);

  // Change the Subscription Mode.
  _sub.options.mode = ["direct"];

  // The following Subscription will be now ignored
  _pubSub.createPublisher<number>("test.topic.child").publishData(3);
  _pubSub.createPublisher<number>("test.topic").publishData(4);
});

Results in the Output

Changing Value to 1337
Calling Subscription of root._id8ebg4qDkQjjx  the 0'th time. Value changed to: 1337
Adding Setter
Changing Value to 133123
In Setter of the attribute_01
Calling Subscription of root._id8ebg4qDkQjjx  the 1'th time. Value changed to: 100
Reading after setter 100
Changing Path
Changing Value to 1337
In Setter of the attribute_01
In Setter of the attribute_01
Calling Subscription of root._test.attribute_01  the 2'th time. Value changed to: 100
create Link with Pub-Sub-System
plublish 1336 on _test.attribute_01
In Setter of the attribute_01
Calling Subscription of root._test.attribute_01  the 3'th time. Value changed to: 1336
plublish 1336 on _test.attribute_01
Calling Subscription of root._test.attribute_01  the 4'th time. Value changed to: 1336