From dac841a34216baa96d1a30260085a37410f021a5 Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Wed, 25 Nov 2020 09:59:31 +0000 Subject: [PATCH] Better outgoing QoS 0 limiting. --- ChangeLog.txt | 2 ++ buildtest.py | 1 + lib/connect.c | 1 - lib/mosquitto_internal.h | 1 - lib/packet_mosq.c | 4 ---- lib/send_publish.c | 23 ----------------------- src/bridge.c | 1 - src/context.c | 2 -- src/database.c | 40 ++++++++++++++++++++++++++++------------ src/websockets.c | 2 -- 10 files changed, 31 insertions(+), 46 deletions(-) diff --git a/ChangeLog.txt b/ChangeLog.txt index ddba79a6..4cf8a639 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -40,6 +40,8 @@ Breaking changes: regardless of whether the `-d` argument is used when running the broker. - The `tls_version` option now defines the *minimum* TLS protocol version to be used, rather than the exact version. Closes #1258. +- The `max_queued_messages` option has been increased from 100 to 1000 by + default, and now also applies to QoS 0 messages, when a client is connected. Broker: - When running as root, if dropping privileges to the "mosquitto" user fails, diff --git a/buildtest.py b/buildtest.py index 6e6cf87a..b28c273f 100755 --- a/buildtest.py +++ b/buildtest.py @@ -22,6 +22,7 @@ build_variants = [ 'WITH_UNIX_SOCKETS', 'WITH_WEBSOCKETS', 'WITH_WRAP', + 'WITH_XTREPORT', ] special_variants = [ diff --git a/lib/connect.c b/lib/connect.c index bcece56c..615a381c 100644 --- a/lib/connect.c +++ b/lib/connect.c @@ -267,7 +267,6 @@ void do_client_disconnect(struct mosquitto *mosq, int reason_code, const mosquit mosq->current_out_packet = mosq->out_packet; if(mosq->out_packet){ mosq->out_packet = mosq->out_packet->next; - mosq->out_packet_len--; if(!mosq->out_packet){ mosq->out_packet_last = NULL; } diff --git a/lib/mosquitto_internal.h b/lib/mosquitto_internal.h index d0df571b..7ee7799b 100644 --- a/lib/mosquitto_internal.h +++ b/lib/mosquitto_internal.h @@ -233,7 +233,6 @@ struct mosquitto { struct mosquitto_message_all *will; struct mosquitto__alias *aliases; struct will_delay_list *will_delay_entry; - int out_packet_len; int alias_count; uint32_t maximum_packet_size; uint32_t will_delay_interval; diff --git a/lib/packet_mosq.c b/lib/packet_mosq.c index 11595459..3b45bca8 100644 --- a/lib/packet_mosq.c +++ b/lib/packet_mosq.c @@ -123,7 +123,6 @@ void packet__cleanup_all(struct mosquitto *mosq) packet__cleanup(packet); mosquitto__free(packet); } - mosq->out_packet_len = 0; packet__cleanup(&mosq->in_packet); @@ -151,7 +150,6 @@ int packet__queue(struct mosquitto *mosq, struct mosquitto__packet *packet) mosq->out_packet = packet; } mosq->out_packet_last = packet; - mosq->out_packet_len++; pthread_mutex_unlock(&mosq->out_packet_mutex); #ifdef WITH_BROKER # ifdef WITH_WEBSOCKETS @@ -215,7 +213,6 @@ int packet__write(struct mosquitto *mosq) if(mosq->out_packet && !mosq->current_out_packet){ mosq->current_out_packet = mosq->out_packet; mosq->out_packet = mosq->out_packet->next; - mosq->out_packet_len--; if(!mosq->out_packet){ mosq->out_packet_last = NULL; } @@ -297,7 +294,6 @@ int packet__write(struct mosquitto *mosq) mosq->current_out_packet = mosq->out_packet; if(mosq->out_packet){ mosq->out_packet = mosq->out_packet->next; - mosq->out_packet_len--; if(!mosq->out_packet){ mosq->out_packet_last = NULL; } diff --git a/lib/send_publish.c b/lib/send_publish.c index 00aecdcf..3fe17b68 100644 --- a/lib/send_publish.c +++ b/lib/send_publish.c @@ -63,29 +63,6 @@ int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint3 } #ifdef WITH_BROKER - if(qos == 0){ - /* This is a crude, incorrect limit on the number of QoS 0 PUBLISHes. - * We limit QoS 1 and 2 *messages* to max_inflight_messages+max_queued_messages. - * We don't create QoS 0 *messages* though, only *packets*. So it is - * tricky to add a correct limit on QoS 0 PUBLISHes. - * This check will drop any further outgoing QoS PUBLISHes if the queue - * of packets to be sent hits the max_queued_messages limit. It won't - * be exactly correct, but does set an upper limit on queued QoS 0 - * packets. - */ - if(mosq->out_packet_len >= db.config->max_queued_messages){ - return MOSQ_ERR_SUCCESS; - } - } - if(mosq->listener && mosq->listener->mount_point){ - len = strlen(mosq->listener->mount_point); - if(len < strlen(topic)){ - topic += len; - }else{ - /* Invalid topic string. Should never happen, but silently swallow the message anyway. */ - return MOSQ_ERR_SUCCESS; - } - } #ifdef WITH_BRIDGE if(mosq->bridge && mosq->bridge->topics && mosq->bridge->topic_remapping){ for(i=0; ibridge->topic_count; i++){ diff --git a/src/bridge.c b/src/bridge.c index 4fccc59f..199a156d 100644 --- a/src/bridge.c +++ b/src/bridge.c @@ -609,7 +609,6 @@ void bridge__packet_cleanup(struct mosquitto *context) } context->out_packet = NULL; context->out_packet_last = NULL; - context->out_packet_len = 0; packet__cleanup(&(context->in_packet)); } diff --git a/src/context.c b/src/context.c index f98e4bfa..7e99ac30 100644 --- a/src/context.c +++ b/src/context.c @@ -65,7 +65,6 @@ struct mosquitto *context__init(mosq_sock_t sock) context->in_packet.payload = NULL; packet__cleanup(&context->in_packet); context->out_packet = NULL; - context->out_packet_len = 0; context->current_out_packet = NULL; context->address = NULL; @@ -156,7 +155,6 @@ void context__cleanup(struct mosquitto *context, bool force_free) context->out_packet = context->out_packet->next; mosquitto__free(packet); } - context->out_packet_len = 0; #if defined(WITH_BROKER) && defined(__GLIBC__) && defined(WITH_ADNS) if(context->adns){ gai_cancel(context->adns); diff --git a/src/database.c b/src/database.c index 6ac9f089..637da07a 100644 --- a/src/database.c +++ b/src/database.c @@ -38,18 +38,35 @@ bool db__ready_for_flight(struct mosquitto_msg_data *msgs, int qos) bool valid_bytes; bool valid_count; - if(qos == 0 || (msgs->inflight_maximum == 0 && db.config->max_inflight_bytes == 0)){ + if(msgs->inflight_maximum == 0 && db.config->max_inflight_bytes == 0){ return true; } - valid_bytes = msgs->msg_bytes12 < db.config->max_inflight_bytes; - valid_count = msgs->inflight_quota > 0; + if(qos == 0){ + /* Deliver QoS 0 messages unless the queue is already full. + * For QoS 0 messages the choice is either "inflight" or dropped. + * There is no queueing option, unless the client is offline and + * queue_qos0_messages is enabled. + */ + valid_bytes = msgs->msg_bytes - db.config->max_inflight_bytes < db.config->max_queued_bytes; + valid_count = msgs->msg_count - msgs->inflight_maximum < db.config->max_queued_messages; - if(msgs->inflight_maximum == 0){ - return valid_bytes; - } - if(db.config->max_inflight_bytes == 0){ - return valid_count; + if(db.config->max_queued_messages == 0){ + return valid_bytes; + } + if(db.config->max_queued_bytes == 0){ + return valid_count; + } + }else{ + valid_bytes = msgs->msg_bytes12 < db.config->max_inflight_bytes; + valid_count = msgs->inflight_quota > 0; + + if(msgs->inflight_maximum == 0){ + return valid_bytes; + } + if(db.config->max_inflight_bytes == 0){ + return valid_count; + } } return valid_bytes && valid_count; @@ -78,8 +95,7 @@ bool db__ready_for_queue(struct mosquitto *context, int qos, struct mosquitto_ms } if(qos == 0){ - source_bytes = msg_data->msg_bytes; - source_count = msg_data->msg_count; + return false; /* This case is handled in db__ready_for_flight() */ }else{ source_bytes = msg_data->msg_bytes12; source_count = msg_data->msg_count12; @@ -427,7 +443,7 @@ int db__message_insert(struct mosquitto *context, uint16_t mid, enum mosquitto_m return 1; } } - }else if(db__ready_for_queue(context, qos, msg_data)){ + }else if(qos != 0 && db__ready_for_queue(context, qos, msg_data)){ state = mosq_ms_queued; rc = 2; }else{ @@ -529,7 +545,7 @@ int db__message_insert(struct mosquitto *context, uint16_t mid, enum mosquitto_m util__decrement_send_quota(context); } - if(dir == mosq_md_out && update && context->current_out_packet == NULL){ + if(dir == mosq_md_out && update){ rc = db__message_write_inflight_out_latest(context); if(rc) return rc; rc = db__message_write_queued_out(context); diff --git a/src/websockets.c b/src/websockets.c index 18712833..f39214c4 100644 --- a/src/websockets.c +++ b/src/websockets.c @@ -278,7 +278,6 @@ static int callback_mqtt(struct libwebsocket_context *context, if(mosq->out_packet && !mosq->current_out_packet){ mosq->current_out_packet = mosq->out_packet; mosq->out_packet = mosq->out_packet->next; - mosq->out_packet_len--; if(!mosq->out_packet){ mosq->out_packet_last = NULL; } @@ -339,7 +338,6 @@ static int callback_mqtt(struct libwebsocket_context *context, mosq->current_out_packet = mosq->out_packet; if(mosq->out_packet){ mosq->out_packet = mosq->out_packet->next; - mosq->out_packet_len--; if(!mosq->out_packet){ mosq->out_packet_last = NULL; }