Last raft of renames for the moment.

This commit is contained in:
Roger A. Light 2015-05-18 09:29:22 +01:00
parent 21946ace6c
commit 4195fde70b
23 changed files with 127 additions and 326 deletions

View File

@ -72,14 +72,14 @@ int mosquitto_lib_init(void)
srand(tv.tv_sec*1000 + tv.tv_usec/1000); srand(tv.tv_sec*1000 + tv.tv_usec/1000);
#endif #endif
mosquitto__net_init(); net__init();
return MOSQ_ERR_SUCCESS; return MOSQ_ERR_SUCCESS;
} }
int mosquitto_lib_cleanup(void) int mosquitto_lib_cleanup(void)
{ {
mosquitto__net_cleanup(); net__cleanup();
return MOSQ_ERR_SUCCESS; return MOSQ_ERR_SUCCESS;
} }
@ -297,7 +297,7 @@ void mosquitto__destroy(struct mosquitto *mosq)
} }
#endif #endif
if(mosq->sock != INVALID_SOCKET){ if(mosq->sock != INVALID_SOCKET){
mosquitto__socket_close(mosq); net__socket_close(mosq);
} }
message__cleanup_all(mosq); message__cleanup_all(mosq);
will__clear(mosq); will__clear(mosq);
@ -404,7 +404,7 @@ static int mosquitto__connect_init(struct mosquitto *mosq, const char *host, int
mosq->keepalive = keepalive; mosq->keepalive = keepalive;
if(mosquitto__socketpair(&mosq->sockpairR, &mosq->sockpairW)){ if(net__socketpair(&mosq->sockpairR, &mosq->sockpairW)){
log__printf(mosq, MOSQ_LOG_WARNING, log__printf(mosq, MOSQ_LOG_WARNING,
"Warning: Unable to open socket pair, outgoing publish commands may be delayed."); "Warning: Unable to open socket pair, outgoing publish commands may be delayed.");
} }
@ -510,11 +510,11 @@ static int mosquitto__reconnect(struct mosquitto *mosq, bool blocking)
#ifdef WITH_SOCKS #ifdef WITH_SOCKS
if(mosq->socks5_host){ if(mosq->socks5_host){
rc = mosquitto__socket_connect(mosq, mosq->socks5_host, mosq->socks5_port, mosq->bind_address, blocking); rc = net__socket_connect(mosq, mosq->socks5_host, mosq->socks5_port, mosq->bind_address, blocking);
}else }else
#endif #endif
{ {
rc = mosquitto__socket_connect(mosq, mosq->host, mosq->port, mosq->bind_address, blocking); rc = net__socket_connect(mosq, mosq->host, mosq->port, mosq->bind_address, blocking);
} }
if(rc){ if(rc){
return rc; return rc;
@ -921,7 +921,7 @@ int mosquitto_loop(struct mosquitto *mosq, int timeout, int max_packets)
if(FD_ISSET(mosq->sock, &readfds)){ if(FD_ISSET(mosq->sock, &readfds)){
#ifdef WITH_TLS #ifdef WITH_TLS
if(mosq->want_connect){ if(mosq->want_connect){
rc = mosquitto__socket_connect_tls(mosq); rc = net__socket_connect_tls(mosq);
if(rc) return rc; if(rc) return rc;
}else }else
#endif #endif
@ -947,7 +947,7 @@ int mosquitto_loop(struct mosquitto *mosq, int timeout, int max_packets)
if(FD_ISSET(mosq->sock, &writefds)){ if(FD_ISSET(mosq->sock, &writefds)){
#ifdef WITH_TLS #ifdef WITH_TLS
if(mosq->want_connect){ if(mosq->want_connect){
rc = mosquitto__socket_connect_tls(mosq); rc = net__socket_connect_tls(mosq);
if(rc) return rc; if(rc) return rc;
}else }else
#endif #endif
@ -1068,7 +1068,7 @@ int mosquitto_loop_misc(struct mosquitto *mosq)
/* mosq->ping_t != 0 means we are waiting for a pingresp. /* mosq->ping_t != 0 means we are waiting for a pingresp.
* This hasn't happened in the keepalive time so we should disconnect. * This hasn't happened in the keepalive time so we should disconnect.
*/ */
mosquitto__socket_close(mosq); net__socket_close(mosq);
pthread_mutex_lock(&mosq->state_mutex); pthread_mutex_lock(&mosq->state_mutex);
if(mosq->state == mosq_cs_disconnecting){ if(mosq->state == mosq_cs_disconnecting){
rc = MOSQ_ERR_SUCCESS; rc = MOSQ_ERR_SUCCESS;
@ -1091,7 +1091,7 @@ int mosquitto_loop_misc(struct mosquitto *mosq)
static int mosquitto__loop_rc_handle(struct mosquitto *mosq, int rc) static int mosquitto__loop_rc_handle(struct mosquitto *mosq, int rc)
{ {
if(rc){ if(rc){
mosquitto__socket_close(mosq); net__socket_close(mosq);
pthread_mutex_lock(&mosq->state_mutex); pthread_mutex_lock(&mosq->state_mutex);
if(mosq->state == mosq_cs_disconnecting){ if(mosq->state == mosq_cs_disconnecting){
rc = MOSQ_ERR_SUCCESS; rc = MOSQ_ERR_SUCCESS;

View File

@ -78,7 +78,7 @@ Contributors:
int tls_ex_index_mosq = -1; int tls_ex_index_mosq = -1;
#endif #endif
void mosquitto__net_init(void) void net__init(void)
{ {
#ifdef WIN32 #ifdef WIN32
WSADATA wsaData; WSADATA wsaData;
@ -99,7 +99,7 @@ void mosquitto__net_init(void)
#endif #endif
} }
void mosquitto__net_cleanup(void) void net__cleanup(void)
{ {
#ifdef WITH_TLS #ifdef WITH_TLS
ERR_remove_state(0); ERR_remove_state(0);
@ -125,9 +125,9 @@ void mosquitto__net_cleanup(void)
* Returns 0 on success. * Returns 0 on success.
*/ */
#ifdef WITH_BROKER #ifdef WITH_BROKER
int mosquitto__socket_close(struct mosquitto_db *db, struct mosquitto *mosq) int net__socket_close(struct mosquitto_db *db, struct mosquitto *mosq)
#else #else
int mosquitto__socket_close(struct mosquitto *mosq) int net__socket_close(struct mosquitto *mosq)
#endif #endif
{ {
int rc = 0; int rc = 0;
@ -195,7 +195,7 @@ static unsigned int psk_client_callback(SSL *ssl, const char *hint,
#endif #endif
int mosquitto__try_connect(struct mosquitto *mosq, const char *host, uint16_t port, int *sock, const char *bind_address, bool blocking) int net__try_connect(struct mosquitto *mosq, const char *host, uint16_t port, int *sock, const char *bind_address, bool blocking)
{ {
struct addrinfo hints; struct addrinfo hints;
struct addrinfo *ainfo, *rp; struct addrinfo *ainfo, *rp;
@ -261,7 +261,7 @@ int mosquitto__try_connect(struct mosquitto *mosq, const char *host, uint16_t po
if(!blocking){ if(!blocking){
/* Set non-blocking */ /* Set non-blocking */
if(mosquitto__socket_nonblock(*sock)){ if(net__socket_nonblock(*sock)){
COMPAT_CLOSE(*sock); COMPAT_CLOSE(*sock);
continue; continue;
} }
@ -278,7 +278,7 @@ int mosquitto__try_connect(struct mosquitto *mosq, const char *host, uint16_t po
if(blocking){ if(blocking){
/* Set non-blocking */ /* Set non-blocking */
if(mosquitto__socket_nonblock(*sock)){ if(net__socket_nonblock(*sock)){
COMPAT_CLOSE(*sock); COMPAT_CLOSE(*sock);
continue; continue;
} }
@ -301,7 +301,7 @@ int mosquitto__try_connect(struct mosquitto *mosq, const char *host, uint16_t po
#ifdef WITH_TLS #ifdef WITH_TLS
int mosquitto__socket_connect_tls(struct mosquitto *mosq) int net__socket_connect_tls(struct mosquitto *mosq)
{ {
int ret; int ret;
@ -331,7 +331,7 @@ int mosquitto__socket_connect_tls(struct mosquitto *mosq)
* Returns -1 on failure (ip is NULL, socket creation/connection error) * Returns -1 on failure (ip is NULL, socket creation/connection error)
* Returns sock number on success. * Returns sock number on success.
*/ */
int mosquitto__socket_connect(struct mosquitto *mosq, const char *host, uint16_t port, const char *bind_address, bool blocking) int net__socket_connect(struct mosquitto *mosq, const char *host, uint16_t port, const char *bind_address, bool blocking)
{ {
int sock = INVALID_SOCKET; int sock = INVALID_SOCKET;
int rc; int rc;
@ -342,7 +342,7 @@ int mosquitto__socket_connect(struct mosquitto *mosq, const char *host, uint16_t
if(!mosq || !host || !port) return MOSQ_ERR_INVAL; if(!mosq || !host || !port) return MOSQ_ERR_INVAL;
rc = mosquitto__try_connect(mosq, host, port, &sock, bind_address, blocking); rc = net__try_connect(mosq, host, port, &sock, bind_address, blocking);
if(rc > 0) return rc; if(rc > 0) return rc;
#ifdef WITH_TLS #ifdef WITH_TLS
@ -475,7 +475,7 @@ int mosquitto__socket_connect(struct mosquitto *mosq, const char *host, uint16_t
SSL_set_bio(mosq->ssl, bio, bio); SSL_set_bio(mosq->ssl, bio, bio);
mosq->sock = sock; mosq->sock = sock;
if(mosquitto__socket_connect_tls(mosq)){ if(net__socket_connect_tls(mosq)){
return MOSQ_ERR_TLS; return MOSQ_ERR_TLS;
} }
@ -488,7 +488,7 @@ int mosquitto__socket_connect(struct mosquitto *mosq, const char *host, uint16_t
} }
ssize_t mosquitto__net_read(struct mosquitto *mosq, void *buf, size_t count) ssize_t net__read(struct mosquitto *mosq, void *buf, size_t count)
{ {
#ifdef WITH_TLS #ifdef WITH_TLS
int ret; int ret;
@ -536,7 +536,7 @@ ssize_t mosquitto__net_read(struct mosquitto *mosq, void *buf, size_t count)
#endif #endif
} }
ssize_t mosquitto__net_write(struct mosquitto *mosq, void *buf, size_t count) ssize_t net__write(struct mosquitto *mosq, void *buf, size_t count)
{ {
#ifdef WITH_TLS #ifdef WITH_TLS
int ret; int ret;
@ -585,7 +585,7 @@ ssize_t mosquitto__net_write(struct mosquitto *mosq, void *buf, size_t count)
} }
int mosquitto__socket_nonblock(int sock) int net__socket_nonblock(int sock)
{ {
#ifndef WIN32 #ifndef WIN32
int opt; int opt;
@ -612,7 +612,7 @@ int mosquitto__socket_nonblock(int sock)
#ifndef WITH_BROKER #ifndef WITH_BROKER
int mosquitto__socketpair(int *pairR, int *pairW) int net__socketpair(int *pairR, int *pairW)
{ {
#ifdef WIN32 #ifdef WIN32
int family[2] = {AF_INET, AF_INET6}; int family[2] = {AF_INET, AF_INET6};
@ -665,7 +665,7 @@ int mosquitto__socketpair(int *pairR, int *pairW)
continue; continue;
} }
if(mosquitto__socket_nonblock(listensock)){ if(net__socket_nonblock(listensock)){
continue; continue;
} }
@ -684,7 +684,7 @@ int mosquitto__socketpair(int *pairR, int *pairW)
COMPAT_CLOSE(listensock); COMPAT_CLOSE(listensock);
continue; continue;
} }
if(mosquitto__socket_nonblock(spR)){ if(net__socket_nonblock(spR)){
COMPAT_CLOSE(listensock); COMPAT_CLOSE(listensock);
continue; continue;
} }
@ -710,7 +710,7 @@ int mosquitto__socketpair(int *pairR, int *pairW)
} }
} }
if(mosquitto__socket_nonblock(spW)){ if(net__socket_nonblock(spW)){
COMPAT_CLOSE(spR); COMPAT_CLOSE(spR);
COMPAT_CLOSE(listensock); COMPAT_CLOSE(listensock);
continue; continue;
@ -728,12 +728,12 @@ int mosquitto__socketpair(int *pairR, int *pairW)
if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1){ if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1){
return MOSQ_ERR_ERRNO; return MOSQ_ERR_ERRNO;
} }
if(mosquitto__socket_nonblock(sv[0])){ if(net__socket_nonblock(sv[0])){
COMPAT_CLOSE(sv[0]); COMPAT_CLOSE(sv[0]);
COMPAT_CLOSE(sv[1]); COMPAT_CLOSE(sv[1]);
return MOSQ_ERR_ERRNO; return MOSQ_ERR_ERRNO;
} }
if(mosquitto__socket_nonblock(sv[1])){ if(net__socket_nonblock(sv[1])){
COMPAT_CLOSE(sv[0]); COMPAT_CLOSE(sv[0]);
COMPAT_CLOSE(sv[1]); COMPAT_CLOSE(sv[1]);
return MOSQ_ERR_ERRNO; return MOSQ_ERR_ERRNO;

View File

@ -49,25 +49,25 @@ struct mosquitto_db;
#define MOSQ_MSB(A) (uint8_t)((A & 0xFF00) >> 8) #define MOSQ_MSB(A) (uint8_t)((A & 0xFF00) >> 8)
#define MOSQ_LSB(A) (uint8_t)(A & 0x00FF) #define MOSQ_LSB(A) (uint8_t)(A & 0x00FF)
void mosquitto__net_init(void); void net__init(void);
void mosquitto__net_cleanup(void); void net__cleanup(void);
int mosquitto__socket_connect(struct mosquitto *mosq, const char *host, uint16_t port, const char *bind_address, bool blocking); int net__socket_connect(struct mosquitto *mosq, const char *host, uint16_t port, const char *bind_address, bool blocking);
#ifdef WITH_BROKER #ifdef WITH_BROKER
int mosquitto__socket_close(struct mosquitto_db *db, struct mosquitto *mosq); int net__socket_close(struct mosquitto_db *db, struct mosquitto *mosq);
#else #else
int mosquitto__socket_close(struct mosquitto *mosq); int net__socket_close(struct mosquitto *mosq);
#endif #endif
int mosquitto__try_connect(struct mosquitto *mosq, const char *host, uint16_t port, int *sock, const char *bind_address, bool blocking); int net__try_connect(struct mosquitto *mosq, const char *host, uint16_t port, int *sock, const char *bind_address, bool blocking);
int mosquitto__socket_nonblock(int sock); int net__socket_nonblock(int sock);
int mosquitto__socketpair(int *sp1, int *sp2); int net__socketpair(int *sp1, int *sp2);
ssize_t mosquitto__net_read(struct mosquitto *mosq, void *buf, size_t count); ssize_t net__read(struct mosquitto *mosq, void *buf, size_t count);
ssize_t mosquitto__net_write(struct mosquitto *mosq, void *buf, size_t count); ssize_t net__write(struct mosquitto *mosq, void *buf, size_t count);
#ifdef WITH_TLS #ifdef WITH_TLS
int mosquitto__socket_apply_tls(struct mosquitto *mosq); int net__socket_apply_tls(struct mosquitto *mosq);
int mosquitto__socket_connect_tls(struct mosquitto *mosq); int net__socket_connect_tls(struct mosquitto *mosq);
#endif #endif
#endif #endif

View File

@ -276,7 +276,7 @@ int packet__write(struct mosquitto *mosq)
packet = mosq->current_out_packet; packet = mosq->current_out_packet;
while(packet->to_process > 0){ while(packet->to_process > 0){
write_length = mosquitto__net_write(mosq, &(packet->payload[packet->pos]), packet->to_process); write_length = net__write(mosq, &(packet->payload[packet->pos]), packet->to_process);
if(write_length > 0){ if(write_length > 0){
G_BYTES_SENT_INC(write_length); G_BYTES_SENT_INC(write_length);
packet->to_process -= write_length; packet->to_process -= write_length;
@ -315,7 +315,7 @@ int packet__write(struct mosquitto *mosq)
}else if(((packet->command)&0xF0) == DISCONNECT){ }else if(((packet->command)&0xF0) == DISCONNECT){
/* FIXME what cleanup needs doing here? /* FIXME what cleanup needs doing here?
* incoming/outgoing messages? */ * incoming/outgoing messages? */
mosquitto__socket_close(mosq); net__socket_close(mosq);
/* Start of duplicate, possibly unnecessary code. /* Start of duplicate, possibly unnecessary code.
* This does leave things in a consistent state at least. */ * This does leave things in a consistent state at least. */
@ -404,7 +404,7 @@ int packet__read(struct mosquitto *mosq)
* Finally, free the memory and reset everything to starting conditions. * Finally, free the memory and reset everything to starting conditions.
*/ */
if(!mosq->in_packet.command){ if(!mosq->in_packet.command){
read_length = mosquitto__net_read(mosq, &byte, 1); read_length = net__read(mosq, &byte, 1);
if(read_length == 1){ if(read_length == 1){
mosq->in_packet.command = byte; mosq->in_packet.command = byte;
#ifdef WITH_BROKER #ifdef WITH_BROKER
@ -440,7 +440,7 @@ int packet__read(struct mosquitto *mosq)
*/ */
if(mosq->in_packet.remaining_count <= 0){ if(mosq->in_packet.remaining_count <= 0){
do{ do{
read_length = mosquitto__net_read(mosq, &byte, 1); read_length = net__read(mosq, &byte, 1);
if(read_length == 1){ if(read_length == 1){
mosq->in_packet.remaining_count--; mosq->in_packet.remaining_count--;
/* Max 4 bytes length for remaining length as defined by protocol. /* Max 4 bytes length for remaining length as defined by protocol.
@ -479,7 +479,7 @@ int packet__read(struct mosquitto *mosq)
} }
} }
while(mosq->in_packet.to_process>0){ while(mosq->in_packet.to_process>0){
read_length = mosquitto__net_read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process); read_length = net__read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process);
if(read_length > 0){ if(read_length > 0){
G_BYTES_RECEIVED_INC(read_length); G_BYTES_RECEIVED_INC(read_length);
mosq->in_packet.to_process -= read_length; mosq->in_packet.to_process -= read_length;
@ -518,9 +518,9 @@ int packet__read(struct mosquitto *mosq)
if(((mosq->in_packet.command)&0xF5) == PUBLISH){ if(((mosq->in_packet.command)&0xF5) == PUBLISH){
G_PUB_MSGS_RECEIVED_INC(1); G_PUB_MSGS_RECEIVED_INC(1);
} }
rc = mqtt3_packet_handle(db, mosq); rc = handle__packet(db, mosq);
#else #else
rc = mosquitto__packet_handle(mosq); rc = handle__packet(mosq);
#endif #endif
/* Free data and reset values */ /* Free data and reset values */

View File

@ -30,7 +30,7 @@ Contributors:
#include "time_mosq.h" #include "time_mosq.h"
#include "util_mosq.h" #include "util_mosq.h"
int mosquitto__packet_handle(struct mosquitto *mosq) int handle__packet(struct mosquitto *mosq)
{ {
assert(mosq); assert(mosq);

View File

@ -19,12 +19,12 @@ Contributors:
#include "mosquitto.h" #include "mosquitto.h"
struct mosquitto_db; struct mosquitto_db;
int mosquitto__packet_handle(struct mosquitto *mosq);
int handle__pingreq(struct mosquitto *mosq); int handle__pingreq(struct mosquitto *mosq);
int handle__pingresp(struct mosquitto *mosq); int handle__pingresp(struct mosquitto *mosq);
#ifdef WITH_BROKER #ifdef WITH_BROKER
int handle__pubackcomp(struct mosquitto_db *db, struct mosquitto *mosq, const char *type); int handle__pubackcomp(struct mosquitto_db *db, struct mosquitto *mosq, const char *type);
#else #else
int handle__packet(struct mosquitto *mosq);
int handle__connack(struct mosquitto *mosq); int handle__connack(struct mosquitto *mosq);
int handle__pubackcomp(struct mosquitto *mosq, const char *type); int handle__pubackcomp(struct mosquitto *mosq, const char *type);
int handle__publish(struct mosquitto *mosq); int handle__publish(struct mosquitto *mosq);

View File

@ -207,7 +207,7 @@ int socks5__read(struct mosquitto *mosq)
if(mosq->state == mosq_cs_socks5_start){ if(mosq->state == mosq_cs_socks5_start){
while(mosq->in_packet.to_process > 0){ while(mosq->in_packet.to_process > 0){
len = mosquitto__net_read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process); len = net__read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process);
if(len > 0){ if(len > 0){
mosq->in_packet.pos += len; mosq->in_packet.pos += len;
mosq->in_packet.to_process -= len; mosq->in_packet.to_process -= len;
@ -249,7 +249,7 @@ int socks5__read(struct mosquitto *mosq)
} }
}else if(mosq->state == mosq_cs_socks5_userpass_reply){ }else if(mosq->state == mosq_cs_socks5_userpass_reply){
while(mosq->in_packet.to_process > 0){ while(mosq->in_packet.to_process > 0){
len = mosquitto__net_read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process); len = net__read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process);
if(len > 0){ if(len > 0){
mosq->in_packet.pos += len; mosq->in_packet.pos += len;
mosq->in_packet.to_process -= len; mosq->in_packet.to_process -= len;
@ -305,7 +305,7 @@ int socks5__read(struct mosquitto *mosq)
} }
}else if(mosq->state == mosq_cs_socks5_request){ }else if(mosq->state == mosq_cs_socks5_request){
while(mosq->in_packet.to_process > 0){ while(mosq->in_packet.to_process > 0){
len = mosquitto__net_read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process); len = net__read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process);
if(len > 0){ if(len > 0){
mosq->in_packet.pos += len; mosq->in_packet.pos += len;
mosq->in_packet.to_process -= len; mosq->in_packet.to_process -= len;

View File

@ -59,7 +59,7 @@ void mosquitto__check_keepalive(struct mosquitto *mosq)
&& now - mosq->last_msg_out >= mosq->bridge->idle_timeout){ && now - mosq->last_msg_out >= mosq->bridge->idle_timeout){
log__printf(NULL, MOSQ_LOG_NOTICE, "Bridge connection %s has exceeded idle timeout, disconnecting.", mosq->id); log__printf(NULL, MOSQ_LOG_NOTICE, "Bridge connection %s has exceeded idle timeout, disconnecting.", mosq->id);
mosquitto__socket_close(db, mosq); net__socket_close(db, mosq);
return; return;
} }
#endif #endif
@ -84,9 +84,9 @@ void mosquitto__check_keepalive(struct mosquitto *mosq)
assert(mosq->listener->client_count >= 0); assert(mosq->listener->client_count >= 0);
} }
mosq->listener = NULL; mosq->listener = NULL;
mosquitto__socket_close(db, mosq); net__socket_close(db, mosq);
#else #else
mosquitto__socket_close(mosq); net__socket_close(mosq);
pthread_mutex_lock(&mosq->state_mutex); pthread_mutex_lock(&mosq->state_mutex);
if(mosq->state == mosq_cs_disconnecting){ if(mosq->state == mosq_cs_disconnecting){
rc = MOSQ_ERR_SUCCESS; rc = MOSQ_ERR_SUCCESS;

View File

@ -166,12 +166,12 @@ int bridge__connect(struct mosquitto_db *db, struct mosquitto *context)
* remove any messages and the next loop carries out the resubscription * remove any messages and the next loop carries out the resubscription
* anyway. This means any unwanted subs will be removed. * anyway. This means any unwanted subs will be removed.
*/ */
mqtt3_subs_clean_session(db, context); sub__clean_session(db, context);
for(i=0; i<context->bridge->topic_count; i++){ for(i=0; i<context->bridge->topic_count; i++){
if(context->bridge->topics[i].direction == bd_out || context->bridge->topics[i].direction == bd_both){ if(context->bridge->topics[i].direction == bd_out || context->bridge->topics[i].direction == bd_both){
log__printf(NULL, MOSQ_LOG_DEBUG, "Bridge %s doing local SUBSCRIBE on topic %s", context->id, context->bridge->topics[i].local_topic); log__printf(NULL, MOSQ_LOG_DEBUG, "Bridge %s doing local SUBSCRIBE on topic %s", context->id, context->bridge->topics[i].local_topic);
if(mqtt3_sub_add(db, context, context->bridge->topics[i].local_topic, context->bridge->topics[i].qos, &db->subs)) return 1; if(sub__add(db, context, context->bridge->topics[i].local_topic, context->bridge->topics[i].qos, &db->subs)) return 1;
} }
} }
@ -210,7 +210,7 @@ int bridge__connect(struct mosquitto_db *db, struct mosquitto *context)
} }
log__printf(NULL, MOSQ_LOG_NOTICE, "Connecting bridge %s (%s:%d)", context->bridge->name, context->bridge->addresses[context->bridge->cur_address].address, context->bridge->addresses[context->bridge->cur_address].port); log__printf(NULL, MOSQ_LOG_NOTICE, "Connecting bridge %s (%s:%d)", context->bridge->name, context->bridge->addresses[context->bridge->cur_address].address, context->bridge->addresses[context->bridge->cur_address].port);
rc = mosquitto__socket_connect(context, context->bridge->addresses[context->bridge->cur_address].address, context->bridge->addresses[context->bridge->cur_address].port, NULL, false); rc = net__socket_connect(context, context->bridge->addresses[context->bridge->cur_address].address, context->bridge->addresses[context->bridge->cur_address].port, NULL, false);
if(rc > 0 ){ if(rc > 0 ){
if(rc == MOSQ_ERR_TLS){ if(rc == MOSQ_ERR_TLS){
return rc; /* Error already printed */ return rc; /* Error already printed */
@ -241,7 +241,7 @@ int bridge__connect(struct mosquitto_db *db, struct mosquitto *context)
}else if(rc == MOSQ_ERR_EAI){ }else if(rc == MOSQ_ERR_EAI){
log__printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", gai_strerror(errno)); log__printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", gai_strerror(errno));
} }
mosquitto__socket_close(db, context); net__socket_close(db, context);
return rc; return rc;
} }
} }

View File

@ -134,9 +134,9 @@ void context__cleanup(struct mosquitto_db *db, struct mosquitto *context, bool d
} }
} }
#endif #endif
mosquitto__socket_close(db, context); net__socket_close(db, context);
if((do_free || context->clean_session) && db){ if((do_free || context->clean_session) && db){
mqtt3_subs_clean_session(db, context); sub__clean_session(db, context);
db__messages_delete(db, context); db__messages_delete(db, context);
} }
if(context->address){ if(context->address){
@ -199,7 +199,7 @@ void context__disconnect(struct mosquitto_db *db, struct mosquitto *ctxt)
ctxt->will = NULL; ctxt->will = NULL;
} }
ctxt->disconnect_t = time(NULL); ctxt->disconnect_t = time(NULL);
mosquitto__socket_close(db, ctxt); net__socket_close(db, ctxt);
} }
void context__add_to_disused(struct mosquitto_db *db, struct mosquitto *context) void context__add_to_disused(struct mosquitto_db *db, struct mosquitto *context)

View File

@ -204,7 +204,7 @@ void db__msg_store_deref(struct mosquitto_db *db, struct mosquitto_msg_store **s
} }
static void _message_remove(struct mosquitto_db *db, struct mosquitto *context, struct mosquitto_client_msg **msg, struct mosquitto_client_msg *last) static void db__message_remove(struct mosquitto_db *db, struct mosquitto *context, struct mosquitto_client_msg **msg, struct mosquitto_client_msg *last)
{ {
if(!context || !msg || !(*msg)){ if(!context || !msg || !(*msg)){
return; return;
@ -269,7 +269,7 @@ int db__message_delete(struct mosquitto_db *db, struct mosquitto *context, uint1
} }
if(tail->mid == mid && tail->direction == dir){ if(tail->mid == mid && tail->direction == dir){
msg_index--; msg_index--;
_message_remove(db, context, &tail, last); db__message_remove(db, context, &tail, last);
deleted = true; deleted = true;
}else{ }else{
last = tail; last = tail;
@ -499,7 +499,7 @@ int db__messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context,
} }
if(db__message_store(db, source_id, 0, topic, qos, payloadlen, payload, retain, &stored, 0)) return 1; if(db__message_store(db, source_id, 0, topic, qos, payloadlen, payload, retain, &stored, 0)) return 1;
return mqtt3_db_messages_queue(db, source_id, topic, qos, retain, &stored); return sub__messages_queue(db, source_id, topic, qos, retain, &stored);
} }
int db__message_store(struct mosquitto_db *db, const char *source, uint16_t source_mid, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain, struct mosquitto_msg_store **stored, dbid_t store_id) int db__message_store(struct mosquitto_db *db, const char *source, uint16_t source_mid, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain, struct mosquitto_msg_store **stored, dbid_t store_id)
@ -635,7 +635,7 @@ int db__message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *conte
if(msg->qos != 2){ if(msg->qos != 2){
/* Anything <QoS 2 can be completely retried by the client at /* Anything <QoS 2 can be completely retried by the client at
* no harm. */ * no harm. */
_message_remove(db, context, &msg, prev); db__message_remove(db, context, &msg, prev);
}else{ }else{
/* Message state can be preserved here because it should match /* Message state can be preserved here because it should match
* whatever the client has got. */ * whatever the client has got. */
@ -764,8 +764,8 @@ int db__message_release(struct mosquitto_db *db, struct mosquitto *context, uint
* denied/dropped and is being processed so the client doesn't * denied/dropped and is being processed so the client doesn't
* keep resending it. That means we don't send it to other * keep resending it. That means we don't send it to other
* clients. */ * clients. */
if(!topic || !mqtt3_db_messages_queue(db, source_id, topic, qos, retain, &tail->store)){ if(!topic || !sub__messages_queue(db, source_id, topic, qos, retain, &tail->store)){
_message_remove(db, context, &tail, last); db__message_remove(db, context, &tail, last);
deleted = true; deleted = true;
}else{ }else{
return 1; return 1;
@ -825,7 +825,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
case mosq_ms_publish_qos0: case mosq_ms_publish_qos0:
rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries); rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries);
if(!rc){ if(!rc){
_message_remove(db, context, &tail, last); db__message_remove(db, context, &tail, last);
}else{ }else{
return rc; return rc;
} }

View File

@ -31,7 +31,7 @@ Contributors:
static uint32_t db_version; static uint32_t db_version;
static int stats = 0; static int stats = 0;
static int _db_client_chunk_restore(struct mosquitto_db *db, FILE *db_fd) static int db__client_chunk_restore(struct mosquitto_db *db, FILE *db_fd)
{ {
uint16_t i16temp, slen, last_mid; uint16_t i16temp, slen, last_mid;
char *client_id = NULL; char *client_id = NULL;
@ -75,7 +75,7 @@ error:
return 1; return 1;
} }
static int _db_client_msg_chunk_restore(struct mosquitto_db *db, FILE *db_fd) static int db__client_msg_chunk_restore(struct mosquitto_db *db, FILE *db_fd)
{ {
dbid_t i64temp, store_id; dbid_t i64temp, store_id;
uint16_t i16temp, slen, mid; uint16_t i16temp, slen, mid;
@ -127,7 +127,7 @@ error:
return 1; return 1;
} }
static int _db_msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fd) static int db__msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fd)
{ {
dbid_t i64temp, store_id; dbid_t i64temp, store_id;
uint32_t i32temp, payloadlen; uint32_t i32temp, payloadlen;
@ -240,7 +240,7 @@ error:
return 1; return 1;
} }
static int _db_retain_chunk_restore(struct mosquitto_db *db, FILE *db_fd) static int db__retain_chunk_restore(struct mosquitto_db *db, FILE *db_fd)
{ {
dbid_t i64temp, store_id; dbid_t i64temp, store_id;
@ -254,7 +254,7 @@ static int _db_retain_chunk_restore(struct mosquitto_db *db, FILE *db_fd)
return 0; return 0;
} }
static int _db_sub_chunk_restore(struct mosquitto_db *db, FILE *db_fd) static int db__sub_chunk_restore(struct mosquitto_db *db, FILE *db_fd)
{ {
uint16_t i16temp, slen; uint16_t i16temp, slen;
uint8_t qos; uint8_t qos;
@ -364,35 +364,35 @@ int main(int argc, char *argv[])
msg_store_count++; msg_store_count++;
if(!stats) printf("DB_CHUNK_MSG_STORE:\n"); if(!stats) printf("DB_CHUNK_MSG_STORE:\n");
if(!stats) printf("\tLength: %d\n", length); if(!stats) printf("\tLength: %d\n", length);
if(_db_msg_store_chunk_restore(&db, fd)) return 1; if(db__msg_store_chunk_restore(&db, fd)) return 1;
break; break;
case DB_CHUNK_CLIENT_MSG: case DB_CHUNK_CLIENT_MSG:
client_msg_count++; client_msg_count++;
if(!stats) printf("DB_CHUNK_CLIENT_MSG:\n"); if(!stats) printf("DB_CHUNK_CLIENT_MSG:\n");
if(!stats) printf("\tLength: %d\n", length); if(!stats) printf("\tLength: %d\n", length);
if(_db_client_msg_chunk_restore(&db, fd)) return 1; if(db__client_msg_chunk_restore(&db, fd)) return 1;
break; break;
case DB_CHUNK_RETAIN: case DB_CHUNK_RETAIN:
retain_count++; retain_count++;
if(!stats) printf("DB_CHUNK_RETAIN:\n"); if(!stats) printf("DB_CHUNK_RETAIN:\n");
if(!stats) printf("\tLength: %d\n", length); if(!stats) printf("\tLength: %d\n", length);
if(_db_retain_chunk_restore(&db, fd)) return 1; if(db__retain_chunk_restore(&db, fd)) return 1;
break; break;
case DB_CHUNK_SUB: case DB_CHUNK_SUB:
sub_count++; sub_count++;
if(!stats) printf("DB_CHUNK_SUB:\n"); if(!stats) printf("DB_CHUNK_SUB:\n");
if(!stats) printf("\tLength: %d\n", length); if(!stats) printf("\tLength: %d\n", length);
if(_db_sub_chunk_restore(&db, fd)) return 1; if(db__sub_chunk_restore(&db, fd)) return 1;
break; break;
case DB_CHUNK_CLIENT: case DB_CHUNK_CLIENT:
client_count++; client_count++;
if(!stats) printf("DB_CHUNK_CLIENT:\n"); if(!stats) printf("DB_CHUNK_CLIENT:\n");
if(!stats) printf("\tLength: %d\n", length); if(!stats) printf("\tLength: %d\n", length);
if(_db_client_chunk_restore(&db, fd)) return 1; if(db__client_chunk_restore(&db, fd)) return 1;
break; break;
default: default:

View File

@ -177,9 +177,9 @@ int mosquitto_main_loop(struct mosquitto_db *db, int *listensock, int listensock
&& context->bridge->cur_address != 0 && context->bridge->cur_address != 0
&& now > context->bridge->primary_retry){ && now > context->bridge->primary_retry){
if(mosquitto__try_connect(context, context->bridge->addresses[0].address, context->bridge->addresses[0].port, &bridge_sock, NULL, false) == MOSQ_ERR_SUCCESS){ if(net__try_connect(context, context->bridge->addresses[0].address, context->bridge->addresses[0].port, &bridge_sock, NULL, false) == MOSQ_ERR_SUCCESS){
COMPAT_CLOSE(bridge_sock); COMPAT_CLOSE(bridge_sock);
mosquitto__socket_close(db, context); net__socket_close(db, context);
context->bridge->cur_address = context->bridge->address_count-1; context->bridge->cur_address = context->bridge->address_count-1;
} }
} }
@ -358,7 +358,7 @@ int mosquitto_main_loop(struct mosquitto_db *db, int *listensock, int listensock
flag_reload = false; flag_reload = false;
} }
if(flag_tree_print){ if(flag_tree_print){
mqtt3_sub_tree_print(&db->subs, 0); sub__tree_print(&db->subs, 0);
flag_tree_print = false; flag_tree_print = false;
} }
#ifdef WITH_WEBSOCKETS #ifdef WITH_WEBSOCKETS

View File

@ -228,7 +228,7 @@ int main(int argc, char *argv[])
memset(&int_db, 0, sizeof(struct mosquitto_db)); memset(&int_db, 0, sizeof(struct mosquitto_db));
mosquitto__net_init(); net__init();
config__init(&config); config__init(&config);
rc = config__parse_args(&config, argc, argv); rc = config__parse_args(&config, argc, argv);
@ -431,7 +431,7 @@ int main(int argc, char *argv[])
} }
config__cleanup(int_db.config); config__cleanup(int_db.config);
mosquitto__net_cleanup(); net__cleanup();
return rc; return rc;
} }

View File

@ -444,7 +444,7 @@ int net__socket_get_address(int sock, char *buf, int len);
/* ============================================================ /* ============================================================
* Read handling functions * Read handling functions
* ============================================================ */ * ============================================================ */
int mqtt3_packet_handle(struct mosquitto_db *db, struct mosquitto *context); int handle__packet(struct mosquitto_db *db, struct mosquitto *context);
int handle__connack(struct mosquitto_db *db, struct mosquitto *context); int handle__connack(struct mosquitto_db *db, struct mosquitto *context);
int handle__connect(struct mosquitto_db *db, struct mosquitto *context); int handle__connect(struct mosquitto_db *db, struct mosquitto *context);
int handle__disconnect(struct mosquitto_db *db, struct mosquitto *context); int handle__disconnect(struct mosquitto_db *db, struct mosquitto *context);
@ -471,7 +471,6 @@ int db__message_update(struct mosquitto *context, uint16_t mid, enum mosquitto_m
int db__message_write(struct mosquitto_db *db, struct mosquitto *context); int db__message_write(struct mosquitto_db *db, struct mosquitto *context);
int db__messages_delete(struct mosquitto_db *db, struct mosquitto *context); int db__messages_delete(struct mosquitto_db *db, struct mosquitto *context);
int db__messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain); int db__messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain);
int mqtt3_db_messages_queue(struct mosquitto_db *db, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store **stored);
int db__message_store(struct mosquitto_db *db, const char *source, uint16_t source_mid, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain, struct mosquitto_msg_store **stored, dbid_t store_id); int db__message_store(struct mosquitto_db *db, const char *source, uint16_t source_mid, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain, struct mosquitto_msg_store **stored, dbid_t store_id);
int db__message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_msg_store **stored); int db__message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_msg_store **stored);
void db__msg_store_add(struct mosquitto_db *db, struct mosquitto_msg_store *store); void db__msg_store_add(struct mosquitto_db *db, struct mosquitto_msg_store *store);
@ -483,16 +482,16 @@ int db__message_timeout_check(struct mosquitto_db *db, unsigned int timeout);
int db__message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *context); int db__message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *context);
void db__vacuum(void); void db__vacuum(void);
void sys__update(struct mosquitto_db *db, int interval, time_t start_time); void sys__update(struct mosquitto_db *db, int interval, time_t start_time);
int mqtt3_retain_queue(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int sub_qos);
/* ============================================================ /* ============================================================
* Subscription functions * Subscription functions
* ============================================================ */ * ============================================================ */
int mqtt3_sub_add(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int qos, struct mosquitto__subhier *root); int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int qos, struct mosquitto__subhier *root);
int mqtt3_sub_remove(struct mosquitto_db *db, struct mosquitto *context, const char *sub, struct mosquitto__subhier *root); int sub__remove(struct mosquitto_db *db, struct mosquitto *context, const char *sub, struct mosquitto__subhier *root);
int mqtt3_sub_search(struct mosquitto_db *db, struct mosquitto__subhier *root, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored); void sub__tree_print(struct mosquitto__subhier *root, int level);
void mqtt3_sub_tree_print(struct mosquitto__subhier *root, int level); int sub__clean_session(struct mosquitto_db *db, struct mosquitto *context);
int mqtt3_subs_clean_session(struct mosquitto_db *db, struct mosquitto *context); int sub__retain_queue(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int sub_qos);
int sub__messages_queue(struct mosquitto_db *db, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store **stored);
/* ============================================================ /* ============================================================
* Context functions * Context functions

View File

@ -83,7 +83,7 @@ int net__socket_accept(struct mosquitto_db *db, int listensock)
G_SOCKET_CONNECTIONS_INC(); G_SOCKET_CONNECTIONS_INC();
if(mosquitto__socket_nonblock(new_sock)){ if(net__socket_nonblock(new_sock)){
return INVALID_SOCKET; return INVALID_SOCKET;
} }
@ -376,7 +376,7 @@ int net__socket_listen(struct mosquitto__listener *listener)
ss_opt = 1; ss_opt = 1;
setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, &ss_opt, sizeof(ss_opt)); setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, &ss_opt, sizeof(ss_opt));
if(mosquitto__socket_nonblock(sock)){ if(net__socket_nonblock(sock)){
return 1; return 1;
} }

View File

@ -667,7 +667,7 @@ static int persist__retain_chunk_restore(struct mosquitto_db *db, FILE *db_fptr)
store_id = i64temp; store_id = i64temp;
HASH_FIND(hh, db->msg_store_load, &store_id, sizeof(dbid_t), load); HASH_FIND(hh, db->msg_store_load, &store_id, sizeof(dbid_t), load);
if(load){ if(load){
mqtt3_db_messages_queue(db, NULL, load->store->topic, load->store->qos, load->store->retain, &load->store); sub__messages_queue(db, NULL, load->store->topic, load->store->qos, load->store->retain, &load->store);
}else{ }else{
log__printf(NULL, MOSQ_LOG_ERR, "Error: Corrupt database whilst restoring a retained message."); log__printf(NULL, MOSQ_LOG_ERR, "Error: Corrupt database whilst restoring a retained message.");
return MOSQ_ERR_INVAL; return MOSQ_ERR_INVAL;
@ -837,7 +837,7 @@ static int persist__restore_sub(struct mosquitto_db *db, const char *client_id,
context = persist__find_or_add_context(db, client_id, 0); context = persist__find_or_add_context(db, client_id, 0);
if(!context) return 1; if(!context) return 1;
return mqtt3_sub_add(db, context, sub, qos, &db->subs); return sub__add(db, context, sub, qos, &db->subs);
} }
#endif #endif

View File

@ -29,7 +29,7 @@ Contributors:
#include "sys_tree.h" #include "sys_tree.h"
#include "util_mosq.h" #include "util_mosq.h"
int mqtt3_packet_handle(struct mosquitto_db *db, struct mosquitto *context) int handle__packet(struct mosquitto_db *db, struct mosquitto *context)
{ {
if(!context) return MOSQ_ERR_INVAL; if(!context) return MOSQ_ERR_INVAL;
@ -226,10 +226,10 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
} }
switch(qos){ switch(qos){
case 0: case 0:
if(mqtt3_db_messages_queue(db, context->id, topic, qos, retain, &stored)) rc = 1; if(sub__messages_queue(db, context->id, topic, qos, retain, &stored)) rc = 1;
break; break;
case 1: case 1:
if(mqtt3_db_messages_queue(db, context->id, topic, qos, retain, &stored)) rc = 1; if(sub__messages_queue(db, context->id, topic, qos, retain, &stored)) rc = 1;
if(send__puback(context, mid)) rc = 1; if(send__puback(context, mid)) rc = 1;
break; break;
case 2: case 2:

View File

@ -691,9 +691,9 @@ int handle__subscribe(struct mosquitto_db *db, struct mosquitto *context)
#endif #endif
if(qos != 0x80){ if(qos != 0x80){
rc2 = mqtt3_sub_add(db, context, sub, qos, &db->subs); rc2 = sub__add(db, context, sub, qos, &db->subs);
if(rc2 == MOSQ_ERR_SUCCESS){ if(rc2 == MOSQ_ERR_SUCCESS){
if(mqtt3_retain_queue(db, context, sub, qos)) rc = 1; if(sub__retain_queue(db, context, sub, qos)) rc = 1;
}else if(rc2 != -1){ }else if(rc2 != -1){
rc = rc2; rc = rc2;
} }
@ -766,7 +766,7 @@ int handle__unsubscribe(struct mosquitto_db *db, struct mosquitto *context)
} }
log__printf(NULL, MOSQ_LOG_DEBUG, "\t%s", sub); log__printf(NULL, MOSQ_LOG_DEBUG, "\t%s", sub);
mqtt3_sub_remove(db, context, sub, &db->subs); sub__remove(db, context, sub, &db->subs);
log__printf(NULL, MOSQ_LOG_UNSUBSCRIBE, "%s %s", context->id, sub); log__printf(NULL, MOSQ_LOG_UNSUBSCRIBE, "%s %s", context->id, sub);
mosquitto__free(sub); mosquitto__free(sub);
} }

View File

@ -245,7 +245,7 @@ static void sub__topic_tokens_free(struct sub__token *tokens)
} }
} }
static int sub__add(struct mosquitto_db *db, struct mosquitto *context, int qos, struct mosquitto__subhier *subhier, struct sub__token *tokens) static int sub__add_recurse(struct mosquitto_db *db, struct mosquitto *context, int qos, struct mosquitto__subhier *subhier, struct sub__token *tokens)
{ {
struct mosquitto__subhier *branch, *last = NULL; struct mosquitto__subhier *branch, *last = NULL;
struct mosquitto__subleaf *leaf, *last_leaf; struct mosquitto__subleaf *leaf, *last_leaf;
@ -321,7 +321,7 @@ static int sub__add(struct mosquitto_db *db, struct mosquitto *context, int qos,
branch = subhier->children; branch = subhier->children;
while(branch){ while(branch){
if(!strcmp(UHPA_ACCESS_TOPIC(branch), UHPA_ACCESS_TOPIC(tokens))){ if(!strcmp(UHPA_ACCESS_TOPIC(branch), UHPA_ACCESS_TOPIC(tokens))){
return sub__add(db, context, qos, branch, tokens->next); return sub__add_recurse(db, context, qos, branch, tokens->next);
} }
last = branch; last = branch;
branch = branch->next; branch = branch->next;
@ -340,10 +340,10 @@ static int sub__add(struct mosquitto_db *db, struct mosquitto *context, int qos,
}else{ }else{
last->next = branch; last->next = branch;
} }
return sub__add(db, context, qos, branch, tokens->next); return sub__add_recurse(db, context, qos, branch, tokens->next);
} }
static int sub__remove(struct mosquitto_db *db, struct mosquitto *context, struct mosquitto__subhier *subhier, struct sub__token *tokens) static int sub__remove_recurse(struct mosquitto_db *db, struct mosquitto *context, struct mosquitto__subhier *subhier, struct sub__token *tokens)
{ {
struct mosquitto__subhier *branch, *last = NULL; struct mosquitto__subhier *branch, *last = NULL;
struct mosquitto__subleaf *leaf; struct mosquitto__subleaf *leaf;
@ -386,7 +386,7 @@ static int sub__remove(struct mosquitto_db *db, struct mosquitto *context, struc
branch = subhier->children; branch = subhier->children;
while(branch){ while(branch){
if(!strcmp(UHPA_ACCESS_TOPIC(branch), UHPA_ACCESS_TOPIC(tokens))){ if(!strcmp(UHPA_ACCESS_TOPIC(branch), UHPA_ACCESS_TOPIC(tokens))){
sub__remove(db, context, branch, tokens->next); sub__remove_recurse(db, context, branch, tokens->next);
if(!branch->children && !branch->subs && !branch->retained){ if(!branch->children && !branch->subs && !branch->retained){
if(last){ if(last){
last->next = branch->next; last->next = branch->next;
@ -438,7 +438,7 @@ static void sub__search(struct mosquitto_db *db, struct mosquitto__subhier *subh
} }
} }
int mqtt3_sub_add(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int qos, struct mosquitto__subhier *root) int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int qos, struct mosquitto__subhier *root)
{ {
int rc = 0; int rc = 0;
struct mosquitto__subhier *subhier, *child; struct mosquitto__subhier *subhier, *child;
@ -452,7 +452,7 @@ int mqtt3_sub_add(struct mosquitto_db *db, struct mosquitto *context, const char
subhier = root->children; subhier = root->children;
while(subhier){ while(subhier){
if(!strcmp(UHPA_ACCESS_TOPIC(subhier), UHPA_ACCESS_TOPIC(tokens))){ if(!strcmp(UHPA_ACCESS_TOPIC(subhier), UHPA_ACCESS_TOPIC(tokens))){
rc = sub__add(db, context, qos, subhier, tokens); rc = sub__add_recurse(db, context, qos, subhier, tokens);
break; break;
} }
subhier = subhier->next; subhier = subhier->next;
@ -482,7 +482,7 @@ int mqtt3_sub_add(struct mosquitto_db *db, struct mosquitto *context, const char
} }
db->subs.children = child; db->subs.children = child;
rc = sub__add(db, context, qos, child, tokens); rc = sub__add_recurse(db, context, qos, child, tokens);
} }
sub__topic_tokens_free(tokens); sub__topic_tokens_free(tokens);
@ -492,7 +492,7 @@ int mqtt3_sub_add(struct mosquitto_db *db, struct mosquitto *context, const char
return rc; return rc;
} }
int mqtt3_sub_remove(struct mosquitto_db *db, struct mosquitto *context, const char *sub, struct mosquitto__subhier *root) int sub__remove(struct mosquitto_db *db, struct mosquitto *context, const char *sub, struct mosquitto__subhier *root)
{ {
int rc = 0; int rc = 0;
struct mosquitto__subhier *subhier; struct mosquitto__subhier *subhier;
@ -506,7 +506,7 @@ int mqtt3_sub_remove(struct mosquitto_db *db, struct mosquitto *context, const c
subhier = root->children; subhier = root->children;
while(subhier){ while(subhier){
if(!strcmp(UHPA_ACCESS_TOPIC(subhier), UHPA_ACCESS_TOPIC(tokens))){ if(!strcmp(UHPA_ACCESS_TOPIC(subhier), UHPA_ACCESS_TOPIC(tokens))){
rc = sub__remove(db, context, subhier, tokens); rc = sub__remove_recurse(db, context, subhier, tokens);
break; break;
} }
subhier = subhier->next; subhier = subhier->next;
@ -517,7 +517,7 @@ int mqtt3_sub_remove(struct mosquitto_db *db, struct mosquitto *context, const c
return rc; return rc;
} }
int mqtt3_db_messages_queue(struct mosquitto_db *db, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store **stored) int sub__messages_queue(struct mosquitto_db *db, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store **stored)
{ {
int rc = 0; int rc = 0;
struct mosquitto__subhier *subhier; struct mosquitto__subhier *subhier;
@ -541,7 +541,7 @@ int mqtt3_db_messages_queue(struct mosquitto_db *db, const char *source_id, cons
/* We have a message that needs to be retained, so ensure that the subscription /* We have a message that needs to be retained, so ensure that the subscription
* tree for its topic exists. * tree for its topic exists.
*/ */
sub__add(db, NULL, 0, subhier, tokens); sub__add_recurse(db, NULL, 0, subhier, tokens);
} }
sub__search(db, subhier, tokens, source_id, topic, qos, retain, *stored, true); sub__search(db, subhier, tokens, source_id, topic, qos, retain, *stored, true);
} }
@ -557,7 +557,7 @@ int mqtt3_db_messages_queue(struct mosquitto_db *db, const char *source_id, cons
/* Remove all subscriptions for a client. /* Remove all subscriptions for a client.
*/ */
int mqtt3_subs_clean_session(struct mosquitto_db *db, struct mosquitto *context) int sub__clean_session(struct mosquitto_db *db, struct mosquitto *context)
{ {
int i; int i;
struct mosquitto__subleaf *leaf; struct mosquitto__subleaf *leaf;
@ -593,7 +593,7 @@ int mqtt3_subs_clean_session(struct mosquitto_db *db, struct mosquitto *context)
return MOSQ_ERR_SUCCESS; return MOSQ_ERR_SUCCESS;
} }
void mqtt3_sub_tree_print(struct mosquitto__subhier *root, int level) void sub__tree_print(struct mosquitto__subhier *root, int level)
{ {
int i; int i;
struct mosquitto__subhier *branch; struct mosquitto__subhier *branch;
@ -619,7 +619,7 @@ void mqtt3_sub_tree_print(struct mosquitto__subhier *root, int level)
branch = root->children; branch = root->children;
while(branch){ while(branch){
mqtt3_sub_tree_print(branch, level+1); sub__tree_print(branch, level+1);
branch = branch->next; branch = branch->next;
} }
} }
@ -693,7 +693,7 @@ static int retain__search(struct mosquitto_db *db, struct mosquitto__subhier *su
return flag; return flag;
} }
int mqtt3_retain_queue(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int sub_qos) int sub__retain_queue(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int sub_qos)
{ {
struct mosquitto__subhier *subhier; struct mosquitto__subhier *subhier;
struct sub__token *tokens = NULL, *tail; struct sub__token *tokens = NULL, *tail;

View File

@ -314,7 +314,7 @@ static int callback_mqtt(struct libwebsocket_context *context,
G_PUB_MSGS_RECEIVED_INC(1); G_PUB_MSGS_RECEIVED_INC(1);
} }
#endif #endif
rc = mqtt3_packet_handle(db, mosq); rc = handle__packet(db, mosq);
/* Free data and reset values */ /* Free data and reset values */
packet__cleanup(&mosq->in_packet); packet__cleanup(&mosq->in_packet);

View File

@ -18,7 +18,7 @@ int main(int argc, char *argv[])
bool clean_session = true; bool clean_session = true;
int keepalive = 60; int keepalive = 60;
mosq = mosquitto_new("packetgen", NULL); mosq = mosquitto_new("packetgen", clean_session, NULL);
if(!mosq){ if(!mosq){
fprintf(stderr, "Error: Out of memory.\n"); fprintf(stderr, "Error: Out of memory.\n");
return 1; return 1;
@ -30,9 +30,9 @@ int main(int argc, char *argv[])
fprintf(stderr, "Error: Unable to open mqtt.connect for writing.\n"); fprintf(stderr, "Error: Unable to open mqtt.connect for writing.\n");
return 1; return 1;
} }
mosq->core.sock = fd; mosq->sock = fd;
printf("_mosquitto_send_connect(): %d\n", _mosquitto_send_connect(mosq, keepalive, clean_session)); printf("send__connect(): %d\n", send__connect(mosq, keepalive, clean_session));
printf("loop: %d\n", mosquitto_loop_write(mosq)); printf("loop: %d\n", mosquitto_loop_write(mosq, 1));
close(fd); close(fd);
/* SUBSCRIBE */ /* SUBSCRIBE */
@ -41,9 +41,9 @@ int main(int argc, char *argv[])
fprintf(stderr, "Error: Unable to open mqtt.subscribe for writing.\n"); fprintf(stderr, "Error: Unable to open mqtt.subscribe for writing.\n");
return 1; return 1;
} }
mosq->core.sock = fd; mosq->sock = fd;
printf("_mosquitto_send_subscribe(): %d\n", _mosquitto_send_subscribe(mosq, NULL, false, "subscribe/topic", 2)); printf("send__subscribe(): %d\n", send__subscribe(mosq, NULL, "subscribe/topic", 2));
printf("loop: %d\n", mosquitto_loop_write(mosq)); printf("loop: %d\n", mosquitto_loop_write(mosq, 1));
close(fd); close(fd);
mosquitto_destroy(mosq); mosquitto_destroy(mosq);

View File

@ -1,198 +0,0 @@
/*
Copyright (c) 2009, Roger Light <roger@atchoo.org>
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Neither the name of mosquitto nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
#include <errno.h>
#include <fcntl.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/select.h>
#include <unistd.h>
#include <mqtt3.h>
typedef enum {
stStart,
stSocketOpened,
stConnSent,
stConnAckd,
stSubSent,
stSubAckd,
stPause
} stateType;
static stateType state = stStart;
int handle_read(mqtt3_context *context)
{
uint8_t buf;
int rc;
rc = read(context->sock, &buf, 1);
printf("rc: %d\n", rc);
if(rc == -1){
printf("Error: %s\n", strerror(errno));
return 1;
}else if(rc == 0){
return 2;
}
switch(buf&0xF0){
case CONNACK:
if(mqtt3_handle_connack(context)) return 3;
state = stConnAckd;
break;
case SUBACK:
if(mqtt3_handle_suback(context)) return 3;
state = stSubAckd;
break;
case PINGREQ:
if(mqtt3_handle_pingreq(context)) return 3;
break;
case PINGRESP:
if(mqtt3_handle_pingresp(context)) return 3;
break;
case PUBACK:
if(mqtt3_handle_puback(context)) return 3;
break;
case PUBCOMP:
if(mqtt3_handle_pubcomp(context)) return 3;
break;
case PUBLISH:
if(mqtt3_handle_publish(context)) return 0;
break;
case PUBREC:
if(mqtt3_handle_pubrec(context)) return 3;
break;
case UNSUBACK:
if(mqtt3_handle_unsuback(context)) return 3;
break;
default:
printf("Unknown command: %s (%d)\n", mqtt3_command_to_string(buf&0xF0), buf&0xF0);
break;
}
return 0;
}
void send_random(mqtt3_context *context, int length)
{
int fd = open("/dev/urandom", O_RDONLY);
uint8_t buf[length];
if(fd >= 0){
if(read(fd, buf, length) == length){
mqtt3_write_bytes(context, buf, length);
}
close(fd);
}
}
/* pselect loop test */
int main(int argc, char *argv[])
{
struct timespec timeout;
fd_set readfds, writefds;
int fdcount;
int run = 1;
mqtt3_context context;
char id[30];
if(argc == 2){
sprintf(id, "test%s", argv[1]);
}else{
sprintf(id, "test");
}
context.sock = mqtt3_socket_connect("127.0.0.1", 1883);
if(context.sock == -1){
return 1;
}
state = stSocketOpened;
while(run){
FD_ZERO(&readfds);
FD_SET(context.sock, &readfds);
FD_ZERO(&writefds);
//FD_SET(0, &writefds);
timeout.tv_sec = 1;
timeout.tv_nsec = 0;
fdcount = pselect(context.sock+1, &readfds, &writefds, NULL, &timeout, NULL);
if(fdcount == -1){
fprintf(stderr, "Error in pselect: %s\n", strerror(errno));
run = 0;
}else if(fdcount == 0){
switch(state){
case stSocketOpened:
mqtt3_raw_connect(&context, id, true, 2, true, "will", "aargh", 60, true);
state = stConnSent;
break;
case stConnSent:
printf("Waiting for CONNACK\n");
break;
case stConnAckd:
// printf("CONNACK received\n");
// mqtt3_raw_subscribe(&context, false, "a/b/c", 0);
// state = stSubSent;
send_random(&context, 100);
break;
case stSubSent:
printf("Waiting for SUBACK\n");
break;
case stSubAckd:
printf("SUBACK received\n");
mqtt3_raw_publish(&context, 0, 0, 0, 1, "a/b/c", 5, (uint8_t *)"Roger");
state = stPause;
break;
case stPause:
//mqtt3_raw_disconnect(&context);
printf("Pause\n");
break;
default:
fprintf(stderr, "Error: Unknown state\n");
break;
}
}else{
printf("fdcount=%d\n", fdcount);
if(FD_ISSET(context.sock, &readfds)){
if(handle_read(&context)){
fprintf(stderr, "Socket closed on remote side\n");
mqtt3_socket_close(&context);
run = 0;
}
}
}
}
return 0;
}