Fix client library keepalive handling.

Fix the case where a message received just before the keepalive timer
expired would cause the client to miss the keepalive timer.

Thanks to Graham Benton.
This commit is contained in:
Roger A. Light 2016-02-28 17:24:43 +00:00
parent 5cca6b4239
commit af995d211d
7 changed files with 36 additions and 27 deletions

View File

@ -1,4 +1,12 @@
1.4.8 - 20150214
1.4.9 - 20160xxx
================
Client library:
- Fix the case where a message received just before the keepalive timer
expired would cause the client to miss the keepalive timer.
1.4.8 - 20160214
================
Broker:

View File

@ -174,7 +174,7 @@ int mosquitto_reinitialise(struct mosquitto *mosq, const char *id, bool clean_se
mosq->out_packet = NULL;
mosq->current_out_packet = NULL;
mosq->last_msg_in = mosquitto_time();
mosq->last_msg_out = mosquitto_time();
mosq->next_msg_out = mosquitto_time() + mosq->keepalive;
mosq->ping_t = 0;
mosq->last_mid = 0;
mosq->state = mosq_cs_new;
@ -489,7 +489,7 @@ static int _mosquitto_reconnect(struct mosquitto *mosq, bool blocking)
pthread_mutex_lock(&mosq->msgtime_mutex);
mosq->last_msg_in = mosquitto_time();
mosq->last_msg_out = mosquitto_time();
mosq->next_msg_out = mosq->last_msg_in + mosq->keepalive;
pthread_mutex_unlock(&mosq->msgtime_mutex);
mosq->ping_t = 0;
@ -841,6 +841,7 @@ int mosquitto_loop(struct mosquitto *mosq, int timeout, int max_packets)
int rc;
char pairbuf;
int maxfd = 0;
time_t now;
if(!mosq || max_packets < 1) return MOSQ_ERR_INVAL;
#ifndef WIN32
@ -902,22 +903,22 @@ int mosquitto_loop(struct mosquitto *mosq, int timeout, int max_packets)
}
}
if(timeout >= 0){
local_timeout.tv_sec = timeout/1000;
#ifdef HAVE_PSELECT
local_timeout.tv_nsec = (timeout-local_timeout.tv_sec*1000)*1e6;
#else
local_timeout.tv_usec = (timeout-local_timeout.tv_sec*1000)*1000;
#endif
}else{
local_timeout.tv_sec = 1;
#ifdef HAVE_PSELECT
local_timeout.tv_nsec = 0;
#else
local_timeout.tv_usec = 0;
#endif
if(timeout < 0){
timeout = 1000;
}
now = mosquitto_time();
if(mosq->next_msg_out && now + timeout/1000 > mosq->next_msg_out){
timeout = (mosq->next_msg_out - now)*1000;
}
local_timeout.tv_sec = timeout/1000;
#ifdef HAVE_PSELECT
local_timeout.tv_nsec = (timeout-local_timeout.tv_sec*1000)*1e6;
#else
local_timeout.tv_usec = (timeout-local_timeout.tv_sec*1000)*1000;
#endif
#ifdef HAVE_PSELECT
fdcount = pselect(maxfd+1, &readfds, &writefds, NULL, &local_timeout, NULL);
#else

View File

@ -155,7 +155,7 @@ struct mosquitto {
uint16_t last_mid;
enum mosquitto_client_state state;
time_t last_msg_in;
time_t last_msg_out;
time_t next_msg_out;
time_t ping_t;
struct _mosquitto_packet in_packet;
struct _mosquitto_packet *current_out_packet;

View File

@ -841,7 +841,7 @@ int _mosquitto_packet_write(struct mosquitto *mosq)
_mosquitto_free(packet);
pthread_mutex_lock(&mosq->msgtime_mutex);
mosq->last_msg_out = mosquitto_time();
mosq->next_msg_out = mosquitto_time() + mosq->keepalive;
pthread_mutex_unlock(&mosq->msgtime_mutex);
/* End of duplicate, possibly unnecessary code */
@ -872,7 +872,7 @@ int _mosquitto_packet_write(struct mosquitto *mosq)
_mosquitto_free(packet);
pthread_mutex_lock(&mosq->msgtime_mutex);
mosq->last_msg_out = mosquitto_time();
mosq->next_msg_out = mosquitto_time() + mosq->keepalive;
pthread_mutex_unlock(&mosq->msgtime_mutex);
}
pthread_mutex_unlock(&mosq->current_out_packet_mutex);

View File

@ -83,7 +83,7 @@ void _mosquitto_check_keepalive(struct mosquitto_db *db, struct mosquitto *mosq)
void _mosquitto_check_keepalive(struct mosquitto *mosq)
#endif
{
time_t last_msg_out;
time_t next_msg_out;
time_t last_msg_in;
time_t now = mosquitto_time();
#ifndef WITH_BROKER
@ -95,7 +95,7 @@ void _mosquitto_check_keepalive(struct mosquitto *mosq)
/* Check if a lazy bridge should be timed out due to idle. */
if(mosq->bridge && mosq->bridge->start_type == bst_lazy
&& mosq->sock != INVALID_SOCKET
&& now - mosq->last_msg_out >= mosq->bridge->idle_timeout){
&& now - mosq->next_msg_out - mosq->keepalive >= mosq->bridge->idle_timeout){
_mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Bridge connection %s has exceeded idle timeout, disconnecting.", mosq->id);
_mosquitto_socket_close(db, mosq);
@ -103,18 +103,18 @@ void _mosquitto_check_keepalive(struct mosquitto *mosq)
}
#endif
pthread_mutex_lock(&mosq->msgtime_mutex);
last_msg_out = mosq->last_msg_out;
next_msg_out = mosq->next_msg_out;
last_msg_in = mosq->last_msg_in;
pthread_mutex_unlock(&mosq->msgtime_mutex);
if(mosq->keepalive && mosq->sock != INVALID_SOCKET &&
(now - last_msg_out >= mosq->keepalive || now - last_msg_in >= mosq->keepalive)){
(now >= next_msg_out || now - last_msg_in >= mosq->keepalive)){
if(mosq->state == mosq_cs_connected && mosq->ping_t == 0){
_mosquitto_send_pingreq(mosq);
/* Reset last msg times to give the server time to send a pingresp */
pthread_mutex_lock(&mosq->msgtime_mutex);
mosq->last_msg_in = now;
mosq->last_msg_out = now;
mosq->next_msg_out = now + mosq->keepalive;
pthread_mutex_unlock(&mosq->msgtime_mutex);
}else{
#ifdef WITH_BROKER

View File

@ -147,7 +147,7 @@ int mqtt3_bridge_connect(struct mosquitto_db *db, struct mosquitto *context)
context->state = mosq_cs_new;
context->sock = INVALID_SOCKET;
context->last_msg_in = mosquitto_time();
context->last_msg_out = mosquitto_time();
context->next_msg_out = mosquitto_time() + context->bridge->keepalive;
context->keepalive = context->bridge->keepalive;
context->clean_session = context->bridge->clean_session;
context->in_packet.payload = NULL;

View File

@ -36,7 +36,7 @@ struct mosquitto *mqtt3_context_init(struct mosquitto_db *db, mosq_sock_t sock)
context->state = mosq_cs_new;
context->sock = sock;
context->last_msg_in = mosquitto_time();
context->last_msg_out = mosquitto_time();
context->next_msg_out = mosquitto_time() + 60;
context->keepalive = 60; /* Default to 60s */
context->clean_session = true;
context->disconnect_t = 0;