More memory efficient message store than with hashes.

Also frees messages from the message store after removing...
This commit is contained in:
Roger A. Light 2014-11-18 19:12:08 +00:00
parent d30d711c3b
commit 1e64bb171a
4 changed files with 109 additions and 18 deletions

View File

@ -121,10 +121,66 @@ static void subhier_clean(struct _mosquitto_subhier *subhier)
int mqtt3_db_close(struct mosquitto_db *db)
{
subhier_clean(db->subs.children);
mosquitto__db_msg_store_clean(db);
return MOSQ_ERR_SUCCESS;
}
void mosquitto__db_msg_store_add(struct mosquitto_db *db, struct mosquitto_msg_store *store)
{
store->next = db->msg_store;
store->prev = NULL;
if(db->msg_store){
db->msg_store->prev = store;
}
db->msg_store = store;
}
void mosquitto__db_msg_store_remove(struct mosquitto_db *db, struct mosquitto_msg_store *store)
{
int i;
if(store->prev){
store->prev->next = store->next;
if(store->next){
store->next->prev = store->prev;
}
}else{
db->msg_store = store->next;
if(store->next){
store->next->prev = NULL;
}
}
db->msg_store_count--;
if(store->source_id) _mosquitto_free(store->source_id);
if(store->dest_ids){
for(i=0; i<store->dest_id_count; i++){
if(store->dest_ids[i]) _mosquitto_free(store->dest_ids[i]);
}
_mosquitto_free(store->dest_ids);
}
if(store->topic) _mosquitto_free(store->topic);
if(store->payload) _mosquitto_free(store->payload);
_mosquitto_free(store);
}
void mosquitto__db_msg_store_clean(struct mosquitto_db *db)
{
struct mosquitto_msg_store *store, *next;;
store = db->msg_store;
while(store){
next = store->next;
mosquitto__db_msg_store_remove(db, store);
store = next;
}
}
static void _message_remove(struct mosquitto_db *db, struct mosquitto *context, struct mosquitto_client_msg **msg, struct mosquitto_client_msg *last)
{
if(!context || !msg || !(*msg)){
@ -133,8 +189,7 @@ static void _message_remove(struct mosquitto_db *db, struct mosquitto *context,
(*msg)->store->ref_count--;
if((*msg)->store->ref_count == 0){
HASH_DELETE(hh, db->msg_store, (*msg)->store);
db->msg_store_count--;
mosquitto__db_msg_store_remove(db, (*msg)->store);
}
if(last){
last->next = (*msg)->next;
@ -399,8 +454,7 @@ int mqtt3_db_messages_delete(struct mosquitto_db *db, struct mosquitto *context)
while(tail){
tail->store->ref_count--;
if(tail->store->ref_count == 0){
HASH_DELETE(hh, db->msg_store, tail->store);
db->msg_store_count--;
mosquitto__db_msg_store_remove(db, tail->store);
}
next = tail->next;
_mosquitto_free(tail);
@ -502,7 +556,7 @@ int mqtt3_db_message_store(struct mosquitto_db *db, const char *source, uint16_t
temp->db_id = store_id;
}
HASH_ADD(hh, db->msg_store, db_id, sizeof(dbid_t), temp);
mosquitto__db_msg_store_add(db, temp);
return MOSQ_ERR_SUCCESS;
}

View File

@ -141,9 +141,16 @@ struct _mosquitto_subhier {
struct mosquitto_msg_store *retained;
};
struct mosquitto_msg_store{
struct mosquitto_msg_store_load{
UT_hash_handle hh;
dbid_t db_id;
struct mosquitto_msg_store *store;
};
struct mosquitto_msg_store{
struct mosquitto_msg_store *next;
struct mosquitto_msg_store *prev;
dbid_t db_id;
char *source_id;
char **dest_ids;
int dest_id_count;
@ -220,6 +227,7 @@ struct mosquitto_db{
struct mosquitto *contexts_bridge;
struct _clientid_index_hash *clientid_index_hash;
struct mosquitto_msg_store *msg_store;
struct mosquitto_msg_store_load *msg_store_load;
int msg_store_count;
struct mqtt3_config *config;
int persistence_changes;
@ -380,6 +388,9 @@ int mqtt3_db_messages_easy_queue(struct mosquitto_db *db, struct mosquitto *cont
int mqtt3_db_messages_queue(struct mosquitto_db *db, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored);
int mqtt3_db_message_store(struct mosquitto_db *db, const char *source, uint16_t source_mid, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain, struct mosquitto_msg_store **stored, dbid_t store_id);
int mqtt3_db_message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_msg_store **stored);
void mosquitto__db_msg_store_add(struct mosquitto_db *db, struct mosquitto_msg_store *store);
void mosquitto__db_msg_store_remove(struct mosquitto_db *db, struct mosquitto_msg_store *store);
void mosquitto__db_msg_store_clean(struct mosquitto_db *db);
/* Check all messages waiting on a client reply and resend if timeout has been exceeded. */
int mqtt3_db_message_timeout_check(struct mosquitto_db *db, unsigned int timeout);
int mqtt3_db_message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *context);

View File

@ -131,13 +131,14 @@ static int mqtt3_db_message_store_write(struct mosquitto_db *db, FILE *db_fptr)
uint32_t i32temp;
uint16_t i16temp, slen;
uint8_t i8temp;
struct mosquitto_msg_store *stored, *stored_tmp;
struct mosquitto_msg_store *stored;
bool force_no_retain;
assert(db);
assert(db_fptr);
HASH_ITER(hh, db->msg_store, stored, stored_tmp){
stored = db->msg_store;
while(stored){
if(!strncmp(stored->topic, "$SYS", 4)){
/* Don't save $SYS messages as retained otherwise they can give
* misleading information when reloaded. They should still be saved
@ -192,6 +193,7 @@ static int mqtt3_db_message_store_write(struct mosquitto_db *db, FILE *db_fptr)
if(stored->payloadlen){
write_e(db_fptr, stored->payload, (unsigned int)stored->payloadlen);
}
stored = stored->next;
}
return MOSQ_ERR_SUCCESS;
@ -403,7 +405,7 @@ error:
static int _db_client_msg_restore(struct mosquitto_db *db, const char *client_id, uint16_t mid, uint8_t qos, uint8_t retain, uint8_t direction, uint8_t state, uint8_t dup, uint64_t store_id)
{
struct mosquitto_client_msg *cmsg;
struct mosquitto_msg_store *store;
struct mosquitto_msg_store_load *load;
struct mosquitto *context;
cmsg = _mosquitto_malloc(sizeof(struct mosquitto_client_msg));
@ -422,13 +424,13 @@ static int _db_client_msg_restore(struct mosquitto_db *db, const char *client_id
cmsg->state = state;
cmsg->dup = dup;
HASH_FIND(hh, db->msg_store, &store_id, sizeof(dbid_t), store);
if(!store){
HASH_FIND(hh, db->msg_store_load, &store_id, sizeof(dbid_t), load);
if(!load){
_mosquitto_free(cmsg);
_mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error restoring persistent database, message store corrupt.");
return 1;
}
cmsg->store = store;
cmsg->store = load->store;
context = _db_find_or_add_context(db, client_id, 0);
if(!context){
@ -555,8 +557,16 @@ static int _db_msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fptr)
char *topic = NULL;
int rc = 0;
struct mosquitto_msg_store *stored = NULL;
struct mosquitto_msg_store_load *load;
char err[256];
load = _mosquitto_malloc(sizeof(struct mosquitto_msg_store_load));
if(!load){
fclose(db_fptr);
_mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
read_e(db_fptr, &i64temp, sizeof(dbid_t));
store_id = i64temp;
@ -565,6 +575,7 @@ static int _db_msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fptr)
if(slen){
source_id = _mosquitto_malloc(slen+1);
if(!source_id){
_mosquitto_free(load);
fclose(db_fptr);
_mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
@ -583,6 +594,7 @@ static int _db_msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fptr)
if(slen){
topic = _mosquitto_malloc(slen+1);
if(!topic){
_mosquitto_free(load);
fclose(db_fptr);
if(source_id) _mosquitto_free(source_id);
_mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
@ -591,6 +603,7 @@ static int _db_msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fptr)
read_e(db_fptr, topic, slen);
topic[slen] = '\0';
}else{
_mosquitto_free(load);
_mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Invalid msg_store chunk when restoring persistent database.");
fclose(db_fptr);
if(source_id) _mosquitto_free(source_id);
@ -605,6 +618,7 @@ static int _db_msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fptr)
if(payloadlen){
payload = _mosquitto_malloc(payloadlen);
if(!payload){
_mosquitto_free(load);
fclose(db_fptr);
if(source_id) _mosquitto_free(source_id);
_mosquitto_free(topic);
@ -615,6 +629,12 @@ static int _db_msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fptr)
}
rc = mqtt3_db_message_store(db, source_id, source_mid, topic, qos, payloadlen, payload, retain, &stored, store_id);
load->db_id = stored->db_id;
load->store = stored;
HASH_ADD(hh, db->msg_store_load, db_id, sizeof(dbid_t), load);
if(source_id) _mosquitto_free(source_id);
_mosquitto_free(topic);
_mosquitto_free(payload);
@ -633,7 +653,7 @@ error:
static int _db_retain_chunk_restore(struct mosquitto_db *db, FILE *db_fptr)
{
dbid_t i64temp, store_id;
struct mosquitto_msg_store *store;
struct mosquitto_msg_store_load *load;
char err[256];
if(fread(&i64temp, sizeof(dbid_t), 1, db_fptr) != 1){
@ -643,9 +663,9 @@ static int _db_retain_chunk_restore(struct mosquitto_db *db, FILE *db_fptr)
return 1;
}
store_id = i64temp;
HASH_FIND(hh, db->msg_store, &store_id, sizeof(dbid_t), store);
if(store){
mqtt3_db_messages_queue(db, NULL, store->topic, store->qos, store->retain, store);
HASH_FIND(hh, db->msg_store_load, &store_id, sizeof(dbid_t), load);
if(load){
mqtt3_db_messages_queue(db, NULL, load->store->topic, load->store->qos, load->store->retain, load->store);
}else{
_mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Corrupt database whilst restoring a retained message.");
return MOSQ_ERR_INVAL;
@ -712,11 +732,14 @@ int mqtt3_db_restore(struct mosquitto_db *db)
uint8_t i8temp;
ssize_t rlen;
char err[256];
struct mosquitto_msg_store_load *load, *load_tmp;
assert(db);
assert(db->config);
assert(db->config->persistence_filepath);
db->msg_store_load = NULL;
fptr = _mosquitto_fopen(db->config->persistence_filepath, "rb");
if(fptr == NULL) return MOSQ_ERR_SUCCESS;
read_e(fptr, &header, 15);
@ -790,6 +813,10 @@ int mqtt3_db_restore(struct mosquitto_db *db)
fclose(fptr);
HASH_ITER(hh, db->msg_store_load, load, load_tmp){
HASH_DELETE(hh, db->msg_store_load, load);
_mosquitto_free(load);
}
return rc;
error:
strerror_r(errno, err, 256);

View File

@ -82,8 +82,7 @@ static int _subs_process(struct mosquitto_db *db, struct _mosquitto_subhier *hie
if(hier->retained){
hier->retained->ref_count--;
if(hier->retained->ref_count == 0){
HASH_DELETE(hh, db->msg_store, hier->retained);
db->msg_store_count--;
mosquitto__db_msg_store_remove(db, hier->retained);
}
#ifdef WITH_SYS_TREE
db->retained_count--;