From 84660e1cbef07ef856de0527186fecd36ced63ce Mon Sep 17 00:00:00 2001 From: Roger Light Date: Tue, 8 Jan 2019 18:38:47 +0000 Subject: [PATCH] Send maximum limits for QoS>0. This needs more work on the broker front to simplify the design. --- lib/connect.c | 1 + lib/handle_connack.c | 1 + lib/messages_mosq.c | 30 +++++++++++++----------------- lib/mosquitto.c | 2 +- lib/mosquitto.h | 13 +++++++++++++ lib/mosquitto_internal.h | 6 +++--- lib/options.c | 7 +++++++ lib/util_mosq.c | 7 +++++++ lib/util_mosq.h | 1 + src/context.c | 3 ++- src/database.c | 14 +++++++------- src/property_broker.c | 7 ++----- 12 files changed, 58 insertions(+), 34 deletions(-) diff --git a/lib/connect.c b/lib/connect.c index a08f7902..f2804002 100644 --- a/lib/connect.c +++ b/lib/connect.c @@ -69,6 +69,7 @@ static int mosquitto__connect_init(struct mosquitto *mosq, const char *host, int mosq->keepalive = keepalive; mosq->receive_quota = mosq->receive_maximum; + mosq->send_quota = mosq->send_maximum; if(mosq->sockpairR != INVALID_SOCKET){ COMPAT_CLOSE(mosq->sockpairR); diff --git a/lib/handle_connack.c b/lib/handle_connack.c index 5e5fa86d..135e9b4c 100644 --- a/lib/handle_connack.c +++ b/lib/handle_connack.c @@ -62,6 +62,7 @@ int handle__connack(struct mosquitto *mosq) } mosquitto_property_read_int16(properties, MQTT_PROP_SERVER_KEEP_ALIVE, &mosq->keepalive, false); + mosquitto_property_read_int16(properties, MQTT_PROP_RECEIVE_MAXIMUM, &mosq->send_maximum, false); log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s received CONNACK (%d)", mosq->id, reason_code); pthread_mutex_lock(&mosq->callback_mutex); diff --git a/lib/messages_mosq.c b/lib/messages_mosq.c index 9d494788..51f48550 100644 --- a/lib/messages_mosq.c +++ b/lib/messages_mosq.c @@ -26,6 +26,7 @@ Contributors: #include "messages_mosq.h" #include "send_mosq.h" #include "time_mosq.h" +#include "util_mosq.h" void message__cleanup(struct mosquitto_message_all **message) { @@ -123,6 +124,7 @@ int message__queue(struct mosquitto *mosq, struct mosquitto_message_all *message /* mosq->*_message_mutex should be locked before entering this function */ assert(mosq); assert(message); + assert(message->msg.qos != 0); if(dir == mosq_md_out){ mosq->out_queue_len++; @@ -133,12 +135,10 @@ int message__queue(struct mosquitto *mosq, struct mosquitto_message_all *message mosq->out_messages = message; } mosq->out_messages_last = message; - if(message->msg.qos > 0){ - if(mosq->max_inflight_messages == 0 || mosq->inflight_messages < mosq->max_inflight_messages){ - mosq->inflight_messages++; - }else{ - rc = 1; - } + if(mosq->send_quota > 0){ + mosq->send_quota--; + }else{ + rc = 1; } }else{ mosq->in_queue_len++; @@ -187,17 +187,15 @@ void message__reconnect_reset(struct mosquitto *mosq) pthread_mutex_lock(&mosq->out_message_mutex); - mosq->inflight_messages = 0; + mosq->send_quota = mosq->send_maximum; message = mosq->out_messages; mosq->out_queue_len = 0; while(message){ mosq->out_queue_len++; message->timestamp = 0; - if(mosq->max_inflight_messages == 0 || mosq->inflight_messages < mosq->max_inflight_messages){ - if(message->msg.qos > 0){ - mosq->inflight_messages++; - } + if(mosq->send_quota > 0){ + mosq->send_quota--; if(message->msg.qos == 1){ message->state = mosq_ms_publish_qos1; }else if(message->msg.qos == 2){ @@ -243,9 +241,7 @@ int message__remove(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_dir }else if(!mosq->out_messages){ mosq->out_messages_last = NULL; } - if(cur->msg.qos > 0){ - mosq->inflight_messages--; - } + util__increment_send_quota(mosq); found = true; break; } @@ -256,9 +252,9 @@ int message__remove(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_dir if(found){ cur = mosq->out_messages; while(cur){ - if(mosq->max_inflight_messages == 0 || mosq->inflight_messages < mosq->max_inflight_messages){ + if(mosq->send_quota > 0){ if(cur->msg.qos > 0 && cur->state == mosq_ms_invalid){ - mosq->inflight_messages++; + mosq->send_quota--; if(cur->msg.qos == 1){ cur->state = mosq_ms_wait_for_puback; }else if(cur->msg.qos == 2){ @@ -394,7 +390,7 @@ int mosquitto_max_inflight_messages_set(struct mosquitto *mosq, unsigned int max { if(!mosq) return MOSQ_ERR_INVAL; - mosq->max_inflight_messages = max_inflight_messages; + mosq->send_maximum = max_inflight_messages; return MOSQ_ERR_SUCCESS; } diff --git a/lib/mosquitto.c b/lib/mosquitto.c index b07c69f8..c5bb056e 100644 --- a/lib/mosquitto.c +++ b/lib/mosquitto.c @@ -146,8 +146,8 @@ int mosquitto_reinitialise(struct mosquitto *mosq, const char *id, bool clean_st mosq->in_messages_last = NULL; mosq->out_messages = NULL; mosq->out_messages_last = NULL; - mosq->max_inflight_messages = 20; mosq->receive_maximum = 20; + mosq->send_maximum = 20; mosq->will = NULL; mosq->on_connect = NULL; mosq->on_publish = NULL; diff --git a/lib/mosquitto.h b/lib/mosquitto.h index cadb2ee0..81fc6929 100644 --- a/lib/mosquitto.h +++ b/lib/mosquitto.h @@ -100,6 +100,7 @@ enum mosq_opt_t { MOSQ_OPT_SSL_CTX = 2, MOSQ_OPT_SSL_CTX_WITH_DEFAULTS = 3, MOSQ_OPT_RECEIVE_MAXIMUM = 4, + MOSQ_OPT_SEND_MAXIMUM = 5, }; /* MQTT specification restricts client ids to a maximum of 23 characters */ @@ -1375,6 +1376,15 @@ libmosq_EXPORT int mosquitto_opts_set(struct mosquitto *mosq, enum mosq_opt_t op * will override this option. Using this option is the recommended * method however. * + * MOSQ_OPT_SEND_MAXIMUM + * Value can be set between 1 and 65535 inclusive, and represents + * the maximum number of outgoing QoS 1 and QoS 2 messages that this + * client will attempt to have "in flight" at once. Defaults to 20. + * This option is not valid for MQTT v3.1 or v3.1.1 clients. + * Note that if the broker being connected to sends a + * MQTT_PROP_RECEIVE_MAXIMUM property that has a lower value than + * this option, then the broker provided value will be used. + * * MOSQ_OPT_SSL_CTX_WITH_DEFAULTS * If value is set to a non zero value, then the user specified * SSL_CTX passed in using MOSQ_OPT_SSL_CTX will have the default @@ -1450,6 +1460,9 @@ libmosq_EXPORT int mosquitto_reconnect_delay_set(struct mosquitto *mosq, unsigne /* * Function: mosquitto_max_inflight_messages_set * + * This function is deprected. Use the function with the + * MOSQ_OPT_SEND_MAXIMUM option instead. + * * Set the number of QoS 1 and 2 messages that can be "in flight" at one time. * An in flight message is part way through its delivery flow. Attempts to send * further messages with will result in the messages being diff --git a/lib/mosquitto_internal.h b/lib/mosquitto_internal.h index f62cf3d3..49a8829b 100644 --- a/lib/mosquitto_internal.h +++ b/lib/mosquitto_internal.h @@ -278,14 +278,14 @@ struct mosquitto { bool reconnect_exponential_backoff; char threaded; struct mosquitto__packet *out_packet_last; - int inflight_messages; # ifdef WITH_SRV ares_channel achan; # endif #endif + int send_quota; int receive_quota; - int receive_maximum; - int max_inflight_messages; + uint16_t send_maximum; + uint16_t receive_maximum; #ifdef WITH_BROKER UT_hash_handle hh_id; diff --git a/lib/options.c b/lib/options.c index 053a1e4b..fd31d57d 100644 --- a/lib/options.c +++ b/lib/options.c @@ -322,6 +322,13 @@ int mosquitto_int_option(struct mosquitto *mosq, enum mosq_opt_t option, int val mosq->receive_maximum = value; break; + case MOSQ_OPT_SEND_MAXIMUM: + if(value < 0 || value > 65535){ + return MOSQ_ERR_INVAL; + } + mosq->send_maximum = value; + break; + case MOSQ_OPT_SSL_CTX_WITH_DEFAULTS: #if defined(WITH_TLS) && OPENSSL_VERSION_NUMBER >= 0x10100000L if(value){ diff --git a/lib/util_mosq.c b/lib/util_mosq.c index 1683a2e9..ca2b5077 100644 --- a/lib/util_mosq.c +++ b/lib/util_mosq.c @@ -261,3 +261,10 @@ void util__increment_receive_quota(struct mosquitto *mosq) } } } + +void util__increment_send_quota(struct mosquitto *mosq) +{ + if(mosq->send_quota < mosq->send_maximum){ + mosq->send_quota++; + } +} \ No newline at end of file diff --git a/lib/util_mosq.h b/lib/util_mosq.h index 62a14f2b..6642a697 100644 --- a/lib/util_mosq.h +++ b/lib/util_mosq.h @@ -38,4 +38,5 @@ int mosquitto__hex2bin(const char *hex, unsigned char *bin, int bin_max_len); #endif void util__increment_receive_quota(struct mosquitto *mosq); +void util__increment_send_quota(struct mosquitto *mosq); #endif diff --git a/src/context.c b/src/context.c index a93b8e30..a6a5f499 100644 --- a/src/context.c +++ b/src/context.c @@ -76,7 +76,8 @@ struct mosquitto *context__init(struct mosquitto_db *db, mosq_sock_t sock) context->last_inflight_msg = NULL; context->queued_msgs = NULL; context->last_queued_msg = NULL; - context->max_inflight_messages = db->config->max_inflight_messages; + context->receive_maximum = db->config->max_inflight_messages; + context->send_maximum = db->config->max_inflight_messages; context->msg_bytes = 0; context->msg_bytes12 = 0; context->msg_count = 0; diff --git a/src/database.c b/src/database.c index d93d3c08..6fc2dc23 100644 --- a/src/database.c +++ b/src/database.c @@ -37,14 +37,14 @@ static unsigned long max_queued_bytes = 0; */ static bool db__ready_for_flight(struct mosquitto *context, int qos) { - if(qos == 0 || (context->max_inflight_messages == 0 && max_inflight_bytes == 0)){ + if(qos == 0 || (context->send_maximum == 0 && max_inflight_bytes == 0)){ return true; } bool valid_bytes = context->msg_bytes12 < max_inflight_bytes; - bool valid_count = context->msg_count12 < context->max_inflight_messages; + bool valid_count = context->msg_count12 < context->send_maximum; - if(context->max_inflight_messages == 0){ + if(context->send_maximum == 0){ return valid_bytes; } if(max_inflight_bytes == 0){ @@ -72,7 +72,7 @@ static bool db__ready_for_queue(struct mosquitto *context, int qos) unsigned long source_bytes = context->msg_bytes12; int source_count = context->msg_count12; unsigned long adjust_bytes = max_inflight_bytes; - int adjust_count = context->max_inflight_messages; + int adjust_count = context->send_maximum; /* nothing in flight for offline clients */ if(context->sock == INVALID_SOCKET){ @@ -306,7 +306,7 @@ int db__message_delete(struct mosquitto_db *db, struct mosquitto *context, uint1 tail = tail->next; } } - while (context->queued_msgs && (context->max_inflight_messages == 0 || msg_index < context->max_inflight_messages)){ + while (context->queued_msgs && (context->send_maximum == 0 || msg_index < context->send_maximum)){ msg_index++; tail = context->queued_msgs; tail->timestamp = mosquitto_time(); @@ -837,7 +837,7 @@ int db__message_release(struct mosquitto_db *db, struct mosquitto *context, uint } } - while(context->queued_msgs && (context->max_inflight_messages == 0 || msg_index < context->max_inflight_messages)){ + while(context->queued_msgs && (context->send_maximum == 0 || msg_index < context->send_maximum)){ msg_index++; tail = context->queued_msgs; tail->timestamp = mosquitto_time(); @@ -988,7 +988,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context) } } - while(context->queued_msgs && (context->max_inflight_messages == 0 || msg_count < context->max_inflight_messages)){ + while(context->queued_msgs && (context->send_maximum == 0 || msg_count < context->send_maximum)){ msg_count++; tail = context->queued_msgs; if(tail->direction == mosq_md_out){ diff --git a/src/property_broker.c b/src/property_broker.c index 57d17c9c..ee76b2c4 100644 --- a/src/property_broker.c +++ b/src/property_broker.c @@ -40,11 +40,8 @@ int property__process_connect(struct mosquitto *context, mosquitto_property *pro return MOSQ_ERR_PROTOCOL; } - if(p->value.i16 == 65535){ - context->max_inflight_messages = 0; - }else{ - context->max_inflight_messages = p->value.i16; - } + context->send_maximum = p->value.i16; + context->send_quota = context->send_maximum; } p = p->next; }