mosquitto/test/broker/03-publish-qos1-queued-bytes.py

164 lines
5.2 KiB
Python
Raw Normal View History

2019-03-28 21:32:12 +00:00
#!/usr/bin/env python3
# Test whether a PUBLISH to a topic with an offline subscriber results in a queued message
import Queue
import random
import string
import subprocess
import socket
import threading
import time
try:
import paho.mqtt.client
import paho.mqtt.publish
except ImportError:
print("WARNING: paho.mqtt module not available, skipping byte count test.")
exit(0)
2018-11-27 11:26:21 +00:00
from mosq_test_helper import *
rc = 1
2018-08-01 20:36:58 +00:00
port = mosq_test.get_port()
def registerOfflineSubscriber():
"""Just a durable client to trigger queuing"""
client = paho.mqtt.client.Client("sub-qos1-offline", clean_session=False)
2018-08-01 20:36:58 +00:00
client.connect("localhost", port=port)
client.subscribe("test/publish/queueing/#", 1)
client.loop()
client.disconnect()
2018-08-01 20:36:58 +00:00
broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port)
class BrokerMonitor(threading.Thread):
def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, verbose=None):
threading.Thread.__init__(self, group=group, target=target, name=name, verbose=verbose)
self.rq, self.cq = args
self.stored = -1
self.stored_bytes = -1
self.dropped = -1
def store_count(self, client, userdata, message):
self.stored = int(message.payload)
def store_bytes(self, client, userdata, message):
self.stored_bytes = int(message.payload)
def publish_dropped(self, client, userdata, message):
self.dropped = int(message.payload)
def run(self):
client = paho.mqtt.client.Client("broker-monitor")
2018-08-01 20:36:58 +00:00
client.connect("localhost", port=port)
client.message_callback_add("$SYS/broker/store/messages/count", self.store_count)
client.message_callback_add("$SYS/broker/store/messages/bytes", self.store_bytes)
client.message_callback_add("$SYS/broker/publish/messages/dropped", self.publish_dropped)
client.subscribe("$SYS/broker/store/messages/#")
client.subscribe("$SYS/broker/publish/messages/dropped")
while True:
expect_drops = cq.get()
self.cq.task_done()
if expect_drops == "quit":
break
first = time.time()
while self.stored < 0 or self.stored_bytes < 0 or (expect_drops and self.dropped < 0):
client.loop(timeout=0.5)
if time.time() - 10 > first:
print("ABORT TIMEOUT")
break
if expect_drops:
self.rq.put((self.stored, self.stored_bytes, self.dropped))
else:
self.rq.put((self.stored, self.stored_bytes, 0))
self.stored = -1
self.stored_bytes = -1
self.dropped = -1
client.disconnect()
rq = Queue.Queue()
cq = Queue.Queue()
brokerMonitor = BrokerMonitor(args=(rq,cq))
class StoreCounts():
def __init__(self):
self.stored = 0
self.bstored = 0
self.drops = 0
self.diff_stored = 0
self.diff_bstored = 0
self.diff_drops = 0
def update(self, tup):
self.diff_stored = tup[0] - self.stored
self.stored = tup[0]
self.diff_bstored = tup[1] - self.bstored
self.bstored = tup[1]
self.diff_drops = tup[2] - self.drops
self.drops = tup[2]
def __repr__(self):
return "s: %d (%d) b: %d (%d) d: %d (%d)" % (self.stored, self.diff_stored, self.bstored, self.diff_bstored, self.drops, self.diff_drops)
try:
registerOfflineSubscriber()
time.sleep(2.5) # Wait for first proper dump of stats
brokerMonitor.start()
counts = StoreCounts()
cq.put(True) # Expect a dropped count (of 0, initial)
counts.update(rq.get()) # Initial start
print("rq.get (INITIAL) gave us: ", counts)
rq.task_done()
# publish 10 short messages, should be no drops
print("publishing 10 short")
cq.put(False) # expect no updated drop count
msgs_short10 = [("test/publish/queueing/%d" % x,
''.join(random.choice(string.hexdigits) for _ in range(10)),
1, False) for x in range(1, 10 + 1)]
2018-08-01 20:36:58 +00:00
paho.mqtt.publish.multiple(msgs_short10, port=port)
counts.update(rq.get()) # Initial start
print("rq.get (short) gave us: ", counts)
rq.task_done()
if counts.diff_stored != 10 or counts.diff_bstored < 100:
raise ValueError
if counts.diff_drops != 0:
raise ValueError
# publish 10 mediums (40bytes). should fail after 8, when it finally crosses 400
print("publishing 10 medium")
cq.put(True) # expect a drop count
msgs_medium10 = [("test/publish/queueing/%d" % x,
''.join(random.choice(string.hexdigits) for _ in range(40)),
1, False) for x in range(1, 10 + 1)]
2018-08-01 20:36:58 +00:00
paho.mqtt.publish.multiple(msgs_medium10, port=port)
counts.update(rq.get()) # Initial start
print("rq.get (medium) gave us: ", counts)
rq.task_done()
if counts.diff_stored != 8 or counts.diff_bstored < 320:
raise ValueError
if counts.diff_drops != 2:
raise ValueError
rc = 0
2020-08-12 10:06:09 +00:00
except mosq_test.TestError:
pass
finally:
cq.put("quit")
brokerMonitor.join()
rq.join()
cq.join()
broker.terminate()
(stdo, stde) = broker.communicate()
if rc:
2019-04-02 13:06:28 +00:00
print(stde.decode('utf-8'))
exit(rc)