fix: replace sleep with (p)select in loop_forever
sleep was blocking loop_stop(force=false) since it was uniteruptible Signed-off-by: Christian Schneider <cschneider@radiodata.biz>
This commit is contained in:
parent
c8789180f3
commit
a3ebeff9d7
@ -78,20 +78,6 @@ static int mosquitto__connect_init(struct mosquitto *mosq, const char *host, int
|
|||||||
mosq->msgs_in.inflight_quota = mosq->msgs_in.inflight_maximum;
|
mosq->msgs_in.inflight_quota = mosq->msgs_in.inflight_maximum;
|
||||||
mosq->msgs_out.inflight_quota = mosq->msgs_out.inflight_maximum;
|
mosq->msgs_out.inflight_quota = mosq->msgs_out.inflight_maximum;
|
||||||
|
|
||||||
if(mosq->sockpairR != INVALID_SOCKET){
|
|
||||||
COMPAT_CLOSE(mosq->sockpairR);
|
|
||||||
mosq->sockpairR = INVALID_SOCKET;
|
|
||||||
}
|
|
||||||
if(mosq->sockpairW != INVALID_SOCKET){
|
|
||||||
COMPAT_CLOSE(mosq->sockpairW);
|
|
||||||
mosq->sockpairW = INVALID_SOCKET;
|
|
||||||
}
|
|
||||||
|
|
||||||
if(net__socketpair(&mosq->sockpairR, &mosq->sockpairW)){
|
|
||||||
log__printf(mosq, MOSQ_LOG_WARNING,
|
|
||||||
"Warning: Unable to open socket pair, outgoing publish commands may be delayed.");
|
|
||||||
}
|
|
||||||
|
|
||||||
return MOSQ_ERR_SUCCESS;
|
return MOSQ_ERR_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
50
lib/loop.c
50
lib/loop.c
@ -193,6 +193,15 @@ int mosquitto_loop(struct mosquitto *mosq, int timeout, int max_packets)
|
|||||||
|
|
||||||
int mosquitto_loop_forever(struct mosquitto *mosq, int timeout, int max_packets)
|
int mosquitto_loop_forever(struct mosquitto *mosq, int timeout, int max_packets)
|
||||||
{
|
{
|
||||||
|
#ifdef HAVE_PSELECT
|
||||||
|
struct timespec local_timeout;
|
||||||
|
#else
|
||||||
|
struct timeval local_timeout;
|
||||||
|
#endif
|
||||||
|
fd_set readfds;
|
||||||
|
int fdcount;
|
||||||
|
char pairbuf;
|
||||||
|
int maxfd = 0;
|
||||||
int run = 1;
|
int run = 1;
|
||||||
int rc;
|
int rc;
|
||||||
unsigned long reconnect_delay;
|
unsigned long reconnect_delay;
|
||||||
@ -252,15 +261,42 @@ int mosquitto_loop_forever(struct mosquitto *mosq, int timeout, int max_packets)
|
|||||||
mosq->reconnects++;
|
mosq->reconnects++;
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef WIN32
|
local_timeout.tv_sec = reconnect_delay;
|
||||||
Sleep(reconnect_delay*1000);
|
#ifdef HAVE_PSELECT
|
||||||
|
local_timeout.tv_nsec = 0;
|
||||||
#else
|
#else
|
||||||
req.tv_sec = reconnect_delay;
|
local_timeout.tv_usec = 0;
|
||||||
req.tv_nsec = 0;
|
|
||||||
while(nanosleep(&req, &rem) == -1 && errno == EINTR){
|
|
||||||
req = rem;
|
|
||||||
}
|
|
||||||
#endif
|
#endif
|
||||||
|
FD_ZERO(&readfds);
|
||||||
|
maxfd = 0;
|
||||||
|
if(mosq->sockpairR != INVALID_SOCKET){
|
||||||
|
/* sockpairR is used to break out of select() before the
|
||||||
|
* timeout, when mosquitto_loop_stop() is called */
|
||||||
|
FD_SET(mosq->sockpairR, &readfds);
|
||||||
|
maxfd = mosq->sockpairR;
|
||||||
|
}
|
||||||
|
#ifdef HAVE_PSELECT
|
||||||
|
fdcount = pselect(maxfd+1, &readfds, NULL, NULL, &local_timeout, NULL);
|
||||||
|
#else
|
||||||
|
fdcount = select(maxfd+1, &readfds, NULL, NULL, &local_timeout);
|
||||||
|
#endif
|
||||||
|
if(fdcount == -1){
|
||||||
|
#ifdef WIN32
|
||||||
|
errno = WSAGetLastError();
|
||||||
|
#endif
|
||||||
|
if(errno == EINTR){
|
||||||
|
return MOSQ_ERR_SUCCESS;
|
||||||
|
}else{
|
||||||
|
return MOSQ_ERR_ERRNO;
|
||||||
|
}
|
||||||
|
}else if(mosq->sockpairR != INVALID_SOCKET && FD_ISSET(mosq->sockpairR, &readfds)){
|
||||||
|
#ifndef WIN32
|
||||||
|
if(read(mosq->sockpairR, &pairbuf, 1) == 0){
|
||||||
|
}
|
||||||
|
#else
|
||||||
|
recv(mosq->sockpairR, &pairbuf, 1, 0);
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
state = mosquitto__get_state(mosq);
|
state = mosquitto__get_state(mosq);
|
||||||
if(state == mosq_cs_disconnecting || state == mosq_cs_disconnected){
|
if(state == mosq_cs_disconnecting || state == mosq_cs_disconnected){
|
||||||
|
@ -92,8 +92,10 @@ struct mosquitto *mosquitto_new(const char *id, bool clean_start, void *userdata
|
|||||||
mosq = (struct mosquitto *)mosquitto__calloc(1, sizeof(struct mosquitto));
|
mosq = (struct mosquitto *)mosquitto__calloc(1, sizeof(struct mosquitto));
|
||||||
if(mosq){
|
if(mosq){
|
||||||
mosq->sock = INVALID_SOCKET;
|
mosq->sock = INVALID_SOCKET;
|
||||||
mosq->sockpairR = INVALID_SOCKET;
|
if(net__socketpair(&mosq->sockpairR, &mosq->sockpairW)){
|
||||||
mosq->sockpairW = INVALID_SOCKET;
|
log__printf(mosq, MOSQ_LOG_WARNING,
|
||||||
|
"Warning: Unable to open socket pair, outgoing publish commands may be delayed.");
|
||||||
|
}
|
||||||
#ifdef WITH_THREADING
|
#ifdef WITH_THREADING
|
||||||
mosq->thread_id = pthread_self();
|
mosq->thread_id = pthread_self();
|
||||||
#endif
|
#endif
|
||||||
@ -131,8 +133,10 @@ int mosquitto_reinitialise(struct mosquitto *mosq, const char *id, bool clean_st
|
|||||||
}
|
}
|
||||||
mosq->protocol = mosq_p_mqtt311;
|
mosq->protocol = mosq_p_mqtt311;
|
||||||
mosq->sock = INVALID_SOCKET;
|
mosq->sock = INVALID_SOCKET;
|
||||||
mosq->sockpairR = INVALID_SOCKET;
|
if(net__socketpair(&mosq->sockpairR, &mosq->sockpairW)){
|
||||||
mosq->sockpairW = INVALID_SOCKET;
|
log__printf(mosq, MOSQ_LOG_WARNING,
|
||||||
|
"Warning: Unable to open socket pair, outgoing publish commands may be delayed.");
|
||||||
|
}
|
||||||
mosq->keepalive = 60;
|
mosq->keepalive = 60;
|
||||||
mosq->clean_start = clean_start;
|
mosq->clean_start = clean_start;
|
||||||
if(id){
|
if(id){
|
||||||
|
@ -1079,6 +1079,9 @@ int net__socketpair(mosq_sock_t *pairR, mosq_sock_t *pairW)
|
|||||||
#else
|
#else
|
||||||
int sv[2];
|
int sv[2];
|
||||||
|
|
||||||
|
*pairR = INVALID_SOCKET;
|
||||||
|
*pairW = INVALID_SOCKET;
|
||||||
|
|
||||||
if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1){
|
if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1){
|
||||||
return MOSQ_ERR_ERRNO;
|
return MOSQ_ERR_ERRNO;
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user