diff --git a/ChangeLog.txt b/ChangeLog.txt index 3fe8c667..dd7ec248 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -7,6 +7,7 @@ Broker: reconnected to the broker. Closes #2167. - Fix bridge not reconnectng if the first reconnection attempt fails. Closes #2207. +- Improve QoS 0 outgoing packet queueing. Clients: - If sending mosquitto_sub output to a pipe, mosquitto_sub will now detect diff --git a/lib/connect.c b/lib/connect.c index a82c1283..6b801471 100644 --- a/lib/connect.c +++ b/lib/connect.c @@ -276,6 +276,7 @@ void do_client_disconnect(struct mosquitto *mosq, int reason_code, const mosquit if(!mosq->out_packet){ mosq->out_packet_last = NULL; } + mosq->out_packet_count--; } pthread_mutex_unlock(&mosq->out_packet_mutex); diff --git a/lib/mosquitto.c b/lib/mosquitto.c index 0d53f2e5..8c0dbeeb 100644 --- a/lib/mosquitto.c +++ b/lib/mosquitto.c @@ -169,6 +169,7 @@ int mosquitto_reinitialise(struct mosquitto *mosq, const char *id, bool clean_st mosq->in_packet.payload = NULL; packet__cleanup(&mosq->in_packet); mosq->out_packet = NULL; + mosq->out_packet_count = 0; mosq->current_out_packet = NULL; mosq->last_msg_in = mosquitto_time(); mosq->next_msg_out = mosquitto_time() + mosq->keepalive; diff --git a/lib/mosquitto_internal.h b/lib/mosquitto_internal.h index 02008af1..1651c860 100644 --- a/lib/mosquitto_internal.h +++ b/lib/mosquitto_internal.h @@ -237,6 +237,7 @@ struct mosquitto { struct mosquitto__alias *aliases; struct will_delay_list *will_delay_entry; int alias_count; + int out_packet_count; uint32_t will_delay_interval; time_t will_delay_time; #ifdef WITH_TLS diff --git a/lib/packet_mosq.c b/lib/packet_mosq.c index 91c82d7e..d9647182 100644 --- a/lib/packet_mosq.c +++ b/lib/packet_mosq.c @@ -122,6 +122,7 @@ void packet__cleanup_all_no_locks(struct mosquitto *mosq) packet__cleanup(packet); mosquitto__free(packet); } + mosq->out_packet_count = 0; packet__cleanup(&mosq->in_packet); } @@ -157,6 +158,7 @@ int packet__queue(struct mosquitto *mosq, struct mosquitto__packet *packet) mosq->out_packet = packet; } mosq->out_packet_last = packet; + mosq->out_packet_count++; pthread_mutex_unlock(&mosq->out_packet_mutex); #ifdef WITH_BROKER # ifdef WITH_WEBSOCKETS @@ -223,6 +225,7 @@ int packet__write(struct mosquitto *mosq) if(!mosq->out_packet){ mosq->out_packet_last = NULL; } + mosq->out_packet_count--; } pthread_mutex_unlock(&mosq->out_packet_mutex); @@ -312,6 +315,7 @@ int packet__write(struct mosquitto *mosq) if(!mosq->out_packet){ mosq->out_packet_last = NULL; } + mosq->out_packet_count--; } pthread_mutex_unlock(&mosq->out_packet_mutex); diff --git a/src/bridge.c b/src/bridge.c index 961b6a72..3fc0dce8 100644 --- a/src/bridge.c +++ b/src/bridge.c @@ -641,6 +641,7 @@ void bridge__packet_cleanup(struct mosquitto *context) } context->out_packet = NULL; context->out_packet_last = NULL; + context->out_packet_count = 0; packet__cleanup(&(context->in_packet)); } diff --git a/src/context.c b/src/context.c index d0f4d218..fc686920 100644 --- a/src/context.c +++ b/src/context.c @@ -69,6 +69,7 @@ struct mosquitto *context__init(mosq_sock_t sock) packet__cleanup(&context->in_packet); context->out_packet = NULL; context->current_out_packet = NULL; + context->out_packet_count = 0; context->address = NULL; if((int)sock >= 0){ @@ -158,6 +159,7 @@ void context__cleanup(struct mosquitto *context, bool force_free) context->out_packet = context->out_packet->next; mosquitto__free(packet); } + context->out_packet_count = 0; #if defined(WITH_BROKER) && defined(__GLIBC__) && defined(WITH_ADNS) if(context->adns){ gai_cancel(context->adns); diff --git a/src/database.c b/src/database.c index fc2c123d..4dfceb26 100644 --- a/src/database.c +++ b/src/database.c @@ -35,11 +35,18 @@ Contributors: * @param qos qos for the packet of interest * @return true if more in flight are allowed. */ -bool db__ready_for_flight(struct mosquitto_msg_data *msgs, int qos) +bool db__ready_for_flight(struct mosquitto *context, enum mosquitto_msg_direction dir, int qos) { + struct mosquitto_msg_data *msgs; bool valid_bytes; bool valid_count; + if(dir == mosq_md_out){ + msgs = &context->msgs_out; + }else{ + msgs = &context->msgs_in; + } + if(msgs->inflight_maximum == 0 && db.config->max_inflight_bytes == 0){ return true; } @@ -54,7 +61,11 @@ bool db__ready_for_flight(struct mosquitto_msg_data *msgs, int qos) return true; } valid_bytes = ((msgs->msg_bytes - (ssize_t)db.config->max_inflight_bytes) < (ssize_t)db.config->max_queued_bytes); - valid_count = msgs->msg_count - msgs->inflight_maximum < db.config->max_queued_messages; + if(dir == mosq_md_out){ + valid_count = context->out_packet_count < db.config->max_queued_messages; + }else{ + valid_count = msgs->msg_count - msgs->inflight_maximum < db.config->max_queued_messages; + } if(db.config->max_queued_messages == 0){ return valid_bytes; @@ -429,7 +440,7 @@ int db__message_insert(struct mosquitto *context, uint16_t mid, enum mosquitto_m } if(context->sock != INVALID_SOCKET){ - if(db__ready_for_flight(msg_data, qos)){ + if(db__ready_for_flight(context, dir, qos)){ if(dir == mosq_md_out){ switch(qos){ case 0: @@ -799,7 +810,7 @@ static int db__message_reconnect_reset_outgoing(struct mosquitto *context) context->msgs_out.msg_count12++; context->msgs_out.msg_bytes12 += msg->store->payloadlen; } - if(db__ready_for_flight(&context->msgs_out, msg->qos)){ + if(db__ready_for_flight(context, mosq_md_out, msg->qos)){ switch(msg->qos){ case 0: msg->state = mosq_ms_publish_qos0; @@ -862,7 +873,7 @@ static int db__message_reconnect_reset_incoming(struct mosquitto *context) context->msgs_in.msg_count12++; context->msgs_in.msg_bytes12 += msg->store->payloadlen; } - if(db__ready_for_flight(&context->msgs_in, msg->qos)){ + if(db__ready_for_flight(context, mosq_md_in, msg->qos)){ switch(msg->qos){ case 0: msg->state = mosq_ms_publish_qos0; diff --git a/src/handle_publish.c b/src/handle_publish.c index 4686d350..0656536d 100644 --- a/src/handle_publish.c +++ b/src/handle_publish.c @@ -299,7 +299,7 @@ int handle__publish(struct mosquitto *context) if(!stored){ if(msg->qos == 0 - || db__ready_for_flight(&context->msgs_in, msg->qos) + || db__ready_for_flight(context, mosq_md_in, msg->qos) || db__ready_for_queue(context, msg->qos, &context->msgs_in)){ dup = 0; diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index c66126c8..22fc0677 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -659,7 +659,7 @@ void db__msg_store_clean(void); void db__msg_store_compact(void); void db__msg_store_free(struct mosquitto_msg_store *store); int db__message_reconnect_reset(struct mosquitto *context); -bool db__ready_for_flight(struct mosquitto_msg_data *msgs, int qos); +bool db__ready_for_flight(struct mosquitto *context, enum mosquitto_msg_direction dir, int qos); bool db__ready_for_queue(struct mosquitto *context, int qos, struct mosquitto_msg_data *msg_data); void sys_tree__init(void); void sys_tree__update(int interval, time_t start_time); diff --git a/src/websockets.c b/src/websockets.c index f03d8107..74e36d31 100644 --- a/src/websockets.c +++ b/src/websockets.c @@ -217,6 +217,7 @@ static int callback_mqtt( if(!mosq->out_packet){ mosq->out_packet_last = NULL; } + mosq->out_packet_count--; } while(mosq->current_out_packet && !lws_send_pipe_choked(mosq->wsi)){ @@ -272,6 +273,7 @@ static int callback_mqtt( if(!mosq->out_packet){ mosq->out_packet_last = NULL; } + mosq->out_packet_count--; } packet__cleanup(packet);