From 4195fde70bdb9e760dcca26512288588733063f0 Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Mon, 18 May 2015 09:29:22 +0100 Subject: [PATCH] Last raft of renames for the moment. --- lib/mosquitto.c | 20 ++-- lib/net_mosq.c | 40 ++++---- lib/net_mosq.h | 24 ++--- lib/packet_mosq.c | 14 +-- lib/read_handle.c | 2 +- lib/read_handle.h | 2 +- lib/socks_mosq.c | 6 +- lib/util_mosq.c | 6 +- src/bridge.c | 8 +- src/context.c | 6 +- src/database.c | 14 +-- src/db_dump/db_dump.c | 20 ++-- src/loop.c | 6 +- src/mosquitto.c | 4 +- src/mosquitto_broker.h | 15 ++- src/net.c | 4 +- src/persist.c | 4 +- src/read_handle.c | 6 +- src/read_handle_server.c | 6 +- src/subs.c | 32 +++---- src/websockets.c | 2 +- test/packet-gen.c | 14 +-- test/random_client.c | 198 --------------------------------------- 23 files changed, 127 insertions(+), 326 deletions(-) delete mode 100644 test/random_client.c diff --git a/lib/mosquitto.c b/lib/mosquitto.c index 926c3cc8..655c3111 100644 --- a/lib/mosquitto.c +++ b/lib/mosquitto.c @@ -72,14 +72,14 @@ int mosquitto_lib_init(void) srand(tv.tv_sec*1000 + tv.tv_usec/1000); #endif - mosquitto__net_init(); + net__init(); return MOSQ_ERR_SUCCESS; } int mosquitto_lib_cleanup(void) { - mosquitto__net_cleanup(); + net__cleanup(); return MOSQ_ERR_SUCCESS; } @@ -297,7 +297,7 @@ void mosquitto__destroy(struct mosquitto *mosq) } #endif if(mosq->sock != INVALID_SOCKET){ - mosquitto__socket_close(mosq); + net__socket_close(mosq); } message__cleanup_all(mosq); will__clear(mosq); @@ -404,7 +404,7 @@ static int mosquitto__connect_init(struct mosquitto *mosq, const char *host, int mosq->keepalive = keepalive; - if(mosquitto__socketpair(&mosq->sockpairR, &mosq->sockpairW)){ + if(net__socketpair(&mosq->sockpairR, &mosq->sockpairW)){ log__printf(mosq, MOSQ_LOG_WARNING, "Warning: Unable to open socket pair, outgoing publish commands may be delayed."); } @@ -510,11 +510,11 @@ static int mosquitto__reconnect(struct mosquitto *mosq, bool blocking) #ifdef WITH_SOCKS if(mosq->socks5_host){ - rc = mosquitto__socket_connect(mosq, mosq->socks5_host, mosq->socks5_port, mosq->bind_address, blocking); + rc = net__socket_connect(mosq, mosq->socks5_host, mosq->socks5_port, mosq->bind_address, blocking); }else #endif { - rc = mosquitto__socket_connect(mosq, mosq->host, mosq->port, mosq->bind_address, blocking); + rc = net__socket_connect(mosq, mosq->host, mosq->port, mosq->bind_address, blocking); } if(rc){ return rc; @@ -921,7 +921,7 @@ int mosquitto_loop(struct mosquitto *mosq, int timeout, int max_packets) if(FD_ISSET(mosq->sock, &readfds)){ #ifdef WITH_TLS if(mosq->want_connect){ - rc = mosquitto__socket_connect_tls(mosq); + rc = net__socket_connect_tls(mosq); if(rc) return rc; }else #endif @@ -947,7 +947,7 @@ int mosquitto_loop(struct mosquitto *mosq, int timeout, int max_packets) if(FD_ISSET(mosq->sock, &writefds)){ #ifdef WITH_TLS if(mosq->want_connect){ - rc = mosquitto__socket_connect_tls(mosq); + rc = net__socket_connect_tls(mosq); if(rc) return rc; }else #endif @@ -1068,7 +1068,7 @@ int mosquitto_loop_misc(struct mosquitto *mosq) /* mosq->ping_t != 0 means we are waiting for a pingresp. * This hasn't happened in the keepalive time so we should disconnect. */ - mosquitto__socket_close(mosq); + net__socket_close(mosq); pthread_mutex_lock(&mosq->state_mutex); if(mosq->state == mosq_cs_disconnecting){ rc = MOSQ_ERR_SUCCESS; @@ -1091,7 +1091,7 @@ int mosquitto_loop_misc(struct mosquitto *mosq) static int mosquitto__loop_rc_handle(struct mosquitto *mosq, int rc) { if(rc){ - mosquitto__socket_close(mosq); + net__socket_close(mosq); pthread_mutex_lock(&mosq->state_mutex); if(mosq->state == mosq_cs_disconnecting){ rc = MOSQ_ERR_SUCCESS; diff --git a/lib/net_mosq.c b/lib/net_mosq.c index ac23ab42..097ff61c 100644 --- a/lib/net_mosq.c +++ b/lib/net_mosq.c @@ -78,7 +78,7 @@ Contributors: int tls_ex_index_mosq = -1; #endif -void mosquitto__net_init(void) +void net__init(void) { #ifdef WIN32 WSADATA wsaData; @@ -99,7 +99,7 @@ void mosquitto__net_init(void) #endif } -void mosquitto__net_cleanup(void) +void net__cleanup(void) { #ifdef WITH_TLS ERR_remove_state(0); @@ -125,9 +125,9 @@ void mosquitto__net_cleanup(void) * Returns 0 on success. */ #ifdef WITH_BROKER -int mosquitto__socket_close(struct mosquitto_db *db, struct mosquitto *mosq) +int net__socket_close(struct mosquitto_db *db, struct mosquitto *mosq) #else -int mosquitto__socket_close(struct mosquitto *mosq) +int net__socket_close(struct mosquitto *mosq) #endif { int rc = 0; @@ -195,7 +195,7 @@ static unsigned int psk_client_callback(SSL *ssl, const char *hint, #endif -int mosquitto__try_connect(struct mosquitto *mosq, const char *host, uint16_t port, int *sock, const char *bind_address, bool blocking) +int net__try_connect(struct mosquitto *mosq, const char *host, uint16_t port, int *sock, const char *bind_address, bool blocking) { struct addrinfo hints; struct addrinfo *ainfo, *rp; @@ -261,7 +261,7 @@ int mosquitto__try_connect(struct mosquitto *mosq, const char *host, uint16_t po if(!blocking){ /* Set non-blocking */ - if(mosquitto__socket_nonblock(*sock)){ + if(net__socket_nonblock(*sock)){ COMPAT_CLOSE(*sock); continue; } @@ -278,7 +278,7 @@ int mosquitto__try_connect(struct mosquitto *mosq, const char *host, uint16_t po if(blocking){ /* Set non-blocking */ - if(mosquitto__socket_nonblock(*sock)){ + if(net__socket_nonblock(*sock)){ COMPAT_CLOSE(*sock); continue; } @@ -301,7 +301,7 @@ int mosquitto__try_connect(struct mosquitto *mosq, const char *host, uint16_t po #ifdef WITH_TLS -int mosquitto__socket_connect_tls(struct mosquitto *mosq) +int net__socket_connect_tls(struct mosquitto *mosq) { int ret; @@ -331,7 +331,7 @@ int mosquitto__socket_connect_tls(struct mosquitto *mosq) * Returns -1 on failure (ip is NULL, socket creation/connection error) * Returns sock number on success. */ -int mosquitto__socket_connect(struct mosquitto *mosq, const char *host, uint16_t port, const char *bind_address, bool blocking) +int net__socket_connect(struct mosquitto *mosq, const char *host, uint16_t port, const char *bind_address, bool blocking) { int sock = INVALID_SOCKET; int rc; @@ -342,7 +342,7 @@ int mosquitto__socket_connect(struct mosquitto *mosq, const char *host, uint16_t if(!mosq || !host || !port) return MOSQ_ERR_INVAL; - rc = mosquitto__try_connect(mosq, host, port, &sock, bind_address, blocking); + rc = net__try_connect(mosq, host, port, &sock, bind_address, blocking); if(rc > 0) return rc; #ifdef WITH_TLS @@ -475,7 +475,7 @@ int mosquitto__socket_connect(struct mosquitto *mosq, const char *host, uint16_t SSL_set_bio(mosq->ssl, bio, bio); mosq->sock = sock; - if(mosquitto__socket_connect_tls(mosq)){ + if(net__socket_connect_tls(mosq)){ return MOSQ_ERR_TLS; } @@ -488,7 +488,7 @@ int mosquitto__socket_connect(struct mosquitto *mosq, const char *host, uint16_t } -ssize_t mosquitto__net_read(struct mosquitto *mosq, void *buf, size_t count) +ssize_t net__read(struct mosquitto *mosq, void *buf, size_t count) { #ifdef WITH_TLS int ret; @@ -536,7 +536,7 @@ ssize_t mosquitto__net_read(struct mosquitto *mosq, void *buf, size_t count) #endif } -ssize_t mosquitto__net_write(struct mosquitto *mosq, void *buf, size_t count) +ssize_t net__write(struct mosquitto *mosq, void *buf, size_t count) { #ifdef WITH_TLS int ret; @@ -585,7 +585,7 @@ ssize_t mosquitto__net_write(struct mosquitto *mosq, void *buf, size_t count) } -int mosquitto__socket_nonblock(int sock) +int net__socket_nonblock(int sock) { #ifndef WIN32 int opt; @@ -612,7 +612,7 @@ int mosquitto__socket_nonblock(int sock) #ifndef WITH_BROKER -int mosquitto__socketpair(int *pairR, int *pairW) +int net__socketpair(int *pairR, int *pairW) { #ifdef WIN32 int family[2] = {AF_INET, AF_INET6}; @@ -665,7 +665,7 @@ int mosquitto__socketpair(int *pairR, int *pairW) continue; } - if(mosquitto__socket_nonblock(listensock)){ + if(net__socket_nonblock(listensock)){ continue; } @@ -684,7 +684,7 @@ int mosquitto__socketpair(int *pairR, int *pairW) COMPAT_CLOSE(listensock); continue; } - if(mosquitto__socket_nonblock(spR)){ + if(net__socket_nonblock(spR)){ COMPAT_CLOSE(listensock); continue; } @@ -710,7 +710,7 @@ int mosquitto__socketpair(int *pairR, int *pairW) } } - if(mosquitto__socket_nonblock(spW)){ + if(net__socket_nonblock(spW)){ COMPAT_CLOSE(spR); COMPAT_CLOSE(listensock); continue; @@ -728,12 +728,12 @@ int mosquitto__socketpair(int *pairR, int *pairW) if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1){ return MOSQ_ERR_ERRNO; } - if(mosquitto__socket_nonblock(sv[0])){ + if(net__socket_nonblock(sv[0])){ COMPAT_CLOSE(sv[0]); COMPAT_CLOSE(sv[1]); return MOSQ_ERR_ERRNO; } - if(mosquitto__socket_nonblock(sv[1])){ + if(net__socket_nonblock(sv[1])){ COMPAT_CLOSE(sv[0]); COMPAT_CLOSE(sv[1]); return MOSQ_ERR_ERRNO; diff --git a/lib/net_mosq.h b/lib/net_mosq.h index df5afe7f..22d70f01 100644 --- a/lib/net_mosq.h +++ b/lib/net_mosq.h @@ -49,25 +49,25 @@ struct mosquitto_db; #define MOSQ_MSB(A) (uint8_t)((A & 0xFF00) >> 8) #define MOSQ_LSB(A) (uint8_t)(A & 0x00FF) -void mosquitto__net_init(void); -void mosquitto__net_cleanup(void); +void net__init(void); +void net__cleanup(void); -int mosquitto__socket_connect(struct mosquitto *mosq, const char *host, uint16_t port, const char *bind_address, bool blocking); +int net__socket_connect(struct mosquitto *mosq, const char *host, uint16_t port, const char *bind_address, bool blocking); #ifdef WITH_BROKER -int mosquitto__socket_close(struct mosquitto_db *db, struct mosquitto *mosq); +int net__socket_close(struct mosquitto_db *db, struct mosquitto *mosq); #else -int mosquitto__socket_close(struct mosquitto *mosq); +int net__socket_close(struct mosquitto *mosq); #endif -int mosquitto__try_connect(struct mosquitto *mosq, const char *host, uint16_t port, int *sock, const char *bind_address, bool blocking); -int mosquitto__socket_nonblock(int sock); -int mosquitto__socketpair(int *sp1, int *sp2); +int net__try_connect(struct mosquitto *mosq, const char *host, uint16_t port, int *sock, const char *bind_address, bool blocking); +int net__socket_nonblock(int sock); +int net__socketpair(int *sp1, int *sp2); -ssize_t mosquitto__net_read(struct mosquitto *mosq, void *buf, size_t count); -ssize_t mosquitto__net_write(struct mosquitto *mosq, void *buf, size_t count); +ssize_t net__read(struct mosquitto *mosq, void *buf, size_t count); +ssize_t net__write(struct mosquitto *mosq, void *buf, size_t count); #ifdef WITH_TLS -int mosquitto__socket_apply_tls(struct mosquitto *mosq); -int mosquitto__socket_connect_tls(struct mosquitto *mosq); +int net__socket_apply_tls(struct mosquitto *mosq); +int net__socket_connect_tls(struct mosquitto *mosq); #endif #endif diff --git a/lib/packet_mosq.c b/lib/packet_mosq.c index da611c4a..c490daa3 100644 --- a/lib/packet_mosq.c +++ b/lib/packet_mosq.c @@ -276,7 +276,7 @@ int packet__write(struct mosquitto *mosq) packet = mosq->current_out_packet; while(packet->to_process > 0){ - write_length = mosquitto__net_write(mosq, &(packet->payload[packet->pos]), packet->to_process); + write_length = net__write(mosq, &(packet->payload[packet->pos]), packet->to_process); if(write_length > 0){ G_BYTES_SENT_INC(write_length); packet->to_process -= write_length; @@ -315,7 +315,7 @@ int packet__write(struct mosquitto *mosq) }else if(((packet->command)&0xF0) == DISCONNECT){ /* FIXME what cleanup needs doing here? * incoming/outgoing messages? */ - mosquitto__socket_close(mosq); + net__socket_close(mosq); /* Start of duplicate, possibly unnecessary code. * This does leave things in a consistent state at least. */ @@ -404,7 +404,7 @@ int packet__read(struct mosquitto *mosq) * Finally, free the memory and reset everything to starting conditions. */ if(!mosq->in_packet.command){ - read_length = mosquitto__net_read(mosq, &byte, 1); + read_length = net__read(mosq, &byte, 1); if(read_length == 1){ mosq->in_packet.command = byte; #ifdef WITH_BROKER @@ -440,7 +440,7 @@ int packet__read(struct mosquitto *mosq) */ if(mosq->in_packet.remaining_count <= 0){ do{ - read_length = mosquitto__net_read(mosq, &byte, 1); + read_length = net__read(mosq, &byte, 1); if(read_length == 1){ mosq->in_packet.remaining_count--; /* Max 4 bytes length for remaining length as defined by protocol. @@ -479,7 +479,7 @@ int packet__read(struct mosquitto *mosq) } } while(mosq->in_packet.to_process>0){ - read_length = mosquitto__net_read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process); + read_length = net__read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process); if(read_length > 0){ G_BYTES_RECEIVED_INC(read_length); mosq->in_packet.to_process -= read_length; @@ -518,9 +518,9 @@ int packet__read(struct mosquitto *mosq) if(((mosq->in_packet.command)&0xF5) == PUBLISH){ G_PUB_MSGS_RECEIVED_INC(1); } - rc = mqtt3_packet_handle(db, mosq); + rc = handle__packet(db, mosq); #else - rc = mosquitto__packet_handle(mosq); + rc = handle__packet(mosq); #endif /* Free data and reset values */ diff --git a/lib/read_handle.c b/lib/read_handle.c index 7ad8b83f..30d5d455 100644 --- a/lib/read_handle.c +++ b/lib/read_handle.c @@ -30,7 +30,7 @@ Contributors: #include "time_mosq.h" #include "util_mosq.h" -int mosquitto__packet_handle(struct mosquitto *mosq) +int handle__packet(struct mosquitto *mosq) { assert(mosq); diff --git a/lib/read_handle.h b/lib/read_handle.h index 5b22e3b1..dfcbb38b 100644 --- a/lib/read_handle.h +++ b/lib/read_handle.h @@ -19,12 +19,12 @@ Contributors: #include "mosquitto.h" struct mosquitto_db; -int mosquitto__packet_handle(struct mosquitto *mosq); int handle__pingreq(struct mosquitto *mosq); int handle__pingresp(struct mosquitto *mosq); #ifdef WITH_BROKER int handle__pubackcomp(struct mosquitto_db *db, struct mosquitto *mosq, const char *type); #else +int handle__packet(struct mosquitto *mosq); int handle__connack(struct mosquitto *mosq); int handle__pubackcomp(struct mosquitto *mosq, const char *type); int handle__publish(struct mosquitto *mosq); diff --git a/lib/socks_mosq.c b/lib/socks_mosq.c index 0b87579b..b9549a1a 100644 --- a/lib/socks_mosq.c +++ b/lib/socks_mosq.c @@ -207,7 +207,7 @@ int socks5__read(struct mosquitto *mosq) if(mosq->state == mosq_cs_socks5_start){ while(mosq->in_packet.to_process > 0){ - len = mosquitto__net_read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process); + len = net__read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process); if(len > 0){ mosq->in_packet.pos += len; mosq->in_packet.to_process -= len; @@ -249,7 +249,7 @@ int socks5__read(struct mosquitto *mosq) } }else if(mosq->state == mosq_cs_socks5_userpass_reply){ while(mosq->in_packet.to_process > 0){ - len = mosquitto__net_read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process); + len = net__read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process); if(len > 0){ mosq->in_packet.pos += len; mosq->in_packet.to_process -= len; @@ -305,7 +305,7 @@ int socks5__read(struct mosquitto *mosq) } }else if(mosq->state == mosq_cs_socks5_request){ while(mosq->in_packet.to_process > 0){ - len = mosquitto__net_read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process); + len = net__read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process); if(len > 0){ mosq->in_packet.pos += len; mosq->in_packet.to_process -= len; diff --git a/lib/util_mosq.c b/lib/util_mosq.c index 938a12ae..589ecf85 100644 --- a/lib/util_mosq.c +++ b/lib/util_mosq.c @@ -59,7 +59,7 @@ void mosquitto__check_keepalive(struct mosquitto *mosq) && now - mosq->last_msg_out >= mosq->bridge->idle_timeout){ log__printf(NULL, MOSQ_LOG_NOTICE, "Bridge connection %s has exceeded idle timeout, disconnecting.", mosq->id); - mosquitto__socket_close(db, mosq); + net__socket_close(db, mosq); return; } #endif @@ -84,9 +84,9 @@ void mosquitto__check_keepalive(struct mosquitto *mosq) assert(mosq->listener->client_count >= 0); } mosq->listener = NULL; - mosquitto__socket_close(db, mosq); + net__socket_close(db, mosq); #else - mosquitto__socket_close(mosq); + net__socket_close(mosq); pthread_mutex_lock(&mosq->state_mutex); if(mosq->state == mosq_cs_disconnecting){ rc = MOSQ_ERR_SUCCESS; diff --git a/src/bridge.c b/src/bridge.c index d6f73e4e..74219d5e 100644 --- a/src/bridge.c +++ b/src/bridge.c @@ -166,12 +166,12 @@ int bridge__connect(struct mosquitto_db *db, struct mosquitto *context) * remove any messages and the next loop carries out the resubscription * anyway. This means any unwanted subs will be removed. */ - mqtt3_subs_clean_session(db, context); + sub__clean_session(db, context); for(i=0; ibridge->topic_count; i++){ if(context->bridge->topics[i].direction == bd_out || context->bridge->topics[i].direction == bd_both){ log__printf(NULL, MOSQ_LOG_DEBUG, "Bridge %s doing local SUBSCRIBE on topic %s", context->id, context->bridge->topics[i].local_topic); - if(mqtt3_sub_add(db, context, context->bridge->topics[i].local_topic, context->bridge->topics[i].qos, &db->subs)) return 1; + if(sub__add(db, context, context->bridge->topics[i].local_topic, context->bridge->topics[i].qos, &db->subs)) return 1; } } @@ -210,7 +210,7 @@ int bridge__connect(struct mosquitto_db *db, struct mosquitto *context) } log__printf(NULL, MOSQ_LOG_NOTICE, "Connecting bridge %s (%s:%d)", context->bridge->name, context->bridge->addresses[context->bridge->cur_address].address, context->bridge->addresses[context->bridge->cur_address].port); - rc = mosquitto__socket_connect(context, context->bridge->addresses[context->bridge->cur_address].address, context->bridge->addresses[context->bridge->cur_address].port, NULL, false); + rc = net__socket_connect(context, context->bridge->addresses[context->bridge->cur_address].address, context->bridge->addresses[context->bridge->cur_address].port, NULL, false); if(rc > 0 ){ if(rc == MOSQ_ERR_TLS){ return rc; /* Error already printed */ @@ -241,7 +241,7 @@ int bridge__connect(struct mosquitto_db *db, struct mosquitto *context) }else if(rc == MOSQ_ERR_EAI){ log__printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", gai_strerror(errno)); } - mosquitto__socket_close(db, context); + net__socket_close(db, context); return rc; } } diff --git a/src/context.c b/src/context.c index 9505152b..cabc8e53 100644 --- a/src/context.c +++ b/src/context.c @@ -134,9 +134,9 @@ void context__cleanup(struct mosquitto_db *db, struct mosquitto *context, bool d } } #endif - mosquitto__socket_close(db, context); + net__socket_close(db, context); if((do_free || context->clean_session) && db){ - mqtt3_subs_clean_session(db, context); + sub__clean_session(db, context); db__messages_delete(db, context); } if(context->address){ @@ -199,7 +199,7 @@ void context__disconnect(struct mosquitto_db *db, struct mosquitto *ctxt) ctxt->will = NULL; } ctxt->disconnect_t = time(NULL); - mosquitto__socket_close(db, ctxt); + net__socket_close(db, ctxt); } void context__add_to_disused(struct mosquitto_db *db, struct mosquitto *context) diff --git a/src/database.c b/src/database.c index 2e73b32f..133c2fce 100644 --- a/src/database.c +++ b/src/database.c @@ -204,7 +204,7 @@ void db__msg_store_deref(struct mosquitto_db *db, struct mosquitto_msg_store **s } -static void _message_remove(struct mosquitto_db *db, struct mosquitto *context, struct mosquitto_client_msg **msg, struct mosquitto_client_msg *last) +static void db__message_remove(struct mosquitto_db *db, struct mosquitto *context, struct mosquitto_client_msg **msg, struct mosquitto_client_msg *last) { if(!context || !msg || !(*msg)){ return; @@ -269,7 +269,7 @@ int db__message_delete(struct mosquitto_db *db, struct mosquitto *context, uint1 } if(tail->mid == mid && tail->direction == dir){ msg_index--; - _message_remove(db, context, &tail, last); + db__message_remove(db, context, &tail, last); deleted = true; }else{ last = tail; @@ -499,7 +499,7 @@ int db__messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context, } if(db__message_store(db, source_id, 0, topic, qos, payloadlen, payload, retain, &stored, 0)) return 1; - return mqtt3_db_messages_queue(db, source_id, topic, qos, retain, &stored); + return sub__messages_queue(db, source_id, topic, qos, retain, &stored); } int db__message_store(struct mosquitto_db *db, const char *source, uint16_t source_mid, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain, struct mosquitto_msg_store **stored, dbid_t store_id) @@ -635,7 +635,7 @@ int db__message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *conte if(msg->qos != 2){ /* Anything store)){ - _message_remove(db, context, &tail, last); + if(!topic || !sub__messages_queue(db, source_id, topic, qos, retain, &tail->store)){ + db__message_remove(db, context, &tail, last); deleted = true; }else{ return 1; @@ -825,7 +825,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context) case mosq_ms_publish_qos0: rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries); if(!rc){ - _message_remove(db, context, &tail, last); + db__message_remove(db, context, &tail, last); }else{ return rc; } diff --git a/src/db_dump/db_dump.c b/src/db_dump/db_dump.c index 660cc4b2..b5df53cf 100644 --- a/src/db_dump/db_dump.c +++ b/src/db_dump/db_dump.c @@ -31,7 +31,7 @@ Contributors: static uint32_t db_version; static int stats = 0; -static int _db_client_chunk_restore(struct mosquitto_db *db, FILE *db_fd) +static int db__client_chunk_restore(struct mosquitto_db *db, FILE *db_fd) { uint16_t i16temp, slen, last_mid; char *client_id = NULL; @@ -75,7 +75,7 @@ error: return 1; } -static int _db_client_msg_chunk_restore(struct mosquitto_db *db, FILE *db_fd) +static int db__client_msg_chunk_restore(struct mosquitto_db *db, FILE *db_fd) { dbid_t i64temp, store_id; uint16_t i16temp, slen, mid; @@ -127,7 +127,7 @@ error: return 1; } -static int _db_msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fd) +static int db__msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fd) { dbid_t i64temp, store_id; uint32_t i32temp, payloadlen; @@ -240,7 +240,7 @@ error: return 1; } -static int _db_retain_chunk_restore(struct mosquitto_db *db, FILE *db_fd) +static int db__retain_chunk_restore(struct mosquitto_db *db, FILE *db_fd) { dbid_t i64temp, store_id; @@ -254,7 +254,7 @@ static int _db_retain_chunk_restore(struct mosquitto_db *db, FILE *db_fd) return 0; } -static int _db_sub_chunk_restore(struct mosquitto_db *db, FILE *db_fd) +static int db__sub_chunk_restore(struct mosquitto_db *db, FILE *db_fd) { uint16_t i16temp, slen; uint8_t qos; @@ -364,35 +364,35 @@ int main(int argc, char *argv[]) msg_store_count++; if(!stats) printf("DB_CHUNK_MSG_STORE:\n"); if(!stats) printf("\tLength: %d\n", length); - if(_db_msg_store_chunk_restore(&db, fd)) return 1; + if(db__msg_store_chunk_restore(&db, fd)) return 1; break; case DB_CHUNK_CLIENT_MSG: client_msg_count++; if(!stats) printf("DB_CHUNK_CLIENT_MSG:\n"); if(!stats) printf("\tLength: %d\n", length); - if(_db_client_msg_chunk_restore(&db, fd)) return 1; + if(db__client_msg_chunk_restore(&db, fd)) return 1; break; case DB_CHUNK_RETAIN: retain_count++; if(!stats) printf("DB_CHUNK_RETAIN:\n"); if(!stats) printf("\tLength: %d\n", length); - if(_db_retain_chunk_restore(&db, fd)) return 1; + if(db__retain_chunk_restore(&db, fd)) return 1; break; case DB_CHUNK_SUB: sub_count++; if(!stats) printf("DB_CHUNK_SUB:\n"); if(!stats) printf("\tLength: %d\n", length); - if(_db_sub_chunk_restore(&db, fd)) return 1; + if(db__sub_chunk_restore(&db, fd)) return 1; break; case DB_CHUNK_CLIENT: client_count++; if(!stats) printf("DB_CHUNK_CLIENT:\n"); if(!stats) printf("\tLength: %d\n", length); - if(_db_client_chunk_restore(&db, fd)) return 1; + if(db__client_chunk_restore(&db, fd)) return 1; break; default: diff --git a/src/loop.c b/src/loop.c index 45682ddd..35e30df0 100644 --- a/src/loop.c +++ b/src/loop.c @@ -177,9 +177,9 @@ int mosquitto_main_loop(struct mosquitto_db *db, int *listensock, int listensock && context->bridge->cur_address != 0 && now > context->bridge->primary_retry){ - if(mosquitto__try_connect(context, context->bridge->addresses[0].address, context->bridge->addresses[0].port, &bridge_sock, NULL, false) == MOSQ_ERR_SUCCESS){ + if(net__try_connect(context, context->bridge->addresses[0].address, context->bridge->addresses[0].port, &bridge_sock, NULL, false) == MOSQ_ERR_SUCCESS){ COMPAT_CLOSE(bridge_sock); - mosquitto__socket_close(db, context); + net__socket_close(db, context); context->bridge->cur_address = context->bridge->address_count-1; } } @@ -358,7 +358,7 @@ int mosquitto_main_loop(struct mosquitto_db *db, int *listensock, int listensock flag_reload = false; } if(flag_tree_print){ - mqtt3_sub_tree_print(&db->subs, 0); + sub__tree_print(&db->subs, 0); flag_tree_print = false; } #ifdef WITH_WEBSOCKETS diff --git a/src/mosquitto.c b/src/mosquitto.c index f52e51c5..c49fc7e9 100644 --- a/src/mosquitto.c +++ b/src/mosquitto.c @@ -228,7 +228,7 @@ int main(int argc, char *argv[]) memset(&int_db, 0, sizeof(struct mosquitto_db)); - mosquitto__net_init(); + net__init(); config__init(&config); rc = config__parse_args(&config, argc, argv); @@ -431,7 +431,7 @@ int main(int argc, char *argv[]) } config__cleanup(int_db.config); - mosquitto__net_cleanup(); + net__cleanup(); return rc; } diff --git a/src/mosquitto_broker.h b/src/mosquitto_broker.h index 7c0c9152..02a69689 100644 --- a/src/mosquitto_broker.h +++ b/src/mosquitto_broker.h @@ -444,7 +444,7 @@ int net__socket_get_address(int sock, char *buf, int len); /* ============================================================ * Read handling functions * ============================================================ */ -int mqtt3_packet_handle(struct mosquitto_db *db, struct mosquitto *context); +int handle__packet(struct mosquitto_db *db, struct mosquitto *context); int handle__connack(struct mosquitto_db *db, struct mosquitto *context); int handle__connect(struct mosquitto_db *db, struct mosquitto *context); int handle__disconnect(struct mosquitto_db *db, struct mosquitto *context); @@ -471,7 +471,6 @@ int db__message_update(struct mosquitto *context, uint16_t mid, enum mosquitto_m int db__message_write(struct mosquitto_db *db, struct mosquitto *context); int db__messages_delete(struct mosquitto_db *db, struct mosquitto *context); int db__messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain); -int mqtt3_db_messages_queue(struct mosquitto_db *db, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store **stored); int db__message_store(struct mosquitto_db *db, const char *source, uint16_t source_mid, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain, struct mosquitto_msg_store **stored, dbid_t store_id); int db__message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_msg_store **stored); void db__msg_store_add(struct mosquitto_db *db, struct mosquitto_msg_store *store); @@ -483,16 +482,16 @@ int db__message_timeout_check(struct mosquitto_db *db, unsigned int timeout); int db__message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *context); void db__vacuum(void); void sys__update(struct mosquitto_db *db, int interval, time_t start_time); -int mqtt3_retain_queue(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int sub_qos); /* ============================================================ * Subscription functions * ============================================================ */ -int mqtt3_sub_add(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int qos, struct mosquitto__subhier *root); -int mqtt3_sub_remove(struct mosquitto_db *db, struct mosquitto *context, const char *sub, struct mosquitto__subhier *root); -int mqtt3_sub_search(struct mosquitto_db *db, struct mosquitto__subhier *root, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored); -void mqtt3_sub_tree_print(struct mosquitto__subhier *root, int level); -int mqtt3_subs_clean_session(struct mosquitto_db *db, struct mosquitto *context); +int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int qos, struct mosquitto__subhier *root); +int sub__remove(struct mosquitto_db *db, struct mosquitto *context, const char *sub, struct mosquitto__subhier *root); +void sub__tree_print(struct mosquitto__subhier *root, int level); +int sub__clean_session(struct mosquitto_db *db, struct mosquitto *context); +int sub__retain_queue(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int sub_qos); +int sub__messages_queue(struct mosquitto_db *db, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store **stored); /* ============================================================ * Context functions diff --git a/src/net.c b/src/net.c index 567115e6..14a08126 100644 --- a/src/net.c +++ b/src/net.c @@ -83,7 +83,7 @@ int net__socket_accept(struct mosquitto_db *db, int listensock) G_SOCKET_CONNECTIONS_INC(); - if(mosquitto__socket_nonblock(new_sock)){ + if(net__socket_nonblock(new_sock)){ return INVALID_SOCKET; } @@ -376,7 +376,7 @@ int net__socket_listen(struct mosquitto__listener *listener) ss_opt = 1; setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, &ss_opt, sizeof(ss_opt)); - if(mosquitto__socket_nonblock(sock)){ + if(net__socket_nonblock(sock)){ return 1; } diff --git a/src/persist.c b/src/persist.c index 187b3b94..a71d821d 100644 --- a/src/persist.c +++ b/src/persist.c @@ -667,7 +667,7 @@ static int persist__retain_chunk_restore(struct mosquitto_db *db, FILE *db_fptr) store_id = i64temp; HASH_FIND(hh, db->msg_store_load, &store_id, sizeof(dbid_t), load); if(load){ - mqtt3_db_messages_queue(db, NULL, load->store->topic, load->store->qos, load->store->retain, &load->store); + sub__messages_queue(db, NULL, load->store->topic, load->store->qos, load->store->retain, &load->store); }else{ log__printf(NULL, MOSQ_LOG_ERR, "Error: Corrupt database whilst restoring a retained message."); return MOSQ_ERR_INVAL; @@ -837,7 +837,7 @@ static int persist__restore_sub(struct mosquitto_db *db, const char *client_id, context = persist__find_or_add_context(db, client_id, 0); if(!context) return 1; - return mqtt3_sub_add(db, context, sub, qos, &db->subs); + return sub__add(db, context, sub, qos, &db->subs); } #endif diff --git a/src/read_handle.c b/src/read_handle.c index 50e55466..18737cc2 100644 --- a/src/read_handle.c +++ b/src/read_handle.c @@ -29,7 +29,7 @@ Contributors: #include "sys_tree.h" #include "util_mosq.h" -int mqtt3_packet_handle(struct mosquitto_db *db, struct mosquitto *context) +int handle__packet(struct mosquitto_db *db, struct mosquitto *context) { if(!context) return MOSQ_ERR_INVAL; @@ -226,10 +226,10 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context) } switch(qos){ case 0: - if(mqtt3_db_messages_queue(db, context->id, topic, qos, retain, &stored)) rc = 1; + if(sub__messages_queue(db, context->id, topic, qos, retain, &stored)) rc = 1; break; case 1: - if(mqtt3_db_messages_queue(db, context->id, topic, qos, retain, &stored)) rc = 1; + if(sub__messages_queue(db, context->id, topic, qos, retain, &stored)) rc = 1; if(send__puback(context, mid)) rc = 1; break; case 2: diff --git a/src/read_handle_server.c b/src/read_handle_server.c index af9edebe..0e61341f 100644 --- a/src/read_handle_server.c +++ b/src/read_handle_server.c @@ -691,9 +691,9 @@ int handle__subscribe(struct mosquitto_db *db, struct mosquitto *context) #endif if(qos != 0x80){ - rc2 = mqtt3_sub_add(db, context, sub, qos, &db->subs); + rc2 = sub__add(db, context, sub, qos, &db->subs); if(rc2 == MOSQ_ERR_SUCCESS){ - if(mqtt3_retain_queue(db, context, sub, qos)) rc = 1; + if(sub__retain_queue(db, context, sub, qos)) rc = 1; }else if(rc2 != -1){ rc = rc2; } @@ -766,7 +766,7 @@ int handle__unsubscribe(struct mosquitto_db *db, struct mosquitto *context) } log__printf(NULL, MOSQ_LOG_DEBUG, "\t%s", sub); - mqtt3_sub_remove(db, context, sub, &db->subs); + sub__remove(db, context, sub, &db->subs); log__printf(NULL, MOSQ_LOG_UNSUBSCRIBE, "%s %s", context->id, sub); mosquitto__free(sub); } diff --git a/src/subs.c b/src/subs.c index ea27b51a..38dacbb3 100644 --- a/src/subs.c +++ b/src/subs.c @@ -245,7 +245,7 @@ static void sub__topic_tokens_free(struct sub__token *tokens) } } -static int sub__add(struct mosquitto_db *db, struct mosquitto *context, int qos, struct mosquitto__subhier *subhier, struct sub__token *tokens) +static int sub__add_recurse(struct mosquitto_db *db, struct mosquitto *context, int qos, struct mosquitto__subhier *subhier, struct sub__token *tokens) { struct mosquitto__subhier *branch, *last = NULL; struct mosquitto__subleaf *leaf, *last_leaf; @@ -321,7 +321,7 @@ static int sub__add(struct mosquitto_db *db, struct mosquitto *context, int qos, branch = subhier->children; while(branch){ if(!strcmp(UHPA_ACCESS_TOPIC(branch), UHPA_ACCESS_TOPIC(tokens))){ - return sub__add(db, context, qos, branch, tokens->next); + return sub__add_recurse(db, context, qos, branch, tokens->next); } last = branch; branch = branch->next; @@ -340,10 +340,10 @@ static int sub__add(struct mosquitto_db *db, struct mosquitto *context, int qos, }else{ last->next = branch; } - return sub__add(db, context, qos, branch, tokens->next); + return sub__add_recurse(db, context, qos, branch, tokens->next); } -static int sub__remove(struct mosquitto_db *db, struct mosquitto *context, struct mosquitto__subhier *subhier, struct sub__token *tokens) +static int sub__remove_recurse(struct mosquitto_db *db, struct mosquitto *context, struct mosquitto__subhier *subhier, struct sub__token *tokens) { struct mosquitto__subhier *branch, *last = NULL; struct mosquitto__subleaf *leaf; @@ -386,7 +386,7 @@ static int sub__remove(struct mosquitto_db *db, struct mosquitto *context, struc branch = subhier->children; while(branch){ if(!strcmp(UHPA_ACCESS_TOPIC(branch), UHPA_ACCESS_TOPIC(tokens))){ - sub__remove(db, context, branch, tokens->next); + sub__remove_recurse(db, context, branch, tokens->next); if(!branch->children && !branch->subs && !branch->retained){ if(last){ last->next = branch->next; @@ -438,7 +438,7 @@ static void sub__search(struct mosquitto_db *db, struct mosquitto__subhier *subh } } -int mqtt3_sub_add(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int qos, struct mosquitto__subhier *root) +int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int qos, struct mosquitto__subhier *root) { int rc = 0; struct mosquitto__subhier *subhier, *child; @@ -452,7 +452,7 @@ int mqtt3_sub_add(struct mosquitto_db *db, struct mosquitto *context, const char subhier = root->children; while(subhier){ if(!strcmp(UHPA_ACCESS_TOPIC(subhier), UHPA_ACCESS_TOPIC(tokens))){ - rc = sub__add(db, context, qos, subhier, tokens); + rc = sub__add_recurse(db, context, qos, subhier, tokens); break; } subhier = subhier->next; @@ -482,7 +482,7 @@ int mqtt3_sub_add(struct mosquitto_db *db, struct mosquitto *context, const char } db->subs.children = child; - rc = sub__add(db, context, qos, child, tokens); + rc = sub__add_recurse(db, context, qos, child, tokens); } sub__topic_tokens_free(tokens); @@ -492,7 +492,7 @@ int mqtt3_sub_add(struct mosquitto_db *db, struct mosquitto *context, const char return rc; } -int mqtt3_sub_remove(struct mosquitto_db *db, struct mosquitto *context, const char *sub, struct mosquitto__subhier *root) +int sub__remove(struct mosquitto_db *db, struct mosquitto *context, const char *sub, struct mosquitto__subhier *root) { int rc = 0; struct mosquitto__subhier *subhier; @@ -506,7 +506,7 @@ int mqtt3_sub_remove(struct mosquitto_db *db, struct mosquitto *context, const c subhier = root->children; while(subhier){ if(!strcmp(UHPA_ACCESS_TOPIC(subhier), UHPA_ACCESS_TOPIC(tokens))){ - rc = sub__remove(db, context, subhier, tokens); + rc = sub__remove_recurse(db, context, subhier, tokens); break; } subhier = subhier->next; @@ -517,7 +517,7 @@ int mqtt3_sub_remove(struct mosquitto_db *db, struct mosquitto *context, const c return rc; } -int mqtt3_db_messages_queue(struct mosquitto_db *db, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store **stored) +int sub__messages_queue(struct mosquitto_db *db, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store **stored) { int rc = 0; struct mosquitto__subhier *subhier; @@ -541,7 +541,7 @@ int mqtt3_db_messages_queue(struct mosquitto_db *db, const char *source_id, cons /* We have a message that needs to be retained, so ensure that the subscription * tree for its topic exists. */ - sub__add(db, NULL, 0, subhier, tokens); + sub__add_recurse(db, NULL, 0, subhier, tokens); } sub__search(db, subhier, tokens, source_id, topic, qos, retain, *stored, true); } @@ -557,7 +557,7 @@ int mqtt3_db_messages_queue(struct mosquitto_db *db, const char *source_id, cons /* Remove all subscriptions for a client. */ -int mqtt3_subs_clean_session(struct mosquitto_db *db, struct mosquitto *context) +int sub__clean_session(struct mosquitto_db *db, struct mosquitto *context) { int i; struct mosquitto__subleaf *leaf; @@ -593,7 +593,7 @@ int mqtt3_subs_clean_session(struct mosquitto_db *db, struct mosquitto *context) return MOSQ_ERR_SUCCESS; } -void mqtt3_sub_tree_print(struct mosquitto__subhier *root, int level) +void sub__tree_print(struct mosquitto__subhier *root, int level) { int i; struct mosquitto__subhier *branch; @@ -619,7 +619,7 @@ void mqtt3_sub_tree_print(struct mosquitto__subhier *root, int level) branch = root->children; while(branch){ - mqtt3_sub_tree_print(branch, level+1); + sub__tree_print(branch, level+1); branch = branch->next; } } @@ -693,7 +693,7 @@ static int retain__search(struct mosquitto_db *db, struct mosquitto__subhier *su return flag; } -int mqtt3_retain_queue(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int sub_qos) +int sub__retain_queue(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int sub_qos) { struct mosquitto__subhier *subhier; struct sub__token *tokens = NULL, *tail; diff --git a/src/websockets.c b/src/websockets.c index 49d4e6a0..be6b1df5 100644 --- a/src/websockets.c +++ b/src/websockets.c @@ -314,7 +314,7 @@ static int callback_mqtt(struct libwebsocket_context *context, G_PUB_MSGS_RECEIVED_INC(1); } #endif - rc = mqtt3_packet_handle(db, mosq); + rc = handle__packet(db, mosq); /* Free data and reset values */ packet__cleanup(&mosq->in_packet); diff --git a/test/packet-gen.c b/test/packet-gen.c index 30af3570..318eb725 100644 --- a/test/packet-gen.c +++ b/test/packet-gen.c @@ -18,7 +18,7 @@ int main(int argc, char *argv[]) bool clean_session = true; int keepalive = 60; - mosq = mosquitto_new("packetgen", NULL); + mosq = mosquitto_new("packetgen", clean_session, NULL); if(!mosq){ fprintf(stderr, "Error: Out of memory.\n"); return 1; @@ -30,9 +30,9 @@ int main(int argc, char *argv[]) fprintf(stderr, "Error: Unable to open mqtt.connect for writing.\n"); return 1; } - mosq->core.sock = fd; - printf("_mosquitto_send_connect(): %d\n", _mosquitto_send_connect(mosq, keepalive, clean_session)); - printf("loop: %d\n", mosquitto_loop_write(mosq)); + mosq->sock = fd; + printf("send__connect(): %d\n", send__connect(mosq, keepalive, clean_session)); + printf("loop: %d\n", mosquitto_loop_write(mosq, 1)); close(fd); /* SUBSCRIBE */ @@ -41,9 +41,9 @@ int main(int argc, char *argv[]) fprintf(stderr, "Error: Unable to open mqtt.subscribe for writing.\n"); return 1; } - mosq->core.sock = fd; - printf("_mosquitto_send_subscribe(): %d\n", _mosquitto_send_subscribe(mosq, NULL, false, "subscribe/topic", 2)); - printf("loop: %d\n", mosquitto_loop_write(mosq)); + mosq->sock = fd; + printf("send__subscribe(): %d\n", send__subscribe(mosq, NULL, "subscribe/topic", 2)); + printf("loop: %d\n", mosquitto_loop_write(mosq, 1)); close(fd); mosquitto_destroy(mosq); diff --git a/test/random_client.c b/test/random_client.c deleted file mode 100644 index f56060c8..00000000 --- a/test/random_client.c +++ /dev/null @@ -1,198 +0,0 @@ -/* -Copyright (c) 2009, Roger Light -All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are met: - -1. Redistributions of source code must retain the above copyright notice, - this list of conditions and the following disclaimer. -2. Redistributions in binary form must reproduce the above copyright - notice, this list of conditions and the following disclaimer in the - documentation and/or other materials provided with the distribution. -3. Neither the name of mosquitto nor the names of its - contributors may be used to endorse or promote products derived from - this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE -LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -POSSIBILITY OF SUCH DAMAGE. -*/ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include - -typedef enum { - stStart, - stSocketOpened, - stConnSent, - stConnAckd, - stSubSent, - stSubAckd, - stPause -} stateType; - -static stateType state = stStart; - -int handle_read(mqtt3_context *context) -{ - uint8_t buf; - int rc; - - rc = read(context->sock, &buf, 1); - printf("rc: %d\n", rc); - if(rc == -1){ - printf("Error: %s\n", strerror(errno)); - return 1; - }else if(rc == 0){ - return 2; - } - - switch(buf&0xF0){ - case CONNACK: - if(mqtt3_handle_connack(context)) return 3; - state = stConnAckd; - break; - case SUBACK: - if(mqtt3_handle_suback(context)) return 3; - state = stSubAckd; - break; - case PINGREQ: - if(mqtt3_handle_pingreq(context)) return 3; - break; - case PINGRESP: - if(mqtt3_handle_pingresp(context)) return 3; - break; - case PUBACK: - if(mqtt3_handle_puback(context)) return 3; - break; - case PUBCOMP: - if(mqtt3_handle_pubcomp(context)) return 3; - break; - case PUBLISH: - if(mqtt3_handle_publish(context)) return 0; - break; - case PUBREC: - if(mqtt3_handle_pubrec(context)) return 3; - break; - case UNSUBACK: - if(mqtt3_handle_unsuback(context)) return 3; - break; - default: - printf("Unknown command: %s (%d)\n", mqtt3_command_to_string(buf&0xF0), buf&0xF0); - break; - } - return 0; -} - -void send_random(mqtt3_context *context, int length) -{ - int fd = open("/dev/urandom", O_RDONLY); - uint8_t buf[length]; - - if(fd >= 0){ - if(read(fd, buf, length) == length){ - mqtt3_write_bytes(context, buf, length); - } - close(fd); - } -} - -/* pselect loop test */ -int main(int argc, char *argv[]) -{ - struct timespec timeout; - fd_set readfds, writefds; - int fdcount; - int run = 1; - mqtt3_context context; - char id[30]; - - if(argc == 2){ - sprintf(id, "test%s", argv[1]); - }else{ - sprintf(id, "test"); - } - context.sock = mqtt3_socket_connect("127.0.0.1", 1883); - if(context.sock == -1){ - return 1; - } - state = stSocketOpened; - - while(run){ - FD_ZERO(&readfds); - FD_SET(context.sock, &readfds); - FD_ZERO(&writefds); - //FD_SET(0, &writefds); - timeout.tv_sec = 1; - timeout.tv_nsec = 0; - - fdcount = pselect(context.sock+1, &readfds, &writefds, NULL, &timeout, NULL); - if(fdcount == -1){ - fprintf(stderr, "Error in pselect: %s\n", strerror(errno)); - run = 0; - }else if(fdcount == 0){ - switch(state){ - case stSocketOpened: - mqtt3_raw_connect(&context, id, true, 2, true, "will", "aargh", 60, true); - state = stConnSent; - break; - case stConnSent: - printf("Waiting for CONNACK\n"); - break; - case stConnAckd: - // printf("CONNACK received\n"); - // mqtt3_raw_subscribe(&context, false, "a/b/c", 0); - // state = stSubSent; - send_random(&context, 100); - break; - case stSubSent: - printf("Waiting for SUBACK\n"); - break; - case stSubAckd: - printf("SUBACK received\n"); - mqtt3_raw_publish(&context, 0, 0, 0, 1, "a/b/c", 5, (uint8_t *)"Roger"); - state = stPause; - break; - case stPause: - //mqtt3_raw_disconnect(&context); - printf("Pause\n"); - break; - default: - fprintf(stderr, "Error: Unknown state\n"); - break; - } - }else{ - printf("fdcount=%d\n", fdcount); - - if(FD_ISSET(context.sock, &readfds)){ - if(handle_read(&context)){ - fprintf(stderr, "Socket closed on remote side\n"); - mqtt3_socket_close(&context); - run = 0; - } - } - } - } - return 0; -} -