Separate out persist reading code from restoring code.

This commit is contained in:
Roger A. Light 2019-03-16 09:42:15 +00:00
parent 110f4aada6
commit a7d0660749
6 changed files with 368 additions and 186 deletions

View File

@ -30,6 +30,7 @@ set (MOSQ_SRCS
../lib/net_mosq.c ../lib/net_mosq.h
../lib/packet_datatypes.c
../lib/packet_mosq.c ../lib/packet_mosq.h
persist_read_v234.c
persist_read.c persist_write.c persist.h
plugin.c
property_broker.c

View File

@ -37,6 +37,7 @@ OBJS= mosquitto.o \
property_broker.o \
property_mosq.o \
persist_read.o \
persist_read_v234.o \
persist_write.o \
plugin.o \
read_handle.o \
@ -143,6 +144,9 @@ net_mosq.o : ../lib/net_mosq.c ../lib/net_mosq.h
persist_read.o : persist_read.c persist.h mosquitto_broker_internal.h
${CROSS_COMPILE}${CC} $(BROKER_CFLAGS) -c $< -o $@
persist_read_v234.o : persist_read_v234.c persist.h mosquitto_broker_internal.h
${CROSS_COMPILE}${CC} $(BROKER_CFLAGS) -c $< -o $@
persist_write.o : persist_write.c persist.h mosquitto_broker_internal.h
${CROSS_COMPILE}${CC} $(BROKER_CFLAGS) -c $< -o $@

View File

@ -32,4 +32,91 @@ extern const unsigned char magic[15];
#define read_e(f, b, c) if(fread(b, 1, c, f) != c){ goto error; }
#define write_e(f, b, c) if(fwrite(b, 1, c, f) != c){ goto error; }
/* COMPATIBILITY NOTES
*
* The P_* structs (persist structs) contain all of the data for a particular
* data chunk. They are loaded in multiple parts, so can be rearranged without
* updating the db format version.
*
* The PF_* structs (persist fixed structs) contain the fixed size data for a
* particular data chunk. They are written to disk as is, so they must not be
* rearranged without updating the db format version. When adding new members,
* always use explicit sized datatypes ("uint32_t", not "long"), and check
* whether what is being added can go in an existing hole in the struct.
*/
struct PF_client{
uint64_t disconnect_t;
uint16_t last_mid;
uint16_t id_len;
};
struct P_client{
struct PF_client F;
char *client_id;
};
struct PF_client_msg{
dbid_t store_id;
uint16_t mid;
uint16_t id_len;
uint8_t qos;
uint8_t state;
uint8_t retain;
uint8_t direction;
uint8_t dup;
};
struct P_client_msg{
struct PF_client_msg F;
char *client_id;
};
struct PF_msg_store{
dbid_t store_id;
uint32_t payloadlen;
uint16_t mid;
uint16_t source_mid;
uint16_t source_id_len;
uint16_t topic_len;
uint16_t source_port;
uint8_t qos;
uint8_t retain;
};
struct P_msg_store{
struct PF_msg_store F;
mosquitto__payload_uhpa payload;
struct mosquitto source;
char *topic;
};
struct PF_sub{
uint16_t id_len;
uint16_t topic_len;
uint8_t qos;
};
struct P_sub{
struct PF_sub F;
char *client_id;
char *topic;
};
struct PF_retain{
dbid_t store_id;
};
struct P_retain{
struct PF_retain F;
};
int persist__read_string(FILE *db_fptr, char **str);
int persist__client_chunk_read_v234(FILE *db_fptr, struct P_client *chunk, int db_version);
int persist__client_msg_chunk_read_v234(FILE *db_fptr, struct P_client_msg *chunk);
int persist__msg_store_chunk_read_v234(FILE *db_fptr, struct P_msg_store *chunk, int db_version);
int persist__retain_chunk_read_v234(FILE *db_fptr, struct P_retain *chunk);
int persist__sub_chunk_read_v234(FILE *db_fptr, struct P_sub *chunk);
#endif

View File

@ -40,7 +40,7 @@ static uint32_t db_version;
const unsigned char magic[15] = {0x00, 0xB5, 0x00, 'm','o','s','q','u','i','t','t','o',' ','d','b'};
static int persist__restore_sub(struct mosquitto_db *db, const char *client_id, const char *sub, int qos);
static int persist__read_string(FILE *db_fptr, char **str);
int persist__read_string(FILE *db_fptr, char **str);
static struct mosquitto *persist__find_or_add_context(struct mosquitto_db *db, const char *client_id, uint16_t last_mid)
{
@ -67,7 +67,8 @@ static struct mosquitto *persist__find_or_add_context(struct mosquitto_db *db, c
return context;
}
static int persist__read_string(FILE *db_fptr, char **str)
int persist__read_string(FILE *db_fptr, char **str)
{
uint16_t i16temp;
uint16_t slen;
@ -153,195 +154,98 @@ static int persist__client_msg_restore(struct mosquitto_db *db, const char *clie
return MOSQ_ERR_SUCCESS;
}
static int persist__client_chunk_restore(struct mosquitto_db *db, FILE *db_fptr)
{
uint16_t i16temp, slen, last_mid;
char *client_id = NULL;
int rc = 0;
struct mosquitto *context;
time_t disconnect_t;
struct P_client chunk;
read_e(db_fptr, &i16temp, sizeof(uint16_t));
slen = ntohs(i16temp);
if(!slen){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Corrupt persistent database.");
memset(&chunk, 0, sizeof(struct P_client));
rc = persist__client_chunk_read_v234(db_fptr, &chunk, db_version);
if(rc){
fclose(db_fptr);
return 1;
}
client_id = mosquitto__malloc(slen+1);
if(!client_id){
fclose(db_fptr);
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
read_e(db_fptr, client_id, slen);
client_id[slen] = '\0';
read_e(db_fptr, &i16temp, sizeof(uint16_t));
last_mid = ntohs(i16temp);
if(db_version == 2){
disconnect_t = time(NULL);
}else{
read_e(db_fptr, &disconnect_t, sizeof(time_t));
return rc;
}
context = persist__find_or_add_context(db, client_id, last_mid);
context = persist__find_or_add_context(db, chunk.client_id, chunk.F.last_mid);
if(context){
context->disconnect_t = disconnect_t;
context->disconnect_t = chunk.F.disconnect_t;
}else{
rc = 1;
}
mosquitto__free(client_id);
mosquitto__free(chunk.client_id);
return rc;
error:
log__printf(NULL, MOSQ_LOG_ERR, "Error: %s.", strerror(errno));
fclose(db_fptr);
mosquitto__free(client_id);
return 1;
}
static int persist__client_msg_chunk_restore(struct mosquitto_db *db, FILE *db_fptr)
{
dbid_t i64temp, store_id;
uint16_t i16temp, slen, mid;
uint8_t qos, retain, direction, state, dup;
char *client_id = NULL;
struct P_client_msg chunk;
int rc;
char *err;
read_e(db_fptr, &i16temp, sizeof(uint16_t));
slen = ntohs(i16temp);
if(!slen){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Corrupt persistent database.");
memset(&chunk, 0, sizeof(struct P_client_msg));
rc = persist__client_msg_chunk_read_v234(db_fptr, &chunk);
if(rc){
fclose(db_fptr);
return 1;
return rc;
}
client_id = mosquitto__malloc(slen+1);
if(!client_id){
fclose(db_fptr);
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
read_e(db_fptr, client_id, slen);
client_id[slen] = '\0';
read_e(db_fptr, &i64temp, sizeof(dbid_t));
store_id = i64temp;
read_e(db_fptr, &i16temp, sizeof(uint16_t));
mid = ntohs(i16temp);
read_e(db_fptr, &qos, sizeof(uint8_t));
read_e(db_fptr, &retain, sizeof(uint8_t));
read_e(db_fptr, &direction, sizeof(uint8_t));
read_e(db_fptr, &state, sizeof(uint8_t));
read_e(db_fptr, &dup, sizeof(uint8_t));
rc = persist__client_msg_restore(db, client_id, mid, qos, retain, direction, state, dup, store_id);
mosquitto__free(client_id);
rc = persist__client_msg_restore(db, chunk.client_id, chunk.F.mid, chunk.F.qos,
chunk.F.retain, chunk.F.direction, chunk.F.state, chunk.F.dup, chunk.F.store_id);
mosquitto__free(chunk.client_id);
return rc;
error:
err = strerror(errno);
log__printf(NULL, MOSQ_LOG_ERR, "Error: %s.", err);
fclose(db_fptr);
mosquitto__free(client_id);
return 1;
}
static int persist__msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fptr)
{
dbid_t i64temp, store_id;
uint32_t i32temp, payloadlen = 0;
uint16_t i16temp, source_mid, source_port = 0;
uint8_t qos, retain;
mosquitto__payload_uhpa payload;
struct mosquitto source;
char *topic = NULL;
int rc = 0;
struct P_msg_store chunk;
struct mosquitto_msg_store *stored = NULL;
struct mosquitto_msg_store_load *load;
char *err;
int rc = 0;
int i;
payload.ptr = NULL;
memset(&chunk, 0, sizeof(struct P_msg_store));
rc = persist__msg_store_chunk_read_v234(db_fptr, &chunk, db_version);
if(rc){
fclose(db_fptr);
return rc;
}
if(chunk.F.source_port){
for(i=0; i<db->config->listener_count; i++){
if(db->config->listeners[i].port == chunk.F.source_port){
chunk.source.listener = &db->config->listeners[i];
break;
}
}
}
load = mosquitto__malloc(sizeof(struct mosquitto_msg_store_load));
if(!load){
fclose(db_fptr);
mosquitto__free(chunk.source.id);
mosquitto__free(chunk.source.username);
mosquitto__free(chunk.topic);
UHPA_FREE(chunk.payload, chunk.F.payloadlen);
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
read_e(db_fptr, &i64temp, sizeof(dbid_t));
store_id = i64temp;
rc = db__message_store(db, &chunk.source, chunk.F.source_mid,
chunk.topic, chunk.F.qos, chunk.F.payloadlen,
&chunk.payload, chunk.F.retain, &stored, 0, NULL, chunk.F.store_id);
memset(&source, 0, sizeof(struct mosquitto));
rc = persist__read_string(db_fptr, &source.id);
if(rc){
mosquitto__free(load);
return rc;
}
if(db_version == 4){
rc = persist__read_string(db_fptr, &source.username);
if(rc){
mosquitto__free(load);
return rc;
}
read_e(db_fptr, &i16temp, sizeof(uint16_t));
source_port = ntohs(i16temp);
if(source_port){
for(i=0; i<db->config->listener_count; i++){
if(db->config->listeners[i].port == source_port){
source.listener = &db->config->listeners[i];
break;
}
}
}
}
read_e(db_fptr, &i16temp, sizeof(uint16_t));
source_mid = ntohs(i16temp);
/* This is the mid - don't need it */
read_e(db_fptr, &i16temp, sizeof(uint16_t));
rc = persist__read_string(db_fptr, &topic);
if(rc){
mosquitto__free(load);
fclose(db_fptr);
mosquitto__free(source.id);
return rc;
}
read_e(db_fptr, &qos, sizeof(uint8_t));
read_e(db_fptr, &retain, sizeof(uint8_t));
read_e(db_fptr, &i32temp, sizeof(uint32_t));
payloadlen = ntohl(i32temp);
if(payloadlen){
if(UHPA_ALLOC(payload, payloadlen) == 0){
mosquitto__free(load);
fclose(db_fptr);
mosquitto__free(source.id);
mosquitto__free(source.username);
mosquitto__free(topic);
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
read_e(db_fptr, UHPA_ACCESS(payload, payloadlen), payloadlen);
}
rc = db__message_store(db, &source, source_mid, topic, qos, payloadlen, &payload, retain, &stored, 0, NULL, store_id);
mosquitto__free(source.id);
source.id = NULL;
mosquitto__free(source.username);
source.username = NULL;
mosquitto__free(chunk.source.id);
mosquitto__free(chunk.source.username);
chunk.source.id = NULL;
chunk.source.username = NULL;
if(rc == MOSQ_ERR_SUCCESS){
load->db_id = stored->db_id;
@ -354,29 +258,23 @@ static int persist__msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fp
fclose(db_fptr);
return rc;
}
error:
err = strerror(errno);
log__printf(NULL, MOSQ_LOG_ERR, "Error: %s.", err);
fclose(db_fptr);
mosquitto__free(source.id);
mosquitto__free(source.username);
return 1;
}
static int persist__retain_chunk_restore(struct mosquitto_db *db, FILE *db_fptr)
{
dbid_t i64temp, store_id;
struct mosquitto_msg_store_load *load;
char *err;
struct P_retain chunk;
int rc;
if(fread(&i64temp, sizeof(dbid_t), 1, db_fptr) != 1){
err = strerror(errno);
log__printf(NULL, MOSQ_LOG_ERR, "Error: %s.", err);
memset(&chunk, 0, sizeof(struct P_retain));
rc = persist__retain_chunk_read_v234(db_fptr, &chunk);
if(rc){
fclose(db_fptr);
return 1;
return rc;
}
store_id = i64temp;
HASH_FIND(hh, db->msg_store_load, &store_id, sizeof(dbid_t), load);
HASH_FIND(hh, db->msg_store_load, &chunk.F.store_id, sizeof(dbid_t), load);
if(load){
sub__messages_queue(db, NULL, load->store->topic, load->store->qos, load->store->retain, &load->store);
}else{
@ -388,38 +286,23 @@ static int persist__retain_chunk_restore(struct mosquitto_db *db, FILE *db_fptr)
static int persist__sub_chunk_restore(struct mosquitto_db *db, FILE *db_fptr)
{
uint8_t qos;
char *client_id;
char *topic;
int rc = 0;
char *err;
struct P_sub chunk;
int rc;
rc = persist__read_string(db_fptr, &client_id);
memset(&chunk, 0, sizeof(struct P_sub));
rc = persist__sub_chunk_read_v234(db_fptr, &chunk);
if(rc){
fclose(db_fptr);
return rc;
}
rc = persist__read_string(db_fptr, &topic);
if(rc){
mosquitto__free(client_id);
fclose(db_fptr);
return rc;
}
rc = persist__restore_sub(db, chunk.client_id, chunk.topic, chunk.F.qos);
read_e(db_fptr, &qos, sizeof(uint8_t));
if(persist__restore_sub(db, client_id, topic, qos) > 0){
rc = 1;
}
mosquitto__free(client_id);
mosquitto__free(topic);
mosquitto__free(chunk.client_id);
mosquitto__free(chunk.topic);
return rc;
error:
err = strerror(errno);
log__printf(NULL, MOSQ_LOG_ERR, "Error: %s.", err);
fclose(db_fptr);
return 1;
}
int persist__restore(struct mosquitto_db *db)

203
src/persist_read_v234.c Normal file
View File

@ -0,0 +1,203 @@
/*
Copyright (c) 2010-2018 Roger Light <roger@atchoo.org>
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License v1.0
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
http://www.eclipse.org/legal/epl-v10.html
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
Contributors:
Roger Light - initial implementation and documentation.
*/
#include "config.h"
#ifdef WITH_PERSISTENCE
#ifndef WIN32
#include <arpa/inet.h>
#endif
#include <assert.h>
#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
#include <string.h>
#include <sys/stat.h>
#include <time.h>
#include "mosquitto_broker_internal.h"
#include "memory_mosq.h"
#include "persist.h"
#include "time_mosq.h"
#include "util_mosq.h"
int persist__client_chunk_read_v234(FILE *db_fptr, struct P_client *chunk, int db_version)
{
uint16_t i16temp;
int rc;
rc = persist__read_string(db_fptr, &chunk->client_id);
if(rc){
return rc;
}
read_e(db_fptr, &i16temp, sizeof(uint16_t));
chunk->F.last_mid = ntohs(i16temp);
if(db_version == 2){
chunk->F.disconnect_t = time(NULL);
}else{
read_e(db_fptr, &chunk->F.disconnect_t, sizeof(time_t));
}
return MOSQ_ERR_SUCCESS;
error:
log__printf(NULL, MOSQ_LOG_ERR, "Error: %s.", strerror(errno));
mosquitto__free(chunk->client_id);
return 1;
}
int persist__client_msg_chunk_read_v234(FILE *db_fptr, struct P_client_msg *chunk)
{
uint16_t i16temp;
int rc;
char *err;
rc = persist__read_string(db_fptr, &chunk->client_id);
if(rc){
return rc;
}
read_e(db_fptr, &chunk->F.store_id, sizeof(dbid_t));
read_e(db_fptr, &i16temp, sizeof(uint16_t));
chunk->F.mid = ntohs(i16temp);
read_e(db_fptr, &chunk->F.qos, sizeof(uint8_t));
read_e(db_fptr, &chunk->F.retain, sizeof(uint8_t));
read_e(db_fptr, &chunk->F.direction, sizeof(uint8_t));
read_e(db_fptr, &chunk->F.state, sizeof(uint8_t));
read_e(db_fptr, &chunk->F.dup, sizeof(uint8_t));
return MOSQ_ERR_SUCCESS;
error:
err = strerror(errno);
log__printf(NULL, MOSQ_LOG_ERR, "Error: %s.", err);
mosquitto__free(chunk->client_id);
return 1;
}
int persist__msg_store_chunk_read_v234(FILE *db_fptr, struct P_msg_store *chunk, int db_version)
{
uint32_t i32temp;
uint16_t i16temp;
int rc = 0;
char *err;
read_e(db_fptr, &chunk->F.store_id, sizeof(dbid_t));
rc = persist__read_string(db_fptr, &chunk->source.id);
if(rc){
return rc;
}
if(db_version == 4){
rc = persist__read_string(db_fptr, &chunk->source.username);
if(rc){
mosquitto__free(chunk->source.id);
return rc;
}
read_e(db_fptr, &i16temp, sizeof(uint16_t));
chunk->F.source_port = ntohs(i16temp);
}
read_e(db_fptr, &i16temp, sizeof(uint16_t));
chunk->F.source_mid = ntohs(i16temp);
/* This is the mid - don't need it */
read_e(db_fptr, &i16temp, sizeof(uint16_t));
rc = persist__read_string(db_fptr, &chunk->topic);
if(rc){
mosquitto__free(chunk->source.id);
mosquitto__free(chunk->source.username);
return rc;
}
read_e(db_fptr, &chunk->F.qos, sizeof(uint8_t));
read_e(db_fptr, &chunk->F.retain, sizeof(uint8_t));
read_e(db_fptr, &i32temp, sizeof(uint32_t));
chunk->F.payloadlen = ntohl(i32temp);
if(chunk->F.payloadlen){
if(UHPA_ALLOC(chunk->payload, chunk->F.payloadlen) == 0){
mosquitto__free(chunk->source.id);
mosquitto__free(chunk->source.username);
mosquitto__free(chunk->topic);
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
read_e(db_fptr, UHPA_ACCESS(chunk->payload, chunk->F.payloadlen), chunk->F.payloadlen);
}
return MOSQ_ERR_SUCCESS;
error:
err = strerror(errno);
log__printf(NULL, MOSQ_LOG_ERR, "Error: %s.", err);
mosquitto__free(chunk->source.id);
mosquitto__free(chunk->source.username);
return 1;
}
int persist__retain_chunk_read_v234(FILE *db_fptr, struct P_retain *chunk)
{
dbid_t i64temp;
char *err;
if(fread(&i64temp, sizeof(dbid_t), 1, db_fptr) != 1){
err = strerror(errno);
log__printf(NULL, MOSQ_LOG_ERR, "Error: %s.", err);
return 1;
}
chunk->F.store_id = i64temp;
return MOSQ_ERR_SUCCESS;
}
int persist__sub_chunk_read_v234(FILE *db_fptr, struct P_sub *chunk)
{
int rc;
char *err;
rc = persist__read_string(db_fptr, &chunk->client_id);
if(rc){
return rc;
}
rc = persist__read_string(db_fptr, &chunk->topic);
if(rc){
mosquitto__free(chunk->client_id);
return rc;
}
read_e(db_fptr, &chunk->F.qos, sizeof(uint8_t));
return MOSQ_ERR_SUCCESS;
error:
err = strerror(errno);
log__printf(NULL, MOSQ_LOG_ERR, "Error: %s.", err);
mosquitto__free(chunk->client_id);
mosquitto__free(chunk->topic);
return 1;
}
#endif

View File

@ -29,6 +29,7 @@ PERSIST_READ_TEST_OBJS = \
PERSIST_READ_OBJS = \
memory_mosq.o \
persist_read.o \
persist_read_v234.o \
util_mosq.o
all : test
@ -48,6 +49,9 @@ packet_datatypes.o : ../../lib/packet_datatypes.c
persist_read.o : ../../src/persist_read.c
$(CROSS_COMPILE)$(CC) $(CFLAGS) -DWITH_BROKER -DWITH_PERSISTENCE -c -o $@ $^
persist_read_v234.o : ../../src/persist_read_v234.c
$(CROSS_COMPILE)$(CC) $(CFLAGS) -DWITH_BROKER -DWITH_PERSISTENCE -c -o $@ $^
property_mosq.o : ../../lib/property_mosq.c
$(CROSS_COMPILE)$(CC) $(CFLAGS) -c -o $@ $^