broker: support byte based queueing

Limiting queued message depth purely based on message count is hard to
control for memory constrained devices.  The size of messages can vary
wildly, from a few bytes, to a few kilobytes.  Support a new
max_queued_bytes option, and drop packets when the first limit is
reached.  Option defaults to 0 (disabled) by default.
Support also a max_inflight_bytes variable, with similar behaviour.

Fixes (partof) https://github.com/eclipse/mosquitto/issues/100

This pulls up some helper routines for calculating whether to allow
inflight or queuing, resolving some inconsistences in connection
resumption.

Signed-off-by: Karl Palsson <karlp@etactica.com>
This commit is contained in:
Karl Palsson 2016-06-21 14:47:41 +00:00
parent 642e141c23
commit c6aac741c2
12 changed files with 320 additions and 15 deletions

View File

@ -31,6 +31,8 @@ Broker:
topics. topics.
- new $SYS/broker/store/messages/count (deprecates $SYS/broker/messages/stored) - new $SYS/broker/store/messages/count (deprecates $SYS/broker/messages/stored)
- new $SYS/broker/store/messages/bytes - new $SYS/broker/store/messages/bytes
- max_queued_bytes feature to limit queues by real size rather than
than just message count. Closes Eclipse #452919 or Github #100
Client library: Client library:
- Outgoing messages with QoS>1 are no longer retried after a timeout period. - Outgoing messages with QoS>1 are no longer retried after a timeout period.

View File

