Prototype separate poll/epoll files.
This commit is contained in:
parent
c3e824ee22
commit
986bf89f47
@ -29,6 +29,7 @@ set (MOSQ_SRCS
|
||||
mosquitto.c
|
||||
mosquitto_broker.h mosquitto_broker_internal.h
|
||||
../lib/misc_mosq.c ../lib/misc_mosq.h
|
||||
mux.c mux.h mux_epoll.c mux_poll.c
|
||||
net.c
|
||||
../lib/net_mosq_ocsp.c ../lib/net_mosq.c ../lib/net_mosq.h
|
||||
../lib/packet_datatypes.c
|
||||
|
12
src/Makefile
12
src/Makefile
@ -33,6 +33,9 @@ OBJS= mosquitto.o \
|
||||
loop.o \
|
||||
memory_mosq.o \
|
||||
misc_mosq.o \
|
||||
mux.o \
|
||||
mux_epoll.o \
|
||||
mux_poll.o \
|
||||
net.o \
|
||||
net_mosq.o \
|
||||
net_mosq_ocsp.o \
|
||||
@ -153,6 +156,15 @@ memory_mosq.o : ../lib/memory_mosq.c ../lib/memory_mosq.h
|
||||
misc_mosq.o : ../lib/misc_mosq.c ../lib/misc_mosq.h
|
||||
${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@
|
||||
|
||||
mux.o : mux.c mosquitto_broker_internal.h
|
||||
${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@
|
||||
|
||||
mux_epoll.o : mux_epoll.c mosquitto_broker_internal.h
|
||||
${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@
|
||||
|
||||
mux_poll.o : mux_poll.c mosquitto_broker_internal.h
|
||||
${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@
|
||||
|
||||
net.o : net.c mosquitto_broker_internal.h
|
||||
${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@
|
||||
|
||||
|
74
src/bridge.c
74
src/bridge.c
@ -30,10 +30,6 @@ Contributors:
|
||||
#endif
|
||||
|
||||
#ifndef WIN32
|
||||
#ifdef WITH_EPOLL
|
||||
#include <sys/epoll.h>
|
||||
#endif
|
||||
#include <poll.h>
|
||||
#include <unistd.h>
|
||||
#else
|
||||
#include <process.h>
|
||||
@ -520,20 +516,12 @@ int bridge__on_connect(struct mosquitto_db *db, struct mosquitto *context)
|
||||
int bridge__register_local_connections(struct mosquitto_db *db)
|
||||
{
|
||||
#ifdef WITH_EPOLL
|
||||
struct epoll_event ev;
|
||||
struct mosquitto *context, *ctxt_tmp = NULL;
|
||||
|
||||
memset(&ev, 0, sizeof(struct epoll_event));
|
||||
|
||||
HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){
|
||||
if(context->bridge){
|
||||
ev.data.fd = context->sock;
|
||||
ev.events = EPOLLIN;
|
||||
context->events = EPOLLIN;
|
||||
if (epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) {
|
||||
if(mux__add_in(db, context)){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll initial registering bridge: %s", strerror(errno));
|
||||
(void)close(db->epollfd);
|
||||
db->epollfd = 0;
|
||||
return MOSQ_ERR_UNKNOWN;
|
||||
}
|
||||
}
|
||||
@ -639,19 +627,12 @@ static void bridge__backoff_reset(struct mosquitto *context)
|
||||
}
|
||||
}
|
||||
|
||||
#ifdef WITH_EPOLL
|
||||
void bridge_check(struct mosquitto_db *db)
|
||||
#else
|
||||
void bridge_check(struct mosquitto_db *db, struct pollfd *pollfds, int *pollfd_index)
|
||||
#endif
|
||||
{
|
||||
static time_t last_check = 0;
|
||||
time_t now;
|
||||
struct mosquitto *context = NULL;
|
||||
socklen_t len;
|
||||
#ifdef WITH_EPOLL
|
||||
struct epoll_event ev;
|
||||
#endif
|
||||
int i;
|
||||
int rc;
|
||||
int err;
|
||||
@ -660,9 +641,6 @@ void bridge_check(struct mosquitto_db *db, struct pollfd *pollfds, int *pollfd_i
|
||||
|
||||
if(now <= last_check) return;
|
||||
|
||||
#ifdef WITH_EPOLL
|
||||
memset(&ev, 0, sizeof(struct epoll_event));
|
||||
#endif
|
||||
for(i=0; i<db->bridge_count; i++){
|
||||
if(!db->bridges[i]) continue;
|
||||
|
||||
@ -736,29 +714,10 @@ void bridge_check(struct mosquitto_db *db, struct pollfd *pollfds, int *pollfd_i
|
||||
}else if(rc == 0){
|
||||
rc = bridge__connect_step2(db, context);
|
||||
if(rc == MOSQ_ERR_SUCCESS){
|
||||
#ifdef WITH_EPOLL
|
||||
ev.data.fd = context->sock;
|
||||
ev.events = EPOLLIN;
|
||||
rc = mux__add_in(db, context);
|
||||
if(context->current_out_packet){
|
||||
ev.events |= EPOLLOUT;
|
||||
rc = mux__add_out(db, context);
|
||||
}
|
||||
if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) {
|
||||
if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) {
|
||||
log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering bridge: %s", strerror(errno));
|
||||
}
|
||||
}else{
|
||||
context->events = ev.events;
|
||||
}
|
||||
#else
|
||||
pollfds[*pollfd_index].fd = context->sock;
|
||||
pollfds[*pollfd_index].events = POLLIN;
|
||||
pollfds[*pollfd_index].revents = 0;
|
||||
if(context->current_out_packet){
|
||||
pollfds[*pollfd_index].events |= POLLOUT;
|
||||
}
|
||||
context->pollfd_index = *pollfd_index;
|
||||
(*pollfd_index)++;
|
||||
#endif
|
||||
}else if(rc == MOSQ_ERR_CONN_PENDING){
|
||||
context->bridge->restart_t = 0;
|
||||
}else{
|
||||
@ -778,10 +737,6 @@ void bridge_check(struct mosquitto_db *db, struct pollfd *pollfds, int *pollfd_i
|
||||
context->bridge->restart_t = 0;
|
||||
}
|
||||
}else{
|
||||
#ifdef WITH_EPOLL
|
||||
/* clean any events triggered in previous connection */
|
||||
context->events = 0;
|
||||
#endif
|
||||
rc = bridge__connect_step1(db, context);
|
||||
if(rc){
|
||||
context->bridge->cur_address++;
|
||||
@ -801,29 +756,10 @@ void bridge_check(struct mosquitto_db *db, struct pollfd *pollfds, int *pollfd_i
|
||||
if(context->bridge->round_robin == false && context->bridge->cur_address != 0){
|
||||
context->bridge->primary_retry = now + 5;
|
||||
}
|
||||
#ifdef WITH_EPOLL
|
||||
ev.data.fd = context->sock;
|
||||
ev.events = EPOLLIN;
|
||||
rc = mux__add_in(db, context);
|
||||
if(context->current_out_packet){
|
||||
ev.events |= EPOLLOUT;
|
||||
rc = mux__add_out(db, context);
|
||||
}
|
||||
if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) {
|
||||
if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) {
|
||||
log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering bridge: %s", strerror(errno));
|
||||
}
|
||||
}else{
|
||||
context->events = ev.events;
|
||||
}
|
||||
#else
|
||||
pollfds[*pollfd_index].fd = context->sock;
|
||||
pollfds[*pollfd_index].events = POLLIN;
|
||||
pollfds[*pollfd_index].revents = 0;
|
||||
if(context->current_out_packet){
|
||||
pollfds[*pollfd_index].events |= POLLOUT;
|
||||
}
|
||||
context->pollfd_index = *pollfd_index;
|
||||
(*pollfd_index)++;
|
||||
#endif
|
||||
}else{
|
||||
context->bridge->cur_address++;
|
||||
if(context->bridge->cur_address == context->bridge->address_count){
|
||||
|
372
src/loop.c
372
src/loop.c
@ -23,11 +23,6 @@ Contributors:
|
||||
|
||||
#include <assert.h>
|
||||
#ifndef WIN32
|
||||
#ifdef WITH_EPOLL
|
||||
#include <sys/epoll.h>
|
||||
#define MAX_EVENTS 1000
|
||||
#endif
|
||||
#include <poll.h>
|
||||
#include <unistd.h>
|
||||
#else
|
||||
#include <process.h>
|
||||
@ -63,12 +58,6 @@ extern bool flag_db_backup;
|
||||
extern bool flag_tree_print;
|
||||
extern int run;
|
||||
|
||||
#ifdef WITH_EPOLL
|
||||
static void loop_handle_reads_writes(struct mosquitto_db *db, mosq_sock_t sock, uint32_t events);
|
||||
#else
|
||||
static void loop_handle_reads_writes(struct mosquitto_db *db, struct pollfd *pollfds);
|
||||
#endif
|
||||
|
||||
#ifdef WITH_WEBSOCKETS
|
||||
static void temp__expire_websockets_clients(struct mosquitto_db *db)
|
||||
{
|
||||
@ -119,74 +108,25 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
|
||||
#endif
|
||||
time_t now = 0;
|
||||
int time_count;
|
||||
int fdcount;
|
||||
struct mosquitto *context, *ctxt_tmp;
|
||||
#ifndef WIN32
|
||||
sigset_t sigblock, origsig;
|
||||
#endif
|
||||
int i;
|
||||
#ifdef WITH_EPOLL
|
||||
int j;
|
||||
struct epoll_event ev, events[MAX_EVENTS];
|
||||
#else
|
||||
struct pollfd *pollfds = NULL;
|
||||
int pollfd_index;
|
||||
int pollfd_max;
|
||||
#endif
|
||||
#ifdef WITH_BRIDGE
|
||||
int rc;
|
||||
#endif
|
||||
#ifdef WITH_WEBSOCKETS
|
||||
int i;
|
||||
#endif
|
||||
|
||||
|
||||
#if defined(WITH_WEBSOCKETS) && LWS_LIBRARY_VERSION_NUMBER == 3002000
|
||||
memset(&sul, 0, sizeof(struct lws_sorted_usec_list));
|
||||
#endif
|
||||
|
||||
#ifndef WIN32
|
||||
sigemptyset(&sigblock);
|
||||
sigaddset(&sigblock, SIGINT);
|
||||
sigaddset(&sigblock, SIGTERM);
|
||||
sigaddset(&sigblock, SIGUSR1);
|
||||
sigaddset(&sigblock, SIGUSR2);
|
||||
sigaddset(&sigblock, SIGHUP);
|
||||
#endif
|
||||
rc = mux__init(db, listensock, listensock_count);
|
||||
if(rc) return rc;
|
||||
|
||||
#ifndef WITH_EPOLL
|
||||
#ifdef WIN32
|
||||
pollfd_max = _getmaxstdio();
|
||||
#else
|
||||
pollfd_max = sysconf(_SC_OPEN_MAX);
|
||||
#endif
|
||||
|
||||
pollfds = mosquitto__malloc(sizeof(struct pollfd)*pollfd_max);
|
||||
if(!pollfds){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
||||
return MOSQ_ERR_NOMEM;
|
||||
}
|
||||
#endif
|
||||
|
||||
#ifdef WITH_EPOLL
|
||||
db->epollfd = 0;
|
||||
if ((db->epollfd = epoll_create(MAX_EVENTS)) == -1) {
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll creating: %s", strerror(errno));
|
||||
return MOSQ_ERR_UNKNOWN;
|
||||
}
|
||||
memset(&ev, 0, sizeof(struct epoll_event));
|
||||
memset(&events, 0, sizeof(struct epoll_event)*MAX_EVENTS);
|
||||
for(i=0; i<listensock_count; i++){
|
||||
ev.data.fd = listensock[i];
|
||||
ev.events = EPOLLIN;
|
||||
if (epoll_ctl(db->epollfd, EPOLL_CTL_ADD, listensock[i], &ev) == -1) {
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll initial registering: %s", strerror(errno));
|
||||
(void)close(db->epollfd);
|
||||
db->epollfd = 0;
|
||||
return MOSQ_ERR_UNKNOWN;
|
||||
}
|
||||
}
|
||||
# ifdef WITH_BRIDGE
|
||||
#ifdef WITH_BRIDGE
|
||||
rc = bridge__register_local_connections(db);
|
||||
if(rc) return rc;
|
||||
# endif
|
||||
#endif
|
||||
|
||||
while(run){
|
||||
@ -197,18 +137,6 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
|
||||
}
|
||||
#endif
|
||||
|
||||
#ifndef WITH_EPOLL
|
||||
memset(pollfds, -1, sizeof(struct pollfd)*pollfd_max);
|
||||
|
||||
pollfd_index = 0;
|
||||
for(i=0; i<listensock_count; i++){
|
||||
pollfds[pollfd_index].fd = listensock[i];
|
||||
pollfds[pollfd_index].events = POLLIN;
|
||||
pollfds[pollfd_index].revents = 0;
|
||||
pollfd_index++;
|
||||
}
|
||||
#endif
|
||||
|
||||
time_count = 0;
|
||||
HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){
|
||||
if(time_count > 0){
|
||||
@ -217,7 +145,6 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
|
||||
time_count = 1000;
|
||||
now = mosquitto_time();
|
||||
}
|
||||
context->pollfd_index = -1;
|
||||
|
||||
if(context->sock != INVALID_SOCKET){
|
||||
/* Local bridges never time out in this fashion. */
|
||||
@ -226,43 +153,13 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
|
||||
|| now - context->last_msg_in <= (time_t)(context->keepalive)*3/2){
|
||||
|
||||
if(db__message_write(db, context) == MOSQ_ERR_SUCCESS){
|
||||
#ifdef WITH_EPOLL
|
||||
if(context->current_out_packet || context->state == mosq_cs_connect_pending || context->ws_want_write){
|
||||
if(!(context->events & EPOLLOUT)) {
|
||||
ev.data.fd = context->sock;
|
||||
ev.events = EPOLLIN | EPOLLOUT;
|
||||
if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) {
|
||||
if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) {
|
||||
log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering to EPOLLOUT: %s", strerror(errno));
|
||||
}
|
||||
}
|
||||
context->events = EPOLLIN | EPOLLOUT;
|
||||
}
|
||||
rc = mux__add_out(db, context);
|
||||
context->ws_want_write = false;
|
||||
}
|
||||
else{
|
||||
if(context->events & EPOLLOUT) {
|
||||
ev.data.fd = context->sock;
|
||||
ev.events = EPOLLIN;
|
||||
if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) {
|
||||
if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) {
|
||||
log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering to EPOLLIN: %s", strerror(errno));
|
||||
}
|
||||
}
|
||||
context->events = EPOLLIN;
|
||||
}
|
||||
rc = mux__remove_out(db, context);
|
||||
}
|
||||
#else
|
||||
pollfds[pollfd_index].fd = context->sock;
|
||||
pollfds[pollfd_index].events = POLLIN;
|
||||
pollfds[pollfd_index].revents = 0;
|
||||
if(context->current_out_packet || context->state == mosq_cs_connect_pending || context->ws_want_write){
|
||||
pollfds[pollfd_index].events |= POLLOUT;
|
||||
context->ws_want_write = false;
|
||||
}
|
||||
context->pollfd_index = pollfd_index;
|
||||
pollfd_index++;
|
||||
#endif
|
||||
}else{
|
||||
do_disconnect(db, context, MOSQ_ERR_CONN_LOST);
|
||||
}
|
||||
@ -274,86 +171,11 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
|
||||
}
|
||||
|
||||
|
||||
#ifdef WITH_BRIDGE
|
||||
# ifdef WITH_EPOLL
|
||||
bridge_check(db);
|
||||
# else
|
||||
bridge_check(db, pollfds, &pollfd_index);
|
||||
# endif
|
||||
#endif
|
||||
|
||||
#ifndef WIN32
|
||||
sigprocmask(SIG_SETMASK, &sigblock, &origsig);
|
||||
#ifdef WITH_EPOLL
|
||||
fdcount = epoll_wait(db->epollfd, events, MAX_EVENTS, 100);
|
||||
#else
|
||||
fdcount = poll(pollfds, pollfd_index, 100);
|
||||
#endif
|
||||
sigprocmask(SIG_SETMASK, &origsig, NULL);
|
||||
#else
|
||||
fdcount = WSAPoll(pollfds, pollfd_index, 100);
|
||||
#endif
|
||||
#ifdef WITH_EPOLL
|
||||
switch(fdcount){
|
||||
case -1:
|
||||
if(errno != EINTR){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll waiting: %s.", strerror(errno));
|
||||
}
|
||||
break;
|
||||
case 0:
|
||||
break;
|
||||
default:
|
||||
for(i=0; i<fdcount; i++){
|
||||
for(j=0; j<listensock_count; j++){
|
||||
if (events[i].data.fd == listensock[j]) {
|
||||
if (events[i].events & (EPOLLIN | EPOLLPRI)){
|
||||
while((ev.data.fd = net__socket_accept(db, listensock[j])) != -1){
|
||||
ev.events = EPOLLIN;
|
||||
if (epoll_ctl(db->epollfd, EPOLL_CTL_ADD, ev.data.fd, &ev) == -1) {
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll accepting: %s", strerror(errno));
|
||||
}
|
||||
context = NULL;
|
||||
HASH_FIND(hh_sock, db->contexts_by_sock, &(ev.data.fd), sizeof(mosq_sock_t), context);
|
||||
if(context){
|
||||
context->events = EPOLLIN;
|
||||
}else{
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll accepting: no context");
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (j == listensock_count) {
|
||||
loop_handle_reads_writes(db, events[i].data.fd, events[i].events);
|
||||
}
|
||||
}
|
||||
}
|
||||
#else
|
||||
if(fdcount == -1){
|
||||
# ifdef WIN32
|
||||
if(pollfd_index == 0 && WSAGetLastError() == WSAEINVAL){
|
||||
/* WSAPoll() immediately returns an error if it is not given
|
||||
* any sockets to wait on. This can happen if we only have
|
||||
* websockets listeners. Sleep a little to prevent a busy loop.
|
||||
*/
|
||||
Sleep(10);
|
||||
}else
|
||||
# endif
|
||||
{
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error in poll: %s.", strerror(errno));
|
||||
}
|
||||
}else{
|
||||
loop_handle_reads_writes(db, pollfds);
|
||||
rc = mux__handle(db, listensock, listensock_count);
|
||||
if(rc) return rc;
|
||||
|
||||
for(i=0; i<listensock_count; i++){
|
||||
if(pollfds[i].revents & (POLLIN | POLLPRI)){
|
||||
while(net__socket_accept(db, listensock[i]) != -1){
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
now = time(NULL);
|
||||
session_expiry__check(db, now);
|
||||
will_delay__check(db, now);
|
||||
@ -417,21 +239,14 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
|
||||
#endif
|
||||
}
|
||||
|
||||
#ifdef WITH_EPOLL
|
||||
(void) close(db->epollfd);
|
||||
db->epollfd = 0;
|
||||
#else
|
||||
mosquitto__free(pollfds);
|
||||
#endif
|
||||
mux__cleanup(db);
|
||||
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}
|
||||
|
||||
void do_disconnect(struct mosquitto_db *db, struct mosquitto *context, int reason)
|
||||
{
|
||||
char *id;
|
||||
#ifdef WITH_EPOLL
|
||||
struct epoll_event ev;
|
||||
#endif
|
||||
#ifdef WITH_WEBSOCKETS
|
||||
bool is_duplicate = false;
|
||||
#endif
|
||||
@ -453,13 +268,8 @@ void do_disconnect(struct mosquitto_db *db, struct mosquitto *context, int reaso
|
||||
}
|
||||
if(context->sock != INVALID_SOCKET){
|
||||
HASH_DELETE(hh_sock, db->contexts_by_sock, context);
|
||||
#ifdef WITH_EPOLL
|
||||
if (epoll_ctl(db->epollfd, EPOLL_CTL_DEL, context->sock, &ev) == -1) {
|
||||
log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll disconnecting websockets: %s", strerror(errno));
|
||||
}
|
||||
#endif
|
||||
mux__delete(db, context);
|
||||
context->sock = INVALID_SOCKET;
|
||||
context->pollfd_index = -1;
|
||||
}
|
||||
if(is_duplicate){
|
||||
/* This occurs if another client is taking over the same client id.
|
||||
@ -503,163 +313,9 @@ void do_disconnect(struct mosquitto_db *db, struct mosquitto *context, int reaso
|
||||
log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s disconnected.", id);
|
||||
}
|
||||
}
|
||||
#ifdef WITH_EPOLL
|
||||
if (context->sock != INVALID_SOCKET && epoll_ctl(db->epollfd, EPOLL_CTL_DEL, context->sock, &ev) == -1) {
|
||||
if(db->config->connection_messages == true){
|
||||
log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll disconnecting: %s", strerror(errno));
|
||||
}
|
||||
}
|
||||
#endif
|
||||
mux__delete(db, context);
|
||||
context__disconnect(db, context);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#ifdef WITH_EPOLL
|
||||
static void loop_handle_reads_writes(struct mosquitto_db *db, mosq_sock_t sock, uint32_t events)
|
||||
#else
|
||||
static void loop_handle_reads_writes(struct mosquitto_db *db, struct pollfd *pollfds)
|
||||
#endif
|
||||
{
|
||||
struct mosquitto *context;
|
||||
#ifndef WITH_EPOLL
|
||||
struct mosquitto *ctxt_tmp;
|
||||
#endif
|
||||
int err;
|
||||
socklen_t len;
|
||||
int rc;
|
||||
|
||||
#ifdef WITH_EPOLL
|
||||
int i;
|
||||
context = NULL;
|
||||
HASH_FIND(hh_sock, db->contexts_by_sock, &sock, sizeof(mosq_sock_t), context);
|
||||
if(!context) {
|
||||
return;
|
||||
}
|
||||
for (i=0;i<1;i++) {
|
||||
#else
|
||||
HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){
|
||||
if(context->pollfd_index < 0){
|
||||
continue;
|
||||
}
|
||||
|
||||
assert(pollfds[context->pollfd_index].fd == context->sock);
|
||||
#endif
|
||||
|
||||
#ifdef WITH_WEBSOCKETS
|
||||
if(context->wsi){
|
||||
struct lws_pollfd wspoll;
|
||||
#ifdef WITH_EPOLL
|
||||
wspoll.fd = context->sock;
|
||||
wspoll.events = context->events;
|
||||
wspoll.revents = events;
|
||||
#else
|
||||
wspoll.fd = pollfds[context->pollfd_index].fd;
|
||||
wspoll.events = pollfds[context->pollfd_index].events;
|
||||
wspoll.revents = pollfds[context->pollfd_index].revents;
|
||||
#endif
|
||||
#ifdef LWS_LIBRARY_VERSION_NUMBER
|
||||
lws_service_fd(lws_get_context(context->wsi), &wspoll);
|
||||
#else
|
||||
lws_service_fd(context->ws_context, &wspoll);
|
||||
#endif
|
||||
continue;
|
||||
}
|
||||
#endif
|
||||
|
||||
#ifdef WITH_TLS
|
||||
#ifdef WITH_EPOLL
|
||||
if(events & EPOLLOUT ||
|
||||
#else
|
||||
if(pollfds[context->pollfd_index].revents & POLLOUT ||
|
||||
#endif
|
||||
context->want_write ||
|
||||
(context->ssl && context->state == mosq_cs_new)){
|
||||
#else
|
||||
#ifdef WITH_EPOLL
|
||||
if(events & EPOLLOUT){
|
||||
#else
|
||||
if(pollfds[context->pollfd_index].revents & POLLOUT){
|
||||
#endif
|
||||
#endif
|
||||
if(context->state == mosq_cs_connect_pending){
|
||||
len = sizeof(int);
|
||||
if(!getsockopt(context->sock, SOL_SOCKET, SO_ERROR, (char *)&err, &len)){
|
||||
if(err == 0){
|
||||
mosquitto__set_state(context, mosq_cs_new);
|
||||
#if defined(WITH_ADNS) && defined(WITH_BRIDGE)
|
||||
if(context->bridge){
|
||||
bridge__connect_step3(db, context);
|
||||
continue;
|
||||
}
|
||||
#endif
|
||||
}
|
||||
}else{
|
||||
do_disconnect(db, context, MOSQ_ERR_CONN_LOST);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
rc = packet__write(context);
|
||||
if(rc){
|
||||
do_disconnect(db, context, rc);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#ifdef WITH_EPOLL
|
||||
context = NULL;
|
||||
HASH_FIND(hh_sock, db->contexts_by_sock, &sock, sizeof(mosq_sock_t), context);
|
||||
if(!context) {
|
||||
return;
|
||||
}
|
||||
for (i=0;i<1;i++) {
|
||||
#else
|
||||
HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){
|
||||
if(context->pollfd_index < 0){
|
||||
continue;
|
||||
}
|
||||
#endif
|
||||
#ifdef WITH_WEBSOCKETS
|
||||
if(context->wsi){
|
||||
// Websocket are already handled above
|
||||
continue;
|
||||
}
|
||||
#endif
|
||||
|
||||
#ifdef WITH_TLS
|
||||
#ifdef WITH_EPOLL
|
||||
if(events & EPOLLIN ||
|
||||
#else
|
||||
if(pollfds[context->pollfd_index].revents & POLLIN ||
|
||||
#endif
|
||||
(context->ssl && context->state == mosq_cs_new)){
|
||||
#else
|
||||
#ifdef WITH_EPOLL
|
||||
if(events & EPOLLIN){
|
||||
#else
|
||||
if(pollfds[context->pollfd_index].revents & POLLIN){
|
||||
#endif
|
||||
#endif
|
||||
do{
|
||||
rc = packet__read(db, context);
|
||||
if(rc){
|
||||
do_disconnect(db, context, rc);
|
||||
continue;
|
||||
}
|
||||
}while(SSL_DATA_PENDING(context));
|
||||
}else{
|
||||
#ifdef WITH_EPOLL
|
||||
if(events & (EPOLLERR | EPOLLHUP)){
|
||||
#else
|
||||
if(context->pollfd_index >= 0 && pollfds[context->pollfd_index].revents & (POLLERR | POLLNVAL | POLLHUP)){
|
||||
#endif
|
||||
do_disconnect(db, context, MOSQ_ERR_CONN_LOST);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
@ -701,16 +701,25 @@ int bridge__connect_step2(struct mosquitto_db *db, struct mosquitto *context);
|
||||
int bridge__connect_step3(struct mosquitto_db *db, struct mosquitto *context);
|
||||
int bridge__on_connect(struct mosquitto_db *db, struct mosquitto *context);
|
||||
void bridge__packet_cleanup(struct mosquitto *context);
|
||||
#ifdef WITH_EPOLL
|
||||
void bridge_check(struct mosquitto_db *db);
|
||||
#else
|
||||
void bridge_check(struct mosquitto_db *db, struct pollfd *pollfds, int *pollfd_index);
|
||||
#endif
|
||||
int bridge__register_local_connections(struct mosquitto_db *db);
|
||||
int bridge__add_topic(struct mosquitto__bridge *bridge, const char *topic, enum mosquitto__bridge_direction direction, int qos, const char *local_prefix, const char *remote_prefix);
|
||||
int bridge__remap_topic_in(struct mosquitto *context, char **topic);
|
||||
#endif
|
||||
|
||||
/* ============================================================
|
||||
* IO multiplex related functions
|
||||
* ============================================================ */
|
||||
int mux__init(struct mosquitto_db *db, mosq_sock_t *listensock, int listensock_count);
|
||||
int mux__loop_prepare(void);
|
||||
int mux__add_out(struct mosquitto_db *db, struct mosquitto *context);
|
||||
int mux__remove_out(struct mosquitto_db *db, struct mosquitto *context);
|
||||
int mux__add_in(struct mosquitto_db *db, struct mosquitto *context);
|
||||
int mux__delete(struct mosquitto_db *db, struct mosquitto *context);
|
||||
int mux__wait(void);
|
||||
int mux__handle(struct mosquitto_db *db, mosq_sock_t *listensock, int listensock_count);
|
||||
int mux__cleanup(struct mosquitto_db *db);
|
||||
|
||||
/* ============================================================
|
||||
* Property related functions
|
||||
* ============================================================ */
|
||||
|
86
src/mux.c
Normal file
86
src/mux.c
Normal file
@ -0,0 +1,86 @@
|
||||
/*
|
||||
Copyright (c) 2009-2019 Roger Light <roger@atchoo.org>
|
||||
|
||||
All rights reserved. This program and the accompanying materials
|
||||
are made available under the terms of the Eclipse Public License v1.0
|
||||
and Eclipse Distribution License v1.0 which accompany this distribution.
|
||||
|
||||
The Eclipse Public License is available at
|
||||
http://www.eclipse.org/legal/epl-v10.html
|
||||
and the Eclipse Distribution License is available at
|
||||
http://www.eclipse.org/org/documents/edl-v10.php.
|
||||
|
||||
Contributors:
|
||||
Roger Light - initial implementation and documentation.
|
||||
Tatsuzo Osawa - Add epoll.
|
||||
*/
|
||||
|
||||
#include "mux.h"
|
||||
|
||||
int mux__init(struct mosquitto_db *db, mosq_sock_t *listensock, int listensock_count)
|
||||
{
|
||||
#ifdef WITH_EPOLL
|
||||
return mux_epoll__init(db, listensock, listensock_count);
|
||||
#else
|
||||
return mux_poll__init(db, listensock, listensock_count);
|
||||
#endif
|
||||
}
|
||||
|
||||
int mux__add_out(struct mosquitto_db *db, struct mosquitto *context)
|
||||
{
|
||||
#ifdef WITH_EPOLL
|
||||
return mux_epoll__add_out(db, context);
|
||||
#else
|
||||
return mux_poll__add_out(db, context);
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
int mux__remove_out(struct mosquitto_db *db, struct mosquitto *context)
|
||||
{
|
||||
#ifdef WITH_EPOLL
|
||||
return mux_epoll__remove_out(db, context);
|
||||
#else
|
||||
return mux_poll__remove_out(db, context);
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
int mux__add_in(struct mosquitto_db *db, struct mosquitto *context)
|
||||
{
|
||||
#ifdef WITH_EPOLL
|
||||
return mux_epoll__add_in(db, context);
|
||||
#else
|
||||
return mux_poll__add_in(db, context);
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
int mux__delete(struct mosquitto_db *db, struct mosquitto *context)
|
||||
{
|
||||
#ifdef WITH_EPOLL
|
||||
return mux_epoll__delete(db, context);
|
||||
#else
|
||||
return mux_poll__delete(db, context);
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
int mux__handle(struct mosquitto_db *db, mosq_sock_t *listensock, int listensock_count)
|
||||
{
|
||||
#ifdef WITH_EPOLL
|
||||
return mux_epoll__handle(db, listensock, listensock_count);
|
||||
#else
|
||||
return mux_poll__handle(db, listensock, listensock_count);
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
int mux__cleanup(struct mosquitto_db *db)
|
||||
{
|
||||
#ifdef WITH_EPOLL
|
||||
return mux_epoll__cleanup(db);
|
||||
#else
|
||||
return mux_poll__cleanup(db);
|
||||
#endif
|
||||
}
|
38
src/mux.h
Normal file
38
src/mux.h
Normal file
@ -0,0 +1,38 @@
|
||||
/*
|
||||
Copyright (c) 2020 Roger Light <roger@atchoo.org>
|
||||
|
||||
All rights reserved. This program and the accompanying materials
|
||||
are made available under the terms of the Eclipse Public License v1.0
|
||||
and Eclipse Distribution License v1.0 which accompany this distribution.
|
||||
|
||||
The Eclipse Public License is available at
|
||||
http://www.eclipse.org/legal/epl-v10.html
|
||||
and the Eclipse Distribution License is available at
|
||||
http://www.eclipse.org/org/documents/edl-v10.php.
|
||||
|
||||
Contributors:
|
||||
Roger Light - initial implementation and documentation.
|
||||
*/
|
||||
|
||||
#ifndef MUX_H
|
||||
#define MUX_H
|
||||
|
||||
#include "mosquitto_broker_internal.h"
|
||||
|
||||
int mux_epoll__init(struct mosquitto_db *db, mosq_sock_t *listensock, int listensock_count);
|
||||
int mux_epoll__add_out(struct mosquitto_db *db, struct mosquitto *context);
|
||||
int mux_epoll__remove_out(struct mosquitto_db *db, struct mosquitto *context);
|
||||
int mux_epoll__add_in(struct mosquitto_db *db, struct mosquitto *context);
|
||||
int mux_epoll__delete(struct mosquitto_db *db, struct mosquitto *context);
|
||||
int mux_epoll__handle(struct mosquitto_db *db, mosq_sock_t *listensock, int listensock_count);
|
||||
int mux_epoll__cleanup(struct mosquitto_db *db);
|
||||
|
||||
int mux_poll__init(struct mosquitto_db *db, mosq_sock_t *listensock, int listensock_count);
|
||||
int mux_poll__add_out(struct mosquitto_db *db, struct mosquitto *context);
|
||||
int mux_poll__remove_out(struct mosquitto_db *db, struct mosquitto *context);
|
||||
int mux_poll__add_in(struct mosquitto_db *db, struct mosquitto *context);
|
||||
int mux_poll__delete(struct mosquitto_db *db, struct mosquitto *context);
|
||||
int mux_poll__handle(struct mosquitto_db *db, mosq_sock_t *listensock, int listensock_count);
|
||||
int mux_poll__cleanup(struct mosquitto_db *db);
|
||||
|
||||
#endif
|
331
src/mux_epoll.c
Normal file
331
src/mux_epoll.c
Normal file
@ -0,0 +1,331 @@
|
||||
/*
|
||||
Copyright (c) 2009-2019 Roger Light <roger@atchoo.org>
|
||||
|
||||
All rights reserved. This program and the accompanying materials
|
||||
are made available under the terms of the Eclipse Public License v1.0
|
||||
and Eclipse Distribution License v1.0 which accompany this distribution.
|
||||
|
||||
The Eclipse Public License is available at
|
||||
http://www.eclipse.org/legal/epl-v10.html
|
||||
and the Eclipse Distribution License is available at
|
||||
http://www.eclipse.org/org/documents/edl-v10.php.
|
||||
|
||||
Contributors:
|
||||
Roger Light - initial implementation and documentation.
|
||||
Tatsuzo Osawa - Add epoll.
|
||||
*/
|
||||
|
||||
#include "config.h"
|
||||
|
||||
#ifdef WITH_EPOLL
|
||||
|
||||
#ifndef WIN32
|
||||
# define _GNU_SOURCE
|
||||
#endif
|
||||
|
||||
#include <assert.h>
|
||||
#ifndef WIN32
|
||||
#ifdef WITH_EPOLL
|
||||
#include <sys/epoll.h>
|
||||
#define MAX_EVENTS 1000
|
||||
#endif
|
||||
#include <poll.h>
|
||||
#include <unistd.h>
|
||||
#else
|
||||
#include <process.h>
|
||||
#include <winsock2.h>
|
||||
#include <ws2tcpip.h>
|
||||
#endif
|
||||
|
||||
#include <errno.h>
|
||||
#include <signal.h>
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
#ifndef WIN32
|
||||
# include <sys/socket.h>
|
||||
#endif
|
||||
#include <time.h>
|
||||
|
||||
#ifdef WITH_WEBSOCKETS
|
||||
# include <libwebsockets.h>
|
||||
#endif
|
||||
|
||||
#include "mosquitto_broker_internal.h"
|
||||
#include "memory_mosq.h"
|
||||
#include "packet_mosq.h"
|
||||
#include "send_mosq.h"
|
||||
#include "sys_tree.h"
|
||||
#include "time_mosq.h"
|
||||
#include "util_mosq.h"
|
||||
|
||||
#ifdef WIN32
|
||||
# error "epoll not supported on WIN32"
|
||||
#endif
|
||||
|
||||
static void loop_handle_reads_writes(struct mosquitto_db *db, mosq_sock_t sock, uint32_t events);
|
||||
|
||||
static sigset_t my_sigblock;
|
||||
static struct epoll_event ep_events[MAX_EVENTS];
|
||||
|
||||
int mux_epoll__init(struct mosquitto_db *db, mosq_sock_t *listensock, int listensock_count)
|
||||
{
|
||||
struct epoll_event ev;
|
||||
int i;
|
||||
|
||||
#ifndef WIN32
|
||||
sigemptyset(&my_sigblock);
|
||||
sigaddset(&my_sigblock, SIGINT);
|
||||
sigaddset(&my_sigblock, SIGTERM);
|
||||
sigaddset(&my_sigblock, SIGUSR1);
|
||||
sigaddset(&my_sigblock, SIGUSR2);
|
||||
sigaddset(&my_sigblock, SIGHUP);
|
||||
#endif
|
||||
|
||||
memset(&ep_events, 0, sizeof(struct epoll_event)*MAX_EVENTS);
|
||||
|
||||
db->epollfd = 0;
|
||||
if ((db->epollfd = epoll_create(MAX_EVENTS)) == -1) {
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll creating: %s", strerror(errno));
|
||||
return MOSQ_ERR_UNKNOWN;
|
||||
}
|
||||
memset(&ev, 0, sizeof(struct epoll_event));
|
||||
for(i=0; i<listensock_count; i++){
|
||||
ev.data.fd = listensock[i];
|
||||
ev.events = EPOLLIN;
|
||||
if (epoll_ctl(db->epollfd, EPOLL_CTL_ADD, listensock[i], &ev) == -1) {
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll initial registering: %s", strerror(errno));
|
||||
(void)close(db->epollfd);
|
||||
db->epollfd = 0;
|
||||
return MOSQ_ERR_UNKNOWN;
|
||||
}
|
||||
}
|
||||
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}
|
||||
|
||||
int mux_epoll__loop_setup(void)
|
||||
{
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int mux_epoll__add_out(struct mosquitto_db *db, struct mosquitto *context)
|
||||
{
|
||||
struct epoll_event ev;
|
||||
|
||||
memset(&ev, 0, sizeof(struct epoll_event));
|
||||
if(!(context->events & EPOLLOUT)) {
|
||||
ev.data.fd = context->sock;
|
||||
ev.events = EPOLLIN | EPOLLOUT;
|
||||
if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) {
|
||||
if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) {
|
||||
log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering to EPOLLOUT: %s", strerror(errno));
|
||||
}
|
||||
}
|
||||
context->events = EPOLLIN | EPOLLOUT;
|
||||
}
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int mux_epoll__remove_out(struct mosquitto_db *db, struct mosquitto *context)
|
||||
{
|
||||
struct epoll_event ev;
|
||||
|
||||
memset(&ev, 0, sizeof(struct epoll_event));
|
||||
if(context->events & EPOLLOUT) {
|
||||
ev.data.fd = context->sock;
|
||||
ev.events = EPOLLIN;
|
||||
if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) {
|
||||
if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) {
|
||||
log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering to EPOLLIN: %s", strerror(errno));
|
||||
}
|
||||
}
|
||||
context->events = EPOLLIN;
|
||||
}
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int mux_epoll__add_in(struct mosquitto_db *db, struct mosquitto *context)
|
||||
{
|
||||
struct epoll_event ev;
|
||||
|
||||
memset(&ev, 0, sizeof(struct epoll_event));
|
||||
ev.events = EPOLLIN;
|
||||
ev.data.fd = context->sock;
|
||||
if (epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) {
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll accepting: %s", strerror(errno));
|
||||
}
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int mux_epoll__delete(struct mosquitto_db *db, struct mosquitto *context)
|
||||
{
|
||||
struct epoll_event ev;
|
||||
|
||||
memset(&ev, 0, sizeof(struct epoll_event));
|
||||
if(context->sock != INVALID_SOCKET){
|
||||
if(epoll_ctl(db->epollfd, EPOLL_CTL_DEL, context->sock, &ev) == -1){
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int mux_epoll__handle(struct mosquitto_db *db, mosq_sock_t *listensock, int listensock_count)
|
||||
{
|
||||
int i;
|
||||
int j;
|
||||
struct epoll_event ev;
|
||||
sigset_t origsig;
|
||||
struct mosquitto *context;
|
||||
int fdcount;
|
||||
|
||||
memset(&ev, 0, sizeof(struct epoll_event));
|
||||
sigprocmask(SIG_SETMASK, &my_sigblock, &origsig);
|
||||
fdcount = epoll_wait(db->epollfd, ep_events, MAX_EVENTS, 100);
|
||||
sigprocmask(SIG_SETMASK, &origsig, NULL);
|
||||
|
||||
switch(fdcount){
|
||||
case -1:
|
||||
if(errno != EINTR){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll waiting: %s.", strerror(errno));
|
||||
}
|
||||
break;
|
||||
case 0:
|
||||
break;
|
||||
default:
|
||||
for(i=0; i<fdcount; i++){
|
||||
for(j=0; j<listensock_count; j++){
|
||||
if (ep_events[i].data.fd == listensock[j]) {
|
||||
if (ep_events[i].events & (EPOLLIN | EPOLLPRI)){
|
||||
while((ev.data.fd = net__socket_accept(db, listensock[j])) != -1){
|
||||
context = NULL;
|
||||
HASH_FIND(hh_sock, db->contexts_by_sock, &(ev.data.fd), sizeof(mosq_sock_t), context);
|
||||
if(!context) {
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll accepting: no context");
|
||||
}
|
||||
context->events = EPOLLIN;
|
||||
mux__add_in(db, context);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (j == listensock_count) {
|
||||
loop_handle_reads_writes(db, ep_events[i].data.fd, ep_events[i].events);
|
||||
}
|
||||
}
|
||||
}
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int mux_epoll__cleanup(struct mosquitto_db *db)
|
||||
{
|
||||
(void)close(db->epollfd);
|
||||
db->epollfd = 0;
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
static void loop_handle_reads_writes(struct mosquitto_db *db, mosq_sock_t sock, uint32_t events)
|
||||
{
|
||||
struct mosquitto *context;
|
||||
int err;
|
||||
socklen_t len;
|
||||
int rc;
|
||||
int i;
|
||||
|
||||
context = NULL;
|
||||
HASH_FIND(hh_sock, db->contexts_by_sock, &sock, sizeof(mosq_sock_t), context);
|
||||
if(!context) {
|
||||
return;
|
||||
}
|
||||
for (i=0;i<1;i++) {
|
||||
|
||||
#ifdef WITH_WEBSOCKETS
|
||||
if(context->wsi){
|
||||
struct lws_pollfd wspoll;
|
||||
wspoll.fd = context->sock;
|
||||
wspoll.events = context->events;
|
||||
wspoll.revents = events;
|
||||
#ifdef LWS_LIBRARY_VERSION_NUMBER
|
||||
lws_service_fd(lws_get_context(context->wsi), &wspoll);
|
||||
#else
|
||||
lws_service_fd(context->ws_context, &wspoll);
|
||||
#endif
|
||||
continue;
|
||||
}
|
||||
#endif
|
||||
|
||||
#ifdef WITH_TLS
|
||||
if(events & EPOLLOUT ||
|
||||
context->want_write ||
|
||||
(context->ssl && context->state == mosq_cs_new)){
|
||||
#else
|
||||
if(events & EPOLLOUT){
|
||||
#endif
|
||||
if(context->state == mosq_cs_connect_pending){
|
||||
len = sizeof(int);
|
||||
if(!getsockopt(context->sock, SOL_SOCKET, SO_ERROR, (char *)&err, &len)){
|
||||
if(err == 0){
|
||||
mosquitto__set_state(context, mosq_cs_new);
|
||||
#if defined(WITH_ADNS) && defined(WITH_BRIDGE)
|
||||
if(context->bridge){
|
||||
bridge__connect_step3(db, context);
|
||||
continue;
|
||||
}
|
||||
#endif
|
||||
}
|
||||
}else{
|
||||
do_disconnect(db, context, MOSQ_ERR_CONN_LOST);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
rc = packet__write(context);
|
||||
if(rc){
|
||||
do_disconnect(db, context, rc);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
context = NULL;
|
||||
HASH_FIND(hh_sock, db->contexts_by_sock, &sock, sizeof(mosq_sock_t), context);
|
||||
if(!context) {
|
||||
return;
|
||||
}
|
||||
for (i=0;i<1;i++) {
|
||||
#ifdef WITH_WEBSOCKETS
|
||||
if(context->wsi){
|
||||
// Websocket are already handled above
|
||||
continue;
|
||||
}
|
||||
#endif
|
||||
|
||||
#ifdef WITH_TLS
|
||||
if(events & EPOLLIN ||
|
||||
(context->ssl && context->state == mosq_cs_new)){
|
||||
#else
|
||||
if(events & EPOLLIN){
|
||||
#endif
|
||||
do{
|
||||
rc = packet__read(db, context);
|
||||
if(rc){
|
||||
do_disconnect(db, context, rc);
|
||||
continue;
|
||||
}
|
||||
}while(SSL_DATA_PENDING(context));
|
||||
}else{
|
||||
if(events & (EPOLLERR | EPOLLHUP)){
|
||||
do_disconnect(db, context, MOSQ_ERR_CONN_LOST);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
323
src/mux_poll.c
Normal file
323
src/mux_poll.c
Normal file
@ -0,0 +1,323 @@
|
||||
/*
|
||||
Copyright (c) 2009-2019 Roger Light <roger@atchoo.org>
|
||||
|
||||
All rights reserved. This program and the accompanying materials
|
||||
are made available under the terms of the Eclipse Public License v1.0
|
||||
and Eclipse Distribution License v1.0 which accompany this distribution.
|
||||
|
||||
The Eclipse Public License is available at
|
||||
http://www.eclipse.org/legal/epl-v10.html
|
||||
and the Eclipse Distribution License is available at
|
||||
http://www.eclipse.org/org/documents/edl-v10.php.
|
||||
|
||||
Contributors:
|
||||
Roger Light - initial implementation and documentation.
|
||||
Tatsuzo Osawa - Add epoll.
|
||||
*/
|
||||
|
||||
#include "config.h"
|
||||
|
||||
#ifndef WITH_EPOLL
|
||||
|
||||
#ifndef WIN32
|
||||
# define _GNU_SOURCE
|
||||
#endif
|
||||
|
||||
#include <assert.h>
|
||||
#ifndef WIN32
|
||||
#include <poll.h>
|
||||
#include <unistd.h>
|
||||
#else
|
||||
#include <process.h>
|
||||
#include <winsock2.h>
|
||||
#include <ws2tcpip.h>
|
||||
#endif
|
||||
|
||||
#include <errno.h>
|
||||
#include <signal.h>
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
#ifndef WIN32
|
||||
# include <sys/socket.h>
|
||||
#endif
|
||||
#include <time.h>
|
||||
|
||||
#ifdef WITH_WEBSOCKETS
|
||||
# include <libwebsockets.h>
|
||||
#endif
|
||||
|
||||
#include "mosquitto_broker_internal.h"
|
||||
#include "memory_mosq.h"
|
||||
#include "packet_mosq.h"
|
||||
#include "send_mosq.h"
|
||||
#include "sys_tree.h"
|
||||
#include "time_mosq.h"
|
||||
#include "util_mosq.h"
|
||||
#include "mux.h"
|
||||
|
||||
static void loop_handle_reads_writes(struct mosquitto_db *db, struct pollfd *pollfds);
|
||||
|
||||
static struct pollfd *pollfds = NULL;
|
||||
static int pollfd_max;
|
||||
static sigset_t my_sigblock;
|
||||
|
||||
int mux_poll__init(struct mosquitto_db *db, mosq_sock_t *listensock, int listensock_count)
|
||||
{
|
||||
int i;
|
||||
int pollfd_index = 0;
|
||||
|
||||
#ifndef WIN32
|
||||
sigemptyset(&my_sigblock);
|
||||
sigaddset(&my_sigblock, SIGINT);
|
||||
sigaddset(&my_sigblock, SIGTERM);
|
||||
sigaddset(&my_sigblock, SIGUSR1);
|
||||
sigaddset(&my_sigblock, SIGUSR2);
|
||||
sigaddset(&my_sigblock, SIGHUP);
|
||||
#endif
|
||||
|
||||
#ifdef WIN32
|
||||
pollfd_max = _getmaxstdio();
|
||||
#else
|
||||
pollfd_max = sysconf(_SC_OPEN_MAX);
|
||||
#endif
|
||||
|
||||
pollfds = mosquitto__malloc(sizeof(struct pollfd)*pollfd_max);
|
||||
if(!pollfds){
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
||||
return MOSQ_ERR_NOMEM;
|
||||
}
|
||||
memset(pollfds, -1, sizeof(struct pollfd)*pollfd_max);
|
||||
|
||||
for(i=0; i<listensock_count; i++){
|
||||
pollfds[pollfd_index].fd = listensock[i];
|
||||
pollfds[pollfd_index].events = POLLIN;
|
||||
pollfds[pollfd_index].revents = 0;
|
||||
pollfd_index++;
|
||||
}
|
||||
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int mux_poll__add_out(struct mosquitto_db *db, struct mosquitto *context)
|
||||
{
|
||||
int i;
|
||||
|
||||
if(context->pollfd_index != -1){
|
||||
pollfds[context->pollfd_index].fd = context->sock;
|
||||
pollfds[context->pollfd_index].events = POLLIN | POLLOUT;
|
||||
pollfds[context->pollfd_index].revents = 0;
|
||||
}else{
|
||||
for(i=0; i<pollfd_max; i++){
|
||||
if(pollfds[i].fd == -1){
|
||||
pollfds[i].fd = context->sock;
|
||||
pollfds[i].events = POLLIN | POLLOUT;
|
||||
pollfds[i].revents = 0;
|
||||
context->pollfd_index = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int mux_poll__remove_out(struct mosquitto_db *db, struct mosquitto *context)
|
||||
{
|
||||
return mux_poll__add_in(db, context);
|
||||
}
|
||||
|
||||
|
||||
int mux_poll__add_in(struct mosquitto_db *db, struct mosquitto *context)
|
||||
{
|
||||
int i;
|
||||
|
||||
if(context->pollfd_index != -1){
|
||||
pollfds[context->pollfd_index].fd = context->sock;
|
||||
pollfds[context->pollfd_index].events = POLLIN | POLLPRI;
|
||||
pollfds[context->pollfd_index].revents = 0;
|
||||
}else{
|
||||
for(i=0; i<pollfd_max; i++){
|
||||
if(pollfds[i].fd == -1){
|
||||
pollfds[i].fd = context->sock;
|
||||
pollfds[i].events = POLLIN;
|
||||
pollfds[i].revents = 0;
|
||||
context->pollfd_index = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}
|
||||
|
||||
int mux_poll__delete(struct mosquitto_db *db, struct mosquitto *context)
|
||||
{
|
||||
if(context->pollfd_index != -1){
|
||||
pollfds[context->pollfd_index].fd = -1;
|
||||
pollfds[context->pollfd_index].events = 0;
|
||||
pollfds[context->pollfd_index].revents = 0;
|
||||
context->pollfd_index = -1;
|
||||
}
|
||||
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
int mux_poll__handle(struct mosquitto_db *db, mosq_sock_t *listensock, int listensock_count)
|
||||
{
|
||||
struct mosquitto *context;
|
||||
mosq_sock_t sock;
|
||||
int i;
|
||||
int fdcount;
|
||||
sigset_t origsig;
|
||||
|
||||
#ifndef WIN32
|
||||
sigprocmask(SIG_SETMASK, &my_sigblock, &origsig);
|
||||
fdcount = poll(pollfds, pollfd_max, 100);
|
||||
sigprocmask(SIG_SETMASK, &origsig, NULL);
|
||||
#else
|
||||
fdcount = WSAPoll(pollfds, pollfd_max, 100);
|
||||
#endif
|
||||
if(fdcount == -1){
|
||||
# ifdef WIN32
|
||||
if(WSAGetLastError() == WSAEINVAL){
|
||||
/* WSAPoll() immediately returns an error if it is not given
|
||||
* any sockets to wait on. This can happen if we only have
|
||||
* websockets listeners. Sleep a little to prevent a busy loop.
|
||||
*/
|
||||
Sleep(10);
|
||||
}else
|
||||
# endif
|
||||
{
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error in poll: %s.", strerror(errno));
|
||||
}
|
||||
}else{
|
||||
loop_handle_reads_writes(db, pollfds);
|
||||
|
||||
for(i=0; i<listensock_count; i++){
|
||||
if(pollfds[i].revents & (POLLIN | POLLPRI)){
|
||||
while((sock = net__socket_accept(db, listensock[i])) != -1){
|
||||
context = NULL;
|
||||
HASH_FIND(hh_sock, db->contexts_by_sock, &sock, sizeof(mosq_sock_t), context);
|
||||
if(!context) {
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error in poll accepting: no context");
|
||||
}
|
||||
context->pollfd_index = -1;
|
||||
mux__add_in(db, context);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int mux_poll__cleanup(struct mosquitto_db *db)
|
||||
{
|
||||
mosquitto__free(pollfds);
|
||||
pollfds = NULL;
|
||||
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
static void loop_handle_reads_writes(struct mosquitto_db *db, struct pollfd *pollfds)
|
||||
{
|
||||
struct mosquitto *context, *ctxt_tmp;
|
||||
int err;
|
||||
socklen_t len;
|
||||
int rc;
|
||||
|
||||
HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){
|
||||
if(context->pollfd_index < 0){
|
||||
continue;
|
||||
}
|
||||
|
||||
assert(pollfds[context->pollfd_index].fd == context->sock);
|
||||
|
||||
#ifdef WITH_WEBSOCKETS
|
||||
if(context->wsi){
|
||||
struct lws_pollfd wspoll;
|
||||
wspoll.fd = pollfds[context->pollfd_index].fd;
|
||||
wspoll.events = pollfds[context->pollfd_index].events;
|
||||
wspoll.revents = pollfds[context->pollfd_index].revents;
|
||||
#ifdef LWS_LIBRARY_VERSION_NUMBER
|
||||
lws_service_fd(lws_get_context(context->wsi), &wspoll);
|
||||
#else
|
||||
lws_service_fd(context->ws_context, &wspoll);
|
||||
#endif
|
||||
continue;
|
||||
}
|
||||
#endif
|
||||
|
||||
#ifdef WITH_TLS
|
||||
if(pollfds[context->pollfd_index].revents & POLLOUT ||
|
||||
context->want_write ||
|
||||
(context->ssl && context->state == mosq_cs_new)){
|
||||
#else
|
||||
if(pollfds[context->pollfd_index].revents & POLLOUT){
|
||||
#endif
|
||||
if(context->state == mosq_cs_connect_pending){
|
||||
len = sizeof(int);
|
||||
if(!getsockopt(context->sock, SOL_SOCKET, SO_ERROR, (char *)&err, &len)){
|
||||
if(err == 0){
|
||||
mosquitto__set_state(context, mosq_cs_new);
|
||||
#if defined(WITH_ADNS) && defined(WITH_BRIDGE)
|
||||
if(context->bridge){
|
||||
bridge__connect_step3(db, context);
|
||||
continue;
|
||||
}
|
||||
#endif
|
||||
}
|
||||
}else{
|
||||
do_disconnect(db, context, MOSQ_ERR_CONN_LOST);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
rc = packet__write(context);
|
||||
if(rc){
|
||||
do_disconnect(db, context, rc);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){
|
||||
if(context->pollfd_index < 0){
|
||||
continue;
|
||||
}
|
||||
#ifdef WITH_WEBSOCKETS
|
||||
if(context->wsi){
|
||||
// Websocket are already handled above
|
||||
continue;
|
||||
}
|
||||
#endif
|
||||
|
||||
#ifdef WITH_TLS
|
||||
if(pollfds[context->pollfd_index].revents & POLLIN ||
|
||||
(context->ssl && context->state == mosq_cs_new)){
|
||||
#else
|
||||
if(pollfds[context->pollfd_index].revents & POLLIN){
|
||||
#endif
|
||||
do{
|
||||
rc = packet__read(db, context);
|
||||
if(rc){
|
||||
do_disconnect(db, context, rc);
|
||||
continue;
|
||||
}
|
||||
}while(SSL_DATA_PENDING(context));
|
||||
}else{
|
||||
if(context->pollfd_index >= 0 && pollfds[context->pollfd_index].revents & (POLLERR | POLLNVAL | POLLHUP)){
|
||||
do_disconnect(db, context, MOSQ_ERR_CONN_LOST);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#endif
|
@ -254,6 +254,7 @@ static int callback_mqtt(struct libwebsocket_context *context,
|
||||
HASH_DELETE(hh_sock, db->contexts_by_sock, mosq);
|
||||
mosq->sock = INVALID_SOCKET;
|
||||
mosq->pollfd_index = -1;
|
||||
mux__delete(db, mosq);
|
||||
}
|
||||
mosq->wsi = NULL;
|
||||
#ifdef WITH_TLS
|
||||
|
Loading…
Reference in New Issue
Block a user