Refactoring bridge code

This separates out a good amount from the main loop code.
This commit is contained in:
Roger A. Light 2019-07-11 14:43:32 +01:00
parent 995f90dbaf
commit 13ac1080a0
5 changed files with 359 additions and 285 deletions

View File

@ -29,6 +29,18 @@ Contributors:
#include <ws2tcpip.h>
#endif
#ifndef WIN32
#ifdef WITH_EPOLL
#include <sys/epoll.h>
#endif
#include <poll.h>
#include <unistd.h>
#else
#include <process.h>
#include <winsock2.h>
#include <ws2tcpip.h>
#endif
#include "mqtt_protocol.h"
#include "mosquitto.h"
#include "mosquitto_broker_internal.h"
@ -425,6 +437,67 @@ int bridge__connect(struct mosquitto_db *db, struct mosquitto *context)
#endif
int bridge__register_local_connections(struct mosquitto_db *db)
{
#ifdef WITH_EPOLL
struct epoll_event ev;
struct mosquitto *context, *ctxt_tmp = NULL;
memset(&ev, 0, sizeof(struct epoll_event));
HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){
if(context->bridge){
ev.data.fd = context->sock;
ev.events = EPOLLIN;
context->events = EPOLLIN;
if (epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) {
log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll initial registering bridge: %s", strerror(errno));
(void)close(db->epollfd);
db->epollfd = 0;
return MOSQ_ERR_UNKNOWN;
}
}
}
#endif
return MOSQ_ERR_SUCCESS;
}
void bridge__cleanup(struct mosquitto_db *db, struct mosquitto *context)
{
int i;
for(i=0; i<db->bridge_count; i++){
if(db->bridges[i] == context){
db->bridges[i] = NULL;
}
}
mosquitto__free(context->bridge->local_clientid);
context->bridge->local_clientid = NULL;
mosquitto__free(context->bridge->local_username);
context->bridge->local_username = NULL;
mosquitto__free(context->bridge->local_password);
context->bridge->local_password = NULL;
if(context->bridge->remote_clientid != context->id){
mosquitto__free(context->bridge->remote_clientid);
}
context->bridge->remote_clientid = NULL;
if(context->bridge->remote_username != context->username){
mosquitto__free(context->bridge->remote_username);
}
context->bridge->remote_username = NULL;
if(context->bridge->remote_password != context->password){
mosquitto__free(context->bridge->remote_password);
}
context->bridge->remote_password = NULL;
}
void bridge__packet_cleanup(struct mosquitto *context)
{
struct mosquitto__packet *packet;
@ -486,4 +559,262 @@ static void bridge__backoff_reset(struct mosquitto *context)
}
}
#ifdef WITH_EPOLL
void bridge_check(struct mosquitto_db *db)
#else
void bridge_check(struct mosquitto_db *db, struct pollfd *pollfds, int *pollfd_index)
#endif
{
static time_t last_check = 0;
time_t now;
struct mosquitto *context = NULL;
socklen_t len;
#ifdef WITH_EPOLL
struct epoll_event ev;
#endif
int i;
int rc;
int err;
now = mosquitto_time();
if(now <= last_check) return;
#ifdef WITH_EPOLL
memset(&ev, 0, sizeof(struct epoll_event));
#endif
for(i=0; i<db->bridge_count; i++){
if(!db->bridges[i]) continue;
context = db->bridges[i];
if(context->sock != INVALID_SOCKET){
mosquitto__check_keepalive(db, context);
/* Check for bridges that are not round robin and not currently
* connected to their primary broker. */
if(context->bridge->round_robin == false
&& context->bridge->cur_address != 0
&& context->bridge->primary_retry
&& now > context->bridge->primary_retry){
if(context->bridge->primary_retry_sock == INVALID_SOCKET){
rc = net__try_connect(context->bridge->addresses[0].address,
context->bridge->addresses[0].port,
&context->bridge->primary_retry_sock, NULL, false);
if(rc == 0){
COMPAT_CLOSE(context->bridge->primary_retry_sock);
context->bridge->primary_retry_sock = INVALID_SOCKET;
context->bridge->primary_retry = 0;
net__socket_close(db, context);
context->bridge->cur_address = 0;
}
}else{
len = sizeof(int);
if(!getsockopt(context->bridge->primary_retry_sock, SOL_SOCKET, SO_ERROR, (char *)&err, &len)){
if(err == 0){
COMPAT_CLOSE(context->bridge->primary_retry_sock);
context->bridge->primary_retry_sock = INVALID_SOCKET;
context->bridge->primary_retry = 0;
net__socket_close(db, context);
context->bridge->cur_address = context->bridge->address_count-1;
}else{
COMPAT_CLOSE(context->bridge->primary_retry_sock);
context->bridge->primary_retry_sock = INVALID_SOCKET;
context->bridge->primary_retry = now+5;
}
}else{
COMPAT_CLOSE(context->bridge->primary_retry_sock);
context->bridge->primary_retry_sock = INVALID_SOCKET;
context->bridge->primary_retry = now+5;
}
}
}
}
if(context->sock == INVALID_SOCKET){
/* Want to try to restart the bridge connection */
if(!context->bridge->restart_t){
context->bridge->restart_t = now+context->bridge->restart_timeout;
context->bridge->cur_address++;
if(context->bridge->cur_address == context->bridge->address_count){
context->bridge->cur_address = 0;
}
}else{
if((context->bridge->start_type == bst_lazy && context->bridge->lazy_reconnect)
|| (context->bridge->start_type == bst_automatic && now > context->bridge->restart_t)){
#if defined(__GLIBC__) && defined(WITH_ADNS)
if(context->adns){
/* Connection attempted, waiting on DNS lookup */
rc = gai_error(context->adns);
if(rc == EAI_INPROGRESS){
/* Just keep on waiting */
}else if(rc == 0){
rc = bridge__connect_step2(db, context);
if(rc == MOSQ_ERR_SUCCESS){
#ifdef WITH_EPOLL
ev.data.fd = context->sock;
ev.events = EPOLLIN;
if(context->current_out_packet){
ev.events |= EPOLLOUT;
}
if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) {
if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) {
log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering bridge: %s", strerror(errno));
}
}else{
context->events = ev.events;
}
#else
pollfds[*pollfd_index].fd = context->sock;
pollfds[*pollfd_index].events = POLLIN;
pollfds[*pollfd_index].revents = 0;
if(context->current_out_packet){
pollfds[*pollfd_index].events |= POLLOUT;
}
context->pollfd_index = *pollfd_index;
(*pollfd_index)++;
#endif
}else if(rc == MOSQ_ERR_CONN_PENDING){
context->bridge->restart_t = 0;
}else{
context->bridge->cur_address++;
if(context->bridge->cur_address == context->bridge->address_count){
context->bridge->cur_address = 0;
}
context->bridge->restart_t = 0;
}
}else{
/* Need to retry */
if(context->adns->ar_result){
freeaddrinfo(context->adns->ar_result);
}
mosquitto__free(context->adns);
context->adns = NULL;
context->bridge->restart_t = 0;
}
}else{
#ifdef WITH_EPOLL
/* clean any events triggered in previous connection */
context->events = 0;
#endif
rc = bridge__connect_step1(db, context);
if(rc){
context->bridge->cur_address++;
if(context->bridge->cur_address == context->bridge->address_count){
context->bridge->cur_address = 0;
}
}else{
/* Short wait for ADNS lookup */
context->bridge->restart_t = 1;
}
}
#else
{
rc = bridge__connect(db, context);
context->bridge->restart_t = 0;
if(rc == MOSQ_ERR_SUCCESS){
if(context->bridge->round_robin == false && context->bridge->cur_address != 0){
context->bridge->primary_retry = now + 5;
}
#ifdef WITH_EPOLL
ev.data.fd = context->sock;
ev.events = EPOLLIN;
if(context->current_out_packet){
ev.events |= EPOLLOUT;
}
if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) {
if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) {
log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering bridge: %s", strerror(errno));
}
}else{
context->events = ev.events;
}
#else
pollfds[*pollfd_index].fd = context->sock;
pollfds[*pollfd_index].events = POLLIN;
pollfds[*pollfd_index].revents = 0;
if(context->current_out_packet){
pollfds[*pollfd_index].events |= POLLOUT;
}
context->pollfd_index = *pollfd_index;
(*pollfd_index)++;
#endif
}else{
context->bridge->cur_address++;
if(context->bridge->cur_address == context->bridge->address_count){
context->bridge->cur_address = 0;
}
}
}
#endif
}
}
}
}
}
int bridge__remap_topic(struct mosquitto *context, char **topic)
{
struct mosquitto__bridge_topic *cur_topic;
char *topic_temp;
int i;
int len;
int rc;
bool match;
if(context->bridge && context->bridge->topics && context->bridge->topic_remapping){
for(i=0; i<context->bridge->topic_count; i++){
cur_topic = &context->bridge->topics[i];
if((cur_topic->direction == bd_both || cur_topic->direction == bd_in)
&& (cur_topic->remote_prefix || cur_topic->local_prefix)){
/* Topic mapping required on this topic if the message matches */
rc = mosquitto_topic_matches_sub(cur_topic->remote_topic, *topic, &match);
if(rc){
mosquitto__free(*topic);
return rc;
}
if(match){
if(cur_topic->remote_prefix){
/* This prefix needs removing. */
if(!strncmp(cur_topic->remote_prefix, *topic, strlen(cur_topic->remote_prefix))){
topic_temp = mosquitto__strdup((*topic)+strlen(cur_topic->remote_prefix));
if(!topic_temp){
mosquitto__free(*topic);
return MOSQ_ERR_NOMEM;
}
mosquitto__free(*topic);
*topic = topic_temp;
}
}
if(cur_topic->local_prefix){
/* This prefix needs adding. */
len = strlen(*topic) + strlen(cur_topic->local_prefix)+1;
topic_temp = mosquitto__malloc(len+1);
if(!topic_temp){
mosquitto__free(*topic);
return MOSQ_ERR_NOMEM;
}
snprintf(topic_temp, len, "%s%s", cur_topic->local_prefix, *topic);
topic_temp[len] = '\0';
mosquitto__free(*topic);
*topic = topic_temp;
}
break;
}
}
}
}
return MOSQ_ERR_SUCCESS;
}
#endif

