diff --git a/src/bridge.c b/src/bridge.c index 7b46daf6..1c06de10 100644 --- a/src/bridge.c +++ b/src/bridge.c @@ -29,6 +29,18 @@ Contributors: #include #endif +#ifndef WIN32 +#ifdef WITH_EPOLL +#include +#endif +#include +#include +#else +#include +#include +#include +#endif + #include "mqtt_protocol.h" #include "mosquitto.h" #include "mosquitto_broker_internal.h" @@ -425,6 +437,67 @@ int bridge__connect(struct mosquitto_db *db, struct mosquitto *context) #endif +int bridge__register_local_connections(struct mosquitto_db *db) +{ +#ifdef WITH_EPOLL + struct epoll_event ev; + struct mosquitto *context, *ctxt_tmp = NULL; + + memset(&ev, 0, sizeof(struct epoll_event)); + + HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){ + if(context->bridge){ + ev.data.fd = context->sock; + ev.events = EPOLLIN; + context->events = EPOLLIN; + if (epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) { + log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll initial registering bridge: %s", strerror(errno)); + (void)close(db->epollfd); + db->epollfd = 0; + return MOSQ_ERR_UNKNOWN; + } + } + } +#endif + return MOSQ_ERR_SUCCESS; +} + + +void bridge__cleanup(struct mosquitto_db *db, struct mosquitto *context) +{ + int i; + + for(i=0; ibridge_count; i++){ + if(db->bridges[i] == context){ + db->bridges[i] = NULL; + } + } + mosquitto__free(context->bridge->local_clientid); + context->bridge->local_clientid = NULL; + + mosquitto__free(context->bridge->local_username); + context->bridge->local_username = NULL; + + mosquitto__free(context->bridge->local_password); + context->bridge->local_password = NULL; + + if(context->bridge->remote_clientid != context->id){ + mosquitto__free(context->bridge->remote_clientid); + } + context->bridge->remote_clientid = NULL; + + if(context->bridge->remote_username != context->username){ + mosquitto__free(context->bridge->remote_username); + } + context->bridge->remote_username = NULL; + + if(context->bridge->remote_password != context->password){ + mosquitto__free(context->bridge->remote_password); + } + context->bridge->remote_password = NULL; +} + + void bridge__packet_cleanup(struct mosquitto *context) { struct mosquitto__packet *packet; @@ -486,4 +559,262 @@ static void bridge__backoff_reset(struct mosquitto *context) } } +#ifdef WITH_EPOLL +void bridge_check(struct mosquitto_db *db) +#else +void bridge_check(struct mosquitto_db *db, struct pollfd *pollfds, int *pollfd_index) +#endif +{ + static time_t last_check = 0; + time_t now; + struct mosquitto *context = NULL; + socklen_t len; +#ifdef WITH_EPOLL + struct epoll_event ev; +#endif + int i; + int rc; + int err; + + now = mosquitto_time(); + + if(now <= last_check) return; + +#ifdef WITH_EPOLL + memset(&ev, 0, sizeof(struct epoll_event)); +#endif + for(i=0; ibridge_count; i++){ + if(!db->bridges[i]) continue; + + context = db->bridges[i]; + + if(context->sock != INVALID_SOCKET){ + mosquitto__check_keepalive(db, context); + + /* Check for bridges that are not round robin and not currently + * connected to their primary broker. */ + if(context->bridge->round_robin == false + && context->bridge->cur_address != 0 + && context->bridge->primary_retry + && now > context->bridge->primary_retry){ + + if(context->bridge->primary_retry_sock == INVALID_SOCKET){ + rc = net__try_connect(context->bridge->addresses[0].address, + context->bridge->addresses[0].port, + &context->bridge->primary_retry_sock, NULL, false); + + if(rc == 0){ + COMPAT_CLOSE(context->bridge->primary_retry_sock); + context->bridge->primary_retry_sock = INVALID_SOCKET; + context->bridge->primary_retry = 0; + net__socket_close(db, context); + context->bridge->cur_address = 0; + } + }else{ + len = sizeof(int); + if(!getsockopt(context->bridge->primary_retry_sock, SOL_SOCKET, SO_ERROR, (char *)&err, &len)){ + if(err == 0){ + COMPAT_CLOSE(context->bridge->primary_retry_sock); + context->bridge->primary_retry_sock = INVALID_SOCKET; + context->bridge->primary_retry = 0; + net__socket_close(db, context); + context->bridge->cur_address = context->bridge->address_count-1; + }else{ + COMPAT_CLOSE(context->bridge->primary_retry_sock); + context->bridge->primary_retry_sock = INVALID_SOCKET; + context->bridge->primary_retry = now+5; + } + }else{ + COMPAT_CLOSE(context->bridge->primary_retry_sock); + context->bridge->primary_retry_sock = INVALID_SOCKET; + context->bridge->primary_retry = now+5; + } + } + } + } + + + + if(context->sock == INVALID_SOCKET){ + /* Want to try to restart the bridge connection */ + if(!context->bridge->restart_t){ + context->bridge->restart_t = now+context->bridge->restart_timeout; + context->bridge->cur_address++; + if(context->bridge->cur_address == context->bridge->address_count){ + context->bridge->cur_address = 0; + } + }else{ + if((context->bridge->start_type == bst_lazy && context->bridge->lazy_reconnect) + || (context->bridge->start_type == bst_automatic && now > context->bridge->restart_t)){ + +#if defined(__GLIBC__) && defined(WITH_ADNS) + if(context->adns){ + /* Connection attempted, waiting on DNS lookup */ + rc = gai_error(context->adns); + if(rc == EAI_INPROGRESS){ + /* Just keep on waiting */ + }else if(rc == 0){ + rc = bridge__connect_step2(db, context); + if(rc == MOSQ_ERR_SUCCESS){ +#ifdef WITH_EPOLL + ev.data.fd = context->sock; + ev.events = EPOLLIN; + if(context->current_out_packet){ + ev.events |= EPOLLOUT; + } + if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) { + if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) { + log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering bridge: %s", strerror(errno)); + } + }else{ + context->events = ev.events; + } +#else + pollfds[*pollfd_index].fd = context->sock; + pollfds[*pollfd_index].events = POLLIN; + pollfds[*pollfd_index].revents = 0; + if(context->current_out_packet){ + pollfds[*pollfd_index].events |= POLLOUT; + } + context->pollfd_index = *pollfd_index; + (*pollfd_index)++; +#endif + }else if(rc == MOSQ_ERR_CONN_PENDING){ + context->bridge->restart_t = 0; + }else{ + context->bridge->cur_address++; + if(context->bridge->cur_address == context->bridge->address_count){ + context->bridge->cur_address = 0; + } + context->bridge->restart_t = 0; + } + }else{ + /* Need to retry */ + if(context->adns->ar_result){ + freeaddrinfo(context->adns->ar_result); + } + mosquitto__free(context->adns); + context->adns = NULL; + context->bridge->restart_t = 0; + } + }else{ +#ifdef WITH_EPOLL + /* clean any events triggered in previous connection */ + context->events = 0; +#endif + rc = bridge__connect_step1(db, context); + if(rc){ + context->bridge->cur_address++; + if(context->bridge->cur_address == context->bridge->address_count){ + context->bridge->cur_address = 0; + } + }else{ + /* Short wait for ADNS lookup */ + context->bridge->restart_t = 1; + } + } +#else + { + rc = bridge__connect(db, context); + context->bridge->restart_t = 0; + if(rc == MOSQ_ERR_SUCCESS){ + if(context->bridge->round_robin == false && context->bridge->cur_address != 0){ + context->bridge->primary_retry = now + 5; + } +#ifdef WITH_EPOLL + ev.data.fd = context->sock; + ev.events = EPOLLIN; + if(context->current_out_packet){ + ev.events |= EPOLLOUT; + } + if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) { + if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) { + log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering bridge: %s", strerror(errno)); + } + }else{ + context->events = ev.events; + } +#else + pollfds[*pollfd_index].fd = context->sock; + pollfds[*pollfd_index].events = POLLIN; + pollfds[*pollfd_index].revents = 0; + if(context->current_out_packet){ + pollfds[*pollfd_index].events |= POLLOUT; + } + context->pollfd_index = *pollfd_index; + (*pollfd_index)++; +#endif + }else{ + context->bridge->cur_address++; + if(context->bridge->cur_address == context->bridge->address_count){ + context->bridge->cur_address = 0; + } + } + } +#endif + } + } + } + } +} + + +int bridge__remap_topic(struct mosquitto *context, char **topic) +{ + struct mosquitto__bridge_topic *cur_topic; + char *topic_temp; + int i; + int len; + int rc; + bool match; + + if(context->bridge && context->bridge->topics && context->bridge->topic_remapping){ + for(i=0; ibridge->topic_count; i++){ + cur_topic = &context->bridge->topics[i]; + if((cur_topic->direction == bd_both || cur_topic->direction == bd_in) + && (cur_topic->remote_prefix || cur_topic->local_prefix)){ + + /* Topic mapping required on this topic if the message matches */ + + rc = mosquitto_topic_matches_sub(cur_topic->remote_topic, *topic, &match); + if(rc){ + mosquitto__free(*topic); + return rc; + } + if(match){ + if(cur_topic->remote_prefix){ + /* This prefix needs removing. */ + if(!strncmp(cur_topic->remote_prefix, *topic, strlen(cur_topic->remote_prefix))){ + topic_temp = mosquitto__strdup((*topic)+strlen(cur_topic->remote_prefix)); + if(!topic_temp){ + mosquitto__free(*topic); + return MOSQ_ERR_NOMEM; + } + mosquitto__free(*topic); + *topic = topic_temp; + } + } + + if(cur_topic->local_prefix){ + /* This prefix needs adding. */ + len = strlen(*topic) + strlen(cur_topic->local_prefix)+1; + topic_temp = mosquitto__malloc(len+1); + if(!topic_temp){ + mosquitto__free(*topic); + return MOSQ_ERR_NOMEM; + } + snprintf(topic_temp, len, "%s%s", cur_topic->local_prefix, *topic); + topic_temp[len] = '\0'; + + mosquitto__free(*topic); + *topic = topic_temp; + } + break; + } + } + } + } + + return MOSQ_ERR_SUCCESS; +} #endif diff --git a/src/context.c b/src/context.c index a78d9f84..96792f5a 100644 --- a/src/context.c +++ b/src/context.c @@ -97,42 +97,12 @@ struct mosquitto *context__init(struct mosquitto_db *db, mosq_sock_t sock) void context__cleanup(struct mosquitto_db *db, struct mosquitto *context, bool do_free) { struct mosquitto__packet *packet; -#ifdef WITH_BRIDGE - int i; -#endif if(!context) return; #ifdef WITH_BRIDGE if(context->bridge){ - for(i=0; ibridge_count; i++){ - if(db->bridges[i] == context){ - db->bridges[i] = NULL; - } - } - mosquitto__free(context->bridge->local_clientid); - context->bridge->local_clientid = NULL; - - mosquitto__free(context->bridge->local_username); - context->bridge->local_username = NULL; - - mosquitto__free(context->bridge->local_password); - context->bridge->local_password = NULL; - - if(context->bridge->remote_clientid != context->id){ - mosquitto__free(context->bridge->remote_clientid); - } - context->bridge->remote_clientid = NULL; - - if(context->bridge->remote_username != context->username){ - mosquitto__free(context->bridge->remote_username); - } - context->bridge->remote_username = NULL; - - if(context->bridge->remote_password != context->password){ - mosquitto__free(context->bridge->remote_password); - } - context->bridge->remote_password = NULL; + bridge__cleanup(db, context); } #endif @@ -235,11 +205,7 @@ void context__disconnect(struct mosquitto_db *db, struct mosquitto *context) if(context->session_expiry_interval == 0){ context__send_will(db, context); -#ifdef WITH_BRIDGE - if(!context->bridge) -#endif - { - + if(context->bridge == NULL){ if(context->will_delay_interval == 0){ /* This will be done later, after the will is published for delay>0. */ context__add_to_disused(db, context); diff --git a/src/handle_publish.c b/src/handle_publish.c index 8b76fd5e..dae67bba 100644 --- a/src/handle_publish.c +++ b/src/handle_publish.c @@ -54,13 +54,6 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context) int topic_alias = -1; uint8_t reason_code = 0; -#ifdef WITH_BRIDGE - char *topic_temp; - int i; - struct mosquitto__bridge_topic *cur_topic; - bool match; -#endif - if(context->state != mosq_cs_connected){ return MOSQ_ERR_PROTOCOL; } @@ -191,52 +184,9 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context) } #ifdef WITH_BRIDGE - if(context->bridge && context->bridge->topics && context->bridge->topic_remapping){ - for(i=0; ibridge->topic_count; i++){ - cur_topic = &context->bridge->topics[i]; - if((cur_topic->direction == bd_both || cur_topic->direction == bd_in) - && (cur_topic->remote_prefix || cur_topic->local_prefix)){ + rc = bridge__remap_topic(context, &topic); + if(rc) return rc; - /* Topic mapping required on this topic if the message matches */ - - rc = mosquitto_topic_matches_sub(cur_topic->remote_topic, topic, &match); - if(rc){ - mosquitto__free(topic); - return rc; - } - if(match){ - if(cur_topic->remote_prefix){ - /* This prefix needs removing. */ - if(!strncmp(cur_topic->remote_prefix, topic, strlen(cur_topic->remote_prefix))){ - topic_temp = mosquitto__strdup(topic+strlen(cur_topic->remote_prefix)); - if(!topic_temp){ - mosquitto__free(topic); - return MOSQ_ERR_NOMEM; - } - mosquitto__free(topic); - topic = topic_temp; - } - } - - if(cur_topic->local_prefix){ - /* This prefix needs adding. */ - len = strlen(topic) + strlen(cur_topic->local_prefix)+1; - topic_temp = mosquitto__malloc(len+1); - if(!topic_temp){ - mosquitto__free(topic); - return MOSQ_ERR_NOMEM; - } - snprintf(topic_temp, len, "%s%s", cur_topic->local_prefix, topic); - topic_temp[len] = '\0'; - - mosquitto__free(topic); - topic = topic_temp; - } - break; - } - } - } - } #endif if(mosquitto_pub_topic_check(topic) != MOSQ_ERR_SUCCESS){ /* Invalid publish topic, just swallow it. */ diff --git a/src/loop.c b/src/loop.c index fc18399e..b2661345 100644 --- a/src/loop.c +++ b/src/loop.c @@ -118,17 +118,13 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li #endif int i; #ifdef WITH_EPOLL + int rc; int j; struct epoll_event ev, events[MAX_EVENTS]; #else struct pollfd *pollfds = NULL; int pollfd_index; int pollfd_max; -#endif -#ifdef WITH_BRIDGE - int rc; - int err; - socklen_t len; #endif time_t expiration_check_time = 0; char *id; @@ -178,21 +174,10 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li return MOSQ_ERR_UNKNOWN; } } -#ifdef WITH_BRIDGE - HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){ - if(context->bridge){ - ev.data.fd = context->sock; - ev.events = EPOLLIN; - context->events = EPOLLIN; - if (epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) { - log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll initial registering bridge: %s", strerror(errno)); - (void)close(db->epollfd); - db->epollfd = 0; - return MOSQ_ERR_UNKNOWN; - } - } - } -#endif +# ifdef WITH_BRIDGE + rc = bridge__register_local_connections(db); + if(rc) return rc; +# endif #endif while(run){ @@ -226,50 +211,6 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li context->pollfd_index = -1; if(context->sock != INVALID_SOCKET){ -#ifdef WITH_BRIDGE - if(context->bridge){ - mosquitto__check_keepalive(db, context); - if(context->bridge->round_robin == false - && context->bridge->cur_address != 0 - && context->bridge->primary_retry - && now > context->bridge->primary_retry){ - - if(context->bridge->primary_retry_sock == INVALID_SOCKET){ - rc = net__try_connect(context->bridge->addresses[0].address, - context->bridge->addresses[0].port, - &context->bridge->primary_retry_sock, NULL, false); - - if(rc == 0){ - COMPAT_CLOSE(context->bridge->primary_retry_sock); - context->bridge->primary_retry_sock = INVALID_SOCKET; - context->bridge->primary_retry = 0; - net__socket_close(db, context); - context->bridge->cur_address = 0; - } - }else{ - len = sizeof(int); - if(!getsockopt(context->bridge->primary_retry_sock, SOL_SOCKET, SO_ERROR, (char *)&err, &len)){ - if(err == 0){ - COMPAT_CLOSE(context->bridge->primary_retry_sock); - context->bridge->primary_retry_sock = INVALID_SOCKET; - context->bridge->primary_retry = 0; - net__socket_close(db, context); - context->bridge->cur_address = context->bridge->address_count-1; - }else{ - COMPAT_CLOSE(context->bridge->primary_retry_sock); - context->bridge->primary_retry_sock = INVALID_SOCKET; - context->bridge->primary_retry = now+5; - } - }else{ - COMPAT_CLOSE(context->bridge->primary_retry_sock); - context->bridge->primary_retry_sock = INVALID_SOCKET; - context->bridge->primary_retry = now+5; - } - } - } - } -#endif - /* Local bridges never time out in this fashion. */ if(!(context->keepalive) || context->bridge @@ -323,141 +264,15 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li } } + #ifdef WITH_BRIDGE - time_count = 0; - for(i=0; ibridge_count; i++){ - if(!db->bridges[i]) continue; +# ifdef WITH_EPOLL + bridge_check(db); +# else + bridge_check(db, pollfds, &pollfd_index); +# endif +#endif - context = db->bridges[i]; - - if(context->sock == INVALID_SOCKET){ - if(time_count > 0){ - time_count--; - }else{ - time_count = 1000; - now = mosquitto_time(); - } - /* Want to try to restart the bridge connection */ - if(!context->bridge->restart_t){ - context->bridge->restart_t = now+context->bridge->restart_timeout; - context->bridge->cur_address++; - if(context->bridge->cur_address == context->bridge->address_count){ - context->bridge->cur_address = 0; - } - }else{ - if((context->bridge->start_type == bst_lazy && context->bridge->lazy_reconnect) - || (context->bridge->start_type == bst_automatic && now > context->bridge->restart_t)){ - -#if defined(__GLIBC__) && defined(WITH_ADNS) - if(context->adns){ - /* Connection attempted, waiting on DNS lookup */ - rc = gai_error(context->adns); - if(rc == EAI_INPROGRESS){ - /* Just keep on waiting */ - }else if(rc == 0){ - rc = bridge__connect_step2(db, context); - if(rc == MOSQ_ERR_SUCCESS){ -#ifdef WITH_EPOLL - ev.data.fd = context->sock; - ev.events = EPOLLIN; - if(context->current_out_packet){ - ev.events |= EPOLLOUT; - } - if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) { - if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) { - log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering bridge: %s", strerror(errno)); - } - }else{ - context->events = ev.events; - } -#else - pollfds[pollfd_index].fd = context->sock; - pollfds[pollfd_index].events = POLLIN; - pollfds[pollfd_index].revents = 0; - if(context->current_out_packet){ - pollfds[pollfd_index].events |= POLLOUT; - } - context->pollfd_index = pollfd_index; - pollfd_index++; -#endif - }else if(rc == MOSQ_ERR_CONN_PENDING){ - context->bridge->restart_t = 0; - }else{ - context->bridge->cur_address++; - if(context->bridge->cur_address == context->bridge->address_count){ - context->bridge->cur_address = 0; - } - context->bridge->restart_t = 0; - } - }else{ - /* Need to retry */ - if(context->adns->ar_result){ - freeaddrinfo(context->adns->ar_result); - } - mosquitto__free(context->adns); - context->adns = NULL; - context->bridge->restart_t = 0; - } - }else{ -#ifdef WITH_EPOLL - /* clean any events triggered in previous connection */ - context->events = 0; -#endif - rc = bridge__connect_step1(db, context); - if(rc){ - context->bridge->cur_address++; - if(context->bridge->cur_address == context->bridge->address_count){ - context->bridge->cur_address = 0; - } - }else{ - /* Short wait for ADNS lookup */ - context->bridge->restart_t = 1; - } - } -#else - { - rc = bridge__connect(db, context); - context->bridge->restart_t = 0; - if(rc == MOSQ_ERR_SUCCESS){ - if(context->bridge->round_robin == false && context->bridge->cur_address != 0){ - context->bridge->primary_retry = now + 5; - } -#ifdef WITH_EPOLL - ev.data.fd = context->sock; - ev.events = EPOLLIN; - if(context->current_out_packet){ - ev.events |= EPOLLOUT; - } - if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) { - if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) { - log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering bridge: %s", strerror(errno)); - } - }else{ - context->events = ev.events; - } -#else - pollfds[pollfd_index].fd = context->sock; - pollfds[pollfd_index].events = POLLIN; - pollfds[pollfd_index].revents = 0; - if(context->current_out_packet){ - pollfds[pollfd_index].events |= POLLOUT; - } - context->pollfd_index = pollfd_index; - pollfd_index++; -#endif - }else{ - context->bridge->cur_address++; - if(context->bridge->cur_address == context->bridge->address_count){ - context->bridge->cur_address = 0; - } - } - } -#endif - } - } - } - } -#endif now = time(NULL); if(db->config->persistent_client_expiration > 0 && now > expiration_check_time){ HASH_ITER(hh_id, db->contexts_by_id, context, ctxt_tmp){ diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index 38cc1360..7d4ba9ef 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -44,6 +44,10 @@ Contributors: # endif #endif +#ifdef WITH_BRIDGE +#include +#endif + #include "mosquitto_internal.h" #include "mosquitto_broker.h" #include "mosquitto_plugin.h" @@ -664,11 +668,19 @@ void log__internal(const char *fmt, ...) __attribute__((format(printf, 1, 2))); * ============================================================ */ #ifdef WITH_BRIDGE int bridge__new(struct mosquitto_db *db, struct mosquitto__bridge *bridge); +void bridge__cleanup(struct mosquitto_db *db, struct mosquitto *context); int bridge__connect(struct mosquitto_db *db, struct mosquitto *context); int bridge__connect_step1(struct mosquitto_db *db, struct mosquitto *context); int bridge__connect_step2(struct mosquitto_db *db, struct mosquitto *context); int bridge__connect_step3(struct mosquitto_db *db, struct mosquitto *context); void bridge__packet_cleanup(struct mosquitto *context); +#ifdef WITH_EPOLL +void bridge_check(struct mosquitto_db *db); +#else +void bridge_check(struct mosquitto_db *db, struct pollfd *pollfds, int *pollfd_index); +#endif +int bridge__register_local_connections(struct mosquitto_db *db); +int bridge__remap_topic(struct mosquitto *context, char **topic); #endif /* ============================================================