#!/usr/bin/env python # Test whether a PUBLISH to a topic with an offline subscriber results in a queued message import Queue import random import string import subprocess import socket import threading import time try: import paho.mqtt.client import paho.mqtt.publish except ImportError: print("WARNING: paho.mqtt module not available, skipping byte count test.") exit(0) import inspect, os, sys # From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],".."))) if cmd_subfolder not in sys.path: sys.path.insert(0, cmd_subfolder) import mosq_test rc = 1 def registerOfflineSubscriber(): """Just a durable client to trigger queuing""" client = paho.mqtt.client.Client("sub-qos1-offline", clean_session=False) client.connect("localhost", port=1888) client.subscribe("test/publish/queueing/#", 1) client.loop() client.disconnect() broker = mosq_test.start_broker(filename=os.path.basename(__file__)) class BrokerMonitor(threading.Thread): def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, verbose=None): threading.Thread.__init__(self, group=group, target=target, name=name, verbose=verbose) self.rq, self.cq = args self.stored = -1 self.stored_bytes = -1 self.dropped = -1 def store_count(self, client, userdata, message): self.stored = int(message.payload) def store_bytes(self, client, userdata, message): self.stored_bytes = int(message.payload) def publish_dropped(self, client, userdata, message): self.dropped = int(message.payload) def run(self): client = paho.mqtt.client.Client("broker-monitor") client.connect("localhost", port=1888) client.message_callback_add("$SYS/broker/store/messages/count", self.store_count) client.message_callback_add("$SYS/broker/store/messages/bytes", self.store_bytes) client.message_callback_add("$SYS/broker/publish/messages/dropped", self.publish_dropped) client.subscribe("$SYS/broker/store/messages/#") client.subscribe("$SYS/broker/publish/messages/dropped") while True: expect_drops = cq.get() self.cq.task_done() if expect_drops == "quit": break first = time.time() while self.stored < 0 or self.stored_bytes < 0 or (expect_drops and self.dropped < 0): client.loop(timeout=0.5) if time.time() - 10 > first: print("ABORT TIMEOUT") break if expect_drops: self.rq.put((self.stored, self.stored_bytes, self.dropped)) else: self.rq.put((self.stored, self.stored_bytes, 0)) self.stored = -1 self.stored_bytes = -1 self.dropped = -1 client.disconnect() rq = Queue.Queue() cq = Queue.Queue() brokerMonitor = BrokerMonitor(args=(rq,cq)) class StoreCounts(): def __init__(self): self.stored = 0 self.bstored = 0 self.drops = 0 self.diff_stored = 0 self.diff_bstored = 0 self.diff_drops = 0 def update(self, tup): self.diff_stored = tup[0] - self.stored self.stored = tup[0] self.diff_bstored = tup[1] - self.bstored self.bstored = tup[1] self.diff_drops = tup[2] - self.drops self.drops = tup[2] def __repr__(self): return "s: %d (%d) b: %d (%d) d: %d (%d)" % (self.stored, self.diff_stored, self.bstored, self.diff_bstored, self.drops, self.diff_drops) try: registerOfflineSubscriber() time.sleep(2.5) # Wait for first proper dump of stats brokerMonitor.start() counts = StoreCounts() cq.put(True) # Expect a dropped count (of 0, initial) counts.update(rq.get()) # Initial start print("rq.get (INITIAL) gave us: ", counts) rq.task_done() # publish 10 short messages, should be no drops print("publishing 10 short") cq.put(False) # expect no updated drop count msgs_short10 = [("test/publish/queueing/%d" % x, ''.join(random.choice(string.hexdigits) for _ in range(10)), 1, False) for x in range(1, 10 + 1)] paho.mqtt.publish.multiple(msgs_short10, port=1888) counts.update(rq.get()) # Initial start print("rq.get (short) gave us: ", counts) rq.task_done() if counts.diff_stored != 10 or counts.diff_bstored < 100: raise ValueError if counts.diff_drops != 0: raise ValueError # publish 10 mediums (40bytes). should fail after 8, when it finally crosses 400 print("publishing 10 medium") cq.put(True) # expect a drop count msgs_medium10 = [("test/publish/queueing/%d" % x, ''.join(random.choice(string.hexdigits) for _ in range(40)), 1, False) for x in range(1, 10 + 1)] paho.mqtt.publish.multiple(msgs_medium10, port=1888) counts.update(rq.get()) # Initial start print("rq.get (medium) gave us: ", counts) rq.task_done() if counts.diff_stored != 8 or counts.diff_bstored < 320: raise ValueError if counts.diff_drops != 2: raise ValueError rc = 0 finally: cq.put("quit") brokerMonitor.join() rq.join() cq.join() broker.terminate() (stdo, stde) = broker.communicate() if rc: print(stde) exit(rc)