[344] Fix non-async case.
This commit is contained in:
parent
f0485d1398
commit
565c9c3432
111
src/bridge.c
111
src/bridge.c
@ -139,6 +139,7 @@ int mqtt3_bridge_new(struct mosquitto_db *db, struct _mqtt3_bridge *bridge)
|
|||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if defined(__GLIBC__) && defined(WITH_ADNS)
|
||||||
int mqtt3_bridge_connect_step1(struct mosquitto_db *db, struct mosquitto *context)
|
int mqtt3_bridge_connect_step1(struct mosquitto_db *db, struct mosquitto *context)
|
||||||
{
|
{
|
||||||
int rc;
|
int rc;
|
||||||
@ -288,17 +289,119 @@ int mqtt3_bridge_connect_step2(struct mosquitto_db *db, struct mosquitto *contex
|
|||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
#else
|
||||||
|
|
||||||
int mqtt3_bridge_connect(struct mosquitto_db *db, struct mosquitto *context)
|
int mqtt3_bridge_connect(struct mosquitto_db *db, struct mosquitto *context)
|
||||||
{
|
{
|
||||||
int rc;
|
int rc;
|
||||||
|
int i;
|
||||||
|
char *notification_topic;
|
||||||
|
int notification_topic_len;
|
||||||
|
uint8_t notification_payload;
|
||||||
|
|
||||||
rc = mqtt3_bridge_connect_step1(db, context);
|
if(!context || !context->bridge) return MOSQ_ERR_INVAL;
|
||||||
if(rc) return rc;
|
|
||||||
|
|
||||||
return mqtt3_bridge_connect_step2(db, context);
|
context->state = mosq_cs_new;
|
||||||
|
context->sock = INVALID_SOCKET;
|
||||||
|
context->last_msg_in = mosquitto_time();
|
||||||
|
context->next_msg_out = mosquitto_time() + context->bridge->keepalive;
|
||||||
|
context->keepalive = context->bridge->keepalive;
|
||||||
|
context->clean_session = context->bridge->clean_session;
|
||||||
|
context->in_packet.payload = NULL;
|
||||||
|
context->ping_t = 0;
|
||||||
|
context->bridge->lazy_reconnect = false;
|
||||||
|
mqtt3_bridge_packet_cleanup(context);
|
||||||
|
mqtt3_db_message_reconnect_reset(db, context);
|
||||||
|
|
||||||
|
if(context->clean_session){
|
||||||
|
mqtt3_db_messages_delete(db, context);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Delete all local subscriptions even for clean_session==false. We don't
|
||||||
|
* remove any messages and the next loop carries out the resubscription
|
||||||
|
* anyway. This means any unwanted subs will be removed.
|
||||||
|
*/
|
||||||
|
mqtt3_subs_clean_session(db, context);
|
||||||
|
|
||||||
|
for(i=0; i<context->bridge->topic_count; i++){
|
||||||
|
if(context->bridge->topics[i].direction == bd_out || context->bridge->topics[i].direction == bd_both){
|
||||||
|
_mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Bridge %s doing local SUBSCRIBE on topic %s", context->id, context->bridge->topics[i].local_topic);
|
||||||
|
if(mqtt3_sub_add(db, context, context->bridge->topics[i].local_topic, context->bridge->topics[i].qos, &db->subs)) return 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if(context->bridge->notifications){
|
||||||
|
if(context->bridge->notification_topic){
|
||||||
|
if(!context->bridge->initial_notification_done){
|
||||||
|
notification_payload = '0';
|
||||||
|
mqtt3_db_messages_easy_queue(db, context, context->bridge->notification_topic, 1, 1, ¬ification_payload, 1);
|
||||||
|
context->bridge->initial_notification_done = true;
|
||||||
|
}
|
||||||
|
notification_payload = '0';
|
||||||
|
rc = _mosquitto_will_set(context, context->bridge->notification_topic, 1, ¬ification_payload, 1, true);
|
||||||
|
if(rc != MOSQ_ERR_SUCCESS){
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
}else{
|
||||||
|
notification_topic_len = strlen(context->bridge->remote_clientid)+strlen("$SYS/broker/connection//state");
|
||||||
|
notification_topic = _mosquitto_malloc(sizeof(char)*(notification_topic_len+1));
|
||||||
|
if(!notification_topic) return MOSQ_ERR_NOMEM;
|
||||||
|
|
||||||
|
snprintf(notification_topic, notification_topic_len+1, "$SYS/broker/connection/%s/state", context->bridge->remote_clientid);
|
||||||
|
|
||||||
|
if(!context->bridge->initial_notification_done){
|
||||||
|
notification_payload = '0';
|
||||||
|
mqtt3_db_messages_easy_queue(db, context, notification_topic, 1, 1, ¬ification_payload, 1);
|
||||||
|
context->bridge->initial_notification_done = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
notification_payload = '0';
|
||||||
|
rc = _mosquitto_will_set(context, notification_topic, 1, ¬ification_payload, 1, true);
|
||||||
|
_mosquitto_free(notification_topic);
|
||||||
|
if(rc != MOSQ_ERR_SUCCESS){
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Connecting bridge %s (%s:%d)", context->bridge->name, context->bridge->addresses[context->bridge->cur_address].address, context->bridge->addresses[context->bridge->cur_address].port);
|
||||||
|
rc = _mosquitto_socket_connect(context, context->bridge->addresses[context->bridge->cur_address].address, context->bridge->addresses[context->bridge->cur_address].port, NULL, false);
|
||||||
|
if(rc > 0 ){
|
||||||
|
if(rc == MOSQ_ERR_TLS){
|
||||||
|
_mosquitto_socket_close(db, context);
|
||||||
|
return rc; /* Error already printed */
|
||||||
|
}else if(rc == MOSQ_ERR_ERRNO){
|
||||||
|
_mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", strerror(errno));
|
||||||
|
}else if(rc == MOSQ_ERR_EAI){
|
||||||
|
_mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", gai_strerror(errno));
|
||||||
|
}
|
||||||
|
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
HASH_ADD(hh_sock, db->contexts_by_sock, sock, sizeof(context->sock), context);
|
||||||
|
|
||||||
|
if(rc == MOSQ_ERR_CONN_PENDING){
|
||||||
|
context->state = mosq_cs_connect_pending;
|
||||||
|
}
|
||||||
|
rc = _mosquitto_send_connect(context, context->keepalive, context->clean_session);
|
||||||
|
if(rc == MOSQ_ERR_SUCCESS){
|
||||||
|
return MOSQ_ERR_SUCCESS;
|
||||||
|
}else if(rc == MOSQ_ERR_ERRNO && errno == ENOTCONN){
|
||||||
|
return MOSQ_ERR_SUCCESS;
|
||||||
|
}else{
|
||||||
|
if(rc == MOSQ_ERR_TLS){
|
||||||
|
return rc; /* Error already printed */
|
||||||
|
}else if(rc == MOSQ_ERR_ERRNO){
|
||||||
|
_mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", strerror(errno));
|
||||||
|
}else if(rc == MOSQ_ERR_EAI){
|
||||||
|
_mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", gai_strerror(errno));
|
||||||
|
}
|
||||||
|
_mosquitto_socket_close(db, context);
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
|
||||||
void mqtt3_bridge_packet_cleanup(struct mosquitto *context)
|
void mqtt3_bridge_packet_cleanup(struct mosquitto *context)
|
||||||
|
Loading…
Reference in New Issue
Block a user