Improve QoS 0 outgoing packet queueing.

This commit is contained in:
Roger A. Light 2021-05-18 15:35:10 +01:00
parent c317891df1
commit 16fb0025a7
11 changed files with 31 additions and 7 deletions

View File

@ -7,6 +7,7 @@ Broker:
reconnected to the broker. Closes #2167.
- Fix bridge not reconnectng if the first reconnection attempt fails.
Closes #2207.
- Improve QoS 0 outgoing packet queueing.
Clients:
- If sending mosquitto_sub output to a pipe, mosquitto_sub will now detect

View File

@ -276,6 +276,7 @@ void do_client_disconnect(struct mosquitto *mosq, int reason_code, const mosquit
if(!mosq->out_packet){
mosq->out_packet_last = NULL;
}
mosq->out_packet_count--;
}
pthread_mutex_unlock(&mosq->out_packet_mutex);

View File

@ -169,6 +169,7 @@ int mosquitto_reinitialise(struct mosquitto *mosq, const char *id, bool clean_st
mosq->in_packet.payload = NULL;
packet__cleanup(&mosq->in_packet);
mosq->out_packet = NULL;
mosq->out_packet_count = 0;
mosq->current_out_packet = NULL;
mosq->last_msg_in = mosquitto_time();
mosq->next_msg_out = mosquitto_time() + mosq->keepalive;

View File

@ -237,6 +237,7 @@ struct mosquitto {
struct mosquitto__alias *aliases;
struct will_delay_list *will_delay_entry;
int alias_count;
int out_packet_count;
uint32_t will_delay_interval;
time_t will_delay_time;
#ifdef WITH_TLS

View File

@ -122,6 +122,7 @@ void packet__cleanup_all_no_locks(struct mosquitto *mosq)
packet__cleanup(packet);
mosquitto__free(packet);
}
mosq->out_packet_count = 0;
packet__cleanup(&mosq->in_packet);
}
@ -157,6 +158,7 @@ int packet__queue(struct mosquitto *mosq, struct mosquitto__packet *packet)
mosq->out_packet = packet;
}
mosq->out_packet_last = packet;
mosq->out_packet_count++;
pthread_mutex_unlock(&mosq->out_packet_mutex);
#ifdef WITH_BROKER
# ifdef WITH_WEBSOCKETS
@ -223,6 +225,7 @@ int packet__write(struct mosquitto *mosq)
if(!mosq->out_packet){
mosq->out_packet_last = NULL;
}
mosq->out_packet_count--;
}
pthread_mutex_unlock(&mosq->out_packet_mutex);
@ -312,6 +315,7 @@ int packet__write(struct mosquitto *mosq)
if(!mosq->out_packet){
mosq->out_packet_last = NULL;
}
mosq->out_packet_count--;
}
pthread_mutex_unlock(&mosq->out_packet_mutex);

View File

@ -641,6 +641,7 @@ void bridge__packet_cleanup(struct mosquitto *context)
}
context->out_packet = NULL;
context->out_packet_last = NULL;
context->out_packet_count = 0;
packet__cleanup(&(context->in_packet));
}

View File