@ -205,6 +205,8 @@ struct mosquitto {
struct mosquitto_client_msg *last_inflight_msg; struct mosquitto_client_msg *last_inflight_msg;
struct mosquitto_client_msg *queued_msgs; struct mosquitto_client_msg *queued_msgs;
struct mosquitto_client_msg *last_queued_msg; struct mosquitto_client_msg *last_queued_msg;
unsigned long msg_bytes;
unsigned long msg_bytes12;
int msg_count; int msg_count;
int msg_count12; int msg_count12;
struct mosquitto__acl_user *acl_list; struct mosquitto__acl_user *acl_list;

View File

@ -358,6 +358,16 @@
<para>Reloaded on reload signal.</para> <para>Reloaded on reload signal.</para>
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry>
<term><option>max_inflight_bytes</option> <replaceable>count</replaceable></term>
<listitem>
<para>QoS 1 and 2 messages will be allowed in flight until this byte
limit is reached. Defaults to 0. (No limit)
See also the <option>max_inflight_messages</option> option.
</para>
<para>Reloaded on reload signal.</para>
</listitem>
</varlistentry>
<varlistentry> <varlistentry>
<term><option>max_inflight_messages</option> <replaceable>count</replaceable></term> <term><option>max_inflight_messages</option> <replaceable>count</replaceable></term>
<listitem> <listitem>
@ -371,6 +381,19 @@
<para>Reloaded on reload signal.</para> <para>Reloaded on reload signal.</para>
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry>
<term><option>max_queued_bytes</option> <replaceable>count</replaceable></term>
<listitem>
<para>QoS 1 and 2 messages above those currently in-flight will be
queued (per client) until this limit is exceeded.
Defaults to 0. (No maximum) See also the
<option>max_queued_messages</option> option.
If both max_queued_messages and max_queued_bytes are specified,
packets will be queued until the first limit is reached.
</para>
<para>Reloaded on reload signal.</para>
</listitem>
</varlistentry>
<varlistentry> <varlistentry>
<term><option>max_queued_messages</option> <replaceable>count</replaceable></term> <term><option>max_queued_messages</option> <replaceable>count</replaceable></term>
<listitem> <listitem>
@ -378,7 +401,9 @@
queue (per client) above those messages that are currently queue (per client) above those messages that are currently
in flight. Defaults to 100. Set to 0 for no maximum (not in flight. Defaults to 100. Set to 0 for no maximum (not
recommended). See also the recommended). See also the
<option>queue_qos0_messages</option> option.</para> <option>queue_qos0_messages</option> and
<option>max_queued_bytes</option> options.
</para>
<para>Reloaded on reload signal.</para> <para>Reloaded on reload signal.</para>
</listitem> </listitem>
</varlistentry> </varlistentry>

View File

@ -46,15 +46,28 @@
# and 2 messages. # and 2 messages.
#max_inflight_messages 20 #max_inflight_messages 20
# QoS 1 and 2 messages will be allowed inflight per client until this limit
# is exceeded. Defaults to 0. (No maximum)
# See also max_inflight_messages
#max_inflight_bytes 0
# The maximum number of QoS 1 and 2 messages to hold in a queue per client # The maximum number of QoS 1 and 2 messages to hold in a queue per client
# above those that are currently in-flight. Defaults to 100. Set # above those that are currently in-flight. Defaults to 100. Set
# to 0 for no maximum (not recommended). # to 0 for no maximum (not recommended).
# See also queue_qos0_messages. # See also queue_qos0_messages.
# See also max_queued_bytes.
#max_queued_messages 100 #max_queued_messages 100
# QoS 1 and 2 messages above those currently in-flight will be queued per
# client until this limit is exceeded. Defaults to 0. (No maximum)
# See also max_queued_messages.
# If both max_queued_messages and max_queued_bytes are specified, packets will
# be queued until the first limit is reached.
#max_queued_bytes 0
# Set to true to queue messages with QoS 0 when a persistent client is # Set to true to queue messages with QoS 0 when a persistent client is
# disconnected. These messages are included in the limit imposed by # disconnected. These messages are included in the limit imposed by
# max_queued_messages. # max_queued_messages and max_queued_bytes
# Defaults to false. # Defaults to false.
# This is a non-standard option for the MQTT v3.1 spec but is allowed in # This is a non-standard option for the MQTT v3.1 spec but is allowed in
# v3.1.1. # v3.1.1.

View File

@ -50,7 +50,9 @@ struct config_recurse {
int log_dest_set; int log_dest_set;
int log_type; int log_type;
int log_type_set; int log_type_set;
unsigned long max_inflight_bytes;
int max_inflight_messages; int max_inflight_messages;
unsigned long max_queued_bytes;
int max_queued_messages; int max_queued_messages;
}; };
@ -485,7 +487,9 @@ int config__read(struct mosquitto__config *config, bool reload)
cr.log_dest_set = 0; cr.log_dest_set = 0;
cr.log_type = MOSQ_LOG_NONE; cr.log_type = MOSQ_LOG_NONE;
cr.log_type_set = 0; cr.log_type_set = 0;
cr.max_inflight_bytes = 0;
cr.max_inflight_messages = 20; cr.max_inflight_messages = 20;
cr.max_queued_bytes = 0;
cr.max_queued_messages = 100; cr.max_queued_messages = 100;
if(!config->config_file) return 0; if(!config->config_file) return 0;
@ -525,7 +529,7 @@ int config__read(struct mosquitto__config *config, bool reload)
config->user = "mosquitto"; config->user = "mosquitto";
} }
db__limits_set(cr.max_inflight_messages, cr.max_queued_messages); db__limits_set(cr.max_inflight_messages, cr.max_inflight_bytes, cr.max_queued_messages, cr.max_queued_bytes);
#ifdef WITH_BRIDGE #ifdef WITH_BRIDGE
for(i=0; i<config->bridge_count; i++){ for(i=0; i<config->bridge_count; i++){
@ -1292,6 +1296,13 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, const
}else{ }else{
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty max_connections value in configuration."); log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty max_connections value in configuration.");
} }
}else if(!strcmp(token, "max_inflight_bytes")){
token = strtok_r(NULL, " ", &saveptr);
if(token){
cr->max_inflight_bytes = atol(token);
}else{
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty max_inflight_bytes value in configuration.");
}
}else if(!strcmp(token, "max_inflight_messages")){ }else if(!strcmp(token, "max_inflight_messages")){
token = strtok_r(NULL, " ", &saveptr); token = strtok_r(NULL, " ", &saveptr);
if(token){ if(token){
@ -1300,6 +1311,13 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, const
}else{ }else{
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty max_inflight_messages value in configuration."); log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty max_inflight_messages value in configuration.");
} }
}else if(!strcmp(token, "max_queued_bytes")){
token = strtok_r(NULL, " ", &saveptr);
if(token){
cr->max_queued_bytes = atol(token); /* 63 bits is ok right? */
}else{
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty max_queued_bytes value in configuration.");
}
}else if(!strcmp(token, "max_queued_messages")){ }else if(!strcmp(token, "max_queued_messages")){
token = strtok_r(NULL, " ", &saveptr); token = strtok_r(NULL, " ", &saveptr);
if(token){ if(token){

View File

@ -74,6 +74,8 @@ struct mosquitto *context__init(struct mosquitto_db *db, mosq_sock_t sock)
context->last_inflight_msg = NULL; context->last_inflight_msg = NULL;
context->queued_msgs = NULL; context->queued_msgs = NULL;
context->last_queued_msg = NULL; context->last_queued_msg = NULL;
context->msg_bytes = 0;
context->msg_bytes12 = 0;
context->msg_count = 0; context->msg_count = 0;
context->msg_count12 = 0; context->msg_count12 = 0;
#ifdef WITH_TLS #ifdef WITH_TLS

View File

@ -26,7 +26,68 @@ Contributors:
#include "time_mosq.h" #include "time_mosq.h"
static int max_inflight = 20; static int max_inflight = 20;
static unsigned long max_inflight_bytes = 0;
static int max_queued = 100; static int max_queued = 100;
static unsigned long max_queued_bytes = 0;
/**
* Is this context ready to take more in flight messages right now?
* @param context the client context of interest
* @param qos qos for the packet of interest
* @return true if more in flight are allowed.
*/
static bool db__ready_for_flight(struct mosquitto *context, int qos)
{
if(qos == 0 || (max_inflight == 0 && max_inflight_bytes == 0)){
return true;
}
bool valid_bytes = context->msg_bytes12 < max_inflight_bytes;
bool valid_count = context->msg_count12 < max_inflight;
if(max_inflight == 0){
return valid_bytes;
}
if(max_inflight_bytes == 0){
return valid_count;
}
return valid_bytes && valid_count;
}
/**
* For a given client context, are more messages allowed to be queued?
* @param context client of interest
* @return true if queuing is allowed, false if should be dropped
*/
static bool db__ready_for_queue(struct mosquitto *context)
{
if(max_queued == 0 && max_queued_bytes == 0){
return true;
}
unsigned long adjust_bytes = max_inflight_bytes;
int adjust_count = max_inflight;
/* nothing in flight for offline clients */
if(context->sock == INVALID_SOCKET){
adjust_bytes = 0;
adjust_count = 0;
}
bool valid_bytes = context->msg_bytes12 - adjust_bytes < max_queued_bytes;
bool valid_count = context->msg_count12 - adjust_count < max_queued;
if(max_queued_bytes == 0){
return valid_count;
}
if(max_queued == 0){
return valid_bytes;
}
return valid_bytes && valid_count;
}
int db__open(struct mosquitto__config *config, struct mosquitto_db *db) int db__open(struct mosquitto__config *config, struct mosquitto_db *db)
{ {
@ -169,6 +230,12 @@ static void db__message_remove(struct mosquitto_db *db, struct mosquitto *contex
} }
if((*msg)->store){ if((*msg)->store){
context->msg_count--;
context->msg_bytes -= (*msg)->store->payloadlen;
if((*msg)->qos > 0){
context->msg_count12--;
context->msg_bytes12 -= (*msg)->store->payloadlen;
}
db__msg_store_deref(db, &(*msg)->store); db__msg_store_deref(db, &(*msg)->store);
} }
if(last){ if(last){
@ -182,10 +249,6 @@ static void db__message_remove(struct mosquitto_db *db, struct mosquitto *contex
context->last_inflight_msg = NULL; context->last_inflight_msg = NULL;
} }
} }
context->msg_count--;
if((*msg)->qos > 0){
context->msg_count12--;
}
mosquitto__free(*msg); mosquitto__free(*msg);
if(last){ if(last){
*msg = last->next; *msg = last->next;
@ -305,7 +368,7 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1
} }
if(context->sock != INVALID_SOCKET){ if(context->sock != INVALID_SOCKET){
if(qos == 0 || max_inflight == 0 || context->msg_count12 < max_inflight){ if(db__ready_for_flight(context, qos)){
if(dir == mosq_md_out){ if(dir == mosq_md_out){
switch(qos){ switch(qos){
case 0: case 0:
@ -325,7 +388,7 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1
return 1; return 1;
} }
} }
}else if(max_queued == 0 || context->msg_count12-max_inflight < max_queued){ }else if(db__ready_for_queue(context)){
state = mosq_ms_queued; state = mosq_ms_queued;
rc = 2; rc = 2;
}else{ }else{
@ -340,7 +403,9 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1
return 2; return 2;
} }
}else{ }else{
if(max_queued > 0 && context->msg_count12 >= max_queued){ if (db__ready_for_queue(context, qos)){
state = mosq_ms_queued;
}else{
G_MSGS_DROPPED_INC(); G_MSGS_DROPPED_INC();
if(context->is_dropping == false){ if(context->is_dropping == false){
context->is_dropping = true; context->is_dropping = true;
@ -349,8 +414,6 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1
context->id); context->id);
} }
return 2; return 2;
}else{
state = mosq_ms_queued;
} }
} }
assert(state != mosq_ms_invalid); assert(state != mosq_ms_invalid);
@ -389,8 +452,10 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1
*last_msg = msg; *last_msg = msg;
} }
context->msg_count++; context->msg_count++;
context->msg_bytes += msg->store->payloadlen;
if(qos > 0){ if(qos > 0){
context->msg_count12++; context->msg_count12++;
context->msg_bytes12 += msg->store->payloadlen;
} }
if(db->config->allow_duplicate_messages == false && dir == mosq_md_out && retain == false){ if(db->config->allow_duplicate_messages == false && dir == mosq_md_out && retain == false){
@ -474,6 +539,8 @@ int db__messages_delete(struct mosquitto_db *db, struct mosquitto *context)
} }
context->queued_msgs = NULL; context->queued_msgs = NULL;
context->last_queued_msg = NULL; context->last_queued_msg = NULL;
context->msg_bytes = 0;
context->msg_bytes12 = 0;
context->msg_count = 0; context->msg_count = 0;
context->msg_count12 = 0; context->msg_count12 = 0;
@ -606,14 +673,18 @@ int db__message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *conte
struct mosquitto_client_msg *prev = NULL; struct mosquitto_client_msg *prev = NULL;
msg = context->inflight_msgs; msg = context->inflight_msgs;
context->msg_bytes = 0;
context->msg_bytes12 = 0;
context->msg_count = 0; context->msg_count = 0;
context->msg_count12 = 0; context->msg_count12 = 0;
while(msg){ while(msg){
context->last_inflight_msg = msg; context->last_inflight_msg = msg;
context->msg_count++; context->msg_count++;
context->msg_bytes += msg->store->payloadlen;
if(msg->qos > 0){ if(msg->qos > 0){
context->msg_count12++; context->msg_count12++;
context->msg_bytes12 += msg->store->payloadlen;
} }
if(msg->direction == mosq_md_out){ if(msg->direction == mosq_md_out){
@ -657,10 +728,12 @@ int db__message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *conte
context->last_queued_msg = msg; context->last_queued_msg = msg;
context->msg_count++; context->msg_count++;
context->msg_bytes += msg->store->payloadlen;
if(msg->qos > 0){ if(msg->qos > 0){
context->msg_count12++; context->msg_count12++;
context->msg_bytes12 += msg->store->payloadlen;
} }
if (max_inflight == 0 || context->msg_count <= max_inflight){ if (db__ready_for_flight(context, msg->qos)) {
switch(msg->qos){ switch(msg->qos){
case 0: case 0:
msg->state = mosq_ms_publish_qos0; msg->state = mosq_ms_publish_qos0;
@ -895,10 +968,12 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
return MOSQ_ERR_SUCCESS; return MOSQ_ERR_SUCCESS;
} }
void db__limits_set(int inflight, int queued) void db__limits_set(int inflight, unsigned long inflight_bytes, int queued, unsigned long queued_bytes)
{ {
max_inflight = inflight; max_inflight = inflight;
max_inflight_bytes = inflight_bytes;
max_queued = queued; max_queued = queued;
max_queued_bytes = queued_bytes;
} }
void db__vacuum(void) void db__vacuum(void)

View File

@ -513,7 +513,7 @@ int db__close(struct mosquitto_db *db);
int persist__backup(struct mosquitto_db *db, bool shutdown); int persist__backup(struct mosquitto_db *db, bool shutdown);
int persist__restore(struct mosquitto_db *db); int persist__restore(struct mosquitto_db *db);
#endif #endif
void db__limits_set(int inflight, int queued); void db__limits_set(int inflight, unsigned long inflight_bytes, int queued, unsigned long queued_bytes);
/* Return the number of in-flight messages in count. */ /* Return the number of in-flight messages in count. */
int db__message_count(int *count); int db__message_count(int *count);
int db__message_delete(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir); int db__message_delete(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir);

View File

@ -0,0 +1,4 @@
sys_interval 1
max_queued_messages 0
max_queued_bytes 400
port 1888

View File

@ -0,0 +1,161 @@
#!/usr/bin/env python
# Test whether a PUBLISH to a topic with an offline subscriber results in a queued message
import Queue
import random
import string
import subprocess
import socket
import threading
import time
import paho.mqtt.client
import paho.mqtt.publish
import inspect, os, sys
# From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder
cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"..")))
if cmd_subfolder not in sys.path:
sys.path.insert(0, cmd_subfolder)
import mosq_test
rc = 1
def registerOfflineSubscriber():
"""Just a durable client to trigger queuing"""
client = paho.mqtt.client.Client("sub-qos1-offline", clean_session=False)
client.connect("localhost", port=1888)
client.subscribe("test/publish/queueing/#", 1)
client.loop()
client.disconnect()
broker = mosq_test.start_broker(filename=os.path.basename(__file__))
class BrokerMonitor(threading.Thread):
def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, verbose=None):
threading.Thread.__init__(self, group=group, target=target, name=name, verbose=verbose)
self.rq, self.cq = args
self.stored = -1
self.stored_bytes = -1
self.dropped = -1
def store_count(self, client, userdata, message):
self.stored = int(message.payload)
def store_bytes(self, client, userdata, message):
self.stored_bytes = int(message.payload)
def publish_dropped(self, client, userdata, message):
self.dropped = int(message.payload)
def run(self):
client = paho.mqtt.client.Client("broker-monitor")
client.connect("localhost", port=1888)
client.message_callback_add("$SYS/broker/store/messages/count", self.store_count)
client.message_callback_add("$SYS/broker/store/messages/bytes", self.store_bytes)
client.message_callback_add("$SYS/broker/publish/messages/dropped", self.publish_dropped)
client.subscribe("$SYS/broker/store/messages/#")
client.subscribe("$SYS/broker/publish/messages/dropped")
while True:
expect_drops = cq.get()
self.cq.task_done()
if expect_drops == "quit":
break
first = time.time()
while self.stored < 0 or self.stored_bytes < 0 or (expect_drops and self.dropped < 0):
client.loop(timeout=0.5)
if time.time() - 10 > first:
print("ABORT TIMEOUT")
break
if expect_drops:
self.rq.put((self.stored, self.stored_bytes, self.dropped))
else:
self.rq.put((self.stored, self.stored_bytes, 0))
self.stored = -1
self.stored_bytes = -1
self.dropped = -1
client.disconnect()
rq = Queue.Queue()
cq = Queue.Queue()
brokerMonitor = BrokerMonitor(args=(rq,cq))
class StoreCounts():
def __init__(self):
self.stored = 0
self.bstored = 0
self.drops = 0
self.diff_stored = 0
self.diff_bstored = 0
self.diff_drops = 0
def update(self, tup):
self.diff_stored = tup[0] - self.stored
self.stored = tup[0]
self.diff_bstored = tup[1] - self.bstored
self.bstored = tup[1]
self.diff_drops = tup[2] - self.drops
self.drops = tup[2]
def __repr__(self):
return "s: %d (%d) b: %d (%d) d: %d (%d)" % (self.stored, self.diff_stored, self.bstored, self.diff_bstored, self.drops, self.diff_drops)
try:
registerOfflineSubscriber()
time.sleep(2.5) # Wait for first proper dump of stats
brokerMonitor.start()
counts = StoreCounts()
cq.put(True) # Expect a dropped count (of 0, initial)
counts.update(rq.get()) # Initial start
print("rq.get (INITIAL) gave us: ", counts)
rq.task_done()
# publish 10 short messages, should be no drops
print("publishing 10 short")
cq.put(False) # expect no updated drop count
msgs_short10 = [("test/publish/queueing/%d" % x,
''.join(random.choice(string.hexdigits) for _ in range(10)),
1, False) for x in range(1, 10 + 1)]
paho.mqtt.publish.multiple(msgs_short10, port=1888)
counts.update(rq.get()) # Initial start
print("rq.get (short) gave us: ", counts)
rq.task_done()
if counts.diff_stored != 10 or counts.diff_bstored < 100:
raise ValueError
if counts.diff_drops != 0:
raise ValueError
# publish 10 mediums (40bytes). should fail after 8, when it finally crosses 400
print("publishing 10 medium")
cq.put(True) # expect a drop count
msgs_medium10 = [("test/publish/queueing/%d" % x,
''.join(random.choice(string.hexdigits) for _ in range(40)),
1, False) for x in range(1, 10 + 1)]
paho.mqtt.publish.multiple(msgs_medium10, port=1888)
counts.update(rq.get()) # Initial start
print("rq.get (medium) gave us: ", counts)
rq.task_done()
if counts.diff_stored != 8 or counts.diff_bstored < 320:
raise ValueError
if counts.diff_drops != 2:
raise ValueError
rc = 0
finally:
cq.put("quit")
brokerMonitor.join()
rq.join()
cq.join()
broker.terminate()
(stdo, stde) = broker.communicate()
if rc:
print(stde)
exit(rc)

View File

@ -48,6 +48,7 @@ endif
./03-publish-c2b-disconnect-qos2.py ./03-publish-c2b-disconnect-qos2.py
./03-publish-b2c-disconnect-qos2.py ./03-publish-b2c-disconnect-qos2.py
./03-pattern-matching.py ./03-pattern-matching.py
./03-publish-qos1-queued-bytes.py
04 : 04 :
./04-retain-qos0.py ./04-retain-qos0.py

View File

@ -10,3 +10,5 @@ if [ "$TRAVIS_OS_NAME" == "osx" ]; then
brew update brew update
brew install c-ares openssl libwebsockets brew install c-ares openssl libwebsockets
fi fi
sudo pip install paho-mqtt