diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 8095e1eb..202a7da1 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -29,6 +29,7 @@ set (MOSQ_SRCS mosquitto.c mosquitto_broker.h mosquitto_broker_internal.h ../lib/misc_mosq.c ../lib/misc_mosq.h + mux.c mux.h mux_epoll.c mux_poll.c net.c ../lib/net_mosq_ocsp.c ../lib/net_mosq.c ../lib/net_mosq.h ../lib/packet_datatypes.c diff --git a/src/Makefile b/src/Makefile index 205ecfb8..f5433ac7 100644 --- a/src/Makefile +++ b/src/Makefile @@ -33,6 +33,9 @@ OBJS= mosquitto.o \ loop.o \ memory_mosq.o \ misc_mosq.o \ + mux.o \ + mux_epoll.o \ + mux_poll.o \ net.o \ net_mosq.o \ net_mosq_ocsp.o \ @@ -153,6 +156,15 @@ memory_mosq.o : ../lib/memory_mosq.c ../lib/memory_mosq.h misc_mosq.o : ../lib/misc_mosq.c ../lib/misc_mosq.h ${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@ +mux.o : mux.c mosquitto_broker_internal.h + ${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@ + +mux_epoll.o : mux_epoll.c mosquitto_broker_internal.h + ${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@ + +mux_poll.o : mux_poll.c mosquitto_broker_internal.h + ${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@ + net.o : net.c mosquitto_broker_internal.h ${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@ diff --git a/src/bridge.c b/src/bridge.c index f0736b39..f1d08900 100644 --- a/src/bridge.c +++ b/src/bridge.c @@ -30,10 +30,6 @@ Contributors: #endif #ifndef WIN32 -#ifdef WITH_EPOLL -#include -#endif -#include #include #else #include @@ -520,20 +516,12 @@ int bridge__on_connect(struct mosquitto_db *db, struct mosquitto *context) int bridge__register_local_connections(struct mosquitto_db *db) { #ifdef WITH_EPOLL - struct epoll_event ev; struct mosquitto *context, *ctxt_tmp = NULL; - memset(&ev, 0, sizeof(struct epoll_event)); - HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){ if(context->bridge){ - ev.data.fd = context->sock; - ev.events = EPOLLIN; - context->events = EPOLLIN; - if (epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) { + if(mux__add_in(db, context)){ log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll initial registering bridge: %s", strerror(errno)); - (void)close(db->epollfd); - db->epollfd = 0; return MOSQ_ERR_UNKNOWN; } } @@ -639,19 +627,12 @@ static void bridge__backoff_reset(struct mosquitto *context) } } -#ifdef WITH_EPOLL void bridge_check(struct mosquitto_db *db) -#else -void bridge_check(struct mosquitto_db *db, struct pollfd *pollfds, int *pollfd_index) -#endif { static time_t last_check = 0; time_t now; struct mosquitto *context = NULL; socklen_t len; -#ifdef WITH_EPOLL - struct epoll_event ev; -#endif int i; int rc; int err; @@ -660,9 +641,6 @@ void bridge_check(struct mosquitto_db *db, struct pollfd *pollfds, int *pollfd_i if(now <= last_check) return; -#ifdef WITH_EPOLL - memset(&ev, 0, sizeof(struct epoll_event)); -#endif for(i=0; ibridge_count; i++){ if(!db->bridges[i]) continue; @@ -736,29 +714,10 @@ void bridge_check(struct mosquitto_db *db, struct pollfd *pollfds, int *pollfd_i }else if(rc == 0){ rc = bridge__connect_step2(db, context); if(rc == MOSQ_ERR_SUCCESS){ -#ifdef WITH_EPOLL - ev.data.fd = context->sock; - ev.events = EPOLLIN; + rc = mux__add_in(db, context); if(context->current_out_packet){ - ev.events |= EPOLLOUT; + rc = mux__add_out(db, context); } - if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) { - if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) { - log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering bridge: %s", strerror(errno)); - } - }else{ - context->events = ev.events; - } -#else - pollfds[*pollfd_index].fd = context->sock; - pollfds[*pollfd_index].events = POLLIN; - pollfds[*pollfd_index].revents = 0; - if(context->current_out_packet){ - pollfds[*pollfd_index].events |= POLLOUT; - } - context->pollfd_index = *pollfd_index; - (*pollfd_index)++; -#endif }else if(rc == MOSQ_ERR_CONN_PENDING){ context->bridge->restart_t = 0; }else{ @@ -778,10 +737,6 @@ void bridge_check(struct mosquitto_db *db, struct pollfd *pollfds, int *pollfd_i context->bridge->restart_t = 0; } }else{ -#ifdef WITH_EPOLL - /* clean any events triggered in previous connection */ - context->events = 0; -#endif rc = bridge__connect_step1(db, context); if(rc){ context->bridge->cur_address++; @@ -801,29 +756,10 @@ void bridge_check(struct mosquitto_db *db, struct pollfd *pollfds, int *pollfd_i if(context->bridge->round_robin == false && context->bridge->cur_address != 0){ context->bridge->primary_retry = now + 5; } -#ifdef WITH_EPOLL - ev.data.fd = context->sock; - ev.events = EPOLLIN; + rc = mux__add_in(db, context); if(context->current_out_packet){ - ev.events |= EPOLLOUT; + rc = mux__add_out(db, context); } - if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) { - if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) { - log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering bridge: %s", strerror(errno)); - } - }else{ - context->events = ev.events; - } -#else - pollfds[*pollfd_index].fd = context->sock; - pollfds[*pollfd_index].events = POLLIN; - pollfds[*pollfd_index].revents = 0; - if(context->current_out_packet){ - pollfds[*pollfd_index].events |= POLLOUT; - } - context->pollfd_index = *pollfd_index; - (*pollfd_index)++; -#endif }else{ context->bridge->cur_address++; if(context->bridge->cur_address == context->bridge->address_count){ diff --git a/src/loop.c b/src/loop.c index 256437d9..942fecae 100644 --- a/src/loop.c +++ b/src/loop.c @@ -23,11 +23,6 @@ Contributors: #include #ifndef WIN32 -#ifdef WITH_EPOLL -#include -#define MAX_EVENTS 1000 -#endif -#include #include #else #include @@ -63,12 +58,6 @@ extern bool flag_db_backup; extern bool flag_tree_print; extern int run; -#ifdef WITH_EPOLL -static void loop_handle_reads_writes(struct mosquitto_db *db, mosq_sock_t sock, uint32_t events); -#else -static void loop_handle_reads_writes(struct mosquitto_db *db, struct pollfd *pollfds); -#endif - #ifdef WITH_WEBSOCKETS static void temp__expire_websockets_clients(struct mosquitto_db *db) { @@ -119,74 +108,25 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li #endif time_t now = 0; int time_count; - int fdcount; struct mosquitto *context, *ctxt_tmp; -#ifndef WIN32 - sigset_t sigblock, origsig; -#endif - int i; -#ifdef WITH_EPOLL - int j; - struct epoll_event ev, events[MAX_EVENTS]; -#else - struct pollfd *pollfds = NULL; - int pollfd_index; - int pollfd_max; -#endif #ifdef WITH_BRIDGE int rc; #endif +#ifdef WITH_WEBSOCKETS + int i; +#endif #if defined(WITH_WEBSOCKETS) && LWS_LIBRARY_VERSION_NUMBER == 3002000 memset(&sul, 0, sizeof(struct lws_sorted_usec_list)); #endif -#ifndef WIN32 - sigemptyset(&sigblock); - sigaddset(&sigblock, SIGINT); - sigaddset(&sigblock, SIGTERM); - sigaddset(&sigblock, SIGUSR1); - sigaddset(&sigblock, SIGUSR2); - sigaddset(&sigblock, SIGHUP); -#endif + rc = mux__init(db, listensock, listensock_count); + if(rc) return rc; -#ifndef WITH_EPOLL -#ifdef WIN32 - pollfd_max = _getmaxstdio(); -#else - pollfd_max = sysconf(_SC_OPEN_MAX); -#endif - - pollfds = mosquitto__malloc(sizeof(struct pollfd)*pollfd_max); - if(!pollfds){ - log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); - return MOSQ_ERR_NOMEM; - } -#endif - -#ifdef WITH_EPOLL - db->epollfd = 0; - if ((db->epollfd = epoll_create(MAX_EVENTS)) == -1) { - log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll creating: %s", strerror(errno)); - return MOSQ_ERR_UNKNOWN; - } - memset(&ev, 0, sizeof(struct epoll_event)); - memset(&events, 0, sizeof(struct epoll_event)*MAX_EVENTS); - for(i=0; iepollfd, EPOLL_CTL_ADD, listensock[i], &ev) == -1) { - log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll initial registering: %s", strerror(errno)); - (void)close(db->epollfd); - db->epollfd = 0; - return MOSQ_ERR_UNKNOWN; - } - } -# ifdef WITH_BRIDGE +#ifdef WITH_BRIDGE rc = bridge__register_local_connections(db); if(rc) return rc; -# endif #endif while(run){ @@ -197,18 +137,6 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li } #endif -#ifndef WITH_EPOLL - memset(pollfds, -1, sizeof(struct pollfd)*pollfd_max); - - pollfd_index = 0; - for(i=0; icontexts_by_sock, context, ctxt_tmp){ if(time_count > 0){ @@ -217,7 +145,6 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li time_count = 1000; now = mosquitto_time(); } - context->pollfd_index = -1; if(context->sock != INVALID_SOCKET){ /* Local bridges never time out in this fashion. */ @@ -226,43 +153,13 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li || now - context->last_msg_in <= (time_t)(context->keepalive)*3/2){ if(db__message_write(db, context) == MOSQ_ERR_SUCCESS){ -#ifdef WITH_EPOLL if(context->current_out_packet || context->state == mosq_cs_connect_pending || context->ws_want_write){ - if(!(context->events & EPOLLOUT)) { - ev.data.fd = context->sock; - ev.events = EPOLLIN | EPOLLOUT; - if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) { - if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) { - log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering to EPOLLOUT: %s", strerror(errno)); - } - } - context->events = EPOLLIN | EPOLLOUT; - } + rc = mux__add_out(db, context); context->ws_want_write = false; } else{ - if(context->events & EPOLLOUT) { - ev.data.fd = context->sock; - ev.events = EPOLLIN; - if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) { - if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) { - log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering to EPOLLIN: %s", strerror(errno)); - } - } - context->events = EPOLLIN; - } + rc = mux__remove_out(db, context); } -#else - pollfds[pollfd_index].fd = context->sock; - pollfds[pollfd_index].events = POLLIN; - pollfds[pollfd_index].revents = 0; - if(context->current_out_packet || context->state == mosq_cs_connect_pending || context->ws_want_write){ - pollfds[pollfd_index].events |= POLLOUT; - context->ws_want_write = false; - } - context->pollfd_index = pollfd_index; - pollfd_index++; -#endif }else{ do_disconnect(db, context, MOSQ_ERR_CONN_LOST); } @@ -274,86 +171,11 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li } -#ifdef WITH_BRIDGE -# ifdef WITH_EPOLL bridge_check(db); -# else - bridge_check(db, pollfds, &pollfd_index); -# endif -#endif -#ifndef WIN32 - sigprocmask(SIG_SETMASK, &sigblock, &origsig); -#ifdef WITH_EPOLL - fdcount = epoll_wait(db->epollfd, events, MAX_EVENTS, 100); -#else - fdcount = poll(pollfds, pollfd_index, 100); -#endif - sigprocmask(SIG_SETMASK, &origsig, NULL); -#else - fdcount = WSAPoll(pollfds, pollfd_index, 100); -#endif -#ifdef WITH_EPOLL - switch(fdcount){ - case -1: - if(errno != EINTR){ - log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll waiting: %s.", strerror(errno)); - } - break; - case 0: - break; - default: - for(i=0; iepollfd, EPOLL_CTL_ADD, ev.data.fd, &ev) == -1) { - log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll accepting: %s", strerror(errno)); - } - context = NULL; - HASH_FIND(hh_sock, db->contexts_by_sock, &(ev.data.fd), sizeof(mosq_sock_t), context); - if(context){ - context->events = EPOLLIN; - }else{ - log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll accepting: no context"); - } - } - } - break; - } - } - if (j == listensock_count) { - loop_handle_reads_writes(db, events[i].data.fd, events[i].events); - } - } - } -#else - if(fdcount == -1){ -# ifdef WIN32 - if(pollfd_index == 0 && WSAGetLastError() == WSAEINVAL){ - /* WSAPoll() immediately returns an error if it is not given - * any sockets to wait on. This can happen if we only have - * websockets listeners. Sleep a little to prevent a busy loop. - */ - Sleep(10); - }else -# endif - { - log__printf(NULL, MOSQ_LOG_ERR, "Error in poll: %s.", strerror(errno)); - } - }else{ - loop_handle_reads_writes(db, pollfds); + rc = mux__handle(db, listensock, listensock_count); + if(rc) return rc; - for(i=0; iepollfd); - db->epollfd = 0; -#else - mosquitto__free(pollfds); -#endif + mux__cleanup(db); + return MOSQ_ERR_SUCCESS; } void do_disconnect(struct mosquitto_db *db, struct mosquitto *context, int reason) { char *id; -#ifdef WITH_EPOLL - struct epoll_event ev; -#endif #ifdef WITH_WEBSOCKETS bool is_duplicate = false; #endif @@ -453,13 +268,8 @@ void do_disconnect(struct mosquitto_db *db, struct mosquitto *context, int reaso } if(context->sock != INVALID_SOCKET){ HASH_DELETE(hh_sock, db->contexts_by_sock, context); -#ifdef WITH_EPOLL - if (epoll_ctl(db->epollfd, EPOLL_CTL_DEL, context->sock, &ev) == -1) { - log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll disconnecting websockets: %s", strerror(errno)); - } -#endif + mux__delete(db, context); context->sock = INVALID_SOCKET; - context->pollfd_index = -1; } if(is_duplicate){ /* This occurs if another client is taking over the same client id. @@ -503,163 +313,9 @@ void do_disconnect(struct mosquitto_db *db, struct mosquitto *context, int reaso log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s disconnected.", id); } } -#ifdef WITH_EPOLL - if (context->sock != INVALID_SOCKET && epoll_ctl(db->epollfd, EPOLL_CTL_DEL, context->sock, &ev) == -1) { - if(db->config->connection_messages == true){ - log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll disconnecting: %s", strerror(errno)); - } - } -#endif + mux__delete(db, context); context__disconnect(db, context); } } -#ifdef WITH_EPOLL -static void loop_handle_reads_writes(struct mosquitto_db *db, mosq_sock_t sock, uint32_t events) -#else -static void loop_handle_reads_writes(struct mosquitto_db *db, struct pollfd *pollfds) -#endif -{ - struct mosquitto *context; -#ifndef WITH_EPOLL - struct mosquitto *ctxt_tmp; -#endif - int err; - socklen_t len; - int rc; - -#ifdef WITH_EPOLL - int i; - context = NULL; - HASH_FIND(hh_sock, db->contexts_by_sock, &sock, sizeof(mosq_sock_t), context); - if(!context) { - return; - } - for (i=0;i<1;i++) { -#else - HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){ - if(context->pollfd_index < 0){ - continue; - } - - assert(pollfds[context->pollfd_index].fd == context->sock); -#endif - -#ifdef WITH_WEBSOCKETS - if(context->wsi){ - struct lws_pollfd wspoll; -#ifdef WITH_EPOLL - wspoll.fd = context->sock; - wspoll.events = context->events; - wspoll.revents = events; -#else - wspoll.fd = pollfds[context->pollfd_index].fd; - wspoll.events = pollfds[context->pollfd_index].events; - wspoll.revents = pollfds[context->pollfd_index].revents; -#endif -#ifdef LWS_LIBRARY_VERSION_NUMBER - lws_service_fd(lws_get_context(context->wsi), &wspoll); -#else - lws_service_fd(context->ws_context, &wspoll); -#endif - continue; - } -#endif - -#ifdef WITH_TLS -#ifdef WITH_EPOLL - if(events & EPOLLOUT || -#else - if(pollfds[context->pollfd_index].revents & POLLOUT || -#endif - context->want_write || - (context->ssl && context->state == mosq_cs_new)){ -#else -#ifdef WITH_EPOLL - if(events & EPOLLOUT){ -#else - if(pollfds[context->pollfd_index].revents & POLLOUT){ -#endif -#endif - if(context->state == mosq_cs_connect_pending){ - len = sizeof(int); - if(!getsockopt(context->sock, SOL_SOCKET, SO_ERROR, (char *)&err, &len)){ - if(err == 0){ - mosquitto__set_state(context, mosq_cs_new); -#if defined(WITH_ADNS) && defined(WITH_BRIDGE) - if(context->bridge){ - bridge__connect_step3(db, context); - continue; - } -#endif - } - }else{ - do_disconnect(db, context, MOSQ_ERR_CONN_LOST); - continue; - } - } - rc = packet__write(context); - if(rc){ - do_disconnect(db, context, rc); - continue; - } - } - } - -#ifdef WITH_EPOLL - context = NULL; - HASH_FIND(hh_sock, db->contexts_by_sock, &sock, sizeof(mosq_sock_t), context); - if(!context) { - return; - } - for (i=0;i<1;i++) { -#else - HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){ - if(context->pollfd_index < 0){ - continue; - } -#endif -#ifdef WITH_WEBSOCKETS - if(context->wsi){ - // Websocket are already handled above - continue; - } -#endif - -#ifdef WITH_TLS -#ifdef WITH_EPOLL - if(events & EPOLLIN || -#else - if(pollfds[context->pollfd_index].revents & POLLIN || -#endif - (context->ssl && context->state == mosq_cs_new)){ -#else -#ifdef WITH_EPOLL - if(events & EPOLLIN){ -#else - if(pollfds[context->pollfd_index].revents & POLLIN){ -#endif -#endif - do{ - rc = packet__read(db, context); - if(rc){ - do_disconnect(db, context, rc); - continue; - } - }while(SSL_DATA_PENDING(context)); - }else{ -#ifdef WITH_EPOLL - if(events & (EPOLLERR | EPOLLHUP)){ -#else - if(context->pollfd_index >= 0 && pollfds[context->pollfd_index].revents & (POLLERR | POLLNVAL | POLLHUP)){ -#endif - do_disconnect(db, context, MOSQ_ERR_CONN_LOST); - continue; - } - } - } -} - - - diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index 1e198d8a..4d722220 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -701,16 +701,25 @@ int bridge__connect_step2(struct mosquitto_db *db, struct mosquitto *context); int bridge__connect_step3(struct mosquitto_db *db, struct mosquitto *context); int bridge__on_connect(struct mosquitto_db *db, struct mosquitto *context); void bridge__packet_cleanup(struct mosquitto *context); -#ifdef WITH_EPOLL void bridge_check(struct mosquitto_db *db); -#else -void bridge_check(struct mosquitto_db *db, struct pollfd *pollfds, int *pollfd_index); -#endif int bridge__register_local_connections(struct mosquitto_db *db); int bridge__add_topic(struct mosquitto__bridge *bridge, const char *topic, enum mosquitto__bridge_direction direction, int qos, const char *local_prefix, const char *remote_prefix); int bridge__remap_topic_in(struct mosquitto *context, char **topic); #endif +/* ============================================================ + * IO multiplex related functions + * ============================================================ */ +int mux__init(struct mosquitto_db *db, mosq_sock_t *listensock, int listensock_count); +int mux__loop_prepare(void); +int mux__add_out(struct mosquitto_db *db, struct mosquitto *context); +int mux__remove_out(struct mosquitto_db *db, struct mosquitto *context); +int mux__add_in(struct mosquitto_db *db, struct mosquitto *context); +int mux__delete(struct mosquitto_db *db, struct mosquitto *context); +int mux__wait(void); +int mux__handle(struct mosquitto_db *db, mosq_sock_t *listensock, int listensock_count); +int mux__cleanup(struct mosquitto_db *db); + /* ============================================================ * Property related functions * ============================================================ */ diff --git a/src/mux.c b/src/mux.c new file mode 100644 index 00000000..02316ca2 --- /dev/null +++ b/src/mux.c @@ -0,0 +1,86 @@ +/* +Copyright (c) 2009-2019 Roger Light + +All rights reserved. This program and the accompanying materials +are made available under the terms of the Eclipse Public License v1.0 +and Eclipse Distribution License v1.0 which accompany this distribution. + +The Eclipse Public License is available at + http://www.eclipse.org/legal/epl-v10.html +and the Eclipse Distribution License is available at + http://www.eclipse.org/org/documents/edl-v10.php. + +Contributors: + Roger Light - initial implementation and documentation. + Tatsuzo Osawa - Add epoll. +*/ + +#include "mux.h" + +int mux__init(struct mosquitto_db *db, mosq_sock_t *listensock, int listensock_count) +{ +#ifdef WITH_EPOLL + return mux_epoll__init(db, listensock, listensock_count); +#else + return mux_poll__init(db, listensock, listensock_count); +#endif +} + +int mux__add_out(struct mosquitto_db *db, struct mosquitto *context) +{ +#ifdef WITH_EPOLL + return mux_epoll__add_out(db, context); +#else + return mux_poll__add_out(db, context); +#endif +} + + +int mux__remove_out(struct mosquitto_db *db, struct mosquitto *context) +{ +#ifdef WITH_EPOLL + return mux_epoll__remove_out(db, context); +#else + return mux_poll__remove_out(db, context); +#endif +} + + +int mux__add_in(struct mosquitto_db *db, struct mosquitto *context) +{ +#ifdef WITH_EPOLL + return mux_epoll__add_in(db, context); +#else + return mux_poll__add_in(db, context); +#endif +} + + +int mux__delete(struct mosquitto_db *db, struct mosquitto *context) +{ +#ifdef WITH_EPOLL + return mux_epoll__delete(db, context); +#else + return mux_poll__delete(db, context); +#endif +} + + +int mux__handle(struct mosquitto_db *db, mosq_sock_t *listensock, int listensock_count) +{ +#ifdef WITH_EPOLL + return mux_epoll__handle(db, listensock, listensock_count); +#else + return mux_poll__handle(db, listensock, listensock_count); +#endif +} + + +int mux__cleanup(struct mosquitto_db *db) +{ +#ifdef WITH_EPOLL + return mux_epoll__cleanup(db); +#else + return mux_poll__cleanup(db); +#endif +} diff --git a/src/mux.h b/src/mux.h new file mode 100644 index 00000000..c2194245 --- /dev/null +++ b/src/mux.h @@ -0,0 +1,38 @@ +/* +Copyright (c) 2020 Roger Light + +All rights reserved. This program and the accompanying materials +are made available under the terms of the Eclipse Public License v1.0 +and Eclipse Distribution License v1.0 which accompany this distribution. + +The Eclipse Public License is available at + http://www.eclipse.org/legal/epl-v10.html +and the Eclipse Distribution License is available at + http://www.eclipse.org/org/documents/edl-v10.php. + +Contributors: + Roger Light - initial implementation and documentation. +*/ + +#ifndef MUX_H +#define MUX_H + +#include "mosquitto_broker_internal.h" + +int mux_epoll__init(struct mosquitto_db *db, mosq_sock_t *listensock, int listensock_count); +int mux_epoll__add_out(struct mosquitto_db *db, struct mosquitto *context); +int mux_epoll__remove_out(struct mosquitto_db *db, struct mosquitto *context); +int mux_epoll__add_in(struct mosquitto_db *db, struct mosquitto *context); +int mux_epoll__delete(struct mosquitto_db *db, struct mosquitto *context); +int mux_epoll__handle(struct mosquitto_db *db, mosq_sock_t *listensock, int listensock_count); +int mux_epoll__cleanup(struct mosquitto_db *db); + +int mux_poll__init(struct mosquitto_db *db, mosq_sock_t *listensock, int listensock_count); +int mux_poll__add_out(struct mosquitto_db *db, struct mosquitto *context); +int mux_poll__remove_out(struct mosquitto_db *db, struct mosquitto *context); +int mux_poll__add_in(struct mosquitto_db *db, struct mosquitto *context); +int mux_poll__delete(struct mosquitto_db *db, struct mosquitto *context); +int mux_poll__handle(struct mosquitto_db *db, mosq_sock_t *listensock, int listensock_count); +int mux_poll__cleanup(struct mosquitto_db *db); + +#endif diff --git a/src/mux_epoll.c b/src/mux_epoll.c new file mode 100644 index 00000000..1b3a6a58 --- /dev/null +++ b/src/mux_epoll.c @@ -0,0 +1,331 @@ +/* +Copyright (c) 2009-2019 Roger Light + +All rights reserved. This program and the accompanying materials +are made available under the terms of the Eclipse Public License v1.0 +and Eclipse Distribution License v1.0 which accompany this distribution. + +The Eclipse Public License is available at + http://www.eclipse.org/legal/epl-v10.html +and the Eclipse Distribution License is available at + http://www.eclipse.org/org/documents/edl-v10.php. + +Contributors: + Roger Light - initial implementation and documentation. + Tatsuzo Osawa - Add epoll. +*/ + +#include "config.h" + +#ifdef WITH_EPOLL + +#ifndef WIN32 +# define _GNU_SOURCE +#endif + +#include +#ifndef WIN32 +#ifdef WITH_EPOLL +#include +#define MAX_EVENTS 1000 +#endif +#include +#include +#else +#include +#include +#include +#endif + +#include +#include +#include +#include +#ifndef WIN32 +# include +#endif +#include + +#ifdef WITH_WEBSOCKETS +# include +#endif + +#include "mosquitto_broker_internal.h" +#include "memory_mosq.h" +#include "packet_mosq.h" +#include "send_mosq.h" +#include "sys_tree.h" +#include "time_mosq.h" +#include "util_mosq.h" + +#ifdef WIN32 +# error "epoll not supported on WIN32" +#endif + +static void loop_handle_reads_writes(struct mosquitto_db *db, mosq_sock_t sock, uint32_t events); + +static sigset_t my_sigblock; +static struct epoll_event ep_events[MAX_EVENTS]; + +int mux_epoll__init(struct mosquitto_db *db, mosq_sock_t *listensock, int listensock_count) +{ + struct epoll_event ev; + int i; + +#ifndef WIN32 + sigemptyset(&my_sigblock); + sigaddset(&my_sigblock, SIGINT); + sigaddset(&my_sigblock, SIGTERM); + sigaddset(&my_sigblock, SIGUSR1); + sigaddset(&my_sigblock, SIGUSR2); + sigaddset(&my_sigblock, SIGHUP); +#endif + + memset(&ep_events, 0, sizeof(struct epoll_event)*MAX_EVENTS); + + db->epollfd = 0; + if ((db->epollfd = epoll_create(MAX_EVENTS)) == -1) { + log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll creating: %s", strerror(errno)); + return MOSQ_ERR_UNKNOWN; + } + memset(&ev, 0, sizeof(struct epoll_event)); + for(i=0; iepollfd, EPOLL_CTL_ADD, listensock[i], &ev) == -1) { + log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll initial registering: %s", strerror(errno)); + (void)close(db->epollfd); + db->epollfd = 0; + return MOSQ_ERR_UNKNOWN; + } + } + + return MOSQ_ERR_SUCCESS; +} + +int mux_epoll__loop_setup(void) +{ + return MOSQ_ERR_SUCCESS; +} + + +int mux_epoll__add_out(struct mosquitto_db *db, struct mosquitto *context) +{ + struct epoll_event ev; + + memset(&ev, 0, sizeof(struct epoll_event)); + if(!(context->events & EPOLLOUT)) { + ev.data.fd = context->sock; + ev.events = EPOLLIN | EPOLLOUT; + if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) { + if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) { + log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering to EPOLLOUT: %s", strerror(errno)); + } + } + context->events = EPOLLIN | EPOLLOUT; + } + return MOSQ_ERR_SUCCESS; +} + + +int mux_epoll__remove_out(struct mosquitto_db *db, struct mosquitto *context) +{ + struct epoll_event ev; + + memset(&ev, 0, sizeof(struct epoll_event)); + if(context->events & EPOLLOUT) { + ev.data.fd = context->sock; + ev.events = EPOLLIN; + if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) { + if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) { + log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering to EPOLLIN: %s", strerror(errno)); + } + } + context->events = EPOLLIN; + } + return MOSQ_ERR_SUCCESS; +} + + +int mux_epoll__add_in(struct mosquitto_db *db, struct mosquitto *context) +{ + struct epoll_event ev; + + memset(&ev, 0, sizeof(struct epoll_event)); + ev.events = EPOLLIN; + ev.data.fd = context->sock; + if (epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) { + log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll accepting: %s", strerror(errno)); + } + return MOSQ_ERR_SUCCESS; +} + + +int mux_epoll__delete(struct mosquitto_db *db, struct mosquitto *context) +{ + struct epoll_event ev; + + memset(&ev, 0, sizeof(struct epoll_event)); + if(context->sock != INVALID_SOCKET){ + if(epoll_ctl(db->epollfd, EPOLL_CTL_DEL, context->sock, &ev) == -1){ + return 1; + } + } + return 0; +} + + +int mux_epoll__handle(struct mosquitto_db *db, mosq_sock_t *listensock, int listensock_count) +{ + int i; + int j; + struct epoll_event ev; + sigset_t origsig; + struct mosquitto *context; + int fdcount; + + memset(&ev, 0, sizeof(struct epoll_event)); + sigprocmask(SIG_SETMASK, &my_sigblock, &origsig); + fdcount = epoll_wait(db->epollfd, ep_events, MAX_EVENTS, 100); + sigprocmask(SIG_SETMASK, &origsig, NULL); + + switch(fdcount){ + case -1: + if(errno != EINTR){ + log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll waiting: %s.", strerror(errno)); + } + break; + case 0: + break; + default: + for(i=0; icontexts_by_sock, &(ev.data.fd), sizeof(mosq_sock_t), context); + if(!context) { + log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll accepting: no context"); + } + context->events = EPOLLIN; + mux__add_in(db, context); + } + } + break; + } + } + if (j == listensock_count) { + loop_handle_reads_writes(db, ep_events[i].data.fd, ep_events[i].events); + } + } + } + return MOSQ_ERR_SUCCESS; +} + + +int mux_epoll__cleanup(struct mosquitto_db *db) +{ + (void)close(db->epollfd); + db->epollfd = 0; + return MOSQ_ERR_SUCCESS; +} + + +static void loop_handle_reads_writes(struct mosquitto_db *db, mosq_sock_t sock, uint32_t events) +{ + struct mosquitto *context; + int err; + socklen_t len; + int rc; + int i; + + context = NULL; + HASH_FIND(hh_sock, db->contexts_by_sock, &sock, sizeof(mosq_sock_t), context); + if(!context) { + return; + } + for (i=0;i<1;i++) { + +#ifdef WITH_WEBSOCKETS + if(context->wsi){ + struct lws_pollfd wspoll; + wspoll.fd = context->sock; + wspoll.events = context->events; + wspoll.revents = events; +#ifdef LWS_LIBRARY_VERSION_NUMBER + lws_service_fd(lws_get_context(context->wsi), &wspoll); +#else + lws_service_fd(context->ws_context, &wspoll); +#endif + continue; + } +#endif + +#ifdef WITH_TLS + if(events & EPOLLOUT || + context->want_write || + (context->ssl && context->state == mosq_cs_new)){ +#else + if(events & EPOLLOUT){ +#endif + if(context->state == mosq_cs_connect_pending){ + len = sizeof(int); + if(!getsockopt(context->sock, SOL_SOCKET, SO_ERROR, (char *)&err, &len)){ + if(err == 0){ + mosquitto__set_state(context, mosq_cs_new); +#if defined(WITH_ADNS) && defined(WITH_BRIDGE) + if(context->bridge){ + bridge__connect_step3(db, context); + continue; + } +#endif + } + }else{ + do_disconnect(db, context, MOSQ_ERR_CONN_LOST); + continue; + } + } + rc = packet__write(context); + if(rc){ + do_disconnect(db, context, rc); + continue; + } + } + } + + context = NULL; + HASH_FIND(hh_sock, db->contexts_by_sock, &sock, sizeof(mosq_sock_t), context); + if(!context) { + return; + } + for (i=0;i<1;i++) { +#ifdef WITH_WEBSOCKETS + if(context->wsi){ + // Websocket are already handled above + continue; + } +#endif + +#ifdef WITH_TLS + if(events & EPOLLIN || + (context->ssl && context->state == mosq_cs_new)){ +#else + if(events & EPOLLIN){ +#endif + do{ + rc = packet__read(db, context); + if(rc){ + do_disconnect(db, context, rc); + continue; + } + }while(SSL_DATA_PENDING(context)); + }else{ + if(events & (EPOLLERR | EPOLLHUP)){ + do_disconnect(db, context, MOSQ_ERR_CONN_LOST); + continue; + } + } + } +} +#endif diff --git a/src/mux_poll.c b/src/mux_poll.c new file mode 100644 index 00000000..da1ee6ba --- /dev/null +++ b/src/mux_poll.c @@ -0,0 +1,323 @@ +/* +Copyright (c) 2009-2019 Roger Light + +All rights reserved. This program and the accompanying materials +are made available under the terms of the Eclipse Public License v1.0 +and Eclipse Distribution License v1.0 which accompany this distribution. + +The Eclipse Public License is available at + http://www.eclipse.org/legal/epl-v10.html +and the Eclipse Distribution License is available at + http://www.eclipse.org/org/documents/edl-v10.php. + +Contributors: + Roger Light - initial implementation and documentation. + Tatsuzo Osawa - Add epoll. +*/ + +#include "config.h" + +#ifndef WITH_EPOLL + +#ifndef WIN32 +# define _GNU_SOURCE +#endif + +#include +#ifndef WIN32 +#include +#include +#else +#include +#include +#include +#endif + +#include +#include +#include +#include +#ifndef WIN32 +# include +#endif +#include + +#ifdef WITH_WEBSOCKETS +# include +#endif + +#include "mosquitto_broker_internal.h" +#include "memory_mosq.h" +#include "packet_mosq.h" +#include "send_mosq.h" +#include "sys_tree.h" +#include "time_mosq.h" +#include "util_mosq.h" +#include "mux.h" + +static void loop_handle_reads_writes(struct mosquitto_db *db, struct pollfd *pollfds); + +static struct pollfd *pollfds = NULL; +static int pollfd_max; +static sigset_t my_sigblock; + +int mux_poll__init(struct mosquitto_db *db, mosq_sock_t *listensock, int listensock_count) +{ + int i; + int pollfd_index = 0; + +#ifndef WIN32 + sigemptyset(&my_sigblock); + sigaddset(&my_sigblock, SIGINT); + sigaddset(&my_sigblock, SIGTERM); + sigaddset(&my_sigblock, SIGUSR1); + sigaddset(&my_sigblock, SIGUSR2); + sigaddset(&my_sigblock, SIGHUP); +#endif + +#ifdef WIN32 + pollfd_max = _getmaxstdio(); +#else + pollfd_max = sysconf(_SC_OPEN_MAX); +#endif + + pollfds = mosquitto__malloc(sizeof(struct pollfd)*pollfd_max); + if(!pollfds){ + log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); + return MOSQ_ERR_NOMEM; + } + memset(pollfds, -1, sizeof(struct pollfd)*pollfd_max); + + for(i=0; ipollfd_index != -1){ + pollfds[context->pollfd_index].fd = context->sock; + pollfds[context->pollfd_index].events = POLLIN | POLLOUT; + pollfds[context->pollfd_index].revents = 0; + }else{ + for(i=0; isock; + pollfds[i].events = POLLIN | POLLOUT; + pollfds[i].revents = 0; + context->pollfd_index = i; + break; + } + } + } + + return MOSQ_ERR_SUCCESS; +} + + +int mux_poll__remove_out(struct mosquitto_db *db, struct mosquitto *context) +{ + return mux_poll__add_in(db, context); +} + + +int mux_poll__add_in(struct mosquitto_db *db, struct mosquitto *context) +{ + int i; + + if(context->pollfd_index != -1){ + pollfds[context->pollfd_index].fd = context->sock; + pollfds[context->pollfd_index].events = POLLIN | POLLPRI; + pollfds[context->pollfd_index].revents = 0; + }else{ + for(i=0; isock; + pollfds[i].events = POLLIN; + pollfds[i].revents = 0; + context->pollfd_index = i; + break; + } + } + } + + return MOSQ_ERR_SUCCESS; +} + +int mux_poll__delete(struct mosquitto_db *db, struct mosquitto *context) +{ + if(context->pollfd_index != -1){ + pollfds[context->pollfd_index].fd = -1; + pollfds[context->pollfd_index].events = 0; + pollfds[context->pollfd_index].revents = 0; + context->pollfd_index = -1; + } + + return MOSQ_ERR_SUCCESS; +} + + + + +int mux_poll__handle(struct mosquitto_db *db, mosq_sock_t *listensock, int listensock_count) +{ + struct mosquitto *context; + mosq_sock_t sock; + int i; + int fdcount; + sigset_t origsig; + +#ifndef WIN32 + sigprocmask(SIG_SETMASK, &my_sigblock, &origsig); + fdcount = poll(pollfds, pollfd_max, 100); + sigprocmask(SIG_SETMASK, &origsig, NULL); +#else + fdcount = WSAPoll(pollfds, pollfd_max, 100); +#endif + if(fdcount == -1){ +# ifdef WIN32 + if(WSAGetLastError() == WSAEINVAL){ + /* WSAPoll() immediately returns an error if it is not given + * any sockets to wait on. This can happen if we only have + * websockets listeners. Sleep a little to prevent a busy loop. + */ + Sleep(10); + }else +# endif + { + log__printf(NULL, MOSQ_LOG_ERR, "Error in poll: %s.", strerror(errno)); + } + }else{ + loop_handle_reads_writes(db, pollfds); + + for(i=0; icontexts_by_sock, &sock, sizeof(mosq_sock_t), context); + if(!context) { + log__printf(NULL, MOSQ_LOG_ERR, "Error in poll accepting: no context"); + } + context->pollfd_index = -1; + mux__add_in(db, context); + } + } + } + } + return MOSQ_ERR_SUCCESS; +} + + +int mux_poll__cleanup(struct mosquitto_db *db) +{ + mosquitto__free(pollfds); + pollfds = NULL; + + return MOSQ_ERR_SUCCESS; +} + + +static void loop_handle_reads_writes(struct mosquitto_db *db, struct pollfd *pollfds) +{ + struct mosquitto *context, *ctxt_tmp; + int err; + socklen_t len; + int rc; + + HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){ + if(context->pollfd_index < 0){ + continue; + } + + assert(pollfds[context->pollfd_index].fd == context->sock); + +#ifdef WITH_WEBSOCKETS + if(context->wsi){ + struct lws_pollfd wspoll; + wspoll.fd = pollfds[context->pollfd_index].fd; + wspoll.events = pollfds[context->pollfd_index].events; + wspoll.revents = pollfds[context->pollfd_index].revents; +#ifdef LWS_LIBRARY_VERSION_NUMBER + lws_service_fd(lws_get_context(context->wsi), &wspoll); +#else + lws_service_fd(context->ws_context, &wspoll); +#endif + continue; + } +#endif + +#ifdef WITH_TLS + if(pollfds[context->pollfd_index].revents & POLLOUT || + context->want_write || + (context->ssl && context->state == mosq_cs_new)){ +#else + if(pollfds[context->pollfd_index].revents & POLLOUT){ +#endif + if(context->state == mosq_cs_connect_pending){ + len = sizeof(int); + if(!getsockopt(context->sock, SOL_SOCKET, SO_ERROR, (char *)&err, &len)){ + if(err == 0){ + mosquitto__set_state(context, mosq_cs_new); +#if defined(WITH_ADNS) && defined(WITH_BRIDGE) + if(context->bridge){ + bridge__connect_step3(db, context); + continue; + } +#endif + } + }else{ + do_disconnect(db, context, MOSQ_ERR_CONN_LOST); + continue; + } + } + rc = packet__write(context); + if(rc){ + do_disconnect(db, context, rc); + continue; + } + } + } + + HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){ + if(context->pollfd_index < 0){ + continue; + } +#ifdef WITH_WEBSOCKETS + if(context->wsi){ + // Websocket are already handled above + continue; + } +#endif + +#ifdef WITH_TLS + if(pollfds[context->pollfd_index].revents & POLLIN || + (context->ssl && context->state == mosq_cs_new)){ +#else + if(pollfds[context->pollfd_index].revents & POLLIN){ +#endif + do{ + rc = packet__read(db, context); + if(rc){ + do_disconnect(db, context, rc); + continue; + } + }while(SSL_DATA_PENDING(context)); + }else{ + if(context->pollfd_index >= 0 && pollfds[context->pollfd_index].revents & (POLLERR | POLLNVAL | POLLHUP)){ + do_disconnect(db, context, MOSQ_ERR_CONN_LOST); + continue; + } + } + } +} + + +#endif diff --git a/src/websockets.c b/src/websockets.c index fd2330b0..9ca03416 100644 --- a/src/websockets.c +++ b/src/websockets.c @@ -254,6 +254,7 @@ static int callback_mqtt(struct libwebsocket_context *context, HASH_DELETE(hh_sock, db->contexts_by_sock, mosq); mosq->sock = INVALID_SOCKET; mosq->pollfd_index = -1; + mux__delete(db, mosq); } mosq->wsi = NULL; #ifdef WITH_TLS