View File

@ -97,42 +97,12 @@ struct mosquitto *context__init(struct mosquitto_db *db, mosq_sock_t sock)
void context__cleanup(struct mosquitto_db *db, struct mosquitto *context, bool do_free)
{
struct mosquitto__packet *packet;
#ifdef WITH_BRIDGE
int i;
#endif
if(!context) return;
#ifdef WITH_BRIDGE
if(context->bridge){
for(i=0; i<db->bridge_count; i++){
if(db->bridges[i] == context){
db->bridges[i] = NULL;
}
}
mosquitto__free(context->bridge->local_clientid);
context->bridge->local_clientid = NULL;
mosquitto__free(context->bridge->local_username);
context->bridge->local_username = NULL;
mosquitto__free(context->bridge->local_password);
context->bridge->local_password = NULL;
if(context->bridge->remote_clientid != context->id){
mosquitto__free(context->bridge->remote_clientid);
}
context->bridge->remote_clientid = NULL;
if(context->bridge->remote_username != context->username){
mosquitto__free(context->bridge->remote_username);
}
context->bridge->remote_username = NULL;
if(context->bridge->remote_password != context->password){
mosquitto__free(context->bridge->remote_password);
}
context->bridge->remote_password = NULL;
bridge__cleanup(db, context);
}
#endif
@ -235,11 +205,7 @@ void context__disconnect(struct mosquitto_db *db, struct mosquitto *context)
if(context->session_expiry_interval == 0){
context__send_will(db, context);
#ifdef WITH_BRIDGE
if(!context->bridge)
#endif
{
if(context->bridge == NULL){
if(context->will_delay_interval == 0){
/* This will be done later, after the will is published for delay>0. */
context__add_to_disused(db, context);

View File

@ -54,13 +54,6 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
int topic_alias = -1;
uint8_t reason_code = 0;
#ifdef WITH_BRIDGE
char *topic_temp;
int i;
struct mosquitto__bridge_topic *cur_topic;
bool match;
#endif
if(context->state != mosq_cs_connected){
return MOSQ_ERR_PROTOCOL;
}
@ -191,52 +184,9 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
}
#ifdef WITH_BRIDGE
if(context->bridge && context->bridge->topics && context->bridge->topic_remapping){
for(i=0; i<context->bridge->topic_count; i++){
cur_topic = &context->bridge->topics[i];
if((cur_topic->direction == bd_both || cur_topic->direction == bd_in)
&& (cur_topic->remote_prefix || cur_topic->local_prefix)){
rc = bridge__remap_topic(context, &topic);
if(rc) return rc;
/* Topic mapping required on this topic if the message matches */
rc = mosquitto_topic_matches_sub(cur_topic->remote_topic, topic, &match);
if(rc){
mosquitto__free(topic);
return rc;
}
if(match){
if(cur_topic->remote_prefix){
/* This prefix needs removing. */
if(!strncmp(cur_topic->remote_prefix, topic, strlen(cur_topic->remote_prefix))){
topic_temp = mosquitto__strdup(topic+strlen(cur_topic->remote_prefix));
if(!topic_temp){
mosquitto__free(topic);
return MOSQ_ERR_NOMEM;
}
mosquitto__free(topic);
topic = topic_temp;
}
}
if(cur_topic->local_prefix){
/* This prefix needs adding. */
len = strlen(topic) + strlen(cur_topic->local_prefix)+1;
topic_temp = mosquitto__malloc(len+1);
if(!topic_temp){
mosquitto__free(topic);
return MOSQ_ERR_NOMEM;
}
snprintf(topic_temp, len, "%s%s", cur_topic->local_prefix, topic);
topic_temp[len] = '\0';
mosquitto__free(topic);
topic = topic_temp;
}
break;
}
}
}
}
#endif
if(mosquitto_pub_topic_check(topic) != MOSQ_ERR_SUCCESS){
/* Invalid publish topic, just swallow it. */

View File

@ -118,17 +118,13 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
#endif
int i;
#ifdef WITH_EPOLL
int rc;
int j;
struct epoll_event ev, events[MAX_EVENTS];
#else
struct pollfd *pollfds = NULL;
int pollfd_index;
int pollfd_max;
#endif
#ifdef WITH_BRIDGE
int rc;
int err;
socklen_t len;
#endif
time_t expiration_check_time = 0;
char *id;
@ -178,21 +174,10 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
return MOSQ_ERR_UNKNOWN;
}
}
#ifdef WITH_BRIDGE
HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){
if(context->bridge){
ev.data.fd = context->sock;
ev.events = EPOLLIN;
context->events = EPOLLIN;
if (epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) {
log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll initial registering bridge: %s", strerror(errno));
(void)close(db->epollfd);
db->epollfd = 0;
return MOSQ_ERR_UNKNOWN;
}
}
}
#endif
# ifdef WITH_BRIDGE
rc = bridge__register_local_connections(db);
if(rc) return rc;
# endif
#endif
while(run){
@ -226,50 +211,6 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
context->pollfd_index = -1;
if(context->sock != INVALID_SOCKET){
#ifdef WITH_BRIDGE
if(context->bridge){
mosquitto__check_keepalive(db, context);
if(context->bridge->round_robin == false
&& context->bridge->cur_address != 0
&& context->bridge->primary_retry
&& now > context->bridge->primary_retry){
if(context->bridge->primary_retry_sock == INVALID_SOCKET){
rc = net__try_connect(context->bridge->addresses[0].address,
context->bridge->addresses[0].port,
&context->bridge->primary_retry_sock, NULL, false);
if(rc == 0){
COMPAT_CLOSE(context->bridge->primary_retry_sock);
context->bridge->primary_retry_sock = INVALID_SOCKET;
context->bridge->primary_retry = 0;
net__socket_close(db, context);
context->bridge->cur_address = 0;
}
}else{
len = sizeof(int);
if(!getsockopt(context->bridge->primary_retry_sock, SOL_SOCKET, SO_ERROR, (char *)&err, &len)){
if(err == 0){
COMPAT_CLOSE(context->bridge->primary_retry_sock);
context->bridge->primary_retry_sock = INVALID_SOCKET;
context->bridge->primary_retry = 0;
net__socket_close(db, context);
context->bridge->cur_address = context->bridge->address_count-1;
}else{
COMPAT_CLOSE(context->bridge->primary_retry_sock);
context->bridge->primary_retry_sock = INVALID_SOCKET;
context->bridge->primary_retry = now+5;
}
}else{
COMPAT_CLOSE(context->bridge->primary_retry_sock);
context->bridge->primary_retry_sock = INVALID_SOCKET;
context->bridge->primary_retry = now+5;
}
}
}
}
#endif
/* Local bridges never time out in this fashion. */
if(!(context->keepalive)
|| context->bridge
@ -323,141 +264,15 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
}
}
#ifdef WITH_BRIDGE
time_count = 0;
for(i=0; i<db->bridge_count; i++){
if(!db->bridges[i]) continue;
# ifdef WITH_EPOLL
bridge_check(db);
# else
bridge_check(db, pollfds, &pollfd_index);
# endif
#endif
context = db->bridges[i];
if(context->sock == INVALID_SOCKET){
if(time_count > 0){
time_count--;
}else{
time_count = 1000;
now = mosquitto_time();
}
/* Want to try to restart the bridge connection */
if(!context->bridge->restart_t){
context->bridge->restart_t = now+context->bridge->restart_timeout;
context->bridge->cur_address++;
if(context->bridge->cur_address == context->bridge->address_count){
context->bridge->cur_address = 0;
}
}else{
if((context->bridge->start_type == bst_lazy && context->bridge->lazy_reconnect)
|| (context->bridge->start_type == bst_automatic && now > context->bridge->restart_t)){
#if defined(__GLIBC__) && defined(WITH_ADNS)
if(context->adns){
/* Connection attempted, waiting on DNS lookup */
rc = gai_error(context->adns);
if(rc == EAI_INPROGRESS){
/* Just keep on waiting */
}else if(rc == 0){
rc = bridge__connect_step2(db, context);
if(rc == MOSQ_ERR_SUCCESS){
#ifdef WITH_EPOLL
ev.data.fd = context->sock;
ev.events = EPOLLIN;
if(context->current_out_packet){
ev.events |= EPOLLOUT;
}
if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) {
if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) {
log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering bridge: %s", strerror(errno));
}
}else{
context->events = ev.events;
}
#else
pollfds[pollfd_index].fd = context->sock;
pollfds[pollfd_index].events = POLLIN;
pollfds[pollfd_index].revents = 0;
if(context->current_out_packet){
pollfds[pollfd_index].events |= POLLOUT;
}
context->pollfd_index = pollfd_index;
pollfd_index++;
#endif
}else if(rc == MOSQ_ERR_CONN_PENDING){
context->bridge->restart_t = 0;
}else{
context->bridge->cur_address++;
if(context->bridge->cur_address == context->bridge->address_count){
context->bridge->cur_address = 0;
}
context->bridge->restart_t = 0;
}
}else{
/* Need to retry */
if(context->adns->ar_result){
freeaddrinfo(context->adns->ar_result);
}
mosquitto__free(context->adns);
context->adns = NULL;
context->bridge->restart_t = 0;
}
}else{
#ifdef WITH_EPOLL
/* clean any events triggered in previous connection */
context->events = 0;
#endif
rc = bridge__connect_step1(db, context);
if(rc){
context->bridge->cur_address++;
if(context->bridge->cur_address == context->bridge->address_count){
context->bridge->cur_address = 0;
}
}else{
/* Short wait for ADNS lookup */
context->bridge->restart_t = 1;
}
}
#else
{
rc = bridge__connect(db, context);
context->bridge->restart_t = 0;
if(rc == MOSQ_ERR_SUCCESS){
if(context->bridge->round_robin == false && context->bridge->cur_address != 0){
context->bridge->primary_retry = now + 5;
}
#ifdef WITH_EPOLL
ev.data.fd = context->sock;
ev.events = EPOLLIN;
if(context->current_out_packet){
ev.events |= EPOLLOUT;
}
if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) {
if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) {
log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering bridge: %s", strerror(errno));
}
}else{
context->events = ev.events;
}
#else
pollfds[pollfd_index].fd = context->sock;
pollfds[pollfd_index].events = POLLIN;
pollfds[pollfd_index].revents = 0;
if(context->current_out_packet){
pollfds[pollfd_index].events |= POLLOUT;
}
context->pollfd_index = pollfd_index;
pollfd_index++;
#endif
}else{
context->bridge->cur_address++;
if(context->bridge->cur_address == context->bridge->address_count){
context->bridge->cur_address = 0;
}
}
}
#endif
}
}
}
}
#endif
now = time(NULL);
if(db->config->persistent_client_expiration > 0 && now > expiration_check_time){
HASH_ITER(hh_id, db->contexts_by_id, context, ctxt_tmp){

View File

@ -44,6 +44,10 @@ Contributors:
# endif
#endif
#ifdef WITH_BRIDGE
#include <poll.h>
#endif
#include "mosquitto_internal.h"
#include "mosquitto_broker.h"
#include "mosquitto_plugin.h"
@ -664,11 +668,19 @@ void log__internal(const char *fmt, ...) __attribute__((format(printf, 1, 2)));
* ============================================================ */
#ifdef WITH_BRIDGE
int bridge__new(struct mosquitto_db *db, struct mosquitto__bridge *bridge);
void bridge__cleanup(struct mosquitto_db *db, struct mosquitto *context);
int bridge__connect(struct mosquitto_db *db, struct mosquitto *context);
int bridge__connect_step1(struct mosquitto_db *db, struct mosquitto *context);
int bridge__connect_step2(struct mosquitto_db *db, struct mosquitto *context);
int bridge__connect_step3(struct mosquitto_db *db, struct mosquitto *context);
void bridge__packet_cleanup(struct mosquitto *context);
#ifdef WITH_EPOLL
void bridge_check(struct mosquitto_db *db);
#else
void bridge_check(struct mosquitto_db *db, struct pollfd *pollfds, int *pollfd_index);
#endif
int bridge__register_local_connections(struct mosquitto_db *db);
int bridge__remap_topic(struct mosquitto *context, char **topic);
#endif
/* ============================================================