diff --git a/src/database.c b/src/database.c index de8224d4..75f083c5 100644 --- a/src/database.c +++ b/src/database.c @@ -121,10 +121,66 @@ static void subhier_clean(struct _mosquitto_subhier *subhier) int mqtt3_db_close(struct mosquitto_db *db) { subhier_clean(db->subs.children); + mosquitto__db_msg_store_clean(db); return MOSQ_ERR_SUCCESS; } + +void mosquitto__db_msg_store_add(struct mosquitto_db *db, struct mosquitto_msg_store *store) +{ + store->next = db->msg_store; + store->prev = NULL; + if(db->msg_store){ + db->msg_store->prev = store; + } + db->msg_store = store; +} + + +void mosquitto__db_msg_store_remove(struct mosquitto_db *db, struct mosquitto_msg_store *store) +{ + int i; + + if(store->prev){ + store->prev->next = store->next; + if(store->next){ + store->next->prev = store->prev; + } + }else{ + db->msg_store = store->next; + if(store->next){ + store->next->prev = NULL; + } + } + db->msg_store_count--; + + if(store->source_id) _mosquitto_free(store->source_id); + if(store->dest_ids){ + for(i=0; idest_id_count; i++){ + if(store->dest_ids[i]) _mosquitto_free(store->dest_ids[i]); + } + _mosquitto_free(store->dest_ids); + } + if(store->topic) _mosquitto_free(store->topic); + if(store->payload) _mosquitto_free(store->payload); + _mosquitto_free(store); +} + + +void mosquitto__db_msg_store_clean(struct mosquitto_db *db) +{ + struct mosquitto_msg_store *store, *next;; + + store = db->msg_store; + while(store){ + next = store->next; + mosquitto__db_msg_store_remove(db, store); + store = next; + } +} + + static void _message_remove(struct mosquitto_db *db, struct mosquitto *context, struct mosquitto_client_msg **msg, struct mosquitto_client_msg *last) { if(!context || !msg || !(*msg)){ @@ -133,8 +189,7 @@ static void _message_remove(struct mosquitto_db *db, struct mosquitto *context, (*msg)->store->ref_count--; if((*msg)->store->ref_count == 0){ - HASH_DELETE(hh, db->msg_store, (*msg)->store); - db->msg_store_count--; + mosquitto__db_msg_store_remove(db, (*msg)->store); } if(last){ last->next = (*msg)->next; @@ -399,8 +454,7 @@ int mqtt3_db_messages_delete(struct mosquitto_db *db, struct mosquitto *context) while(tail){ tail->store->ref_count--; if(tail->store->ref_count == 0){ - HASH_DELETE(hh, db->msg_store, tail->store); - db->msg_store_count--; + mosquitto__db_msg_store_remove(db, tail->store); } next = tail->next; _mosquitto_free(tail); @@ -502,7 +556,7 @@ int mqtt3_db_message_store(struct mosquitto_db *db, const char *source, uint16_t temp->db_id = store_id; } - HASH_ADD(hh, db->msg_store, db_id, sizeof(dbid_t), temp); + mosquitto__db_msg_store_add(db, temp); return MOSQ_ERR_SUCCESS; } diff --git a/src/mosquitto_broker.h b/src/mosquitto_broker.h index 7b96b252..4686f1b7 100644 --- a/src/mosquitto_broker.h +++ b/src/mosquitto_broker.h @@ -141,9 +141,16 @@ struct _mosquitto_subhier { struct mosquitto_msg_store *retained; }; -struct mosquitto_msg_store{ +struct mosquitto_msg_store_load{ UT_hash_handle hh; dbid_t db_id; + struct mosquitto_msg_store *store; +}; + +struct mosquitto_msg_store{ + struct mosquitto_msg_store *next; + struct mosquitto_msg_store *prev; + dbid_t db_id; char *source_id; char **dest_ids; int dest_id_count; @@ -220,6 +227,7 @@ struct mosquitto_db{ struct mosquitto *contexts_bridge; struct _clientid_index_hash *clientid_index_hash; struct mosquitto_msg_store *msg_store; + struct mosquitto_msg_store_load *msg_store_load; int msg_store_count; struct mqtt3_config *config; int persistence_changes; @@ -380,6 +388,9 @@ int mqtt3_db_messages_easy_queue(struct mosquitto_db *db, struct mosquitto *cont int mqtt3_db_messages_queue(struct mosquitto_db *db, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored); int mqtt3_db_message_store(struct mosquitto_db *db, const char *source, uint16_t source_mid, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain, struct mosquitto_msg_store **stored, dbid_t store_id); int mqtt3_db_message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_msg_store **stored); +void mosquitto__db_msg_store_add(struct mosquitto_db *db, struct mosquitto_msg_store *store); +void mosquitto__db_msg_store_remove(struct mosquitto_db *db, struct mosquitto_msg_store *store); +void mosquitto__db_msg_store_clean(struct mosquitto_db *db); /* Check all messages waiting on a client reply and resend if timeout has been exceeded. */ int mqtt3_db_message_timeout_check(struct mosquitto_db *db, unsigned int timeout); int mqtt3_db_message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *context); diff --git a/src/persist.c b/src/persist.c index e41af589..e8f7fe19 100644 --- a/src/persist.c +++ b/src/persist.c @@ -131,13 +131,14 @@ static int mqtt3_db_message_store_write(struct mosquitto_db *db, FILE *db_fptr) uint32_t i32temp; uint16_t i16temp, slen; uint8_t i8temp; - struct mosquitto_msg_store *stored, *stored_tmp; + struct mosquitto_msg_store *stored; bool force_no_retain; assert(db); assert(db_fptr); - HASH_ITER(hh, db->msg_store, stored, stored_tmp){ + stored = db->msg_store; + while(stored){ if(!strncmp(stored->topic, "$SYS", 4)){ /* Don't save $SYS messages as retained otherwise they can give * misleading information when reloaded. They should still be saved @@ -192,6 +193,7 @@ static int mqtt3_db_message_store_write(struct mosquitto_db *db, FILE *db_fptr) if(stored->payloadlen){ write_e(db_fptr, stored->payload, (unsigned int)stored->payloadlen); } + stored = stored->next; } return MOSQ_ERR_SUCCESS; @@ -403,7 +405,7 @@ error: static int _db_client_msg_restore(struct mosquitto_db *db, const char *client_id, uint16_t mid, uint8_t qos, uint8_t retain, uint8_t direction, uint8_t state, uint8_t dup, uint64_t store_id) { struct mosquitto_client_msg *cmsg; - struct mosquitto_msg_store *store; + struct mosquitto_msg_store_load *load; struct mosquitto *context; cmsg = _mosquitto_malloc(sizeof(struct mosquitto_client_msg)); @@ -422,13 +424,13 @@ static int _db_client_msg_restore(struct mosquitto_db *db, const char *client_id cmsg->state = state; cmsg->dup = dup; - HASH_FIND(hh, db->msg_store, &store_id, sizeof(dbid_t), store); - if(!store){ + HASH_FIND(hh, db->msg_store_load, &store_id, sizeof(dbid_t), load); + if(!load){ _mosquitto_free(cmsg); _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error restoring persistent database, message store corrupt."); return 1; } - cmsg->store = store; + cmsg->store = load->store; context = _db_find_or_add_context(db, client_id, 0); if(!context){ @@ -555,8 +557,16 @@ static int _db_msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fptr) char *topic = NULL; int rc = 0; struct mosquitto_msg_store *stored = NULL; + struct mosquitto_msg_store_load *load; char err[256]; + load = _mosquitto_malloc(sizeof(struct mosquitto_msg_store_load)); + if(!load){ + fclose(db_fptr); + _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); + return MOSQ_ERR_NOMEM; + } + read_e(db_fptr, &i64temp, sizeof(dbid_t)); store_id = i64temp; @@ -565,6 +575,7 @@ static int _db_msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fptr) if(slen){ source_id = _mosquitto_malloc(slen+1); if(!source_id){ + _mosquitto_free(load); fclose(db_fptr); _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); return MOSQ_ERR_NOMEM; @@ -583,6 +594,7 @@ static int _db_msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fptr) if(slen){ topic = _mosquitto_malloc(slen+1); if(!topic){ + _mosquitto_free(load); fclose(db_fptr); if(source_id) _mosquitto_free(source_id); _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); @@ -591,6 +603,7 @@ static int _db_msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fptr) read_e(db_fptr, topic, slen); topic[slen] = '\0'; }else{ + _mosquitto_free(load); _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Invalid msg_store chunk when restoring persistent database."); fclose(db_fptr); if(source_id) _mosquitto_free(source_id); @@ -605,6 +618,7 @@ static int _db_msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fptr) if(payloadlen){ payload = _mosquitto_malloc(payloadlen); if(!payload){ + _mosquitto_free(load); fclose(db_fptr); if(source_id) _mosquitto_free(source_id); _mosquitto_free(topic); @@ -615,6 +629,12 @@ static int _db_msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fptr) } rc = mqtt3_db_message_store(db, source_id, source_mid, topic, qos, payloadlen, payload, retain, &stored, store_id); + + load->db_id = stored->db_id; + load->store = stored; + + HASH_ADD(hh, db->msg_store_load, db_id, sizeof(dbid_t), load); + if(source_id) _mosquitto_free(source_id); _mosquitto_free(topic); _mosquitto_free(payload); @@ -633,7 +653,7 @@ error: static int _db_retain_chunk_restore(struct mosquitto_db *db, FILE *db_fptr) { dbid_t i64temp, store_id; - struct mosquitto_msg_store *store; + struct mosquitto_msg_store_load *load; char err[256]; if(fread(&i64temp, sizeof(dbid_t), 1, db_fptr) != 1){ @@ -643,9 +663,9 @@ static int _db_retain_chunk_restore(struct mosquitto_db *db, FILE *db_fptr) return 1; } store_id = i64temp; - HASH_FIND(hh, db->msg_store, &store_id, sizeof(dbid_t), store); - if(store){ - mqtt3_db_messages_queue(db, NULL, store->topic, store->qos, store->retain, store); + HASH_FIND(hh, db->msg_store_load, &store_id, sizeof(dbid_t), load); + if(load){ + mqtt3_db_messages_queue(db, NULL, load->store->topic, load->store->qos, load->store->retain, load->store); }else{ _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Corrupt database whilst restoring a retained message."); return MOSQ_ERR_INVAL; @@ -712,11 +732,14 @@ int mqtt3_db_restore(struct mosquitto_db *db) uint8_t i8temp; ssize_t rlen; char err[256]; + struct mosquitto_msg_store_load *load, *load_tmp; assert(db); assert(db->config); assert(db->config->persistence_filepath); + db->msg_store_load = NULL; + fptr = _mosquitto_fopen(db->config->persistence_filepath, "rb"); if(fptr == NULL) return MOSQ_ERR_SUCCESS; read_e(fptr, &header, 15); @@ -790,6 +813,10 @@ int mqtt3_db_restore(struct mosquitto_db *db) fclose(fptr); + HASH_ITER(hh, db->msg_store_load, load, load_tmp){ + HASH_DELETE(hh, db->msg_store_load, load); + _mosquitto_free(load); + } return rc; error: strerror_r(errno, err, 256); diff --git a/src/subs.c b/src/subs.c index 985072e6..1faf7942 100644 --- a/src/subs.c +++ b/src/subs.c @@ -82,8 +82,7 @@ static int _subs_process(struct mosquitto_db *db, struct _mosquitto_subhier *hie if(hier->retained){ hier->retained->ref_count--; if(hier->retained->ref_count == 0){ - HASH_DELETE(hh, db->msg_store, hier->retained); - db->msg_store_count--; + mosquitto__db_msg_store_remove(db, hier->retained); } #ifdef WITH_SYS_TREE db->retained_count--;