2014-05-07 22:27:00 +00:00
/*
Copyright ( c ) 2009 - 2014 Roger Light < roger @ atchoo . org >
All rights reserved . This program and the accompanying materials
are made available under the terms of the Eclipse Public License v1 .0
and Eclipse Distribution License v1 .0 which accompany this distribution .
The Eclipse Public License is available at
http : //www.eclipse.org/legal/epl-v10.html
and the Eclipse Distribution License is available at
http : //www.eclipse.org/org/documents/edl-v10.php.
Contributors :
Roger Light - initial implementation and documentation .
*/
# include <assert.h>
# include <errno.h>
# include <fcntl.h>
# include <stdio.h>
# include <string.h>
# ifndef WIN32
# include <netdb.h>
# include <sys/socket.h>
# include <unistd.h>
# else
# include <winsock2.h>
# include <ws2tcpip.h>
# endif
# ifdef ANDROID
# include <linux/in.h>
# include <linux/in6.h>
# include <sys/endian.h>
# endif
# ifdef __FreeBSD__
# include <netinet / in.h>
# endif
# ifdef __SYMBIAN32__
# include <netinet/in.h>
# endif
# ifdef __QNX__
# ifndef AI_ADDRCONFIG
# define AI_ADDRCONFIG 0
# endif
# include <net/netbyte.h>
# include <netinet/in.h>
# endif
# ifdef WITH_TLS
# include <openssl/err.h>
# include <tls_mosq.h>
# endif
# ifdef WITH_BROKER
# include <mosquitto_broker.h>
# ifdef WITH_SYS_TREE
extern uint64_t g_bytes_received ;
extern uint64_t g_bytes_sent ;
extern unsigned long g_msgs_received ;
extern unsigned long g_msgs_sent ;
extern unsigned long g_pub_msgs_received ;
extern unsigned long g_pub_msgs_sent ;
# endif
2014-05-06 09:47:00 +00:00
# ifdef WITH_WEBSOCKETS
# include <libwebsockets.h>
# endif
2014-05-07 22:27:00 +00:00
# else
# include <read_handle.h>
# endif
# include <logging_mosq.h>
# include <memory_mosq.h>
# include <mqtt3_protocol.h>
# include <net_mosq.h>
# include <time_mosq.h>
# include <util_mosq.h>
# ifdef WITH_TLS
int tls_ex_index_mosq = - 1 ;
# endif
void _mosquitto_net_init ( void )
{
# ifdef WIN32
WSADATA wsaData ;
WSAStartup ( MAKEWORD ( 2 , 2 ) , & wsaData ) ;
# endif
# ifdef WITH_SRV
ares_library_init ( ARES_LIB_INIT_ALL ) ;
# endif
# ifdef WITH_TLS
SSL_load_error_strings ( ) ;
SSL_library_init ( ) ;
OpenSSL_add_all_algorithms ( ) ;
if ( tls_ex_index_mosq = = - 1 ) {
tls_ex_index_mosq = SSL_get_ex_new_index ( 0 , " client context " , NULL , NULL , NULL ) ;
}
# endif
}
void _mosquitto_net_cleanup ( void )
{
# ifdef WITH_TLS
ERR_free_strings ( ) ;
EVP_cleanup ( ) ;
CRYPTO_cleanup_all_ex_data ( ) ;
# endif
# ifdef WITH_SRV
ares_library_cleanup ( ) ;
# endif
# ifdef WIN32
WSACleanup ( ) ;
# endif
}
void _mosquitto_packet_cleanup ( struct _mosquitto_packet * packet )
{
if ( ! packet ) return ;
/* Free data and reset values */
packet - > command = 0 ;
packet - > have_remaining = 0 ;
packet - > remaining_count = 0 ;
packet - > remaining_mult = 1 ;
packet - > remaining_length = 0 ;
if ( packet - > payload ) _mosquitto_free ( packet - > payload ) ;
packet - > payload = NULL ;
packet - > to_process = 0 ;
packet - > pos = 0 ;
}
int _mosquitto_packet_queue ( struct mosquitto * mosq , struct _mosquitto_packet * packet )
{
# ifndef WITH_BROKER
char sockpair_data = 0 ;
# endif
assert ( mosq ) ;
assert ( packet ) ;
packet - > pos = 0 ;
packet - > to_process = packet - > packet_length ;
packet - > next = NULL ;
pthread_mutex_lock ( & mosq - > out_packet_mutex ) ;
if ( mosq - > out_packet ) {
mosq - > out_packet_last - > next = packet ;
} else {
mosq - > out_packet = packet ;
}
mosq - > out_packet_last = packet ;
pthread_mutex_unlock ( & mosq - > out_packet_mutex ) ;
# ifdef WITH_BROKER
2014-05-06 09:47:00 +00:00
# ifdef WITH_WEBSOCKETS
if ( mosq - > wsi ) {
libwebsocket_callback_on_writable ( mosq - > ws_context , mosq - > wsi ) ;
return 0 ;
} else {
return _mosquitto_packet_write ( mosq ) ;
}
# else
2014-05-07 22:27:00 +00:00
return _mosquitto_packet_write ( mosq ) ;
2014-05-06 09:47:00 +00:00
# endif
2014-05-07 22:27:00 +00:00
# else
/* Write a single byte to sockpairW (connected to sockpairR) to break out
* of select ( ) if in threaded mode . */
if ( mosq - > sockpairW ! = INVALID_SOCKET ) {
# ifndef WIN32
if ( write ( mosq - > sockpairW , & sockpair_data , 1 ) ) {
}
# else
send ( mosq - > sockpairW , & sockpair_data , 1 , 0 ) ;
# endif
}
if ( mosq - > in_callback = = false & & mosq - > threaded = = false ) {
return _mosquitto_packet_write ( mosq ) ;
} else {
return MOSQ_ERR_SUCCESS ;
}
# endif
}
/* Close a socket associated with a context and set it to -1.
* Returns 1 on failure ( context is NULL )
* Returns 0 on success .
*/
2014-06-23 16:57:35 +00:00
# ifdef WITH_BROKER
int _mosquitto_socket_close ( struct mosquitto_db * db , struct mosquitto * mosq )
# else
2014-05-07 22:27:00 +00:00
int _mosquitto_socket_close ( struct mosquitto * mosq )
2014-06-23 16:57:35 +00:00
# endif
2014-05-07 22:27:00 +00:00
{
int rc = 0 ;
assert ( mosq ) ;
# ifdef WITH_TLS
if ( mosq - > ssl ) {
SSL_shutdown ( mosq - > ssl ) ;
SSL_free ( mosq - > ssl ) ;
mosq - > ssl = NULL ;
}
if ( mosq - > ssl_ctx ) {
SSL_CTX_free ( mosq - > ssl_ctx ) ;
mosq - > ssl_ctx = NULL ;
}
# endif
if ( mosq - > sock ! = INVALID_SOCKET ) {
2014-06-23 16:57:35 +00:00
# ifdef WITH_BROKER
HASH_DELETE ( hh_sock , db - > contexts_by_sock , mosq ) ;
# endif
2014-05-07 22:27:00 +00:00
rc = COMPAT_CLOSE ( mosq - > sock ) ;
mosq - > sock = INVALID_SOCKET ;
}
2014-06-03 15:21:46 +00:00
# ifdef WITH_BROKER
if ( mosq - > listener ) {
mosq - > listener - > client_count - - ;
assert ( mosq - > listener - > client_count > = 0 ) ;
mosq - > listener = NULL ;
}
# endif
2014-05-07 22:27:00 +00:00
return rc ;
}
# ifdef REAL_WITH_TLS_PSK
static unsigned int psk_client_callback ( SSL * ssl , const char * hint ,
char * identity , unsigned int max_identity_len ,
unsigned char * psk , unsigned int max_psk_len )
{
struct mosquitto * mosq ;
int len ;
mosq = SSL_get_ex_data ( ssl , tls_ex_index_mosq ) ;
if ( ! mosq ) return 0 ;
snprintf ( identity , max_identity_len , " %s " , mosq - > tls_psk_identity ) ;
len = _mosquitto_hex2bin ( mosq - > tls_psk , psk , max_psk_len ) ;
if ( len < 0 ) return 0 ;
return len ;
}
# endif
int _mosquitto_try_connect ( const char * host , uint16_t port , int * sock , const char * bind_address , bool blocking )
{
struct addrinfo hints ;
struct addrinfo * ainfo , * rp ;
struct addrinfo * ainfo_bind , * rp_bind ;
int s ;
int rc ;
# ifdef WIN32
uint32_t val = 1 ;
# endif
* sock = INVALID_SOCKET ;
memset ( & hints , 0 , sizeof ( struct addrinfo ) ) ;
hints . ai_family = PF_UNSPEC ;
hints . ai_flags = AI_ADDRCONFIG ;
hints . ai_socktype = SOCK_STREAM ;
s = getaddrinfo ( host , NULL , & hints , & ainfo ) ;
if ( s ) {
errno = s ;
return MOSQ_ERR_EAI ;
}
if ( bind_address ) {
s = getaddrinfo ( bind_address , NULL , & hints , & ainfo_bind ) ;
if ( s ) {
freeaddrinfo ( ainfo ) ;
errno = s ;
return MOSQ_ERR_EAI ;
}
}
for ( rp = ainfo ; rp ! = NULL ; rp = rp - > ai_next ) {
* sock = socket ( rp - > ai_family , rp - > ai_socktype , rp - > ai_protocol ) ;
if ( * sock = = INVALID_SOCKET ) continue ;
if ( rp - > ai_family = = PF_INET ) {
( ( struct sockaddr_in * ) rp - > ai_addr ) - > sin_port = htons ( port ) ;
} else if ( rp - > ai_family = = PF_INET6 ) {
( ( struct sockaddr_in6 * ) rp - > ai_addr ) - > sin6_port = htons ( port ) ;
} else {
2014-05-27 21:03:01 +00:00
COMPAT_CLOSE ( * sock ) ;
2014-05-07 22:27:00 +00:00
continue ;
}
if ( bind_address ) {
for ( rp_bind = ainfo_bind ; rp_bind ! = NULL ; rp_bind = rp_bind - > ai_next ) {
if ( bind ( * sock , rp_bind - > ai_addr , rp_bind - > ai_addrlen ) = = 0 ) {
break ;
}
}
if ( ! rp_bind ) {
COMPAT_CLOSE ( * sock ) ;
continue ;
}
}
if ( ! blocking ) {
/* Set non-blocking */
if ( _mosquitto_socket_nonblock ( * sock ) ) {
COMPAT_CLOSE ( * sock ) ;
continue ;
}
}
rc = connect ( * sock , rp - > ai_addr , rp - > ai_addrlen ) ;
# ifdef WIN32
errno = WSAGetLastError ( ) ;
# endif
if ( rc = = 0 | | errno = = EINPROGRESS | | errno = = COMPAT_EWOULDBLOCK ) {
if ( blocking ) {
/* Set non-blocking */
if ( _mosquitto_socket_nonblock ( * sock ) ) {
COMPAT_CLOSE ( * sock ) ;
continue ;
}
}
break ;
}
COMPAT_CLOSE ( * sock ) ;
* sock = INVALID_SOCKET ;
}
freeaddrinfo ( ainfo ) ;
if ( bind_address ) {
freeaddrinfo ( ainfo_bind ) ;
}
if ( ! rp ) {
return MOSQ_ERR_ERRNO ;
}
return MOSQ_ERR_SUCCESS ;
}
/* Create a socket and connect it to 'ip' on port 'port'.
* Returns - 1 on failure ( ip is NULL , socket creation / connection error )
* Returns sock number on success .
*/
int _mosquitto_socket_connect ( struct mosquitto * mosq , const char * host , uint16_t port , const char * bind_address , bool blocking )
{
int sock = INVALID_SOCKET ;
int rc ;
# ifdef WITH_TLS
int ret ;
BIO * bio ;
# endif
if ( ! mosq | | ! host | | ! port ) return MOSQ_ERR_INVAL ;
# ifdef WITH_TLS
if ( mosq - > tls_cafile | | mosq - > tls_capath | | mosq - > tls_psk ) {
blocking = true ;
}
# endif
rc = _mosquitto_try_connect ( host , port , & sock , bind_address , blocking ) ;
if ( rc ! = MOSQ_ERR_SUCCESS ) return rc ;
# ifdef WITH_TLS
if ( mosq - > tls_cafile | | mosq - > tls_capath | | mosq - > tls_psk ) {
# if OPENSSL_VERSION_NUMBER >= 0x10001000L
if ( ! mosq - > tls_version | | ! strcmp ( mosq - > tls_version , " tlsv1.2 " ) ) {
mosq - > ssl_ctx = SSL_CTX_new ( TLSv1_2_client_method ( ) ) ;
} else if ( ! strcmp ( mosq - > tls_version , " tlsv1.1 " ) ) {
mosq - > ssl_ctx = SSL_CTX_new ( TLSv1_1_client_method ( ) ) ;
} else if ( ! strcmp ( mosq - > tls_version , " tlsv1 " ) ) {
mosq - > ssl_ctx = SSL_CTX_new ( TLSv1_client_method ( ) ) ;
} else {
_mosquitto_log_printf ( mosq , MOSQ_LOG_ERR , " Error: Protocol %s not supported. " , mosq - > tls_version ) ;
COMPAT_CLOSE ( sock ) ;
return MOSQ_ERR_INVAL ;
}
# else
if ( ! mosq - > tls_version | | ! strcmp ( mosq - > tls_version , " tlsv1 " ) ) {
mosq - > ssl_ctx = SSL_CTX_new ( TLSv1_client_method ( ) ) ;
} else {
_mosquitto_log_printf ( mosq , MOSQ_LOG_ERR , " Error: Protocol %s not supported. " , mosq - > tls_version ) ;
COMPAT_CLOSE ( sock ) ;
return MOSQ_ERR_INVAL ;
}
# endif
if ( ! mosq - > ssl_ctx ) {
_mosquitto_log_printf ( mosq , MOSQ_LOG_ERR , " Error: Unable to create TLS context. " ) ;
COMPAT_CLOSE ( sock ) ;
return MOSQ_ERR_TLS ;
}
# if OPENSSL_VERSION_NUMBER >= 0x10000000
/* Disable compression */
SSL_CTX_set_options ( mosq - > ssl_ctx , SSL_OP_NO_COMPRESSION ) ;
# endif
# ifdef SSL_MODE_RELEASE_BUFFERS
/* Use even less memory per SSL connection. */
SSL_CTX_set_mode ( mosq - > ssl_ctx , SSL_MODE_RELEASE_BUFFERS ) ;
# endif
if ( mosq - > tls_ciphers ) {
ret = SSL_CTX_set_cipher_list ( mosq - > ssl_ctx , mosq - > tls_ciphers ) ;
if ( ret = = 0 ) {
_mosquitto_log_printf ( mosq , MOSQ_LOG_ERR , " Error: Unable to set TLS ciphers. Check cipher list \" %s \" . " , mosq - > tls_ciphers ) ;
COMPAT_CLOSE ( sock ) ;
return MOSQ_ERR_TLS ;
}
}
if ( mosq - > tls_cafile | | mosq - > tls_capath ) {
ret = SSL_CTX_load_verify_locations ( mosq - > ssl_ctx , mosq - > tls_cafile , mosq - > tls_capath ) ;
if ( ret = = 0 ) {
# ifdef WITH_BROKER
if ( mosq - > tls_cafile & & mosq - > tls_capath ) {
_mosquitto_log_printf ( mosq , MOSQ_LOG_ERR , " Error: Unable to load CA certificates, check bridge_cafile \" %s \" and bridge_capath \" %s \" . " , mosq - > tls_cafile , mosq - > tls_capath ) ;
} else if ( mosq - > tls_cafile ) {
_mosquitto_log_printf ( mosq , MOSQ_LOG_ERR , " Error: Unable to load CA certificates, check bridge_cafile \" %s \" . " , mosq - > tls_cafile ) ;
} else {
_mosquitto_log_printf ( mosq , MOSQ_LOG_ERR , " Error: Unable to load CA certificates, check bridge_capath \" %s \" . " , mosq - > tls_capath ) ;
}
# else
if ( mosq - > tls_cafile & & mosq - > tls_capath ) {
_mosquitto_log_printf ( mosq , MOSQ_LOG_ERR , " Error: Unable to load CA certificates, check cafile \" %s \" and capath \" %s \" . " , mosq - > tls_cafile , mosq - > tls_capath ) ;
} else if ( mosq - > tls_cafile ) {
_mosquitto_log_printf ( mosq , MOSQ_LOG_ERR , " Error: Unable to load CA certificates, check cafile \" %s \" . " , mosq - > tls_cafile ) ;
} else {
_mosquitto_log_printf ( mosq , MOSQ_LOG_ERR , " Error: Unable to load CA certificates, check capath \" %s \" . " , mosq - > tls_capath ) ;
}
# endif
COMPAT_CLOSE ( sock ) ;
return MOSQ_ERR_TLS ;
}
if ( mosq - > tls_cert_reqs = = 0 ) {
SSL_CTX_set_verify ( mosq - > ssl_ctx , SSL_VERIFY_NONE , NULL ) ;
} else {
SSL_CTX_set_verify ( mosq - > ssl_ctx , SSL_VERIFY_PEER , _mosquitto_server_certificate_verify ) ;
}
if ( mosq - > tls_pw_callback ) {
SSL_CTX_set_default_passwd_cb ( mosq - > ssl_ctx , mosq - > tls_pw_callback ) ;
SSL_CTX_set_default_passwd_cb_userdata ( mosq - > ssl_ctx , mosq ) ;
}
if ( mosq - > tls_certfile ) {
ret = SSL_CTX_use_certificate_chain_file ( mosq - > ssl_ctx , mosq - > tls_certfile ) ;
if ( ret ! = 1 ) {
# ifdef WITH_BROKER
_mosquitto_log_printf ( mosq , MOSQ_LOG_ERR , " Error: Unable to load client certificate, check bridge_certfile \" %s \" . " , mosq - > tls_certfile ) ;
# else
_mosquitto_log_printf ( mosq , MOSQ_LOG_ERR , " Error: Unable to load client certificate \" %s \" . " , mosq - > tls_certfile ) ;
# endif
COMPAT_CLOSE ( sock ) ;
return MOSQ_ERR_TLS ;
}
}
if ( mosq - > tls_keyfile ) {
ret = SSL_CTX_use_PrivateKey_file ( mosq - > ssl_ctx , mosq - > tls_keyfile , SSL_FILETYPE_PEM ) ;
if ( ret ! = 1 ) {
# ifdef WITH_BROKER
_mosquitto_log_printf ( mosq , MOSQ_LOG_ERR , " Error: Unable to load client key file, check bridge_keyfile \" %s \" . " , mosq - > tls_keyfile ) ;
# else
_mosquitto_log_printf ( mosq , MOSQ_LOG_ERR , " Error: Unable to load client key file \" %s \" . " , mosq - > tls_keyfile ) ;
# endif
COMPAT_CLOSE ( sock ) ;
return MOSQ_ERR_TLS ;
}
ret = SSL_CTX_check_private_key ( mosq - > ssl_ctx ) ;
if ( ret ! = 1 ) {
_mosquitto_log_printf ( mosq , MOSQ_LOG_ERR , " Error: Client certificate/key are inconsistent. " ) ;
COMPAT_CLOSE ( sock ) ;
return MOSQ_ERR_TLS ;
}
}
# ifdef REAL_WITH_TLS_PSK
} else if ( mosq - > tls_psk ) {
SSL_CTX_set_psk_client_callback ( mosq - > ssl_ctx , psk_client_callback ) ;
# endif
}
mosq - > ssl = SSL_new ( mosq - > ssl_ctx ) ;
if ( ! mosq - > ssl ) {
COMPAT_CLOSE ( sock ) ;
return MOSQ_ERR_TLS ;
}
SSL_set_ex_data ( mosq - > ssl , tls_ex_index_mosq , mosq ) ;
bio = BIO_new_socket ( sock , BIO_NOCLOSE ) ;
if ( ! bio ) {
COMPAT_CLOSE ( sock ) ;
return MOSQ_ERR_TLS ;
}
SSL_set_bio ( mosq - > ssl , bio , bio ) ;
ret = SSL_connect ( mosq - > ssl ) ;
if ( ret ! = 1 ) {
ret = SSL_get_error ( mosq - > ssl , ret ) ;
if ( ret = = SSL_ERROR_WANT_READ ) {
/* We always try to read anyway */
} else if ( ret = = SSL_ERROR_WANT_WRITE ) {
mosq - > want_write = true ;
} else {
COMPAT_CLOSE ( sock ) ;
return MOSQ_ERR_TLS ;
}
}
}
# endif
mosq - > sock = sock ;
return MOSQ_ERR_SUCCESS ;
}
int _mosquitto_read_byte ( struct _mosquitto_packet * packet , uint8_t * byte )
{
assert ( packet ) ;
if ( packet - > pos + 1 > packet - > remaining_length ) return MOSQ_ERR_PROTOCOL ;
* byte = packet - > payload [ packet - > pos ] ;
packet - > pos + + ;
return MOSQ_ERR_SUCCESS ;
}
void _mosquitto_write_byte ( struct _mosquitto_packet * packet , uint8_t byte )
{
assert ( packet ) ;
assert ( packet - > pos + 1 < = packet - > packet_length ) ;
packet - > payload [ packet - > pos ] = byte ;
packet - > pos + + ;
}
int _mosquitto_read_bytes ( struct _mosquitto_packet * packet , void * bytes , uint32_t count )
{
assert ( packet ) ;
if ( packet - > pos + count > packet - > remaining_length ) return MOSQ_ERR_PROTOCOL ;
memcpy ( bytes , & ( packet - > payload [ packet - > pos ] ) , count ) ;
packet - > pos + = count ;
return MOSQ_ERR_SUCCESS ;
}
void _mosquitto_write_bytes ( struct _mosquitto_packet * packet , const void * bytes , uint32_t count )
{
assert ( packet ) ;
assert ( packet - > pos + count < = packet - > packet_length ) ;
memcpy ( & ( packet - > payload [ packet - > pos ] ) , bytes , count ) ;
packet - > pos + = count ;
}
int _mosquitto_read_string ( struct _mosquitto_packet * packet , char * * str )
{
uint16_t len ;
int rc ;
assert ( packet ) ;
rc = _mosquitto_read_uint16 ( packet , & len ) ;
if ( rc ) return rc ;
if ( packet - > pos + len > packet - > remaining_length ) return MOSQ_ERR_PROTOCOL ;
* str = _mosquitto_calloc ( len + 1 , sizeof ( char ) ) ;
if ( * str ) {
memcpy ( * str , & ( packet - > payload [ packet - > pos ] ) , len ) ;
packet - > pos + = len ;
} else {
return MOSQ_ERR_NOMEM ;
}
return MOSQ_ERR_SUCCESS ;
}
void _mosquitto_write_string ( struct _mosquitto_packet * packet , const char * str , uint16_t length )
{
assert ( packet ) ;
_mosquitto_write_uint16 ( packet , length ) ;
_mosquitto_write_bytes ( packet , str , length ) ;
}
int _mosquitto_read_uint16 ( struct _mosquitto_packet * packet , uint16_t * word )
{
uint8_t msb , lsb ;
assert ( packet ) ;
if ( packet - > pos + 2 > packet - > remaining_length ) return MOSQ_ERR_PROTOCOL ;
msb = packet - > payload [ packet - > pos ] ;
packet - > pos + + ;
lsb = packet - > payload [ packet - > pos ] ;
packet - > pos + + ;
* word = ( msb < < 8 ) + lsb ;
return MOSQ_ERR_SUCCESS ;
}
void _mosquitto_write_uint16 ( struct _mosquitto_packet * packet , uint16_t word )
{
_mosquitto_write_byte ( packet , MOSQ_MSB ( word ) ) ;
_mosquitto_write_byte ( packet , MOSQ_LSB ( word ) ) ;
}
ssize_t _mosquitto_net_read ( struct mosquitto * mosq , void * buf , size_t count )
{
# ifdef WITH_TLS
int ret ;
int err ;
char ebuf [ 256 ] ;
unsigned long e ;
# endif
assert ( mosq ) ;
errno = 0 ;
# ifdef WITH_TLS
if ( mosq - > ssl ) {
ret = SSL_read ( mosq - > ssl , buf , count ) ;
if ( ret < = 0 ) {
err = SSL_get_error ( mosq - > ssl , ret ) ;
if ( err = = SSL_ERROR_WANT_READ ) {
ret = - 1 ;
errno = EAGAIN ;
} else if ( err = = SSL_ERROR_WANT_WRITE ) {
ret = - 1 ;
mosq - > want_write = true ;
errno = EAGAIN ;
} else {
e = ERR_get_error ( ) ;
while ( e ) {
_mosquitto_log_printf ( mosq , MOSQ_LOG_ERR , " OpenSSL Error: %s " , ERR_error_string ( e , ebuf ) ) ;
e = ERR_get_error ( ) ;
}
errno = EPROTO ;
}
}
return ( ssize_t ) ret ;
} else {
/* Call normal read/recv */
# endif
# ifndef WIN32
return read ( mosq - > sock , buf , count ) ;
# else
return recv ( mosq - > sock , buf , count , 0 ) ;
# endif
# ifdef WITH_TLS
}
# endif
}
ssize_t _mosquitto_net_write ( struct mosquitto * mosq , void * buf , size_t count )
{
# ifdef WITH_TLS
int ret ;
int err ;
char ebuf [ 256 ] ;
unsigned long e ;
# endif
assert ( mosq ) ;
errno = 0 ;
# ifdef WITH_TLS
if ( mosq - > ssl ) {
ret = SSL_write ( mosq - > ssl , buf , count ) ;
if ( ret < 0 ) {
err = SSL_get_error ( mosq - > ssl , ret ) ;
if ( err = = SSL_ERROR_WANT_READ ) {
ret = - 1 ;
errno = EAGAIN ;
} else if ( err = = SSL_ERROR_WANT_WRITE ) {
ret = - 1 ;
mosq - > want_write = true ;
errno = EAGAIN ;
} else {
e = ERR_get_error ( ) ;
while ( e ) {
_mosquitto_log_printf ( mosq , MOSQ_LOG_ERR , " OpenSSL Error: %s " , ERR_error_string ( e , ebuf ) ) ;
e = ERR_get_error ( ) ;
}
errno = EPROTO ;
}
}
return ( ssize_t ) ret ;
} else {
/* Call normal write/send */
# endif
# ifndef WIN32
return write ( mosq - > sock , buf , count ) ;
# else
return send ( mosq - > sock , buf , count , 0 ) ;
# endif
# ifdef WITH_TLS
}
# endif
}
int _mosquitto_packet_write ( struct mosquitto * mosq )
{
ssize_t write_length ;
struct _mosquitto_packet * packet ;
if ( ! mosq ) return MOSQ_ERR_INVAL ;
if ( mosq - > sock = = INVALID_SOCKET ) return MOSQ_ERR_NO_CONN ;
pthread_mutex_lock ( & mosq - > current_out_packet_mutex ) ;
pthread_mutex_lock ( & mosq - > out_packet_mutex ) ;
if ( mosq - > out_packet & & ! mosq - > current_out_packet ) {
mosq - > current_out_packet = mosq - > out_packet ;
mosq - > out_packet = mosq - > out_packet - > next ;
if ( ! mosq - > out_packet ) {
mosq - > out_packet_last = NULL ;
}
}
pthread_mutex_unlock ( & mosq - > out_packet_mutex ) ;
while ( mosq - > current_out_packet ) {
packet = mosq - > current_out_packet ;
while ( packet - > to_process > 0 ) {
write_length = _mosquitto_net_write ( mosq , & ( packet - > payload [ packet - > pos ] ) , packet - > to_process ) ;
if ( write_length > 0 ) {
# if defined(WITH_BROKER) && defined(WITH_SYS_TREE)
g_bytes_sent + = write_length ;
# endif
packet - > to_process - = write_length ;
packet - > pos + = write_length ;
} else {
# ifdef WIN32
errno = WSAGetLastError ( ) ;
# endif
if ( errno = = EAGAIN | | errno = = COMPAT_EWOULDBLOCK ) {
pthread_mutex_unlock ( & mosq - > current_out_packet_mutex ) ;
return MOSQ_ERR_SUCCESS ;
} else {
pthread_mutex_unlock ( & mosq - > current_out_packet_mutex ) ;
switch ( errno ) {
case COMPAT_ECONNRESET :
return MOSQ_ERR_CONN_LOST ;
default :
return MOSQ_ERR_ERRNO ;
}
}
}
}
# ifdef WITH_BROKER
# ifdef WITH_SYS_TREE
g_msgs_sent + + ;
if ( ( ( packet - > command ) & 0xF6 ) = = PUBLISH ) {
g_pub_msgs_sent + + ;
}
# endif
# else
if ( ( ( packet - > command ) & 0xF6 ) = = PUBLISH ) {
pthread_mutex_lock ( & mosq - > callback_mutex ) ;
if ( mosq - > on_publish ) {
/* This is a QoS=0 message */
mosq - > in_callback = true ;
mosq - > on_publish ( mosq , mosq - > userdata , packet - > mid ) ;
mosq - > in_callback = false ;
}
pthread_mutex_unlock ( & mosq - > callback_mutex ) ;
} else if ( ( ( packet - > command ) & 0xF0 ) = = DISCONNECT ) {
/* FIXME what cleanup needs doing here?
* incoming / outgoing messages ? */
_mosquitto_socket_close ( mosq ) ;
/* Start of duplicate, possibly unnecessary code.
* This does leave things in a consistent state at least . */
/* Free data and reset values */
pthread_mutex_lock ( & mosq - > out_packet_mutex ) ;
mosq - > current_out_packet = mosq - > out_packet ;
if ( mosq - > out_packet ) {
mosq - > out_packet = mosq - > out_packet - > next ;
if ( ! mosq - > out_packet ) {
mosq - > out_packet_last = NULL ;
}
}
pthread_mutex_unlock ( & mosq - > out_packet_mutex ) ;
_mosquitto_packet_cleanup ( packet ) ;
_mosquitto_free ( packet ) ;
pthread_mutex_lock ( & mosq - > msgtime_mutex ) ;
mosq - > last_msg_out = mosquitto_time ( ) ;
pthread_mutex_unlock ( & mosq - > msgtime_mutex ) ;
/* End of duplicate, possibly unnecessary code */
pthread_mutex_lock ( & mosq - > callback_mutex ) ;
if ( mosq - > on_disconnect ) {
mosq - > in_callback = true ;
mosq - > on_disconnect ( mosq , mosq - > userdata , 0 ) ;
mosq - > in_callback = false ;
}
2014-05-16 09:20:45 +00:00
pthread_mutex_unlock ( & mosq - > callback_mutex ) ;
2014-05-07 22:27:00 +00:00
pthread_mutex_unlock ( & mosq - > current_out_packet_mutex ) ;
return MOSQ_ERR_SUCCESS ;
}
# endif
/* Free data and reset values */
pthread_mutex_lock ( & mosq - > out_packet_mutex ) ;
mosq - > current_out_packet = mosq - > out_packet ;
if ( mosq - > out_packet ) {
mosq - > out_packet = mosq - > out_packet - > next ;
if ( ! mosq - > out_packet ) {
mosq - > out_packet_last = NULL ;
}
}
pthread_mutex_unlock ( & mosq - > out_packet_mutex ) ;
_mosquitto_packet_cleanup ( packet ) ;
_mosquitto_free ( packet ) ;
pthread_mutex_lock ( & mosq - > msgtime_mutex ) ;
mosq - > last_msg_out = mosquitto_time ( ) ;
pthread_mutex_unlock ( & mosq - > msgtime_mutex ) ;
}
pthread_mutex_unlock ( & mosq - > current_out_packet_mutex ) ;
return MOSQ_ERR_SUCCESS ;
}
# ifdef WITH_BROKER
int _mosquitto_packet_read ( struct mosquitto_db * db , struct mosquitto * mosq )
# else
int _mosquitto_packet_read ( struct mosquitto * mosq )
# endif
{
uint8_t byte ;
ssize_t read_length ;
int rc = 0 ;
if ( ! mosq ) return MOSQ_ERR_INVAL ;
if ( mosq - > sock = = INVALID_SOCKET ) return MOSQ_ERR_NO_CONN ;
/* This gets called if pselect() indicates that there is network data
* available - ie . at least one byte . What we do depends on what data we
* already have .
* If we ' ve not got a command , attempt to read one and save it . This should
* always work because it ' s only a single byte .
* Then try to read the remaining length . This may fail because it is may
* be more than one byte - will need to save data pending next read if it
* does fail .
* Then try to read the remaining payload , where ' payload ' here means the
* combined variable header and actual payload . This is the most likely to
* fail due to longer length , so save current data and current position .
* After all data is read , send to _mosquitto_handle_packet ( ) to deal with .
* Finally , free the memory and reset everything to starting conditions .
*/
if ( ! mosq - > in_packet . command ) {
read_length = _mosquitto_net_read ( mosq , & byte , 1 ) ;
if ( read_length = = 1 ) {
mosq - > in_packet . command = byte ;
# ifdef WITH_BROKER
# ifdef WITH_SYS_TREE
g_bytes_received + + ;
# endif
/* Clients must send CONNECT as their first command. */
if ( ! ( mosq - > bridge ) & & mosq - > state = = mosq_cs_new & & ( byte & 0xF0 ) ! = CONNECT ) return MOSQ_ERR_PROTOCOL ;
# endif
} else {
if ( read_length = = 0 ) return MOSQ_ERR_CONN_LOST ; /* EOF */
# ifdef WIN32
errno = WSAGetLastError ( ) ;
# endif
if ( errno = = EAGAIN | | errno = = COMPAT_EWOULDBLOCK ) {
return MOSQ_ERR_SUCCESS ;
} else {
switch ( errno ) {
case COMPAT_ECONNRESET :
return MOSQ_ERR_CONN_LOST ;
default :
return MOSQ_ERR_ERRNO ;
}
}
}
}
if ( ! mosq - > in_packet . have_remaining ) {
do {
read_length = _mosquitto_net_read ( mosq , & byte , 1 ) ;
if ( read_length = = 1 ) {
mosq - > in_packet . remaining_count + + ;
/* Max 4 bytes length for remaining length as defined by protocol.
* Anything more likely means a broken / malicious client .
*/
if ( mosq - > in_packet . remaining_count > 4 ) return MOSQ_ERR_PROTOCOL ;
# if defined(WITH_BROKER) && defined(WITH_SYS_TREE)
g_bytes_received + + ;
# endif
mosq - > in_packet . remaining_length + = ( byte & 127 ) * mosq - > in_packet . remaining_mult ;
mosq - > in_packet . remaining_mult * = 128 ;
} else {
if ( read_length = = 0 ) return MOSQ_ERR_CONN_LOST ; /* EOF */
# ifdef WIN32
errno = WSAGetLastError ( ) ;
# endif
if ( errno = = EAGAIN | | errno = = COMPAT_EWOULDBLOCK ) {
return MOSQ_ERR_SUCCESS ;
} else {
switch ( errno ) {
case COMPAT_ECONNRESET :
return MOSQ_ERR_CONN_LOST ;
default :
return MOSQ_ERR_ERRNO ;
}
}
}
} while ( ( byte & 128 ) ! = 0 ) ;
if ( mosq - > in_packet . remaining_length > 0 ) {
mosq - > in_packet . payload = _mosquitto_malloc ( mosq - > in_packet . remaining_length * sizeof ( uint8_t ) ) ;
if ( ! mosq - > in_packet . payload ) return MOSQ_ERR_NOMEM ;
mosq - > in_packet . to_process = mosq - > in_packet . remaining_length ;
}
mosq - > in_packet . have_remaining = 1 ;
}
while ( mosq - > in_packet . to_process > 0 ) {
read_length = _mosquitto_net_read ( mosq , & ( mosq - > in_packet . payload [ mosq - > in_packet . pos ] ) , mosq - > in_packet . to_process ) ;
if ( read_length > 0 ) {
# if defined(WITH_BROKER) && defined(WITH_SYS_TREE)
g_bytes_received + = read_length ;
# endif
mosq - > in_packet . to_process - = read_length ;
mosq - > in_packet . pos + = read_length ;
} else {
# ifdef WIN32
errno = WSAGetLastError ( ) ;
# endif
if ( errno = = EAGAIN | | errno = = COMPAT_EWOULDBLOCK ) {
if ( mosq - > in_packet . to_process > 1000 ) {
/* Update last_msg_in time if more than 1000 bytes left to
* receive . Helps when receiving large messages .
* This is an arbitrary limit , but with some consideration .
* If a client can ' t send 1000 bytes in a second it
* probably shouldn ' t be using a 1 second keep alive . */
pthread_mutex_lock ( & mosq - > msgtime_mutex ) ;
mosq - > last_msg_in = mosquitto_time ( ) ;
pthread_mutex_unlock ( & mosq - > msgtime_mutex ) ;
}
return MOSQ_ERR_SUCCESS ;
} else {
switch ( errno ) {
case COMPAT_ECONNRESET :
return MOSQ_ERR_CONN_LOST ;
default :
return MOSQ_ERR_ERRNO ;
}
}
}
}
/* All data for this packet is read. */
mosq - > in_packet . pos = 0 ;
# ifdef WITH_BROKER
# ifdef WITH_SYS_TREE
g_msgs_received + + ;
if ( ( ( mosq - > in_packet . command ) & 0xF5 ) = = PUBLISH ) {
g_pub_msgs_received + + ;
}
# endif
rc = mqtt3_packet_handle ( db , mosq ) ;
# else
rc = _mosquitto_packet_handle ( mosq ) ;
# endif
/* Free data and reset values */
_mosquitto_packet_cleanup ( & mosq - > in_packet ) ;
pthread_mutex_lock ( & mosq - > msgtime_mutex ) ;
mosq - > last_msg_in = mosquitto_time ( ) ;
pthread_mutex_unlock ( & mosq - > msgtime_mutex ) ;
return rc ;
}
int _mosquitto_socket_nonblock ( int sock )
{
# ifndef WIN32
int opt ;
/* Set non-blocking */
opt = fcntl ( sock , F_GETFL , 0 ) ;
if ( opt = = - 1 ) {
COMPAT_CLOSE ( sock ) ;
return 1 ;
}
if ( fcntl ( sock , F_SETFL , opt | O_NONBLOCK ) = = - 1 ) {
/* If either fcntl fails, don't want to allow this client to connect. */
COMPAT_CLOSE ( sock ) ;
return 1 ;
}
# else
opt = 1 ;
if ( ioctlsocket ( sock , FIONBIO , & opt ) ) {
COMPAT_CLOSE ( sock ) ;
return 1 ;
}
# endif
return 0 ;
}
# ifndef WITH_BROKER
int _mosquitto_socketpair ( int * pairR , int * pairW )
{
# ifdef WIN32
int family [ 2 ] = { AF_INET , AF_INET6 } ;
int i ;
struct sockaddr_storage ss ;
struct sockaddr_in * sa = ( struct sockaddr_in * ) & ss ;
struct sockaddr_in6 * sa6 = ( struct sockaddr_in6 * ) & ss ;
socklen_t ss_len ;
int spR , spW ;
int listensock ;
* pairR = - 1 ;
* pairW = - 1 ;
for ( i = 0 ; i < 2 ; i + + ) {
memset ( & ss , 0 , sizeof ( ss ) ) ;
if ( family [ i ] = = AF_INET ) {
sa - > sin_family = family [ i ] ;
sa - > sin_addr . s_addr = htonl ( INADDR_LOOPBACK ) ;
sa - > sin_port = 0 ;
ss_len = sizeof ( struct sockaddr_in ) ;
} else if ( family [ i ] = = AF_INET6 ) {
sa6 - > sin6_family = family [ i ] ;
sa6 - > sin6_addr = in6addr_loopback ;
sa6 - > sin6_port = 0 ;
ss_len = sizeof ( struct sockaddr_in6 ) ;
} else {
return MOSQ_ERR_INVAL ;
}
listensock = socket ( family [ i ] , SOCK_STREAM , IPPROTO_TCP ) ;
if ( listensock = = - 1 ) {
continue ;
}
if ( bind ( listensock , ( struct sockaddr * ) & ss , ss_len ) = = - 1 ) {
COMPAT_CLOSE ( listensock ) ;
continue ;
}
if ( listen ( listensock , 1 ) = = - 1 ) {
COMPAT_CLOSE ( listensock ) ;
continue ;
}
memset ( & ss , 0 , sizeof ( ss ) ) ;
ss_len = sizeof ( ss ) ;
if ( getsockname ( listensock , ( struct sockaddr * ) & ss , & ss_len ) < 0 ) {
COMPAT_CLOSE ( listensock ) ;
continue ;
}
if ( _mosquitto_socket_nonblock ( listensock ) ) {
continue ;
}
if ( family [ i ] = = AF_INET ) {
sa - > sin_family = family [ i ] ;
sa - > sin_addr . s_addr = htonl ( INADDR_LOOPBACK ) ;
ss_len = sizeof ( struct sockaddr_in ) ;
} else if ( family [ i ] = = AF_INET6 ) {
sa6 - > sin6_family = family [ i ] ;
sa6 - > sin6_addr = in6addr_loopback ;
ss_len = sizeof ( struct sockaddr_in6 ) ;
}
spR = socket ( family [ i ] , SOCK_STREAM , IPPROTO_TCP ) ;
if ( spR = = - 1 ) {
COMPAT_CLOSE ( listensock ) ;
continue ;
}
if ( _mosquitto_socket_nonblock ( spR ) ) {
COMPAT_CLOSE ( listensock ) ;
continue ;
}
if ( connect ( spR , ( struct sockaddr * ) & ss , ss_len ) < 0 ) {
# ifdef WIN32
errno = WSAGetLastError ( ) ;
# endif
if ( errno ! = EINPROGRESS & & errno ! = COMPAT_EWOULDBLOCK ) {
COMPAT_CLOSE ( spR ) ;
COMPAT_CLOSE ( listensock ) ;
continue ;
}
}
spW = accept ( listensock , NULL , 0 ) ;
if ( spW = = - 1 ) {
# ifdef WIN32
errno = WSAGetLastError ( ) ;
# endif
if ( errno ! = EINPROGRESS & & errno ! = COMPAT_EWOULDBLOCK ) {
COMPAT_CLOSE ( spR ) ;
COMPAT_CLOSE ( listensock ) ;
continue ;
}
}
if ( _mosquitto_socket_nonblock ( spW ) ) {
COMPAT_CLOSE ( spR ) ;
COMPAT_CLOSE ( listensock ) ;
continue ;
}
COMPAT_CLOSE ( listensock ) ;
* pairR = spR ;
* pairW = spW ;
return MOSQ_ERR_SUCCESS ;
}
return MOSQ_ERR_UNKNOWN ;
# else
int sv [ 2 ] ;
if ( socketpair ( AF_UNIX , SOCK_STREAM , 0 , sv ) = = - 1 ) {
return MOSQ_ERR_ERRNO ;
}
if ( _mosquitto_socket_nonblock ( sv [ 0 ] ) ) {
COMPAT_CLOSE ( sv [ 0 ] ) ;
COMPAT_CLOSE ( sv [ 1 ] ) ;
return MOSQ_ERR_ERRNO ;
}
if ( _mosquitto_socket_nonblock ( sv [ 1 ] ) ) {
COMPAT_CLOSE ( sv [ 0 ] ) ;
COMPAT_CLOSE ( sv [ 1 ] ) ;
return MOSQ_ERR_ERRNO ;
}
* pairR = sv [ 0 ] ;
* pairW = sv [ 1 ] ;
return MOSQ_ERR_SUCCESS ;
# endif
}
# endif