Apply limits to QoS 0 outgoing messages.

This commit is contained in:
Roger A. Light 2020-11-24 10:13:14 +00:00
parent 4667c9d5bc
commit 2755fe3c4c
11 changed files with 53 additions and 70 deletions

View File

@ -267,6 +267,7 @@ void do_client_disconnect(struct mosquitto *mosq, int reason_code, const mosquit
mosq->current_out_packet = mosq->out_packet;
if(mosq->out_packet){
mosq->out_packet = mosq->out_packet->next;
mosq->out_packet_len--;
if(!mosq->out_packet){
mosq->out_packet_last = NULL;
}

View File

@ -224,7 +224,6 @@ int mosquitto_reinitialise(struct mosquitto *mosq, const char *id, bool clean_st
void mosquitto__destroy(struct mosquitto *mosq)
{
struct mosquitto__packet *packet;
if(!mosq) return;
#ifdef WITH_THREADING
@ -295,22 +294,7 @@ void mosquitto__destroy(struct mosquitto *mosq)
mosquitto_property_free_all(&mosq->connect_properties);
/* Out packet cleanup */
if(mosq->out_packet && !mosq->current_out_packet){
mosq->current_out_packet = mosq->out_packet;
mosq->out_packet = mosq->out_packet->next;
}
while(mosq->current_out_packet){
packet = mosq->current_out_packet;
/* Free data and reset values */
mosq->current_out_packet = mosq->out_packet;
if(mosq->out_packet){
mosq->out_packet = mosq->out_packet->next;
}
packet__cleanup(packet);
mosquitto__free(packet);
}
packet__cleanup_all(mosq);
packet__cleanup(&mosq->in_packet);
if(mosq->sockpairR != INVALID_SOCKET){

View File

@ -233,8 +233,9 @@ struct mosquitto {
struct mosquitto_message_all *will;
struct mosquitto__alias *aliases;
struct will_delay_list *will_delay_entry;
uint32_t maximum_packet_size;
int out_packet_len;
int alias_count;
uint32_t maximum_packet_size;
uint32_t will_delay_interval;
time_t will_delay_time;
#ifdef WITH_TLS

View File

@ -123,6 +123,7 @@ void packet__cleanup_all(struct mosquitto *mosq)
packet__cleanup(packet);
mosquitto__free(packet);
}
mosq->out_packet_len = 0;
packet__cleanup(&mosq->in_packet);
@ -150,6 +151,7 @@ int packet__queue(struct mosquitto *mosq, struct mosquitto__packet *packet)
mosq->out_packet = packet;
}
mosq->out_packet_last = packet;
mosq->out_packet_len++;
pthread_mutex_unlock(&mosq->out_packet_mutex);
#ifdef WITH_BROKER
# ifdef WITH_WEBSOCKETS
@ -213,6 +215,7 @@ int packet__write(struct mosquitto *mosq)
if(mosq->out_packet && !mosq->current_out_packet){
mosq->current_out_packet = mosq->out_packet;
mosq->out_packet = mosq->out_packet->next;
mosq->out_packet_len--;
if(!mosq->out_packet){
mosq->out_packet_last = NULL;
}
@ -294,6 +297,7 @@ int packet__write(struct mosquitto *mosq)
mosq->current_out_packet = mosq->out_packet;
if(mosq->out_packet){
mosq->out_packet = mosq->out_packet->next;
mosq->out_packet_len--;
if(!mosq->out_packet){
mosq->out_packet_last = NULL;
}

View File

@ -63,6 +63,20 @@ int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint3
}
#ifdef WITH_BROKER
if(qos == 0){
/* This is a crude, incorrect limit on the number of QoS 0 PUBLISHes.
* We limit QoS 1 and 2 *messages* to max_inflight_messages+max_queued_messages.
* We don't create QoS 0 *messages* though, only *packets*. So it is
* tricky to add a correct limit on QoS 0 PUBLISHes.
* This check will drop any further outgoing QoS PUBLISHes if the queue
* of packets to be sent hits the max_queued_messages limit. It won't
* be exactly correct, but does set an upper limit on queued QoS 0
* packets.
*/
if(mosq->out_packet_len >= db.config->max_queued_messages){
return MOSQ_ERR_SUCCESS;
}
}
if(mosq->listener && mosq->listener->mount_point){
len = strlen(mosq->listener->mount_point);
if(len < strlen(topic)){

View File

@ -609,6 +609,7 @@ void bridge__packet_cleanup(struct mosquitto *context)
}
context->out_packet = NULL;
context->out_packet_last = NULL;
context->out_packet_len = 0;
packet__cleanup(&(context->in_packet));
}

View File

@ -52,9 +52,6 @@ struct config_recurse {
int log_dest_set;
unsigned int log_type;
int log_type_set;
unsigned long max_inflight_bytes;
unsigned long max_queued_bytes;
int max_queued_messages;
};
#if defined(WIN32) || defined(__CYGWIN__)
@ -191,6 +188,9 @@ static void config__init_reload(struct mosquitto__config *config)
config->max_keepalive = 65535;
config->max_packet_size = 0;
config->max_inflight_messages = 20;
config->max_queued_messages = 1000;
config->max_inflight_bytes = 0;
config->max_queued_bytes = 0;
config->persistence = false;
mosquitto__free(config->persistence_location);
config->persistence_location = NULL;
@ -607,9 +607,6 @@ int config__read(struct mosquitto__config *config, bool reload)
cr.log_dest_set = 0;
cr.log_type = MOSQ_LOG_NONE;
cr.log_type_set = 0;
cr.max_inflight_bytes = 0;
cr.max_queued_bytes = 0;
cr.max_queued_messages = 1000;
if(!db.config_file) return 0;
@ -676,8 +673,6 @@ int config__read(struct mosquitto__config *config, bool reload)
config->user = mosquitto__strdup("mosquitto");
}
db__limits_set(cr.max_inflight_bytes, cr.max_queued_messages, cr.max_queued_bytes);
#ifdef WITH_BRIDGE
for(i=0; i<config->bridge_count; i++){
if(!config->bridges[i].name){
@ -1629,12 +1624,9 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
}
cur_listener->maximum_qos = (uint8_t)tmp_int;
}else if(!strcmp(token, "max_inflight_bytes")){
token = strtok_r(NULL, " ", &saveptr);
if(token){
cr->max_inflight_bytes = (unsigned long)atol(token);
}else{
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty max_inflight_bytes value in configuration.");
}
if(conf__parse_int(&token, "max_inflight_bytes", &tmp_int, saveptr)) return MOSQ_ERR_INVAL;
if(tmp_int < 0) tmp_int = 0;
config->max_inflight_bytes = (size_t)tmp_int;
}else if(!strcmp(token, "max_inflight_messages")){
if(conf__parse_int(&token, "max_inflight_messages", &tmp_int, saveptr)) return MOSQ_ERR_INVAL;
if(tmp_int < 0 || tmp_int == UINT16_MAX){
@ -1659,20 +1651,13 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
}
config->max_packet_size = (uint32_t)tmp_int;
}else if(!strcmp(token, "max_queued_bytes")){
token = strtok_r(NULL, " ", &saveptr);
if(token){
cr->max_queued_bytes = (unsigned long)atol(token); /* 63 bits is ok right? */
}else{
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty max_queued_bytes value in configuration.");
}
if(conf__parse_int(&token, "max_queued_bytes", &tmp_int, saveptr)) return MOSQ_ERR_INVAL;
if(tmp_int < 0) tmp_int = 0;
config->max_queued_bytes = (size_t)tmp_int;
}else if(!strcmp(token, "max_queued_messages")){
token = strtok_r(NULL, " ", &saveptr);
if(token){
cr->max_queued_messages = atoi(token);
if(cr->max_queued_messages < 0) cr->max_queued_messages = 0;
}else{
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty max_queued_messages value in configuration.");
}
if(conf__parse_int(&token, "max_queued_messages", &tmp_int, saveptr)) return MOSQ_ERR_INVAL;
if(tmp_int < 0) tmp_int = 0;
config->max_queued_messages = tmp_int;
}else if(!strcmp(token, "memory_limit")){
ssize_t lim;
if(conf__parse_ssize_t(&token, "memory_limit", &lim, saveptr)) return MOSQ_ERR_INVAL;

View File

@ -65,6 +65,7 @@ struct mosquitto *context__init(mosq_sock_t sock)
context->in_packet.payload = NULL;
packet__cleanup(&context->in_packet);
context->out_packet = NULL;
context->out_packet_len = 0;
context->current_out_packet = NULL;
context->address = NULL;
@ -155,6 +156,7 @@ void context__cleanup(struct mosquitto *context, bool force_free)
context->out_packet = context->out_packet->next;
mosquitto__free(packet);
}
context->out_packet_len = 0;
#if defined(WITH_BROKER) && defined(__GLIBC__) && defined(WITH_ADNS)
if(context->adns){
gai_cancel(context->adns);

View File

@ -27,10 +27,6 @@ Contributors:
#include "time_mosq.h"
#include "util_mosq.h"
static unsigned long max_inflight_bytes = 0;
static int max_queued = 100;
static unsigned long max_queued_bytes = 0;
/**
* Is this context ready to take more in flight messages right now?
* @param context the client context of interest
@ -42,17 +38,17 @@ bool db__ready_for_flight(struct mosquitto_msg_data *msgs, int qos)
bool valid_bytes;
bool valid_count;
if(qos == 0 || (msgs->inflight_maximum == 0 && max_inflight_bytes == 0)){
if(qos == 0 || (msgs->inflight_maximum == 0 && db.config->max_inflight_bytes == 0)){
return true;
}
valid_bytes = msgs->msg_bytes12 < max_inflight_bytes;
valid_bytes = msgs->msg_bytes12 < db.config->max_inflight_bytes;
valid_count = msgs->inflight_quota > 0;
if(msgs->inflight_maximum == 0){
return valid_bytes;
}
if(max_inflight_bytes == 0){
if(db.config->max_inflight_bytes == 0){
return valid_count;
}
@ -73,11 +69,11 @@ bool db__ready_for_queue(struct mosquitto *context, int qos, struct mosquitto_ms
int source_count;
int adjust_count;
unsigned long source_bytes;
unsigned long adjust_bytes = max_inflight_bytes;
unsigned long adjust_bytes = db.config->max_inflight_bytes;
bool valid_bytes;
bool valid_count;
if(max_queued == 0 && max_queued_bytes == 0){
if(db.config->max_queued_messages == 0 && db.config->max_queued_bytes == 0){
return true;
}
@ -96,13 +92,13 @@ bool db__ready_for_queue(struct mosquitto *context, int qos, struct mosquitto_ms
adjust_count = 0;
}
valid_bytes = source_bytes - adjust_bytes < max_queued_bytes;
valid_count = source_count - adjust_count < max_queued;
valid_bytes = source_bytes - adjust_bytes < db.config->max_queued_bytes;
valid_count = source_count - adjust_count < db.config->max_queued_messages;
if(max_queued_bytes == 0){
if(db.config->max_queued_bytes == 0){
return valid_count;
}
if(max_queued == 0){
if(db.config->max_queued_messages == 0){
return valid_bytes;
}
@ -1204,12 +1200,3 @@ int db__message_write_queued_out(struct mosquitto *context)
}
return MOSQ_ERR_SUCCESS;
}
void db__limits_set(unsigned long inflight_bytes, int queued, unsigned long queued_bytes)
{
max_inflight_bytes = inflight_bytes;
max_queued = queued;
max_queued_bytes = queued_bytes;
}

View File

@ -347,10 +347,13 @@ struct mosquitto__config {
char *log_timestamp_format;
char *log_file;
FILE *log_fptr;
uint16_t max_inflight_messages;
uint16_t max_keepalive;
size_t max_inflight_bytes;
size_t max_queued_bytes;
int max_queued_messages;
uint32_t max_packet_size;
uint32_t message_size_limit;
uint16_t max_inflight_messages;
uint16_t max_keepalive;
bool persistence;
char *persistence_location;
char *persistence_file;
@ -708,7 +711,6 @@ int db__close(void);
int persist__backup(bool shutdown);
int persist__restore(void);
#endif
void db__limits_set(unsigned long inflight_bytes, int queued, unsigned long queued_bytes);
/* Return the number of in-flight messages in count. */
int db__message_count(int *count);
int db__message_delete_outgoing(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state expect_state, int qos);

View File

@ -278,6 +278,7 @@ static int callback_mqtt(struct libwebsocket_context *context,
if(mosq->out_packet && !mosq->current_out_packet){
mosq->current_out_packet = mosq->out_packet;
mosq->out_packet = mosq->out_packet->next;
mosq->out_packet_len--;
if(!mosq->out_packet){
mosq->out_packet_last = NULL;
}
@ -338,6 +339,7 @@ static int callback_mqtt(struct libwebsocket_context *context,
mosq->current_out_packet = mosq->out_packet;
if(mosq->out_packet){
mosq->out_packet = mosq->out_packet->next;
mosq->out_packet_len--;
if(!mosq->out_packet){
mosq->out_packet_last = NULL;
}