@ -69,6 +69,7 @@ struct mosquitto *context__init(mosq_sock_t sock)
packet__cleanup(&context->in_packet);
context->out_packet = NULL;
context->current_out_packet = NULL;
context->out_packet_count = 0;
context->address = NULL;
if((int)sock >= 0){
@ -158,6 +159,7 @@ void context__cleanup(struct mosquitto *context, bool force_free)
context->out_packet = context->out_packet->next;
mosquitto__free(packet);
}
context->out_packet_count = 0;
#if defined(WITH_BROKER) && defined(__GLIBC__) && defined(WITH_ADNS)
if(context->adns){
gai_cancel(context->adns);

View File

@ -35,11 +35,18 @@ Contributors:
* @param qos qos for the packet of interest
* @return true if more in flight are allowed.
*/
bool db__ready_for_flight(struct mosquitto_msg_data *msgs, int qos)
bool db__ready_for_flight(struct mosquitto *context, enum mosquitto_msg_direction dir, int qos)
{
struct mosquitto_msg_data *msgs;
bool valid_bytes;
bool valid_count;
if(dir == mosq_md_out){
msgs = &context->msgs_out;
}else{
msgs = &context->msgs_in;
}
if(msgs->inflight_maximum == 0 && db.config->max_inflight_bytes == 0){
return true;
}
@ -54,7 +61,11 @@ bool db__ready_for_flight(struct mosquitto_msg_data *msgs, int qos)
return true;
}
valid_bytes = ((msgs->msg_bytes - (ssize_t)db.config->max_inflight_bytes) < (ssize_t)db.config->max_queued_bytes);
valid_count = msgs->msg_count - msgs->inflight_maximum < db.config->max_queued_messages;
if(dir == mosq_md_out){
valid_count = context->out_packet_count < db.config->max_queued_messages;
}else{
valid_count = msgs->msg_count - msgs->inflight_maximum < db.config->max_queued_messages;
}
if(db.config->max_queued_messages == 0){
return valid_bytes;
@ -429,7 +440,7 @@ int db__message_insert(struct mosquitto *context, uint16_t mid, enum mosquitto_m
}
if(context->sock != INVALID_SOCKET){
if(db__ready_for_flight(msg_data, qos)){
if(db__ready_for_flight(context, dir, qos)){
if(dir == mosq_md_out){
switch(qos){
case 0:
@ -799,7 +810,7 @@ static int db__message_reconnect_reset_outgoing(struct mosquitto *context)
context->msgs_out.msg_count12++;
context->msgs_out.msg_bytes12 += msg->store->payloadlen;
}
if(db__ready_for_flight(&context->msgs_out, msg->qos)){
if(db__ready_for_flight(context, mosq_md_out, msg->qos)){
switch(msg->qos){
case 0:
msg->state = mosq_ms_publish_qos0;
@ -862,7 +873,7 @@ static int db__message_reconnect_reset_incoming(struct mosquitto *context)
context->msgs_in.msg_count12++;
context->msgs_in.msg_bytes12 += msg->store->payloadlen;
}
if(db__ready_for_flight(&context->msgs_in, msg->qos)){
if(db__ready_for_flight(context, mosq_md_in, msg->qos)){
switch(msg->qos){
case 0:
msg->state = mosq_ms_publish_qos0;

View File

@ -299,7 +299,7 @@ int handle__publish(struct mosquitto *context)
if(!stored){
if(msg->qos == 0
|| db__ready_for_flight(&context->msgs_in, msg->qos)
|| db__ready_for_flight(context, mosq_md_in, msg->qos)
|| db__ready_for_queue(context, msg->qos, &context->msgs_in)){
dup = 0;

View File

@ -659,7 +659,7 @@ void db__msg_store_clean(void);
void db__msg_store_compact(void);
void db__msg_store_free(struct mosquitto_msg_store *store);
int db__message_reconnect_reset(struct mosquitto *context);
bool db__ready_for_flight(struct mosquitto_msg_data *msgs, int qos);
bool db__ready_for_flight(struct mosquitto *context, enum mosquitto_msg_direction dir, int qos);
bool db__ready_for_queue(struct mosquitto *context, int qos, struct mosquitto_msg_data *msg_data);
void sys_tree__init(void);
void sys_tree__update(int interval, time_t start_time);

View File

@ -217,6 +217,7 @@ static int callback_mqtt(
if(!mosq->out_packet){
mosq->out_packet_last = NULL;
}
mosq->out_packet_count--;
}
while(mosq->current_out_packet && !lws_send_pipe_choked(mosq->wsi)){
@ -272,6 +273,7 @@ static int callback_mqtt(
if(!mosq->out_packet){
mosq->out_packet_last = NULL;
}
mosq->out_packet_count--;
}
packet__cleanup(packet);