From c07ba2a3da7ced1335978ce2623530726f362f39 Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Sun, 18 Jun 2017 12:52:59 +0100 Subject: [PATCH] Experimental fix for poor websockets performance. --- ChangeLog.txt | 3 +++ lib/mosquitto_internal.h | 1 + src/loop.c | 45 +++++++++++++++++++++++++++++----------- src/websockets.c | 18 ++++++++++++++++ 4 files changed, 55 insertions(+), 12 deletions(-) diff --git a/ChangeLog.txt b/ChangeLog.txt index e505f825..b98c5513 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -1,3 +1,6 @@ +Broker: +- Fix for poor websockets performance. + Clients: - Don't use / in auto-generated client ids. diff --git a/lib/mosquitto_internal.h b/lib/mosquitto_internal.h index ac925738..8d3014dd 100644 --- a/lib/mosquitto_internal.h +++ b/lib/mosquitto_internal.h @@ -226,6 +226,7 @@ struct mosquitto { struct libwebsocket *wsi; # endif # endif + bool ws_want_write; #else # ifdef WITH_SOCKS char *socks5_host; diff --git a/src/loop.c b/src/loop.c index 6e9de59f..2c90e249 100644 --- a/src/loop.c +++ b/src/loop.c @@ -108,6 +108,7 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li struct pollfd *pollfds = NULL; int pollfd_count = 0; int pollfd_index; + int pollfd_max; #ifdef WITH_BRIDGE mosq_sock_t bridge_sock; int rc; @@ -122,6 +123,18 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li sigaddset(&sigblock, SIGINT); #endif +#ifdef WIN32 + pollfd_max = _getmaxstdio(); +#else + pollfd_max = getdtablesize(); +#endif + + pollfds = _mosquitto_malloc(sizeof(struct pollfd)*pollfd_max); + if(!pollfds){ + _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); + return MOSQ_ERR_NOMEM; + } + if(db->config->persistent_client_expiration > 0){ expiration_check_time = time(NULL) + 3600; } @@ -139,16 +152,8 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li context_count += db->bridge_count; #endif - if(listensock_count + context_count > pollfd_count || !pollfds){ - pollfd_count = listensock_count + context_count; - pollfds = _mosquitto_realloc(pollfds, sizeof(struct pollfd)*pollfd_count); - if(!pollfds){ - _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); - return MOSQ_ERR_NOMEM; - } - } - - memset(pollfds, -1, sizeof(struct pollfd)*pollfd_count); + pollfd_count = listensock_count + context_count; + memset(pollfds, -1, sizeof(struct pollfd)*pollfd_max); pollfd_index = 0; for(i=0; isock; pollfds[pollfd_index].events = POLLIN; pollfds[pollfd_index].revents = 0; - if(context->current_out_packet || context->state == mosq_cs_connect_pending){ + if(context->current_out_packet || context->state == mosq_cs_connect_pending || context->ws_want_write){ pollfds[pollfd_index].events |= POLLOUT; + context->ws_want_write = false; } context->pollfd_index = pollfd_index; pollfd_index++; @@ -436,7 +442,10 @@ void do_disconnect(struct mosquitto_db *db, struct mosquitto *context) if(context->wsi){ libwebsocket_callback_on_writable(context->ws_context, context->wsi); } - context->sock = INVALID_SOCKET; + if(context->sock != INVALID_SOCKET){ + HASH_DELETE(hh_sock, db->contexts_by_sock, context); + context->sock = INVALID_SOCKET; + } }else #endif { @@ -482,6 +491,18 @@ static void loop_handle_reads_writes(struct mosquitto_db *db, struct pollfd *pol } assert(pollfds[context->pollfd_index].fd == context->sock); + +#ifdef WITH_WEBSOCKETS + if(context->wsi){ + struct lws_pollfd wspoll; + wspoll.fd = pollfds[context->pollfd_index].fd; + wspoll.events = pollfds[context->pollfd_index].events; + wspoll.revents = pollfds[context->pollfd_index].revents; + lws_service_fd(lws_get_context(context->wsi), &wspoll); + continue; + } +#endif + #ifdef WITH_TLS if(pollfds[context->pollfd_index].revents & POLLOUT || context->want_write || diff --git a/src/websockets.c b/src/websockets.c index b44943d3..84e5089b 100644 --- a/src/websockets.c +++ b/src/websockets.c @@ -218,6 +218,8 @@ static int callback_mqtt(struct libwebsocket_context *context, u->mosq = NULL; return -1; } + mosq->sock = libwebsocket_get_socket_fd(wsi); + HASH_ADD(hh_sock, db->contexts_by_sock, sock, sizeof(mosq->sock), mosq); break; case LWS_CALLBACK_CLOSED: @@ -226,6 +228,10 @@ static int callback_mqtt(struct libwebsocket_context *context, } mosq = u->mosq; if(mosq){ + if(mosq->sock > 0){ + HASH_DELETE(hh_sock, db->contexts_by_sock, mosq); + mosq->sock = INVALID_SOCKET; + } mosq->wsi = NULL; do_disconnect(db, mosq); } @@ -412,6 +418,9 @@ static int callback_http(struct libwebsocket_context *context, char *filename, *filename_canonical; unsigned char buf[4096]; struct stat filestat; + struct mosquitto_db *db = &int_db; + struct mosquitto *mosq; + struct lws_pollargs *pollargs = (struct lws_pollargs *)in; /* FIXME - ssl cert verification is done here. */ @@ -583,6 +592,15 @@ static int callback_http(struct libwebsocket_context *context, break; #endif + case LWS_CALLBACK_ADD_POLL_FD: + case LWS_CALLBACK_DEL_POLL_FD: + case LWS_CALLBACK_CHANGE_MODE_POLL_FD: + HASH_FIND(hh_sock, db->contexts_by_sock, &pollargs->fd, sizeof(pollargs->fd), mosq); + if(mosq){ + mosq->ws_want_write = true; + } + break; + default: return 0; }