diff --git a/ChangeLog.txt b/ChangeLog.txt
index 467fefe1..8b6b483f 100644
--- a/ChangeLog.txt
+++ b/ChangeLog.txt
@@ -31,6 +31,8 @@ Broker:
topics.
- new $SYS/broker/store/messages/count (deprecates $SYS/broker/messages/stored)
- new $SYS/broker/store/messages/bytes
+- max_queued_bytes feature to limit queues by real size rather than
+ than just message count. Closes Eclipse #452919 or Github #100
Client library:
- Outgoing messages with QoS>1 are no longer retried after a timeout period.
diff --git a/lib/mosquitto_internal.h b/lib/mosquitto_internal.h
index f6cb3827..04e7cc47 100644
--- a/lib/mosquitto_internal.h
+++ b/lib/mosquitto_internal.h
@@ -205,6 +205,8 @@ struct mosquitto {
struct mosquitto_client_msg *last_inflight_msg;
struct mosquitto_client_msg *queued_msgs;
struct mosquitto_client_msg *last_queued_msg;
+ unsigned long msg_bytes;
+ unsigned long msg_bytes12;
int msg_count;
int msg_count12;
struct mosquitto__acl_user *acl_list;
diff --git a/man/mosquitto.conf.5.xml b/man/mosquitto.conf.5.xml
index c2caf11c..2a4f6189 100644
--- a/man/mosquitto.conf.5.xml
+++ b/man/mosquitto.conf.5.xml
@@ -358,6 +358,16 @@
Reloaded on reload signal.
+
+ count
+
+ QoS 1 and 2 messages will be allowed in flight until this byte
+ limit is reached. Defaults to 0. (No limit)
+ See also the option.
+
+ Reloaded on reload signal.
+
+
count
@@ -371,6 +381,19 @@
Reloaded on reload signal.
+
+ count
+
+ QoS 1 and 2 messages above those currently in-flight will be
+ queued (per client) until this limit is exceeded.
+ Defaults to 0. (No maximum) See also the
+ option.
+ If both max_queued_messages and max_queued_bytes are specified,
+ packets will be queued until the first limit is reached.
+
+ Reloaded on reload signal.
+
+
count
@@ -378,7 +401,9 @@
queue (per client) above those messages that are currently
in flight. Defaults to 100. Set to 0 for no maximum (not
recommended). See also the
- option.
+ and
+ options.
+
Reloaded on reload signal.
diff --git a/mosquitto.conf b/mosquitto.conf
index bbe24750..085a6a16 100644
--- a/mosquitto.conf
+++ b/mosquitto.conf
@@ -46,15 +46,28 @@
# and 2 messages.
#max_inflight_messages 20
+# QoS 1 and 2 messages will be allowed inflight per client until this limit
+# is exceeded. Defaults to 0. (No maximum)
+# See also max_inflight_messages
+#max_inflight_bytes 0
+
# The maximum number of QoS 1 and 2 messages to hold in a queue per client
# above those that are currently in-flight. Defaults to 100. Set
# to 0 for no maximum (not recommended).
# See also queue_qos0_messages.
+# See also max_queued_bytes.
#max_queued_messages 100
+# QoS 1 and 2 messages above those currently in-flight will be queued per
+# client until this limit is exceeded. Defaults to 0. (No maximum)
+# See also max_queued_messages.
+# If both max_queued_messages and max_queued_bytes are specified, packets will
+# be queued until the first limit is reached.
+#max_queued_bytes 0
+
# Set to true to queue messages with QoS 0 when a persistent client is
# disconnected. These messages are included in the limit imposed by
-# max_queued_messages.
+# max_queued_messages and max_queued_bytes
# Defaults to false.
# This is a non-standard option for the MQTT v3.1 spec but is allowed in
# v3.1.1.
diff --git a/src/conf.c b/src/conf.c
index 2b3f2ecd..fd6e015c 100644
--- a/src/conf.c
+++ b/src/conf.c
@@ -50,7 +50,9 @@ struct config_recurse {
int log_dest_set;
int log_type;
int log_type_set;
+ unsigned long max_inflight_bytes;
int max_inflight_messages;
+ unsigned long max_queued_bytes;
int max_queued_messages;
};
@@ -485,7 +487,9 @@ int config__read(struct mosquitto__config *config, bool reload)
cr.log_dest_set = 0;
cr.log_type = MOSQ_LOG_NONE;
cr.log_type_set = 0;
+ cr.max_inflight_bytes = 0;
cr.max_inflight_messages = 20;
+ cr.max_queued_bytes = 0;
cr.max_queued_messages = 100;
if(!config->config_file) return 0;
@@ -525,7 +529,7 @@ int config__read(struct mosquitto__config *config, bool reload)
config->user = "mosquitto";
}
- db__limits_set(cr.max_inflight_messages, cr.max_queued_messages);
+ db__limits_set(cr.max_inflight_messages, cr.max_inflight_bytes, cr.max_queued_messages, cr.max_queued_bytes);
#ifdef WITH_BRIDGE
for(i=0; ibridge_count; i++){
@@ -1292,6 +1296,13 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, const
}else{
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty max_connections value in configuration.");
}
+ }else if(!strcmp(token, "max_inflight_bytes")){
+ token = strtok_r(NULL, " ", &saveptr);
+ if(token){
+ cr->max_inflight_bytes = atol(token);
+ }else{
+ log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty max_inflight_bytes value in configuration.");
+ }
}else if(!strcmp(token, "max_inflight_messages")){
token = strtok_r(NULL, " ", &saveptr);
if(token){
@@ -1300,6 +1311,13 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, const
}else{
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty max_inflight_messages value in configuration.");
}
+ }else if(!strcmp(token, "max_queued_bytes")){
+ token = strtok_r(NULL, " ", &saveptr);
+ if(token){
+ cr->max_queued_bytes = atol(token); /* 63 bits is ok right? */
+ }else{
+ log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty max_queued_bytes value in configuration.");
+ }
}else if(!strcmp(token, "max_queued_messages")){
token = strtok_r(NULL, " ", &saveptr);
if(token){
diff --git a/src/context.c b/src/context.c
index ab101606..a09a788c 100644
--- a/src/context.c
+++ b/src/context.c
@@ -74,6 +74,8 @@ struct mosquitto *context__init(struct mosquitto_db *db, mosq_sock_t sock)
context->last_inflight_msg = NULL;
context->queued_msgs = NULL;
context->last_queued_msg = NULL;
+ context->msg_bytes = 0;
+ context->msg_bytes12 = 0;
context->msg_count = 0;
context->msg_count12 = 0;
#ifdef WITH_TLS
diff --git a/src/database.c b/src/database.c
index d6d0626a..d7f9792e 100644
--- a/src/database.c
+++ b/src/database.c
@@ -26,7 +26,68 @@ Contributors:
#include "time_mosq.h"
static int max_inflight = 20;
+static unsigned long max_inflight_bytes = 0;
static int max_queued = 100;
+static unsigned long max_queued_bytes = 0;
+
+/**
+ * Is this context ready to take more in flight messages right now?
+ * @param context the client context of interest
+ * @param qos qos for the packet of interest
+ * @return true if more in flight are allowed.
+ */
+static bool db__ready_for_flight(struct mosquitto *context, int qos)
+{
+ if(qos == 0 || (max_inflight == 0 && max_inflight_bytes == 0)){
+ return true;
+ }
+
+ bool valid_bytes = context->msg_bytes12 < max_inflight_bytes;
+ bool valid_count = context->msg_count12 < max_inflight;
+
+ if(max_inflight == 0){
+ return valid_bytes;
+ }
+ if(max_inflight_bytes == 0){
+ return valid_count;
+ }
+
+ return valid_bytes && valid_count;
+}
+
+
+/**
+ * For a given client context, are more messages allowed to be queued?
+ * @param context client of interest
+ * @return true if queuing is allowed, false if should be dropped
+ */
+static bool db__ready_for_queue(struct mosquitto *context)
+{
+ if(max_queued == 0 && max_queued_bytes == 0){
+ return true;
+ }
+
+ unsigned long adjust_bytes = max_inflight_bytes;
+ int adjust_count = max_inflight;
+ /* nothing in flight for offline clients */
+ if(context->sock == INVALID_SOCKET){
+ adjust_bytes = 0;
+ adjust_count = 0;
+ }
+
+ bool valid_bytes = context->msg_bytes12 - adjust_bytes < max_queued_bytes;
+ bool valid_count = context->msg_count12 - adjust_count < max_queued;
+
+ if(max_queued_bytes == 0){
+ return valid_count;
+ }
+ if(max_queued == 0){
+ return valid_bytes;
+ }
+
+ return valid_bytes && valid_count;
+}
+
int db__open(struct mosquitto__config *config, struct mosquitto_db *db)
{
@@ -169,6 +230,12 @@ static void db__message_remove(struct mosquitto_db *db, struct mosquitto *contex
}
if((*msg)->store){
+ context->msg_count--;
+ context->msg_bytes -= (*msg)->store->payloadlen;
+ if((*msg)->qos > 0){
+ context->msg_count12--;
+ context->msg_bytes12 -= (*msg)->store->payloadlen;
+ }
db__msg_store_deref(db, &(*msg)->store);
}
if(last){
@@ -182,10 +249,6 @@ static void db__message_remove(struct mosquitto_db *db, struct mosquitto *contex
context->last_inflight_msg = NULL;
}
}
- context->msg_count--;
- if((*msg)->qos > 0){
- context->msg_count12--;
- }
mosquitto__free(*msg);
if(last){
*msg = last->next;
@@ -305,7 +368,7 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1
}
if(context->sock != INVALID_SOCKET){
- if(qos == 0 || max_inflight == 0 || context->msg_count12 < max_inflight){
+ if(db__ready_for_flight(context, qos)){
if(dir == mosq_md_out){
switch(qos){
case 0:
@@ -325,7 +388,7 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1
return 1;
}
}
- }else if(max_queued == 0 || context->msg_count12-max_inflight < max_queued){
+ }else if(db__ready_for_queue(context)){
state = mosq_ms_queued;
rc = 2;
}else{
@@ -340,7 +403,9 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1
return 2;
}
}else{
- if(max_queued > 0 && context->msg_count12 >= max_queued){
+ if (db__ready_for_queue(context, qos)){
+ state = mosq_ms_queued;
+ }else{
G_MSGS_DROPPED_INC();
if(context->is_dropping == false){
context->is_dropping = true;
@@ -349,8 +414,6 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1
context->id);
}
return 2;
- }else{
- state = mosq_ms_queued;
}
}
assert(state != mosq_ms_invalid);
@@ -389,8 +452,10 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1
*last_msg = msg;
}
context->msg_count++;
+ context->msg_bytes += msg->store->payloadlen;
if(qos > 0){
context->msg_count12++;
+ context->msg_bytes12 += msg->store->payloadlen;
}
if(db->config->allow_duplicate_messages == false && dir == mosq_md_out && retain == false){
@@ -474,6 +539,8 @@ int db__messages_delete(struct mosquitto_db *db, struct mosquitto *context)
}
context->queued_msgs = NULL;
context->last_queued_msg = NULL;
+ context->msg_bytes = 0;
+ context->msg_bytes12 = 0;
context->msg_count = 0;
context->msg_count12 = 0;
@@ -606,14 +673,18 @@ int db__message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *conte
struct mosquitto_client_msg *prev = NULL;
msg = context->inflight_msgs;
+ context->msg_bytes = 0;
+ context->msg_bytes12 = 0;
context->msg_count = 0;
context->msg_count12 = 0;
while(msg){
context->last_inflight_msg = msg;
context->msg_count++;
+ context->msg_bytes += msg->store->payloadlen;
if(msg->qos > 0){
context->msg_count12++;
+ context->msg_bytes12 += msg->store->payloadlen;
}
if(msg->direction == mosq_md_out){
@@ -657,10 +728,12 @@ int db__message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *conte
context->last_queued_msg = msg;
context->msg_count++;
+ context->msg_bytes += msg->store->payloadlen;
if(msg->qos > 0){
context->msg_count12++;
+ context->msg_bytes12 += msg->store->payloadlen;
}
- if (max_inflight == 0 || context->msg_count <= max_inflight){
+ if (db__ready_for_flight(context, msg->qos)) {
switch(msg->qos){
case 0:
msg->state = mosq_ms_publish_qos0;
@@ -895,10 +968,12 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
return MOSQ_ERR_SUCCESS;
}
-void db__limits_set(int inflight, int queued)
+void db__limits_set(int inflight, unsigned long inflight_bytes, int queued, unsigned long queued_bytes)
{
max_inflight = inflight;
+ max_inflight_bytes = inflight_bytes;
max_queued = queued;
+ max_queued_bytes = queued_bytes;
}
void db__vacuum(void)
diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h
index d6a840b1..f9a0636b 100644
--- a/src/mosquitto_broker_internal.h
+++ b/src/mosquitto_broker_internal.h
@@ -513,7 +513,7 @@ int db__close(struct mosquitto_db *db);
int persist__backup(struct mosquitto_db *db, bool shutdown);
int persist__restore(struct mosquitto_db *db);
#endif
-void db__limits_set(int inflight, int queued);
+void db__limits_set(int inflight, unsigned long inflight_bytes, int queued, unsigned long queued_bytes);
/* Return the number of in-flight messages in count. */
int db__message_count(int *count);
int db__message_delete(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir);
diff --git a/test/broker/03-publish-qos1-queued-bytes.conf b/test/broker/03-publish-qos1-queued-bytes.conf
new file mode 100644
index 00000000..6f853103
--- /dev/null
+++ b/test/broker/03-publish-qos1-queued-bytes.conf
@@ -0,0 +1,4 @@
+sys_interval 1
+max_queued_messages 0
+max_queued_bytes 400
+port 1888
diff --git a/test/broker/03-publish-qos1-queued-bytes.py b/test/broker/03-publish-qos1-queued-bytes.py
new file mode 100755
index 00000000..cf4c58dc
--- /dev/null
+++ b/test/broker/03-publish-qos1-queued-bytes.py
@@ -0,0 +1,161 @@
+#!/usr/bin/env python
+
+# Test whether a PUBLISH to a topic with an offline subscriber results in a queued message
+import Queue
+import random
+import string
+import subprocess
+import socket
+import threading
+import time
+
+import paho.mqtt.client
+import paho.mqtt.publish
+
+
+import inspect, os, sys
+# From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder
+cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"..")))
+if cmd_subfolder not in sys.path:
+ sys.path.insert(0, cmd_subfolder)
+
+import mosq_test
+
+rc = 1
+
+def registerOfflineSubscriber():
+ """Just a durable client to trigger queuing"""
+ client = paho.mqtt.client.Client("sub-qos1-offline", clean_session=False)
+ client.connect("localhost", port=1888)
+ client.subscribe("test/publish/queueing/#", 1)
+ client.loop()
+ client.disconnect()
+
+
+broker = mosq_test.start_broker(filename=os.path.basename(__file__))
+
+class BrokerMonitor(threading.Thread):
+ def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, verbose=None):
+ threading.Thread.__init__(self, group=group, target=target, name=name, verbose=verbose)
+ self.rq, self.cq = args
+ self.stored = -1
+ self.stored_bytes = -1
+ self.dropped = -1
+
+ def store_count(self, client, userdata, message):
+ self.stored = int(message.payload)
+
+ def store_bytes(self, client, userdata, message):
+ self.stored_bytes = int(message.payload)
+
+ def publish_dropped(self, client, userdata, message):
+ self.dropped = int(message.payload)
+
+ def run(self):
+ client = paho.mqtt.client.Client("broker-monitor")
+ client.connect("localhost", port=1888)
+ client.message_callback_add("$SYS/broker/store/messages/count", self.store_count)
+ client.message_callback_add("$SYS/broker/store/messages/bytes", self.store_bytes)
+ client.message_callback_add("$SYS/broker/publish/messages/dropped", self.publish_dropped)
+ client.subscribe("$SYS/broker/store/messages/#")
+ client.subscribe("$SYS/broker/publish/messages/dropped")
+
+ while True:
+ expect_drops = cq.get()
+ self.cq.task_done()
+ if expect_drops == "quit":
+ break
+ first = time.time()
+ while self.stored < 0 or self.stored_bytes < 0 or (expect_drops and self.dropped < 0):
+ client.loop(timeout=0.5)
+ if time.time() - 10 > first:
+ print("ABORT TIMEOUT")
+ break
+
+ if expect_drops:
+ self.rq.put((self.stored, self.stored_bytes, self.dropped))
+ else:
+ self.rq.put((self.stored, self.stored_bytes, 0))
+ self.stored = -1
+ self.stored_bytes = -1
+ self.dropped = -1
+
+ client.disconnect()
+
+rq = Queue.Queue()
+cq = Queue.Queue()
+brokerMonitor = BrokerMonitor(args=(rq,cq))
+
+class StoreCounts():
+ def __init__(self):
+ self.stored = 0
+ self.bstored = 0
+ self.drops = 0
+ self.diff_stored = 0
+ self.diff_bstored = 0
+ self.diff_drops = 0
+
+ def update(self, tup):
+ self.diff_stored = tup[0] - self.stored
+ self.stored = tup[0]
+ self.diff_bstored = tup[1] - self.bstored
+ self.bstored = tup[1]
+ self.diff_drops = tup[2] - self.drops
+ self.drops = tup[2]
+
+ def __repr__(self):
+ return "s: %d (%d) b: %d (%d) d: %d (%d)" % (self.stored, self.diff_stored, self.bstored, self.diff_bstored, self.drops, self.diff_drops)
+
+try:
+ registerOfflineSubscriber()
+ time.sleep(2.5) # Wait for first proper dump of stats
+ brokerMonitor.start()
+ counts = StoreCounts()
+ cq.put(True) # Expect a dropped count (of 0, initial)
+ counts.update(rq.get()) # Initial start
+ print("rq.get (INITIAL) gave us: ", counts)
+ rq.task_done()
+
+ # publish 10 short messages, should be no drops
+ print("publishing 10 short")
+ cq.put(False) # expect no updated drop count
+ msgs_short10 = [("test/publish/queueing/%d" % x,
+ ''.join(random.choice(string.hexdigits) for _ in range(10)),
+ 1, False) for x in range(1, 10 + 1)]
+ paho.mqtt.publish.multiple(msgs_short10, port=1888)
+ counts.update(rq.get()) # Initial start
+ print("rq.get (short) gave us: ", counts)
+ rq.task_done()
+ if counts.diff_stored != 10 or counts.diff_bstored < 100:
+ raise ValueError
+ if counts.diff_drops != 0:
+ raise ValueError
+
+ # publish 10 mediums (40bytes). should fail after 8, when it finally crosses 400
+ print("publishing 10 medium")
+ cq.put(True) # expect a drop count
+ msgs_medium10 = [("test/publish/queueing/%d" % x,
+ ''.join(random.choice(string.hexdigits) for _ in range(40)),
+ 1, False) for x in range(1, 10 + 1)]
+ paho.mqtt.publish.multiple(msgs_medium10, port=1888)
+ counts.update(rq.get()) # Initial start
+ print("rq.get (medium) gave us: ", counts)
+ rq.task_done()
+ if counts.diff_stored != 8 or counts.diff_bstored < 320:
+ raise ValueError
+ if counts.diff_drops != 2:
+ raise ValueError
+ rc = 0
+
+finally:
+ cq.put("quit")
+ brokerMonitor.join()
+ rq.join()
+ cq.join()
+ broker.terminate()
+ (stdo, stde) = broker.communicate()
+ if rc:
+ print(stde)
+
+exit(rc)
+
diff --git a/test/broker/Makefile b/test/broker/Makefile
index a84950e9..3206d473 100644
--- a/test/broker/Makefile
+++ b/test/broker/Makefile
@@ -48,6 +48,7 @@ endif
./03-publish-c2b-disconnect-qos2.py
./03-publish-b2c-disconnect-qos2.py
./03-pattern-matching.py
+ ./03-publish-qos1-queued-bytes.py
04 :
./04-retain-qos0.py
diff --git a/travis-install.sh b/travis-install.sh
index edc8c173..ba034832 100755
--- a/travis-install.sh
+++ b/travis-install.sh
@@ -10,3 +10,5 @@ if [ "$TRAVIS_OS_NAME" == "osx" ]; then
brew update
brew install c-ares openssl libwebsockets
fi
+
+sudo pip install paho-mqtt