Start of fix for [344].

This commit is contained in:
Roger A. Light 2017-02-06 22:39:39 +00:00
parent e2edba054e
commit e13af18ed9
7 changed files with 220 additions and 62 deletions

View File

@ -156,6 +156,10 @@ ifeq ($(UNAME),QNX)
LIB_LIBS:=$(LIB_LIBS) -lsocket
endif
ifeq ($(UNAME),Linux)
BROKER_LIBS:=$(BROKER_LIBS) -lanl
endif
ifeq ($(WITH_WRAP),yes)
BROKER_LIBS:=$(BROKER_LIBS) -lwrap
BROKER_CFLAGS:=$(BROKER_CFLAGS) -DWITH_WRAP

View File

@ -56,6 +56,9 @@ Contributors:
#include "mosquitto.h"
#include "time_mosq.h"
#ifdef WITH_BROKER
# ifdef __linux__
# include <netdb.h>
# endif
# include "uthash.h"
struct mosquitto_client_msg;
#endif
@ -151,6 +154,9 @@ struct mosquitto {
mosq_sock_t sock;
#ifndef WITH_BROKER
mosq_sock_t sockpairR, sockpairW;
#endif
#ifdef __linux__
struct gaicb *adns; /* For getaddrinfo_a */
#endif
enum _mosquitto_protocol protocol;
char *address;

View File

@ -14,12 +14,15 @@ Contributors:
Roger Light - initial implementation and documentation.
*/
#define _GNU_SOURCE
#include <assert.h>
#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
#include <string.h>
#ifndef WIN32
#define _GNU_SOURCE
#include <netdb.h>
#include <sys/socket.h>
#include <unistd.h>
@ -268,6 +271,88 @@ static unsigned int psk_client_callback(SSL *ssl, const char *hint,
}
#endif
#if defined(WITH_BROKER) && defined(__linux__)
/* Async connect, part 1 (dns lookup) */
int _mosquitto_try_connect_step1(struct mosquitto *mosq, const char *host)
{
int s;
void *sevp = NULL;
if(mosq->adns){
_mosquitto_free(mosq->adns);
}
mosq->adns = _mosquitto_calloc(1, sizeof(struct gaicb));
if(!mosq->adns){
return MOSQ_ERR_NOMEM;
}
mosq->adns->ar_name = host;
s = getaddrinfo_a(GAI_NOWAIT, &mosq->adns, 1, sevp);
if(s){
errno = s;
return MOSQ_ERR_EAI;
}
return MOSQ_ERR_SUCCESS;
}
/* Async connect part 2, the connection. */
int _mosquitto_try_connect_step2(struct mosquitto *mosq, uint16_t port, mosq_sock_t *sock)
{
struct addrinfo *ainfo, *rp;
int rc;
ainfo = mosq->adns->ar_result;
for(rp = ainfo; rp != NULL; rp = rp->ai_next){
*sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if(*sock == INVALID_SOCKET) continue;
if(rp->ai_family == PF_INET){
((struct sockaddr_in *)rp->ai_addr)->sin_port = htons(port);
}else if(rp->ai_family == PF_INET6){
((struct sockaddr_in6 *)rp->ai_addr)->sin6_port = htons(port);
}else{
COMPAT_CLOSE(*sock);
continue;
}
/* Set non-blocking */
if(_mosquitto_socket_nonblock(*sock)){
COMPAT_CLOSE(*sock);
continue;
}
rc = connect(*sock, rp->ai_addr, rp->ai_addrlen);
#ifdef WIN32
errno = WSAGetLastError();
#endif
if(rc == 0 || errno == EINPROGRESS || errno == COMPAT_EWOULDBLOCK){
if(rc < 0 && (errno == EINPROGRESS || errno == COMPAT_EWOULDBLOCK)){
rc = MOSQ_ERR_CONN_PENDING;
}
/* Set non-blocking */
if(_mosquitto_socket_nonblock(*sock)){
COMPAT_CLOSE(*sock);
continue;
}
break;
}
COMPAT_CLOSE(*sock);
*sock = INVALID_SOCKET;
}
freeaddrinfo(ainfo);
if(!rp){
return MOSQ_ERR_ERRNO;
}
return rc;
}
#endif
int _mosquitto_try_connect(struct mosquitto *mosq, const char *host, uint16_t port, mosq_sock_t *sock, const char *bind_address, bool blocking)
{
struct addrinfo hints;
@ -303,7 +388,7 @@ int _mosquitto_try_connect(struct mosquitto *mosq, const char *host, uint16_t po
for(rp = ainfo; rp != NULL; rp = rp->ai_next){
*sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if(*sock == INVALID_SOCKET) continue;
if(rp->ai_family == PF_INET){
((struct sockaddr_in *)rp->ai_addr)->sin_port = htons(port);
}else if(rp->ai_family == PF_INET6){
@ -396,24 +481,13 @@ int mosquitto__socket_connect_tls(struct mosquitto *mosq)
}
#endif
/* Create a socket and connect it to 'ip' on port 'port'.
* Returns -1 on failure (ip is NULL, socket creation/connection error)
* Returns sock number on success.
*/
int _mosquitto_socket_connect(struct mosquitto *mosq, const char *host, uint16_t port, const char *bind_address, bool blocking)
int _mosquitto_socket_connect_step3(struct mosquitto *mosq, const char *host, uint16_t port, const char *bind_address, bool blocking)
{
mosq_sock_t sock = INVALID_SOCKET;
int rc;
#ifdef WITH_TLS
int ret;
BIO *bio;
#endif
if(!mosq || !host || !port) return MOSQ_ERR_INVAL;
rc = _mosquitto_try_connect(mosq, host, port, &sock, bind_address, blocking);
if(rc > 0) return rc;
#ifdef WITH_TLS
if(mosq->tls_cafile || mosq->tls_capath || mosq->tls_psk){
#if OPENSSL_VERSION_NUMBER >= 0x10001000L
@ -425,7 +499,7 @@ int _mosquitto_socket_connect(struct mosquitto *mosq, const char *host, uint16_t
mosq->ssl_ctx = SSL_CTX_new(TLSv1_client_method());
}else{
_mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Protocol %s not supported.", mosq->tls_version);
COMPAT_CLOSE(sock);
COMPAT_CLOSE(mosq->sock);
return MOSQ_ERR_INVAL;
}
#else
@ -433,13 +507,13 @@ int _mosquitto_socket_connect(struct mosquitto *mosq, const char *host, uint16_t
mosq->ssl_ctx = SSL_CTX_new(TLSv1_client_method());
}else{
_mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Protocol %s not supported.", mosq->tls_version);
COMPAT_CLOSE(sock);
COMPAT_CLOSE(mosq->sock);
return MOSQ_ERR_INVAL;
}
#endif
if(!mosq->ssl_ctx){
_mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to create TLS context.");
COMPAT_CLOSE(sock);
COMPAT_CLOSE(mosq->sock);
return MOSQ_ERR_TLS;
}
@ -456,7 +530,7 @@ int _mosquitto_socket_connect(struct mosquitto *mosq, const char *host, uint16_t
ret = SSL_CTX_set_cipher_list(mosq->ssl_ctx, mosq->tls_ciphers);
if(ret == 0){
_mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to set TLS ciphers. Check cipher list \"%s\".", mosq->tls_ciphers);
COMPAT_CLOSE(sock);
COMPAT_CLOSE(mosq->sock);
return MOSQ_ERR_TLS;
}
}
@ -480,7 +554,7 @@ int _mosquitto_socket_connect(struct mosquitto *mosq, const char *host, uint16_t
_mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load CA certificates, check capath \"%s\".", mosq->tls_capath);
}
#endif
COMPAT_CLOSE(sock);
COMPAT_CLOSE(mosq->sock);
return MOSQ_ERR_TLS;
}
if(mosq->tls_cert_reqs == 0){
@ -502,7 +576,7 @@ int _mosquitto_socket_connect(struct mosquitto *mosq, const char *host, uint16_t
#else
_mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load client certificate \"%s\".", mosq->tls_certfile);
#endif
COMPAT_CLOSE(sock);
COMPAT_CLOSE(mosq->sock);
return MOSQ_ERR_TLS;
}
}
@ -514,13 +588,13 @@ int _mosquitto_socket_connect(struct mosquitto *mosq, const char *host, uint16_t
#else
_mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load client key file \"%s\".", mosq->tls_keyfile);
#endif
COMPAT_CLOSE(sock);
COMPAT_CLOSE(mosq->sock);
return MOSQ_ERR_TLS;
}
ret = SSL_CTX_check_private_key(mosq->ssl_ctx);
if(ret != 1){
_mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Client certificate/key are inconsistent.");
COMPAT_CLOSE(sock);
COMPAT_CLOSE(mosq->sock);
return MOSQ_ERR_TLS;
}
}
@ -532,27 +606,43 @@ int _mosquitto_socket_connect(struct mosquitto *mosq, const char *host, uint16_t
mosq->ssl = SSL_new(mosq->ssl_ctx);
if(!mosq->ssl){
COMPAT_CLOSE(sock);
COMPAT_CLOSE(mosq->sock);
return MOSQ_ERR_TLS;
}
SSL_set_ex_data(mosq->ssl, tls_ex_index_mosq, mosq);
bio = BIO_new_socket(sock, BIO_NOCLOSE);
bio = BIO_new_socket(mosq->sock, BIO_NOCLOSE);
if(!bio){
COMPAT_CLOSE(sock);
COMPAT_CLOSE(mosq->sock);
return MOSQ_ERR_TLS;
}
SSL_set_bio(mosq->ssl, bio, bio);
mosq->sock = sock;
if(mosquitto__socket_connect_tls(mosq)){
return MOSQ_ERR_TLS;
}
}
#endif
return MOSQ_ERR_SUCCESS;
}
/* Create a socket and connect it to 'ip' on port 'port'.
* Returns -1 on failure (ip is NULL, socket creation/connection error)
* Returns sock number on success.
*/
int _mosquitto_socket_connect(struct mosquitto *mosq, const char *host, uint16_t port, const char *bind_address, bool blocking)
{
mosq_sock_t sock = INVALID_SOCKET;
int rc;
if(!mosq || !host || !port) return MOSQ_ERR_INVAL;
rc = _mosquitto_try_connect(mosq, host, port, &sock, bind_address, blocking);
if(rc > 0) return rc;
mosq->sock = sock;
rc = _mosquitto_socket_connect_step3(mosq, host, port, bind_address, blocking);
return rc;
}

