Add Epoll. (#495)

Signed-off-by: Tatsuzo Osawa <tatsuzo.osawa@gmail.com>
This commit is contained in:
toast-uz 2017-07-28 02:43:09 +09:00 committed by Roger Light
parent d5e8217d20
commit df9ad5f0bd
4 changed files with 243 additions and 1 deletions

View File

@ -89,6 +89,9 @@ WITH_STATIC_LIBRARIES:=no
# Build with async dns lookup support for bridges (temporary). Requires glibc.
#WITH_ADNS:=yes
# Build with epoll support.
#WITH_EPOLL:=yes
# =============================================================================
# End of user configuration
# =============================================================================
@ -272,3 +275,10 @@ STRIP?=strip
ifeq ($(WITH_STRIP),yes)
STRIP_OPTS:=-s --strip-program=${CROSS_COMPILE}${STRIP}
endif
ifeq ($(WITH_EPOLL),yes)
ifeq ($(UNAME),Linux)
BROKER_CFLAGS:=$(BROKER_CFLAGS) -DWITH_EPOLL
endif
endif

View File

@ -12,6 +12,7 @@ and the Eclipse Distribution License is available at
Contributors:
Roger Light - initial implementation and documentation.
Tatsuzo Osawa - Add epoll.
*/
#ifndef MOSQUITTO_INTERNAL_H
@ -274,8 +275,12 @@ struct mosquitto {
UT_hash_handle hh_sock;
struct mosquitto *for_free_next;
#endif
#ifdef WITH_EPOLL
uint32_t events;
#endif
};
#define STREMPTY(str) (str[0] == '\0')
#endif

View File

@ -12,6 +12,7 @@ and the Eclipse Distribution License is available at
Contributors:
Roger Light - initial implementation and documentation.
Tatsuzo Osawa - Add epoll.
*/
#define _GNU_SOURCE
@ -20,6 +21,10 @@ Contributors:
#include <assert.h>
#ifndef WIN32
#ifdef WITH_EPOLL
#include <sys/epoll.h>
#define MAX_EVENTS 1000
#endif
#include <poll.h>
#include <unistd.h>
#else
@ -56,7 +61,11 @@ extern bool flag_db_backup;
extern bool flag_tree_print;
extern int run;
#ifdef WITH_EPOLL
static void loop_handle_reads_writes(struct mosquitto_db *db, mosq_sock_t sock, uint32_t events);
#else
static void loop_handle_reads_writes(struct mosquitto_db *db, struct pollfd *pollfds);
#endif
#ifdef WITH_WEBSOCKETS
static void temp__expire_websockets_clients(struct mosquitto_db *db)
@ -105,9 +114,14 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
sigset_t sigblock, origsig;
#endif
int i;
#ifdef WITH_EPOLL
int j;
struct epoll_event ev, events[MAX_EVENTS];
#else
struct pollfd *pollfds = NULL;
int pollfd_index;
int pollfd_max;
#endif
#ifdef WITH_BRIDGE
mosq_sock_t bridge_sock;
int rc;
@ -124,6 +138,7 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
sigaddset(&sigblock, SIGHUP);
#endif
#ifndef WITH_EPOLL
#ifdef WIN32
pollfd_max = _getmaxstdio();
#else
@ -135,11 +150,47 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
#endif
if(db->config->persistent_client_expiration > 0){
expiration_check_time = time(NULL) + 3600;
}
#ifdef WITH_EPOLL
db->epollfd = 0;
if ((db->epollfd = epoll_create(MAX_EVENTS)) == -1) {
log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll creating: %s", strerror(errno));
return MOSQ_ERR_UNKNOWN;
}
memset(&ev, 0, sizeof(struct epoll_event));
memset(&events, 0, sizeof(struct epoll_event)*MAX_EVENTS);
for(i=0; i<listensock_count; i++){
ev.data.fd = listensock[i];
ev.events = EPOLLIN;
if (epoll_ctl(db->epollfd, EPOLL_CTL_ADD, listensock[i], &ev) == -1) {
log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll initial registering: %s", strerror(errno));
(void)close(db->epollfd);
db->epollfd = 0;
return MOSQ_ERR_UNKNOWN;
}
}
#ifdef WITH_BRIDGE
HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){
if(context->bridge){
ev.data.fd = context->sock;
ev.events = EPOLLIN;
context->events = EPOLLIN;
if (epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) {
log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll initial registering bridge: %s", strerror(errno));
(void)close(db->epollfd);
db->epollfd = 0;
return MOSQ_ERR_UNKNOWN;
}
}
}
#endif
#endif
while(run){
context__free_disused(db);
#ifdef WITH_SYS_TREE
@ -148,6 +199,7 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
}
#endif
#ifndef WITH_EPOLL
memset(pollfds, -1, sizeof(struct pollfd)*pollfd_max);
pollfd_index = 0;
@ -157,6 +209,7 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
pollfds[pollfd_index].revents = 0;
pollfd_index++;
}
#endif
now_time = time(NULL);
@ -193,6 +246,33 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
|| now - context->last_msg_in < (time_t)(context->keepalive)*3/2){
if(db__message_write(db, context) == MOSQ_ERR_SUCCESS){
#ifdef WITH_EPOLL
if(context->current_out_packet || context->state == mosq_cs_connect_pending || context->ws_want_write){
if(!(context->events & EPOLLOUT)) {
ev.data.fd = context->sock;
ev.events = EPOLLIN | EPOLLOUT;
if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) {
if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) {
log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering to EPOLLOUT: %s", strerror(errno));
}
}
context->events = EPOLLIN | EPOLLOUT;
}
context->ws_want_write = false;
}
else{
if(context->events & EPOLLOUT) {
ev.data.fd = context->sock;
ev.events = EPOLLIN;
if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) {
if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) {
log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering to EPOLLIN: %s", strerror(errno));
}
}
context->events = EPOLLIN;
}
}
#else
pollfds[pollfd_index].fd = context->sock;
pollfds[pollfd_index].events = POLLIN;
pollfds[pollfd_index].revents = 0;
@ -202,6 +282,7 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
}
context->pollfd_index = pollfd_index;
pollfd_index++;
#endif
}else{
do_disconnect(db, context);
}
@ -257,6 +338,20 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
}else if(rc == 0){
rc = bridge__connect_step2(db, context);
if(rc == MOSQ_ERR_SUCCESS){
#ifdef WITH_EPOLL
ev.data.fd = context->sock;
ev.events = EPOLLIN;
if(context->current_out_packet){
ev.events |= EPOLLOUT;
}
if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) {
if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) {
log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering bridge: %s", strerror(errno));
}
}else{
context->events = ev.events;
}
#else
pollfds[pollfd_index].fd = context->sock;
pollfds[pollfd_index].events = POLLIN;
pollfds[pollfd_index].revents = 0;
@ -265,6 +360,7 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
}
context->pollfd_index = pollfd_index;
pollfd_index++;
#endif
}else{
context->bridge->cur_address++;
if(context->bridge->cur_address == context->bridge->address_count){
@ -292,6 +388,20 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
{
rc = bridge__connect(db, context);
if(rc == MOSQ_ERR_SUCCESS){
#ifdef WITH_EPOLL
ev.data.fd = context->sock;
ev.events = EPOLLIN;
if(context->current_out_packet){
ev.events |= EPOLLOUT;
}
if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) {
if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) {
log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering bridge: %s", strerror(errno));
}
}else{
context->events = ev.events;
}
#else
pollfds[pollfd_index].fd = context->sock;
pollfds[pollfd_index].events = POLLIN;
pollfds[pollfd_index].revents = 0;
@ -300,6 +410,7 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
}
context->pollfd_index = pollfd_index;
pollfd_index++;
#endif
}else{
context->bridge->cur_address++;
if(context->bridge->cur_address == context->bridge->address_count){
@ -341,11 +452,49 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
#ifndef WIN32
sigprocmask(SIG_SETMASK, &sigblock, &origsig);
#ifdef WITH_EPOLL
fdcount = epoll_wait(db->epollfd, events, MAX_EVENTS, 100);
#else
fdcount = poll(pollfds, pollfd_index, 100);
#endif
sigprocmask(SIG_SETMASK, &origsig, NULL);
#else
fdcount = WSAPoll(pollfds, pollfd_index, 100);
#endif
#ifdef WITH_EPOLL
switch(fdcount){
case -1:
log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll waiting: %s.", strerror(errno));
break;
case 0:
break;
default:
for(i=0; i<fdcount; i++){
for(j=0; j<listensock_count; j++){
if (events[i].data.fd == listensock[j]) {
if (events[i].events & (EPOLLIN | EPOLLPRI)){
while((ev.data.fd = net__socket_accept(db, listensock[j])) != -1){
ev.events = EPOLLIN;
if (epoll_ctl(db->epollfd, EPOLL_CTL_ADD, ev.data.fd, &ev) == -1) {
log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll accepting: %s", strerror(errno));
}
context = NULL;
HASH_FIND(hh_sock, db->contexts_by_sock, &(ev.data.fd), sizeof(mosq_sock_t), context);
if(!context) {
log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll accepting: no context");
}
context->events = EPOLLIN;
}
}
break;
}
}
if (j == listensock_count) {
loop_handle_reads_writes(db, events[i].data.fd, events[i].events);
}
}
}
#else
if(fdcount == -1){
log__printf(NULL, MOSQ_LOG_ERR, "Error in poll: %s.", strerror(errno));
}else{
@ -358,6 +507,7 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
}
}
}
#endif
#ifdef WITH_PERSISTENCE
if(db->config->persistence && db->config->autosave_interval){
if(db->config->autosave_on_changes){
@ -410,13 +560,21 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
#endif
}
#ifdef WITH_EPOLL
(void) close(db->epollfd);
db->epollfd = 0;
#else
mosquitto__free(pollfds);
#endif
return MOSQ_ERR_SUCCESS;
}
void do_disconnect(struct mosquitto_db *db, struct mosquitto *context)
{
char *id;
#ifdef WITH_EPOLL
struct epoll_event ev;
#endif
if(context->state == mosq_cs_disconnected){
return;
@ -431,6 +589,11 @@ void do_disconnect(struct mosquitto_db *db, struct mosquitto *context)
}
if(context->sock != INVALID_SOCKET){
HASH_DELETE(hh_sock, db->contexts_by_sock, context);
#ifdef WITH_EPOLL
if (epoll_ctl(db->epollfd, EPOLL_CTL_DEL, context->sock, &ev) == -1) {
log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll disconnecting websockets: %s", strerror(errno));
}
#endif
context->sock = INVALID_SOCKET;
context->pollfd_index = -1;
}
@ -449,6 +612,11 @@ void do_disconnect(struct mosquitto_db *db, struct mosquitto *context)
log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s disconnected.", id);
}
}
#ifdef WITH_EPOLL
if (epoll_ctl(db->epollfd, EPOLL_CTL_DEL, context->sock, &ev) == -1) {
log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll disconnecting: %s", strerror(errno));
}
#endif
context__disconnect(db, context);
#ifdef WITH_BRIDGE
if(context->clean_session && !context->bridge){
@ -467,36 +635,67 @@ void do_disconnect(struct mosquitto_db *db, struct mosquitto *context)
}
#ifdef WITH_EPOLL
static void loop_handle_reads_writes(struct mosquitto_db *db, mosq_sock_t sock, uint32_t events)
#else
static void loop_handle_reads_writes(struct mosquitto_db *db, struct pollfd *pollfds)
#endif
{
struct mosquitto *context, *ctxt_tmp;
struct mosquitto *context;
#ifndef WITH_EPOLL
struct mosquitto *ctxt_tmp;
#endif
int err;
socklen_t len;
#ifdef WITH_EPOLL
int i;
context = NULL;
HASH_FIND(hh_sock, db->contexts_by_sock, &sock, sizeof(mosq_sock_t), context);
if(!context) {
return;
}
for (i=0;i<1;i++) {
#else
HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){
if(context->pollfd_index < 0){
continue;
}
assert(pollfds[context->pollfd_index].fd == context->sock);
#endif
#ifdef WITH_WEBSOCKETS
if(context->wsi){
struct lws_pollfd wspoll;
#ifdef WITH_EPOLL
wspoll.fd = context->sock;
wspoll.events = context->events;
wspoll.revents = events;
#else
wspoll.fd = pollfds[context->pollfd_index].fd;
wspoll.events = pollfds[context->pollfd_index].events;
wspoll.revents = pollfds[context->pollfd_index].revents;
#endif
lws_service_fd(lws_get_context(context->wsi), &wspoll);
continue;
}
#endif
#ifdef WITH_TLS
#ifdef WITH_EPOLL
if(events & EPOLLOUT ||
#else
if(pollfds[context->pollfd_index].revents & POLLOUT ||
#endif
context->want_write ||
(context->ssl && context->state == mosq_cs_new)){
#else
#ifdef WITH_EPOLL
if(events & EPOLLOUT){
#else
if(pollfds[context->pollfd_index].revents & POLLOUT){
#endif
#endif
if(context->state == mosq_cs_connect_pending){
len = sizeof(int);
@ -516,10 +715,19 @@ static void loop_handle_reads_writes(struct mosquitto_db *db, struct pollfd *pol
}
}
#ifdef WITH_EPOLL
context = NULL;
HASH_FIND(hh_sock, db->contexts_by_sock, &sock, sizeof(mosq_sock_t), context);
if(!context) {
return;
}
for (i=0;i<1;i++) {
#else
HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){
if(context->pollfd_index < 0){
continue;
}
#endif
#ifdef WITH_WEBSOCKETS
if(context->wsi){
// Websocket are already handled above
@ -528,10 +736,18 @@ static void loop_handle_reads_writes(struct mosquitto_db *db, struct pollfd *pol
#endif
#ifdef WITH_TLS
#ifdef WITH_EPOLL
if(events & EPOLLIN ||
#else
if(pollfds[context->pollfd_index].revents & POLLIN ||
#endif
(context->ssl && context->state == mosq_cs_new)){
#else
#ifdef WITH_EPOLL
if(events & EPOLLIN){
#else
if(pollfds[context->pollfd_index].revents & POLLIN){
#endif
#endif
do{
if(packet__read(db, context)){
@ -540,10 +756,16 @@ static void loop_handle_reads_writes(struct mosquitto_db *db, struct pollfd *pol
}
}while(SSL_DATA_PENDING(context));
}
#ifdef WITH_EPOLL
if(events & (EPOLLERR | EPOLLHUP)){
#else
if(context->pollfd_index >= 0 && pollfds[context->pollfd_index].revents & (POLLERR | POLLNVAL | POLLHUP)){
#endif
do_disconnect(db, context);
continue;
}
}
}

View File

@ -12,6 +12,7 @@ and the Eclipse Distribution License is available at
Contributors:
Roger Light - initial implementation and documentation.
Tatsuzo Osawa - Add epoll.
*/
#ifndef MOSQUITTO_BROKER_INTERNAL_H
@ -371,6 +372,9 @@ struct mosquitto_db{
int retained_count;
#endif
struct mosquitto *ll_for_free;
#ifdef WITH_EPOLL
int epollfd;
#endif
};
enum mosquitto__bridge_direction{
@ -620,3 +624,4 @@ struct libwebsocket_context *mosq_websockets_init(struct mosquitto__listener *li
void do_disconnect(struct mosquitto_db *db, struct mosquitto *context);
#endif