From a7d066074927ec48234616aa78bbf2379760a3cc Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Sat, 16 Mar 2019 09:42:15 +0000 Subject: [PATCH] Separate out persist reading code from restoring code. --- src/CMakeLists.txt | 1 + src/Makefile | 4 + src/persist.h | 87 ++++++++++++++ src/persist_read.c | 255 +++++++++++----------------------------- src/persist_read_v234.c | 203 ++++++++++++++++++++++++++++++++ test/unit/Makefile | 4 + 6 files changed, 368 insertions(+), 186 deletions(-) create mode 100644 src/persist_read_v234.c diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 498c96e9..e0722f6c 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -30,6 +30,7 @@ set (MOSQ_SRCS ../lib/net_mosq.c ../lib/net_mosq.h ../lib/packet_datatypes.c ../lib/packet_mosq.c ../lib/packet_mosq.h + persist_read_v234.c persist_read.c persist_write.c persist.h plugin.c property_broker.c diff --git a/src/Makefile b/src/Makefile index 050e7c0d..82b799fb 100644 --- a/src/Makefile +++ b/src/Makefile @@ -37,6 +37,7 @@ OBJS= mosquitto.o \ property_broker.o \ property_mosq.o \ persist_read.o \ + persist_read_v234.o \ persist_write.o \ plugin.o \ read_handle.o \ @@ -143,6 +144,9 @@ net_mosq.o : ../lib/net_mosq.c ../lib/net_mosq.h persist_read.o : persist_read.c persist.h mosquitto_broker_internal.h ${CROSS_COMPILE}${CC} $(BROKER_CFLAGS) -c $< -o $@ +persist_read_v234.o : persist_read_v234.c persist.h mosquitto_broker_internal.h + ${CROSS_COMPILE}${CC} $(BROKER_CFLAGS) -c $< -o $@ + persist_write.o : persist_write.c persist.h mosquitto_broker_internal.h ${CROSS_COMPILE}${CC} $(BROKER_CFLAGS) -c $< -o $@ diff --git a/src/persist.h b/src/persist.h index d26d3bf7..b15100fa 100644 --- a/src/persist.h +++ b/src/persist.h @@ -32,4 +32,91 @@ extern const unsigned char magic[15]; #define read_e(f, b, c) if(fread(b, 1, c, f) != c){ goto error; } #define write_e(f, b, c) if(fwrite(b, 1, c, f) != c){ goto error; } +/* COMPATIBILITY NOTES + * + * The P_* structs (persist structs) contain all of the data for a particular + * data chunk. They are loaded in multiple parts, so can be rearranged without + * updating the db format version. + * + * The PF_* structs (persist fixed structs) contain the fixed size data for a + * particular data chunk. They are written to disk as is, so they must not be + * rearranged without updating the db format version. When adding new members, + * always use explicit sized datatypes ("uint32_t", not "long"), and check + * whether what is being added can go in an existing hole in the struct. + */ + +struct PF_client{ + uint64_t disconnect_t; + uint16_t last_mid; + uint16_t id_len; +}; +struct P_client{ + struct PF_client F; + char *client_id; +}; + + +struct PF_client_msg{ + dbid_t store_id; + uint16_t mid; + uint16_t id_len; + uint8_t qos; + uint8_t state; + uint8_t retain; + uint8_t direction; + uint8_t dup; +}; +struct P_client_msg{ + struct PF_client_msg F; + char *client_id; +}; + + +struct PF_msg_store{ + dbid_t store_id; + uint32_t payloadlen; + uint16_t mid; + uint16_t source_mid; + uint16_t source_id_len; + uint16_t topic_len; + uint16_t source_port; + uint8_t qos; + uint8_t retain; +}; +struct P_msg_store{ + struct PF_msg_store F; + mosquitto__payload_uhpa payload; + struct mosquitto source; + char *topic; +}; + + +struct PF_sub{ + uint16_t id_len; + uint16_t topic_len; + uint8_t qos; +}; +struct P_sub{ + struct PF_sub F; + char *client_id; + char *topic; +}; + + +struct PF_retain{ + dbid_t store_id; +}; +struct P_retain{ + struct PF_retain F; +}; + + +int persist__read_string(FILE *db_fptr, char **str); + +int persist__client_chunk_read_v234(FILE *db_fptr, struct P_client *chunk, int db_version); +int persist__client_msg_chunk_read_v234(FILE *db_fptr, struct P_client_msg *chunk); +int persist__msg_store_chunk_read_v234(FILE *db_fptr, struct P_msg_store *chunk, int db_version); +int persist__retain_chunk_read_v234(FILE *db_fptr, struct P_retain *chunk); +int persist__sub_chunk_read_v234(FILE *db_fptr, struct P_sub *chunk); + #endif diff --git a/src/persist_read.c b/src/persist_read.c index 6efb2fb8..48e2d275 100644 --- a/src/persist_read.c +++ b/src/persist_read.c @@ -40,7 +40,7 @@ static uint32_t db_version; const unsigned char magic[15] = {0x00, 0xB5, 0x00, 'm','o','s','q','u','i','t','t','o',' ','d','b'}; static int persist__restore_sub(struct mosquitto_db *db, const char *client_id, const char *sub, int qos); -static int persist__read_string(FILE *db_fptr, char **str); +int persist__read_string(FILE *db_fptr, char **str); static struct mosquitto *persist__find_or_add_context(struct mosquitto_db *db, const char *client_id, uint16_t last_mid) { @@ -67,7 +67,8 @@ static struct mosquitto *persist__find_or_add_context(struct mosquitto_db *db, c return context; } -static int persist__read_string(FILE *db_fptr, char **str) + +int persist__read_string(FILE *db_fptr, char **str) { uint16_t i16temp; uint16_t slen; @@ -153,195 +154,98 @@ static int persist__client_msg_restore(struct mosquitto_db *db, const char *clie return MOSQ_ERR_SUCCESS; } + static int persist__client_chunk_restore(struct mosquitto_db *db, FILE *db_fptr) { - uint16_t i16temp, slen, last_mid; - char *client_id = NULL; int rc = 0; struct mosquitto *context; - time_t disconnect_t; + struct P_client chunk; - read_e(db_fptr, &i16temp, sizeof(uint16_t)); - slen = ntohs(i16temp); - if(!slen){ - log__printf(NULL, MOSQ_LOG_ERR, "Error: Corrupt persistent database."); + memset(&chunk, 0, sizeof(struct P_client)); + + rc = persist__client_chunk_read_v234(db_fptr, &chunk, db_version); + if(rc){ fclose(db_fptr); - return 1; - } - client_id = mosquitto__malloc(slen+1); - if(!client_id){ - fclose(db_fptr); - log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); - return MOSQ_ERR_NOMEM; - } - read_e(db_fptr, client_id, slen); - client_id[slen] = '\0'; - - read_e(db_fptr, &i16temp, sizeof(uint16_t)); - last_mid = ntohs(i16temp); - - if(db_version == 2){ - disconnect_t = time(NULL); - }else{ - read_e(db_fptr, &disconnect_t, sizeof(time_t)); + return rc; } - context = persist__find_or_add_context(db, client_id, last_mid); + context = persist__find_or_add_context(db, chunk.client_id, chunk.F.last_mid); if(context){ - context->disconnect_t = disconnect_t; + context->disconnect_t = chunk.F.disconnect_t; }else{ rc = 1; } - mosquitto__free(client_id); + mosquitto__free(chunk.client_id); return rc; -error: - log__printf(NULL, MOSQ_LOG_ERR, "Error: %s.", strerror(errno)); - fclose(db_fptr); - mosquitto__free(client_id); - return 1; } + static int persist__client_msg_chunk_restore(struct mosquitto_db *db, FILE *db_fptr) { - dbid_t i64temp, store_id; - uint16_t i16temp, slen, mid; - uint8_t qos, retain, direction, state, dup; - char *client_id = NULL; + struct P_client_msg chunk; int rc; - char *err; - read_e(db_fptr, &i16temp, sizeof(uint16_t)); - slen = ntohs(i16temp); - if(!slen){ - log__printf(NULL, MOSQ_LOG_ERR, "Error: Corrupt persistent database."); + memset(&chunk, 0, sizeof(struct P_client_msg)); + + rc = persist__client_msg_chunk_read_v234(db_fptr, &chunk); + if(rc){ fclose(db_fptr); - return 1; + return rc; } - client_id = mosquitto__malloc(slen+1); - if(!client_id){ - fclose(db_fptr); - log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); - return MOSQ_ERR_NOMEM; - } - read_e(db_fptr, client_id, slen); - client_id[slen] = '\0'; - read_e(db_fptr, &i64temp, sizeof(dbid_t)); - store_id = i64temp; - - read_e(db_fptr, &i16temp, sizeof(uint16_t)); - mid = ntohs(i16temp); - - read_e(db_fptr, &qos, sizeof(uint8_t)); - read_e(db_fptr, &retain, sizeof(uint8_t)); - read_e(db_fptr, &direction, sizeof(uint8_t)); - read_e(db_fptr, &state, sizeof(uint8_t)); - read_e(db_fptr, &dup, sizeof(uint8_t)); - - rc = persist__client_msg_restore(db, client_id, mid, qos, retain, direction, state, dup, store_id); - mosquitto__free(client_id); + rc = persist__client_msg_restore(db, chunk.client_id, chunk.F.mid, chunk.F.qos, + chunk.F.retain, chunk.F.direction, chunk.F.state, chunk.F.dup, chunk.F.store_id); + mosquitto__free(chunk.client_id); return rc; -error: - err = strerror(errno); - log__printf(NULL, MOSQ_LOG_ERR, "Error: %s.", err); - fclose(db_fptr); - mosquitto__free(client_id); - return 1; } + static int persist__msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fptr) { - dbid_t i64temp, store_id; - uint32_t i32temp, payloadlen = 0; - uint16_t i16temp, source_mid, source_port = 0; - uint8_t qos, retain; - mosquitto__payload_uhpa payload; - struct mosquitto source; - char *topic = NULL; - int rc = 0; + struct P_msg_store chunk; struct mosquitto_msg_store *stored = NULL; struct mosquitto_msg_store_load *load; - char *err; + int rc = 0; int i; - payload.ptr = NULL; + memset(&chunk, 0, sizeof(struct P_msg_store)); + rc = persist__msg_store_chunk_read_v234(db_fptr, &chunk, db_version); + if(rc){ + fclose(db_fptr); + return rc; + } + + if(chunk.F.source_port){ + for(i=0; iconfig->listener_count; i++){ + if(db->config->listeners[i].port == chunk.F.source_port){ + chunk.source.listener = &db->config->listeners[i]; + break; + } + } + } load = mosquitto__malloc(sizeof(struct mosquitto_msg_store_load)); if(!load){ fclose(db_fptr); + mosquitto__free(chunk.source.id); + mosquitto__free(chunk.source.username); + mosquitto__free(chunk.topic); + UHPA_FREE(chunk.payload, chunk.F.payloadlen); log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); return MOSQ_ERR_NOMEM; } - read_e(db_fptr, &i64temp, sizeof(dbid_t)); - store_id = i64temp; + rc = db__message_store(db, &chunk.source, chunk.F.source_mid, + chunk.topic, chunk.F.qos, chunk.F.payloadlen, + &chunk.payload, chunk.F.retain, &stored, 0, NULL, chunk.F.store_id); - memset(&source, 0, sizeof(struct mosquitto)); - - rc = persist__read_string(db_fptr, &source.id); - if(rc){ - mosquitto__free(load); - return rc; - } - if(db_version == 4){ - rc = persist__read_string(db_fptr, &source.username); - if(rc){ - mosquitto__free(load); - return rc; - } - read_e(db_fptr, &i16temp, sizeof(uint16_t)); - source_port = ntohs(i16temp); - if(source_port){ - for(i=0; iconfig->listener_count; i++){ - if(db->config->listeners[i].port == source_port){ - source.listener = &db->config->listeners[i]; - break; - } - } - } - } - - read_e(db_fptr, &i16temp, sizeof(uint16_t)); - source_mid = ntohs(i16temp); - - /* This is the mid - don't need it */ - read_e(db_fptr, &i16temp, sizeof(uint16_t)); - - rc = persist__read_string(db_fptr, &topic); - if(rc){ - mosquitto__free(load); - fclose(db_fptr); - mosquitto__free(source.id); - return rc; - } - - read_e(db_fptr, &qos, sizeof(uint8_t)); - read_e(db_fptr, &retain, sizeof(uint8_t)); - - read_e(db_fptr, &i32temp, sizeof(uint32_t)); - payloadlen = ntohl(i32temp); - - if(payloadlen){ - if(UHPA_ALLOC(payload, payloadlen) == 0){ - mosquitto__free(load); - fclose(db_fptr); - mosquitto__free(source.id); - mosquitto__free(source.username); - mosquitto__free(topic); - log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); - return MOSQ_ERR_NOMEM; - } - read_e(db_fptr, UHPA_ACCESS(payload, payloadlen), payloadlen); - } - - rc = db__message_store(db, &source, source_mid, topic, qos, payloadlen, &payload, retain, &stored, 0, NULL, store_id); - mosquitto__free(source.id); - source.id = NULL; - mosquitto__free(source.username); - source.username = NULL; + mosquitto__free(chunk.source.id); + mosquitto__free(chunk.source.username); + chunk.source.id = NULL; + chunk.source.username = NULL; if(rc == MOSQ_ERR_SUCCESS){ load->db_id = stored->db_id; @@ -354,29 +258,23 @@ static int persist__msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fp fclose(db_fptr); return rc; } -error: - err = strerror(errno); - log__printf(NULL, MOSQ_LOG_ERR, "Error: %s.", err); - fclose(db_fptr); - mosquitto__free(source.id); - mosquitto__free(source.username); - return 1; } static int persist__retain_chunk_restore(struct mosquitto_db *db, FILE *db_fptr) { - dbid_t i64temp, store_id; struct mosquitto_msg_store_load *load; - char *err; + struct P_retain chunk; + int rc; - if(fread(&i64temp, sizeof(dbid_t), 1, db_fptr) != 1){ - err = strerror(errno); - log__printf(NULL, MOSQ_LOG_ERR, "Error: %s.", err); + memset(&chunk, 0, sizeof(struct P_retain)); + + rc = persist__retain_chunk_read_v234(db_fptr, &chunk); + if(rc){ fclose(db_fptr); - return 1; + return rc; } - store_id = i64temp; - HASH_FIND(hh, db->msg_store_load, &store_id, sizeof(dbid_t), load); + + HASH_FIND(hh, db->msg_store_load, &chunk.F.store_id, sizeof(dbid_t), load); if(load){ sub__messages_queue(db, NULL, load->store->topic, load->store->qos, load->store->retain, &load->store); }else{ @@ -388,38 +286,23 @@ static int persist__retain_chunk_restore(struct mosquitto_db *db, FILE *db_fptr) static int persist__sub_chunk_restore(struct mosquitto_db *db, FILE *db_fptr) { - uint8_t qos; - char *client_id; - char *topic; - int rc = 0; - char *err; + struct P_sub chunk; + int rc; - rc = persist__read_string(db_fptr, &client_id); + memset(&chunk, 0, sizeof(struct P_sub)); + + rc = persist__sub_chunk_read_v234(db_fptr, &chunk); if(rc){ fclose(db_fptr); return rc; } - rc = persist__read_string(db_fptr, &topic); - if(rc){ - mosquitto__free(client_id); - fclose(db_fptr); - return rc; - } + rc = persist__restore_sub(db, chunk.client_id, chunk.topic, chunk.F.qos); - read_e(db_fptr, &qos, sizeof(uint8_t)); - if(persist__restore_sub(db, client_id, topic, qos) > 0){ - rc = 1; - } - mosquitto__free(client_id); - mosquitto__free(topic); + mosquitto__free(chunk.client_id); + mosquitto__free(chunk.topic); return rc; -error: - err = strerror(errno); - log__printf(NULL, MOSQ_LOG_ERR, "Error: %s.", err); - fclose(db_fptr); - return 1; } int persist__restore(struct mosquitto_db *db) diff --git a/src/persist_read_v234.c b/src/persist_read_v234.c new file mode 100644 index 00000000..8292b3ea --- /dev/null +++ b/src/persist_read_v234.c @@ -0,0 +1,203 @@ +/* +Copyright (c) 2010-2018 Roger Light + +All rights reserved. This program and the accompanying materials +are made available under the terms of the Eclipse Public License v1.0 +and Eclipse Distribution License v1.0 which accompany this distribution. + +The Eclipse Public License is available at + http://www.eclipse.org/legal/epl-v10.html +and the Eclipse Distribution License is available at + http://www.eclipse.org/org/documents/edl-v10.php. + +Contributors: + Roger Light - initial implementation and documentation. +*/ + +#include "config.h" + +#ifdef WITH_PERSISTENCE + +#ifndef WIN32 +#include +#endif +#include +#include +#include +#include +#include +#include +#include + +#include "mosquitto_broker_internal.h" +#include "memory_mosq.h" +#include "persist.h" +#include "time_mosq.h" +#include "util_mosq.h" + + +int persist__client_chunk_read_v234(FILE *db_fptr, struct P_client *chunk, int db_version) +{ + uint16_t i16temp; + int rc; + + rc = persist__read_string(db_fptr, &chunk->client_id); + if(rc){ + return rc; + } + + read_e(db_fptr, &i16temp, sizeof(uint16_t)); + chunk->F.last_mid = ntohs(i16temp); + + if(db_version == 2){ + chunk->F.disconnect_t = time(NULL); + }else{ + read_e(db_fptr, &chunk->F.disconnect_t, sizeof(time_t)); + } + + return MOSQ_ERR_SUCCESS; +error: + log__printf(NULL, MOSQ_LOG_ERR, "Error: %s.", strerror(errno)); + mosquitto__free(chunk->client_id); + return 1; +} + + +int persist__client_msg_chunk_read_v234(FILE *db_fptr, struct P_client_msg *chunk) +{ + uint16_t i16temp; + int rc; + char *err; + + rc = persist__read_string(db_fptr, &chunk->client_id); + if(rc){ + return rc; + } + + read_e(db_fptr, &chunk->F.store_id, sizeof(dbid_t)); + + read_e(db_fptr, &i16temp, sizeof(uint16_t)); + chunk->F.mid = ntohs(i16temp); + + read_e(db_fptr, &chunk->F.qos, sizeof(uint8_t)); + read_e(db_fptr, &chunk->F.retain, sizeof(uint8_t)); + read_e(db_fptr, &chunk->F.direction, sizeof(uint8_t)); + read_e(db_fptr, &chunk->F.state, sizeof(uint8_t)); + read_e(db_fptr, &chunk->F.dup, sizeof(uint8_t)); + + return MOSQ_ERR_SUCCESS; +error: + err = strerror(errno); + log__printf(NULL, MOSQ_LOG_ERR, "Error: %s.", err); + mosquitto__free(chunk->client_id); + return 1; +} + + +int persist__msg_store_chunk_read_v234(FILE *db_fptr, struct P_msg_store *chunk, int db_version) +{ + uint32_t i32temp; + uint16_t i16temp; + int rc = 0; + char *err; + + read_e(db_fptr, &chunk->F.store_id, sizeof(dbid_t)); + + rc = persist__read_string(db_fptr, &chunk->source.id); + if(rc){ + return rc; + } + if(db_version == 4){ + rc = persist__read_string(db_fptr, &chunk->source.username); + if(rc){ + mosquitto__free(chunk->source.id); + return rc; + } + read_e(db_fptr, &i16temp, sizeof(uint16_t)); + chunk->F.source_port = ntohs(i16temp); + } + + read_e(db_fptr, &i16temp, sizeof(uint16_t)); + chunk->F.source_mid = ntohs(i16temp); + + /* This is the mid - don't need it */ + read_e(db_fptr, &i16temp, sizeof(uint16_t)); + + rc = persist__read_string(db_fptr, &chunk->topic); + if(rc){ + mosquitto__free(chunk->source.id); + mosquitto__free(chunk->source.username); + return rc; + } + + read_e(db_fptr, &chunk->F.qos, sizeof(uint8_t)); + read_e(db_fptr, &chunk->F.retain, sizeof(uint8_t)); + + read_e(db_fptr, &i32temp, sizeof(uint32_t)); + chunk->F.payloadlen = ntohl(i32temp); + + if(chunk->F.payloadlen){ + if(UHPA_ALLOC(chunk->payload, chunk->F.payloadlen) == 0){ + mosquitto__free(chunk->source.id); + mosquitto__free(chunk->source.username); + mosquitto__free(chunk->topic); + log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); + return MOSQ_ERR_NOMEM; + } + read_e(db_fptr, UHPA_ACCESS(chunk->payload, chunk->F.payloadlen), chunk->F.payloadlen); + } + + return MOSQ_ERR_SUCCESS; +error: + err = strerror(errno); + log__printf(NULL, MOSQ_LOG_ERR, "Error: %s.", err); + mosquitto__free(chunk->source.id); + mosquitto__free(chunk->source.username); + return 1; +} + + +int persist__retain_chunk_read_v234(FILE *db_fptr, struct P_retain *chunk) +{ + dbid_t i64temp; + char *err; + + if(fread(&i64temp, sizeof(dbid_t), 1, db_fptr) != 1){ + err = strerror(errno); + log__printf(NULL, MOSQ_LOG_ERR, "Error: %s.", err); + return 1; + } + chunk->F.store_id = i64temp; + + return MOSQ_ERR_SUCCESS; +} + + +int persist__sub_chunk_read_v234(FILE *db_fptr, struct P_sub *chunk) +{ + int rc; + char *err; + + rc = persist__read_string(db_fptr, &chunk->client_id); + if(rc){ + return rc; + } + + rc = persist__read_string(db_fptr, &chunk->topic); + if(rc){ + mosquitto__free(chunk->client_id); + return rc; + } + + read_e(db_fptr, &chunk->F.qos, sizeof(uint8_t)); + + return MOSQ_ERR_SUCCESS; +error: + err = strerror(errno); + log__printf(NULL, MOSQ_LOG_ERR, "Error: %s.", err); + mosquitto__free(chunk->client_id); + mosquitto__free(chunk->topic); + return 1; +} + +#endif diff --git a/test/unit/Makefile b/test/unit/Makefile index b6640b39..8b5e7495 100644 --- a/test/unit/Makefile +++ b/test/unit/Makefile @@ -29,6 +29,7 @@ PERSIST_READ_TEST_OBJS = \ PERSIST_READ_OBJS = \ memory_mosq.o \ persist_read.o \ + persist_read_v234.o \ util_mosq.o all : test @@ -48,6 +49,9 @@ packet_datatypes.o : ../../lib/packet_datatypes.c persist_read.o : ../../src/persist_read.c $(CROSS_COMPILE)$(CC) $(CFLAGS) -DWITH_BROKER -DWITH_PERSISTENCE -c -o $@ $^ +persist_read_v234.o : ../../src/persist_read_v234.c + $(CROSS_COMPILE)$(CC) $(CFLAGS) -DWITH_BROKER -DWITH_PERSISTENCE -c -o $@ $^ + property_mosq.o : ../../lib/property_mosq.c $(CROSS_COMPILE)$(CC) $(CFLAGS) -c -o $@ $^