Better outgoing QoS 0 limiting.

This commit is contained in:
Roger A. Light 2020-11-25 09:59:31 +00:00
parent fdd624cec4
commit dac841a342
10 changed files with 31 additions and 46 deletions

View File

@ -40,6 +40,8 @@ Breaking changes:
regardless of whether the `-d` argument is used when running the broker. regardless of whether the `-d` argument is used when running the broker.
- The `tls_version` option now defines the *minimum* TLS protocol version to - The `tls_version` option now defines the *minimum* TLS protocol version to
be used, rather than the exact version. Closes #1258. be used, rather than the exact version. Closes #1258.
- The `max_queued_messages` option has been increased from 100 to 1000 by
default, and now also applies to QoS 0 messages, when a client is connected.
Broker: Broker:
- When running as root, if dropping privileges to the "mosquitto" user fails, - When running as root, if dropping privileges to the "mosquitto" user fails,

View File

@ -22,6 +22,7 @@ build_variants = [
'WITH_UNIX_SOCKETS', 'WITH_UNIX_SOCKETS',
'WITH_WEBSOCKETS', 'WITH_WEBSOCKETS',
'WITH_WRAP', 'WITH_WRAP',
'WITH_XTREPORT',
] ]
special_variants = [ special_variants = [

View File

@ -267,7 +267,6 @@ void do_client_disconnect(struct mosquitto *mosq, int reason_code, const mosquit
mosq->current_out_packet = mosq->out_packet; mosq->current_out_packet = mosq->out_packet;
if(mosq->out_packet){ if(mosq->out_packet){
mosq->out_packet = mosq->out_packet->next; mosq->out_packet = mosq->out_packet->next;
mosq->out_packet_len--;
if(!mosq->out_packet){ if(!mosq->out_packet){
mosq->out_packet_last = NULL; mosq->out_packet_last = NULL;
} }

View File

@ -233,7 +233,6 @@ struct mosquitto {
struct mosquitto_message_all *will; struct mosquitto_message_all *will;
struct mosquitto__alias *aliases; struct mosquitto__alias *aliases;
struct will_delay_list *will_delay_entry; struct will_delay_list *will_delay_entry;
int out_packet_len;
int alias_count; int alias_count;
uint32_t maximum_packet_size; uint32_t maximum_packet_size;
uint32_t will_delay_interval; uint32_t will_delay_interval;

View File

@ -123,7 +123,6 @@ void packet__cleanup_all(struct mosquitto *mosq)
packet__cleanup(packet); packet__cleanup(packet);
mosquitto__free(packet); mosquitto__free(packet);
} }
mosq->out_packet_len = 0;
packet__cleanup(&mosq->in_packet); packet__cleanup(&mosq->in_packet);
@ -151,7 +150,6 @@ int packet__queue(struct mosquitto *mosq, struct mosquitto__packet *packet)
mosq->out_packet = packet; mosq->out_packet = packet;
} }
mosq->out_packet_last = packet; mosq->out_packet_last = packet;
mosq->out_packet_len++;
pthread_mutex_unlock(&mosq->out_packet_mutex); pthread_mutex_unlock(&mosq->out_packet_mutex);
#ifdef WITH_BROKER #ifdef WITH_BROKER
# ifdef WITH_WEBSOCKETS # ifdef WITH_WEBSOCKETS
@ -215,7 +213,6 @@ int packet__write(struct mosquitto *mosq)
if(mosq->out_packet && !mosq->current_out_packet){ if(mosq->out_packet && !mosq->current_out_packet){
mosq->current_out_packet = mosq->out_packet; mosq->current_out_packet = mosq->out_packet;
mosq->out_packet = mosq->out_packet->next; mosq->out_packet = mosq->out_packet->next;
mosq->out_packet_len--;
if(!mosq->out_packet){ if(!mosq->out_packet){
mosq->out_packet_last = NULL; mosq->out_packet_last = NULL;
} }
@ -297,7 +294,6 @@ int packet__write(struct mosquitto *mosq)
mosq->current_out_packet = mosq->out_packet; mosq->current_out_packet = mosq->out_packet;
if(mosq->out_packet){ if(mosq->out_packet){
mosq->out_packet = mosq->out_packet->next; mosq->out_packet = mosq->out_packet->next;
mosq->out_packet_len--;
if(!mosq->out_packet){ if(!mosq->out_packet){
mosq->out_packet_last = NULL; mosq->out_packet_last = NULL;
} }

View File

@ -63,29 +63,6 @@ int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint3
} }
#ifdef WITH_BROKER #ifdef WITH_BROKER
if(qos == 0){
/* This is a crude, incorrect limit on the number of QoS 0 PUBLISHes.
* We limit QoS 1 and 2 *messages* to max_inflight_messages+max_queued_messages.
* We don't create QoS 0 *messages* though, only *packets*. So it is
* tricky to add a correct limit on QoS 0 PUBLISHes.
* This check will drop any further outgoing QoS PUBLISHes if the queue
* of packets to be sent hits the max_queued_messages limit. It won't
* be exactly correct, but does set an upper limit on queued QoS 0
* packets.
*/
if(mosq->out_packet_len >= db.config->max_queued_messages){
return MOSQ_ERR_SUCCESS;
}
}
if(mosq->listener && mosq->listener->mount_point){
len = strlen(mosq->listener->mount_point);
if(len < strlen(topic)){
topic += len;
}else{
/* Invalid topic string. Should never happen, but silently swallow the message anyway. */
return MOSQ_ERR_SUCCESS;
}
}
#ifdef WITH_BRIDGE #ifdef WITH_BRIDGE
if(mosq->bridge && mosq->bridge->topics && mosq->bridge->topic_remapping){ if(mosq->bridge && mosq->bridge->topics && mosq->bridge->topic_remapping){
for(i=0; i<mosq->bridge->topic_count; i++){ for(i=0; i<mosq->bridge->topic_count; i++){

View File

@ -609,7 +609,6 @@ void bridge__packet_cleanup(struct mosquitto *context)
} }
context->out_packet = NULL; context->out_packet = NULL;
context->out_packet_last = NULL; context->out_packet_last = NULL;
context->out_packet_len = 0;
packet__cleanup(&(context->in_packet)); packet__cleanup(&(context->in_packet));
} }

View File

@ -65,7 +65,6 @@ struct mosquitto *context__init(mosq_sock_t sock)
context->in_packet.payload = NULL; context->in_packet.payload = NULL;
packet__cleanup(&context->in_packet); packet__cleanup(&context->in_packet);
context->out_packet = NULL; context->out_packet = NULL;
context->out_packet_len = 0;
context->current_out_packet = NULL; context->current_out_packet = NULL;
context->address = NULL; context->address = NULL;
@ -156,7 +155,6 @@ void context__cleanup(struct mosquitto *context, bool force_free)
context->out_packet = context->out_packet->next; context->out_packet = context->out_packet->next;
mosquitto__free(packet); mosquitto__free(packet);
} }
context->out_packet_len = 0;
#if defined(WITH_BROKER) && defined(__GLIBC__) && defined(WITH_ADNS) #if defined(WITH_BROKER) && defined(__GLIBC__) && defined(WITH_ADNS)
if(context->adns){ if(context->adns){
gai_cancel(context->adns); gai_cancel(context->adns);

View File

@ -38,10 +38,26 @@ bool db__ready_for_flight(struct mosquitto_msg_data *msgs, int qos)
bool valid_bytes; bool valid_bytes;
bool valid_count; bool valid_count;
if(qos == 0 || (msgs->inflight_maximum == 0 && db.config->max_inflight_bytes == 0)){ if(msgs->inflight_maximum == 0 && db.config->max_inflight_bytes == 0){
return true; return true;
} }
if(qos == 0){
/* Deliver QoS 0 messages unless the queue is already full.
* For QoS 0 messages the choice is either "inflight" or dropped.
* There is no queueing option, unless the client is offline and
* queue_qos0_messages is enabled.
*/
valid_bytes = msgs->msg_bytes - db.config->max_inflight_bytes < db.config->max_queued_bytes;
valid_count = msgs->msg_count - msgs->inflight_maximum < db.config->max_queued_messages;
if(db.config->max_queued_messages == 0){
return valid_bytes;
}
if(db.config->max_queued_bytes == 0){
return valid_count;
}
}else{
valid_bytes = msgs->msg_bytes12 < db.config->max_inflight_bytes; valid_bytes = msgs->msg_bytes12 < db.config->max_inflight_bytes;
valid_count = msgs->inflight_quota > 0; valid_count = msgs->inflight_quota > 0;
@ -51,6 +67,7 @@ bool db__ready_for_flight(struct mosquitto_msg_data *msgs, int qos)
if(db.config->max_inflight_bytes == 0){ if(db.config->max_inflight_bytes == 0){
return valid_count; return valid_count;
} }
}
return valid_bytes && valid_count; return valid_bytes && valid_count;
} }
@ -78,8 +95,7 @@ bool db__ready_for_queue(struct mosquitto *context, int qos, struct mosquitto_ms
} }
if(qos == 0){ if(qos == 0){
source_bytes = msg_data->msg_bytes; return false; /* This case is handled in db__ready_for_flight() */
source_count = msg_data->msg_count;
}else{ }else{
source_bytes = msg_data->msg_bytes12; source_bytes = msg_data->msg_bytes12;
source_count = msg_data->msg_count12; source_count = msg_data->msg_count12;
@ -427,7 +443,7 @@ int db__message_insert(struct mosquitto *context, uint16_t mid, enum mosquitto_m
return 1; return 1;
} }
} }
}else if(db__ready_for_queue(context, qos, msg_data)){ }else if(qos != 0 && db__ready_for_queue(context, qos, msg_data)){
state = mosq_ms_queued; state = mosq_ms_queued;
rc = 2; rc = 2;
}else{ }else{
@ -529,7 +545,7 @@ int db__message_insert(struct mosquitto *context, uint16_t mid, enum mosquitto_m
util__decrement_send_quota(context); util__decrement_send_quota(context);
} }
if(dir == mosq_md_out && update && context->current_out_packet == NULL){ if(dir == mosq_md_out && update){
rc = db__message_write_inflight_out_latest(context); rc = db__message_write_inflight_out_latest(context);
if(rc) return rc; if(rc) return rc;
rc = db__message_write_queued_out(context); rc = db__message_write_queued_out(context);

View File

@ -278,7 +278,6 @@ static int callback_mqtt(struct libwebsocket_context *context,
if(mosq->out_packet && !mosq->current_out_packet){ if(mosq->out_packet && !mosq->current_out_packet){
mosq->current_out_packet = mosq->out_packet; mosq->current_out_packet = mosq->out_packet;
mosq->out_packet = mosq->out_packet->next; mosq->out_packet = mosq->out_packet->next;
mosq->out_packet_len--;
if(!mosq->out_packet){ if(!mosq->out_packet){
mosq->out_packet_last = NULL; mosq->out_packet_last = NULL;
} }
@ -339,7 +338,6 @@ static int callback_mqtt(struct libwebsocket_context *context,
mosq->current_out_packet = mosq->out_packet; mosq->current_out_packet = mosq->out_packet;
if(mosq->out_packet){ if(mosq->out_packet){
mosq->out_packet = mosq->out_packet->next; mosq->out_packet = mosq->out_packet->next;
mosq->out_packet_len--;
if(!mosq->out_packet){ if(!mosq->out_packet){
mosq->out_packet_last = NULL; mosq->out_packet_last = NULL;
} }