From 565c9c34321e5d82815b4fc83156543f59b73a08 Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Thu, 9 Feb 2017 16:41:48 +0000 Subject: [PATCH] [344] Fix non-async case. --- src/bridge.c | 111 +++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 107 insertions(+), 4 deletions(-) diff --git a/src/bridge.c b/src/bridge.c index 869b3db1..dc5f9c44 100644 --- a/src/bridge.c +++ b/src/bridge.c @@ -139,6 +139,7 @@ int mqtt3_bridge_new(struct mosquitto_db *db, struct _mqtt3_bridge *bridge) #endif } +#if defined(__GLIBC__) && defined(WITH_ADNS) int mqtt3_bridge_connect_step1(struct mosquitto_db *db, struct mosquitto *context) { int rc; @@ -288,17 +289,119 @@ int mqtt3_bridge_connect_step2(struct mosquitto_db *db, struct mosquitto *contex return rc; } } - +#else int mqtt3_bridge_connect(struct mosquitto_db *db, struct mosquitto *context) { int rc; + int i; + char *notification_topic; + int notification_topic_len; + uint8_t notification_payload; - rc = mqtt3_bridge_connect_step1(db, context); - if(rc) return rc; + if(!context || !context->bridge) return MOSQ_ERR_INVAL; - return mqtt3_bridge_connect_step2(db, context); + context->state = mosq_cs_new; + context->sock = INVALID_SOCKET; + context->last_msg_in = mosquitto_time(); + context->next_msg_out = mosquitto_time() + context->bridge->keepalive; + context->keepalive = context->bridge->keepalive; + context->clean_session = context->bridge->clean_session; + context->in_packet.payload = NULL; + context->ping_t = 0; + context->bridge->lazy_reconnect = false; + mqtt3_bridge_packet_cleanup(context); + mqtt3_db_message_reconnect_reset(db, context); + + if(context->clean_session){ + mqtt3_db_messages_delete(db, context); + } + + /* Delete all local subscriptions even for clean_session==false. We don't + * remove any messages and the next loop carries out the resubscription + * anyway. This means any unwanted subs will be removed. + */ + mqtt3_subs_clean_session(db, context); + + for(i=0; ibridge->topic_count; i++){ + if(context->bridge->topics[i].direction == bd_out || context->bridge->topics[i].direction == bd_both){ + _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Bridge %s doing local SUBSCRIBE on topic %s", context->id, context->bridge->topics[i].local_topic); + if(mqtt3_sub_add(db, context, context->bridge->topics[i].local_topic, context->bridge->topics[i].qos, &db->subs)) return 1; + } + } + + if(context->bridge->notifications){ + if(context->bridge->notification_topic){ + if(!context->bridge->initial_notification_done){ + notification_payload = '0'; + mqtt3_db_messages_easy_queue(db, context, context->bridge->notification_topic, 1, 1, ¬ification_payload, 1); + context->bridge->initial_notification_done = true; + } + notification_payload = '0'; + rc = _mosquitto_will_set(context, context->bridge->notification_topic, 1, ¬ification_payload, 1, true); + if(rc != MOSQ_ERR_SUCCESS){ + return rc; + } + }else{ + notification_topic_len = strlen(context->bridge->remote_clientid)+strlen("$SYS/broker/connection//state"); + notification_topic = _mosquitto_malloc(sizeof(char)*(notification_topic_len+1)); + if(!notification_topic) return MOSQ_ERR_NOMEM; + + snprintf(notification_topic, notification_topic_len+1, "$SYS/broker/connection/%s/state", context->bridge->remote_clientid); + + if(!context->bridge->initial_notification_done){ + notification_payload = '0'; + mqtt3_db_messages_easy_queue(db, context, notification_topic, 1, 1, ¬ification_payload, 1); + context->bridge->initial_notification_done = true; + } + + notification_payload = '0'; + rc = _mosquitto_will_set(context, notification_topic, 1, ¬ification_payload, 1, true); + _mosquitto_free(notification_topic); + if(rc != MOSQ_ERR_SUCCESS){ + return rc; + } + } + } + + _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Connecting bridge %s (%s:%d)", context->bridge->name, context->bridge->addresses[context->bridge->cur_address].address, context->bridge->addresses[context->bridge->cur_address].port); + rc = _mosquitto_socket_connect(context, context->bridge->addresses[context->bridge->cur_address].address, context->bridge->addresses[context->bridge->cur_address].port, NULL, false); + if(rc > 0 ){ + if(rc == MOSQ_ERR_TLS){ + _mosquitto_socket_close(db, context); + return rc; /* Error already printed */ + }else if(rc == MOSQ_ERR_ERRNO){ + _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", strerror(errno)); + }else if(rc == MOSQ_ERR_EAI){ + _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", gai_strerror(errno)); + } + + return rc; + } + + HASH_ADD(hh_sock, db->contexts_by_sock, sock, sizeof(context->sock), context); + + if(rc == MOSQ_ERR_CONN_PENDING){ + context->state = mosq_cs_connect_pending; + } + rc = _mosquitto_send_connect(context, context->keepalive, context->clean_session); + if(rc == MOSQ_ERR_SUCCESS){ + return MOSQ_ERR_SUCCESS; + }else if(rc == MOSQ_ERR_ERRNO && errno == ENOTCONN){ + return MOSQ_ERR_SUCCESS; + }else{ + if(rc == MOSQ_ERR_TLS){ + return rc; /* Error already printed */ + }else if(rc == MOSQ_ERR_ERRNO){ + _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", strerror(errno)); + }else if(rc == MOSQ_ERR_EAI){ + _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", gai_strerror(errno)); + } + _mosquitto_socket_close(db, context); + return rc; + } } +#endif void mqtt3_bridge_packet_cleanup(struct mosquitto *context)