View File

@ -61,6 +61,8 @@ int _mosquitto_socket_close(struct mosquitto_db *db, struct mosquitto *mosq);
int _mosquitto_socket_close(struct mosquitto *mosq);
#endif
int _mosquitto_try_connect(struct mosquitto *mosq, const char *host, uint16_t port, mosq_sock_t *sock, const char *bind_address, bool blocking);
int _mosquitto_try_connect_step1(struct mosquitto *mosq, const char *host);
int _mosquitto_try_connect_step2(struct mosquitto *mosq, uint16_t port, mosq_sock_t *sock);
int _mosquitto_socket_nonblock(mosq_sock_t sock);
int _mosquitto_socketpair(mosq_sock_t *sp1, mosq_sock_t *sp2);

View File

@ -131,10 +131,15 @@ int mqtt3_bridge_new(struct mosquitto_db *db, struct _mqtt3_bridge *bridge)
return MOSQ_ERR_NOMEM;
}
#ifdef __linux__
new_context->bridge->restart_t = 1; /* force quick restart of bridge */
return mqtt3_bridge_connect_step1(db, new_context);
#else
return mqtt3_bridge_connect(db, new_context);
#endif
}
int mqtt3_bridge_connect(struct mosquitto_db *db, struct mosquitto *context)
int mqtt3_bridge_connect_step1(struct mosquitto_db *db, struct mosquitto *context)
{
int rc;
int i;
@ -207,6 +212,31 @@ int mqtt3_bridge_connect(struct mosquitto_db *db, struct mosquitto *context)
}
}
_mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Connecting bridge %s (%s:%d)", context->bridge->name, context->bridge->addresses[context->bridge->cur_address].address, context->bridge->addresses[context->bridge->cur_address].port);
rc = _mosquitto_try_connect_step1(context, context->bridge->addresses[context->bridge->cur_address].address);
if(rc > 0 ){
if(rc == MOSQ_ERR_TLS){
_mosquitto_socket_close(db, context);
return rc; /* Error already printed */
}else if(rc == MOSQ_ERR_ERRNO){
_mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", strerror(errno));
}else if(rc == MOSQ_ERR_EAI){
_mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", gai_strerror(errno));
}
return rc;
}
return MOSQ_ERR_SUCCESS;
}
int mqtt3_bridge_connect_step2(struct mosquitto_db *db, struct mosquitto *context)
{
int rc;
if(!context || !context->bridge) return MOSQ_ERR_INVAL;
_mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Connecting bridge %s (%s:%d)", context->bridge->name, context->bridge->addresses[context->bridge->cur_address].address, context->bridge->addresses[context->bridge->cur_address].port);
rc = _mosquitto_socket_connect(context, context->bridge->addresses[context->bridge->cur_address].address, context->bridge->addresses[context->bridge->cur_address].port, NULL, false);
if(rc > 0 ){
@ -245,6 +275,18 @@ int mqtt3_bridge_connect(struct mosquitto_db *db, struct mosquitto *context)
}
}
int mqtt3_bridge_connect(struct mosquitto_db *db, struct mosquitto *context)
{
int rc;
rc = mqtt3_bridge_connect_step1(db, context);
if(rc) return rc;
return mqtt3_bridge_connect_step2(db, context);
}
void mqtt3_bridge_packet_cleanup(struct mosquitto *context)
{
struct _mosquitto_packet *packet;

View File

@ -244,45 +244,57 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
context->bridge->primary_retry = now + 5;
}
}else{
if(context->bridge->start_type == bst_lazy && context->bridge->lazy_reconnect){
rc = mqtt3_bridge_connect(db, context);
if(rc == MOSQ_ERR_SUCCESS){
pollfds[pollfd_index].fd = context->sock;
pollfds[pollfd_index].events = POLLIN;
pollfds[pollfd_index].revents = 0;
if(context->current_out_packet){
pollfds[pollfd_index].events |= POLLOUT;
}
context->pollfd_index = pollfd_index;
pollfd_index++;
}else{
context->bridge->cur_address++;
if(context->bridge->cur_address == context->bridge->address_count){
context->bridge->cur_address = 0;
}
}
}
if(context->bridge->start_type == bst_automatic && now > context->bridge->restart_t){
context->bridge->restart_t = 0;
rc = mqtt3_bridge_connect(db, context);
if(rc == MOSQ_ERR_SUCCESS){
pollfds[pollfd_index].fd = context->sock;
pollfds[pollfd_index].events = POLLIN;
pollfds[pollfd_index].revents = 0;
if(context->current_out_packet){
pollfds[pollfd_index].events |= POLLOUT;
}
context->pollfd_index = pollfd_index;
pollfd_index++;
}else{
/* Retry later. */
context->bridge->restart_t = now+context->bridge->restart_timeout;
if((context->bridge->start_type == bst_lazy && context->bridge->lazy_reconnect)
|| (context->bridge->start_type == bst_automatic && now > context->bridge->restart_t)){
context->bridge->cur_address++;
if(context->bridge->cur_address == context->bridge->address_count){
context->bridge->cur_address = 0;
#ifdef __linux__
if(context->adns){
/* Waiting on DNS lookup */
rc = mqtt3_bridge_connect_step2(db, context);
if(rc == MOSQ_ERR_SUCCESS){
pollfds[pollfd_index].fd = context->sock;
pollfds[pollfd_index].events = POLLIN;
pollfds[pollfd_index].revents = 0;
if(context->current_out_packet){
pollfds[pollfd_index].events |= POLLOUT;
}
context->pollfd_index = pollfd_index;
pollfd_index++;
}else{
context->bridge->cur_address++;
if(context->bridge->cur_address == context->bridge->address_count){
context->bridge->cur_address = 0;
}
}
}else{
rc = mqtt3_bridge_connect_step1(db, context);
if(rc){
context->bridge->cur_address++;
if(context->bridge->cur_address == context->bridge->address_count){
context->bridge->cur_address = 0;
}
}
}
#else
{
rc = mqtt3_bridge_connect(db, context);
if(rc == MOSQ_ERR_SUCCESS){
pollfds[pollfd_index].fd = context->sock;
pollfds[pollfd_index].events = POLLIN;
pollfds[pollfd_index].revents = 0;
if(context->current_out_packet){
pollfds[pollfd_index].events |= POLLOUT;
}
context->pollfd_index = pollfd_index;
pollfd_index++;
}else{
context->bridge->cur_address++;
if(context->bridge->cur_address == context->bridge->address_count){
context->bridge->cur_address = 0;
}
}
}
#endif
}
}
}

View File

@ -466,6 +466,8 @@ int _mosquitto_log_printf(struct mosquitto *mosq, int level, const char *fmt, ..
#ifdef WITH_BRIDGE
int mqtt3_bridge_new(struct mosquitto_db *db, struct _mqtt3_bridge *bridge);
int mqtt3_bridge_connect(struct mosquitto_db *db, struct mosquitto *context);
int mqtt3_bridge_connect_step1(struct mosquitto_db *db, struct mosquitto *context);
int mqtt3_bridge_connect_step2(struct mosquitto_db *db, struct mosquitto *context);
void mqtt3_bridge_packet_cleanup(struct mosquitto *context);
#endif