diff --git a/ChangeLog.txt b/ChangeLog.txt index d3d3da02..d94258c6 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -10,6 +10,8 @@ Broker: - Fix support for openssl 3.0. - Fix check when loading persistence file of a different version than the native version. Closes #1684. +- Fix possible assert crash associated with bridge reconnecting when compiled + without epoll support. Closes #1700. Client library: - Don't treat an unexpected PUBACK, PUBREL, or PUBCOMP as a fatal error. diff --git a/src/loop.c b/src/loop.c index 36fa3114..823ffa6f 100644 --- a/src/loop.c +++ b/src/loop.c @@ -222,114 +222,6 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li } #endif - time_count = 0; - HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){ - if(time_count > 0){ - time_count--; - }else{ - time_count = 1000; - now = mosquitto_time(); - } - context->pollfd_index = -1; - - if(context->sock != INVALID_SOCKET){ -#ifdef WITH_BRIDGE - if(context->bridge){ - mosquitto__check_keepalive(db, context); - if(context->bridge->round_robin == false - && context->bridge->cur_address != 0 - && context->bridge->primary_retry - && now > context->bridge->primary_retry){ - - if(context->bridge->primary_retry_sock == INVALID_SOCKET){ - rc = net__try_connect(context->bridge->addresses[0].address, - context->bridge->addresses[0].port, - &context->bridge->primary_retry_sock, NULL, false); - - if(rc == 0){ - COMPAT_CLOSE(context->bridge->primary_retry_sock); - context->bridge->primary_retry_sock = INVALID_SOCKET; - context->bridge->primary_retry = 0; - net__socket_close(db, context); - context->bridge->cur_address = 0; - } - }else{ - len = sizeof(int); - if(!getsockopt(context->bridge->primary_retry_sock, SOL_SOCKET, SO_ERROR, (char *)&err, &len)){ - if(err == 0){ - COMPAT_CLOSE(context->bridge->primary_retry_sock); - context->bridge->primary_retry_sock = INVALID_SOCKET; - context->bridge->primary_retry = 0; - net__socket_close(db, context); - context->bridge->cur_address = context->bridge->address_count-1; - }else{ - COMPAT_CLOSE(context->bridge->primary_retry_sock); - context->bridge->primary_retry_sock = INVALID_SOCKET; - context->bridge->primary_retry = now+5; - } - }else{ - COMPAT_CLOSE(context->bridge->primary_retry_sock); - context->bridge->primary_retry_sock = INVALID_SOCKET; - context->bridge->primary_retry = now+5; - } - } - } - } -#endif - - /* Local bridges never time out in this fashion. */ - if(!(context->keepalive) - || context->bridge - || now - context->last_msg_in <= (time_t)(context->keepalive)*3/2){ - - if(db__message_write(db, context) == MOSQ_ERR_SUCCESS){ -#ifdef WITH_EPOLL - if(context->current_out_packet || context->state == mosq_cs_connect_pending || context->ws_want_write){ - if(!(context->events & EPOLLOUT)) { - ev.data.fd = context->sock; - ev.events = EPOLLIN | EPOLLOUT; - if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) { - if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) { - log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering to EPOLLOUT: %s", strerror(errno)); - } - } - context->events = EPOLLIN | EPOLLOUT; - } - context->ws_want_write = false; - } - else{ - if(context->events & EPOLLOUT) { - ev.data.fd = context->sock; - ev.events = EPOLLIN; - if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) { - if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) { - log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering to EPOLLIN: %s", strerror(errno)); - } - } - context->events = EPOLLIN; - } - } -#else - pollfds[pollfd_index].fd = context->sock; - pollfds[pollfd_index].events = POLLIN; - pollfds[pollfd_index].revents = 0; - if(context->current_out_packet || context->state == mosq_cs_connect_pending || context->ws_want_write){ - pollfds[pollfd_index].events |= POLLOUT; - context->ws_want_write = false; - } - context->pollfd_index = pollfd_index; - pollfd_index++; -#endif - }else{ - do_disconnect(db, context, MOSQ_ERR_CONN_LOST); - } - }else{ - /* Client has exceeded keepalive*1.5 */ - do_disconnect(db, context, MOSQ_ERR_KEEPALIVE); - } - } - } - #ifdef WITH_BRIDGE time_count = 0; for(i=0; ibridge_count; i++){ @@ -466,6 +358,115 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li } #endif + time_count = 0; + HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){ + if(time_count > 0){ + time_count--; + }else{ + time_count = 1000; + now = mosquitto_time(); + } + context->pollfd_index = -1; + + if(context->sock != INVALID_SOCKET){ +#ifdef WITH_BRIDGE + if(context->bridge){ + mosquitto__check_keepalive(db, context); + if(context->bridge->round_robin == false + && context->bridge->cur_address != 0 + && context->bridge->primary_retry + && now > context->bridge->primary_retry){ + + if(context->bridge->primary_retry_sock == INVALID_SOCKET){ + rc = net__try_connect(context->bridge->addresses[0].address, + context->bridge->addresses[0].port, + &context->bridge->primary_retry_sock, NULL, false); + + if(rc == 0){ + COMPAT_CLOSE(context->bridge->primary_retry_sock); + context->bridge->primary_retry_sock = INVALID_SOCKET; + context->bridge->primary_retry = 0; + net__socket_close(db, context); + context->bridge->cur_address = 0; + } + }else{ + len = sizeof(int); + if(!getsockopt(context->bridge->primary_retry_sock, SOL_SOCKET, SO_ERROR, (char *)&err, &len)){ + if(err == 0){ + COMPAT_CLOSE(context->bridge->primary_retry_sock); + context->bridge->primary_retry_sock = INVALID_SOCKET; + context->bridge->primary_retry = 0; + net__socket_close(db, context); + context->bridge->cur_address = context->bridge->address_count-1; + }else{ + COMPAT_CLOSE(context->bridge->primary_retry_sock); + context->bridge->primary_retry_sock = INVALID_SOCKET; + context->bridge->primary_retry = now+5; + } + }else{ + COMPAT_CLOSE(context->bridge->primary_retry_sock); + context->bridge->primary_retry_sock = INVALID_SOCKET; + context->bridge->primary_retry = now+5; + } + } + } + } +#endif + + /* Local bridges never time out in this fashion. */ + if(!(context->keepalive) + || context->bridge + || now - context->last_msg_in <= (time_t)(context->keepalive)*3/2){ + + if(db__message_write(db, context) == MOSQ_ERR_SUCCESS){ +#ifdef WITH_EPOLL + if(context->current_out_packet || context->state == mosq_cs_connect_pending || context->ws_want_write){ + if(!(context->events & EPOLLOUT)) { + ev.data.fd = context->sock; + ev.events = EPOLLIN | EPOLLOUT; + if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) { + if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) { + log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering to EPOLLOUT: %s", strerror(errno)); + } + } + context->events = EPOLLIN | EPOLLOUT; + } + context->ws_want_write = false; + } + else{ + if(context->events & EPOLLOUT) { + ev.data.fd = context->sock; + ev.events = EPOLLIN; + if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) { + if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) { + log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering to EPOLLIN: %s", strerror(errno)); + } + } + context->events = EPOLLIN; + } + } +#else + pollfds[pollfd_index].fd = context->sock; + pollfds[pollfd_index].events = POLLIN; + pollfds[pollfd_index].revents = 0; + if(context->current_out_packet || context->state == mosq_cs_connect_pending || context->ws_want_write){ + pollfds[pollfd_index].events |= POLLOUT; + context->ws_want_write = false; + } + context->pollfd_index = pollfd_index; + pollfd_index++; +#endif + }else{ + do_disconnect(db, context, MOSQ_ERR_CONN_LOST); + } + }else{ + /* Client has exceeded keepalive*1.5 */ + do_disconnect(db, context, MOSQ_ERR_KEEPALIVE); + } + } + } + + #ifndef WIN32 sigprocmask(SIG_SETMASK, &sigblock, &origsig); #ifdef WITH_EPOLL