Separate the persistence actual writing functions.
This commit is contained in:
parent
7a53b28080
commit
e249ca7e91
@ -30,8 +30,9 @@ set (MOSQ_SRCS
|
||||
../lib/net_mosq.c ../lib/net_mosq.h
|
||||
../lib/packet_datatypes.c
|
||||
../lib/packet_mosq.c ../lib/packet_mosq.h
|
||||
persist_read_v234.c
|
||||
persist_read.c persist_write.c persist.h
|
||||
persist_read_v234.c persist_read.c
|
||||
persist_write_v4.c persist_write.c
|
||||
persist.h
|
||||
plugin.c
|
||||
property_broker.c
|
||||
../lib/property_mosq.c ../lib/property_mosq.h
|
||||
|
@ -39,6 +39,7 @@ OBJS= mosquitto.o \
|
||||
persist_read.o \
|
||||
persist_read_v234.o \
|
||||
persist_write.o \
|
||||
persist_write_v4.o \
|
||||
plugin.o \
|
||||
read_handle.o \
|
||||
security.o \
|
||||
@ -150,6 +151,9 @@ persist_read_v234.o : persist_read_v234.c persist.h mosquitto_broker_internal.h
|
||||
persist_write.o : persist_write.c persist.h mosquitto_broker_internal.h
|
||||
${CROSS_COMPILE}${CC} $(BROKER_CFLAGS) -c $< -o $@
|
||||
|
||||
persist_write_v4.o : persist_write_v4.c persist.h mosquitto_broker_internal.h
|
||||
${CROSS_COMPILE}${CC} $(BROKER_CFLAGS) -c $< -o $@
|
||||
|
||||
packet_datatypes.o : ../lib/packet_datatypes.c ../lib/packet_mosq.h
|
||||
${CROSS_COMPILE}${CC} $(BROKER_CFLAGS) -c $< -o $@
|
||||
|
||||
|
@ -78,6 +78,7 @@ struct PF_msg_store{
|
||||
uint16_t mid;
|
||||
uint16_t source_mid;
|
||||
uint16_t source_id_len;
|
||||
uint16_t source_username_len;
|
||||
uint16_t topic_len;
|
||||
uint16_t source_port;
|
||||
uint8_t qos;
|
||||
@ -119,4 +120,10 @@ int persist__msg_store_chunk_read_v234(FILE *db_fptr, struct P_msg_store *chunk,
|
||||
int persist__retain_chunk_read_v234(FILE *db_fptr, struct P_retain *chunk);
|
||||
int persist__sub_chunk_read_v234(FILE *db_fptr, struct P_sub *chunk);
|
||||
|
||||
int persist__client_chunk_write_v4(FILE *db_fptr, const struct P_client *chunk);
|
||||
int persist__client_msg_chunk_write_v4(FILE *db_fptr, const struct P_client_msg *chunk);
|
||||
int persist__message_store_chunk_write_v4(FILE *db_fptr, const struct P_msg_store *chunk);
|
||||
int persist__retain_chunk_write_v4(FILE *db_fptr, const struct P_retain *chunk);
|
||||
int persist__sub_chunk_write_v4(FILE *db_fptr, const struct P_sub *chunk);
|
||||
|
||||
#endif
|
||||
|
@ -35,38 +35,11 @@ Contributors:
|
||||
#include "time_mosq.h"
|
||||
#include "util_mosq.h"
|
||||
|
||||
static int persist__write_string(FILE *db_fptr, const char *str, bool nullok);
|
||||
|
||||
static int persist__write_string(FILE *db_fptr, const char *str, bool nullok)
|
||||
static int persist__client_messages_save(struct mosquitto_db *db, FILE *db_fptr, struct mosquitto *context, struct mosquitto_client_msg *queue)
|
||||
{
|
||||
uint16_t i16temp, slen;
|
||||
|
||||
if(str){
|
||||
slen = strlen(str);
|
||||
i16temp = htons(slen);
|
||||
write_e(db_fptr, &i16temp, sizeof(uint16_t));
|
||||
write_e(db_fptr, str, slen);
|
||||
}else if(nullok){
|
||||
i16temp = htons(0);
|
||||
write_e(db_fptr, &i16temp, sizeof(uint16_t));
|
||||
}else{
|
||||
return 1;
|
||||
}
|
||||
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
error:
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: %s.", strerror(errno));
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
||||
static int persist__client_messages_write(struct mosquitto_db *db, FILE *db_fptr, struct mosquitto *context, struct mosquitto_client_msg *queue)
|
||||
{
|
||||
uint32_t length;
|
||||
dbid_t i64temp;
|
||||
uint16_t i16temp, slen;
|
||||
uint8_t i8temp;
|
||||
struct P_client_msg chunk;
|
||||
struct mosquitto_client_msg *cmsg;
|
||||
int rc;
|
||||
|
||||
assert(db);
|
||||
assert(db_fptr);
|
||||
@ -84,60 +57,33 @@ static int persist__client_messages_write(struct mosquitto_db *db, FILE *db_fptr
|
||||
continue;
|
||||
}
|
||||
|
||||
slen = strlen(context->id);
|
||||
chunk.F.store_id = cmsg->store->db_id;
|
||||
chunk.F.mid = cmsg->mid;
|
||||
chunk.F.id_len = strlen(context->id);
|
||||
chunk.F.qos = cmsg->qos;
|
||||
chunk.F.retain = cmsg->retain;
|
||||
chunk.F.direction = cmsg->direction;
|
||||
chunk.F.state = cmsg->state;
|
||||
chunk.F.dup = cmsg->dup;
|
||||
chunk.client_id = context->id;
|
||||
|
||||
length = htonl(sizeof(dbid_t) + sizeof(uint16_t) + sizeof(uint8_t) +
|
||||
sizeof(uint8_t) + sizeof(uint8_t) + sizeof(uint8_t) +
|
||||
sizeof(uint8_t) + 2+slen);
|
||||
|
||||
i16temp = htons(DB_CHUNK_CLIENT_MSG);
|
||||
write_e(db_fptr, &i16temp, sizeof(uint16_t));
|
||||
write_e(db_fptr, &length, sizeof(uint32_t));
|
||||
|
||||
i16temp = htons(slen);
|
||||
write_e(db_fptr, &i16temp, sizeof(uint16_t));
|
||||
write_e(db_fptr, context->id, slen);
|
||||
|
||||
i64temp = cmsg->store->db_id;
|
||||
write_e(db_fptr, &i64temp, sizeof(dbid_t));
|
||||
|
||||
i16temp = htons(cmsg->mid);
|
||||
write_e(db_fptr, &i16temp, sizeof(uint16_t));
|
||||
|
||||
i8temp = (uint8_t )cmsg->qos;
|
||||
write_e(db_fptr, &i8temp, sizeof(uint8_t));
|
||||
|
||||
i8temp = (uint8_t )cmsg->retain;
|
||||
write_e(db_fptr, &i8temp, sizeof(uint8_t));
|
||||
|
||||
i8temp = (uint8_t )cmsg->direction;
|
||||
write_e(db_fptr, &i8temp, sizeof(uint8_t));
|
||||
|
||||
i8temp = (uint8_t )cmsg->state;
|
||||
write_e(db_fptr, &i8temp, sizeof(uint8_t));
|
||||
|
||||
i8temp = (uint8_t )cmsg->dup;
|
||||
write_e(db_fptr, &i8temp, sizeof(uint8_t));
|
||||
rc = persist__client_msg_chunk_write_v4(db_fptr, &chunk);
|
||||
if(rc){
|
||||
return rc;
|
||||
}
|
||||
|
||||
cmsg = cmsg->next;
|
||||
}
|
||||
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
error:
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: %s.", strerror(errno));
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
||||
static int persist__message_store_write(struct mosquitto_db *db, FILE *db_fptr)
|
||||
static int persist__message_store_save(struct mosquitto_db *db, FILE *db_fptr)
|
||||
{
|
||||
uint32_t length;
|
||||
dbid_t i64temp;
|
||||
uint32_t i32temp;
|
||||
uint16_t i16temp, tlen;
|
||||
uint8_t i8temp;
|
||||
struct P_msg_store chunk;
|
||||
struct mosquitto_msg_store *stored;
|
||||
bool force_no_retain;
|
||||
int rc;
|
||||
|
||||
assert(db);
|
||||
assert(db_fptr);
|
||||
@ -159,136 +105,92 @@ static int persist__message_store_write(struct mosquitto_db *db, FILE *db_fptr)
|
||||
* misleading information when reloaded. They should still be saved
|
||||
* because a disconnected durable client may have them in their
|
||||
* queue. */
|
||||
force_no_retain = true;
|
||||
chunk.F.retain = (uint8_t)stored->retain;
|
||||
}else{
|
||||
force_no_retain = false;
|
||||
chunk.F.retain = 0;
|
||||
}
|
||||
if(stored->topic){
|
||||
tlen = strlen(stored->topic);
|
||||
}else{
|
||||
tlen = 0;
|
||||
}
|
||||
length = sizeof(dbid_t) + sizeof(uint16_t) +
|
||||
sizeof(uint16_t) + sizeof(uint16_t) +
|
||||
2+tlen + sizeof(uint32_t) +
|
||||
stored->payloadlen + sizeof(uint8_t) + sizeof(uint8_t)
|
||||
+ 2*sizeof(uint16_t);
|
||||
|
||||
chunk.F.store_id = stored->db_id;
|
||||
chunk.F.payloadlen = stored->payloadlen;
|
||||
chunk.F.mid = stored->mid;
|
||||
chunk.F.source_mid = stored->source_mid;
|
||||
if(stored->source_id){
|
||||
length += strlen(stored->source_id);
|
||||
chunk.F.source_id_len = strlen(stored->source_id);
|
||||
chunk.source.id = stored->source_id;
|
||||
}else{
|
||||
chunk.F.source_id_len = 0;
|
||||
chunk.source.id = NULL;
|
||||
}
|
||||
if(stored->source_username){
|
||||
length += strlen(stored->source_username);
|
||||
chunk.F.source_username_len = strlen(stored->source_username);
|
||||
chunk.source.username = stored->source_username;
|
||||
}else{
|
||||
chunk.F.source_username_len = 0;
|
||||
chunk.source.username = NULL;
|
||||
}
|
||||
if(stored->topic){
|
||||
chunk.F.topic_len = strlen(stored->topic);
|
||||
chunk.topic = stored->topic;
|
||||
}else{
|
||||
chunk.F.topic_len = 0;
|
||||
chunk.topic = NULL;
|
||||
}
|
||||
length = htonl(length);
|
||||
|
||||
i16temp = htons(DB_CHUNK_MSG_STORE);
|
||||
write_e(db_fptr, &i16temp, sizeof(uint16_t));
|
||||
write_e(db_fptr, &length, sizeof(uint32_t));
|
||||
|
||||
i64temp = stored->db_id;
|
||||
write_e(db_fptr, &i64temp, sizeof(dbid_t));
|
||||
|
||||
if(persist__write_string(db_fptr, stored->source_id, false)) return 1;
|
||||
if(persist__write_string(db_fptr, stored->source_username, true)) return 1;
|
||||
if(stored->source_listener){
|
||||
i16temp = htons(stored->source_listener->port);
|
||||
chunk.F.source_port = stored->source_listener->port;
|
||||
}else{
|
||||
i16temp = 0;
|
||||
chunk.F.source_port = 0;
|
||||
}
|
||||
write_e(db_fptr, &i16temp, sizeof(uint16_t));
|
||||
chunk.F.qos = stored->qos;
|
||||
|
||||
|
||||
i16temp = htons(stored->source_mid);
|
||||
write_e(db_fptr, &i16temp, sizeof(uint16_t));
|
||||
|
||||
i16temp = htons(stored->mid);
|
||||
write_e(db_fptr, &i16temp, sizeof(uint16_t));
|
||||
|
||||
i16temp = htons(tlen);
|
||||
write_e(db_fptr, &i16temp, sizeof(uint16_t));
|
||||
if(tlen){
|
||||
write_e(db_fptr, stored->topic, tlen);
|
||||
rc = persist__message_store_chunk_write_v4(db_fptr, &chunk);
|
||||
if(rc){
|
||||
return rc;
|
||||
}
|
||||
|
||||
i8temp = (uint8_t )stored->qos;
|
||||
write_e(db_fptr, &i8temp, sizeof(uint8_t));
|
||||
|
||||
if(force_no_retain == false){
|
||||
i8temp = (uint8_t )stored->retain;
|
||||
}else{
|
||||
i8temp = 0;
|
||||
}
|
||||
write_e(db_fptr, &i8temp, sizeof(uint8_t));
|
||||
|
||||
i32temp = htonl(stored->payloadlen);
|
||||
write_e(db_fptr, &i32temp, sizeof(uint32_t));
|
||||
if(stored->payloadlen){
|
||||
write_e(db_fptr, UHPA_ACCESS_PAYLOAD(stored), (unsigned int)stored->payloadlen);
|
||||
}
|
||||
|
||||
stored = stored->next;
|
||||
}
|
||||
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
error:
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: %s.", strerror(errno));
|
||||
return 1;
|
||||
}
|
||||
|
||||
static int persist__client_write(struct mosquitto_db *db, FILE *db_fptr)
|
||||
static int persist__client_save(struct mosquitto_db *db, FILE *db_fptr)
|
||||
{
|
||||
struct mosquitto *context, *ctxt_tmp;
|
||||
uint16_t i16temp, slen;
|
||||
uint32_t length;
|
||||
time_t disconnect_t;
|
||||
struct P_client chunk;
|
||||
int rc;
|
||||
|
||||
assert(db);
|
||||
assert(db_fptr);
|
||||
|
||||
HASH_ITER(hh_id, db->contexts_by_id, context, ctxt_tmp){
|
||||
if(context && context->clean_start == false){
|
||||
length = htonl(2+strlen(context->id) + sizeof(uint16_t) + sizeof(time_t));
|
||||
chunk.F.id_len = strlen(context->id);
|
||||
chunk.F.last_mid = context->last_mid;
|
||||
chunk.F.disconnect_t = context->disconnect_t;
|
||||
chunk.client_id = context->id;
|
||||
|
||||
i16temp = htons(DB_CHUNK_CLIENT);
|
||||
write_e(db_fptr, &i16temp, sizeof(uint16_t));
|
||||
write_e(db_fptr, &length, sizeof(uint32_t));
|
||||
|
||||
slen = strlen(context->id);
|
||||
i16temp = htons(slen);
|
||||
write_e(db_fptr, &i16temp, sizeof(uint16_t));
|
||||
write_e(db_fptr, context->id, slen);
|
||||
i16temp = htons(context->last_mid);
|
||||
write_e(db_fptr, &i16temp, sizeof(uint16_t));
|
||||
if(context->disconnect_t){
|
||||
disconnect_t = context->disconnect_t;
|
||||
}else{
|
||||
disconnect_t = time(NULL);
|
||||
rc = persist__client_chunk_write_v4(db_fptr, &chunk);
|
||||
if(rc){
|
||||
return rc;
|
||||
}
|
||||
write_e(db_fptr, &disconnect_t, sizeof(time_t));
|
||||
|
||||
if(persist__client_messages_write(db, db_fptr, context, context->inflight_msgs)) return 1;
|
||||
if(persist__client_messages_write(db, db_fptr, context, context->queued_msgs)) return 1;
|
||||
if(persist__client_messages_save(db, db_fptr, context, context->inflight_msgs)) return 1;
|
||||
if(persist__client_messages_save(db, db_fptr, context, context->queued_msgs)) return 1;
|
||||
}
|
||||
}
|
||||
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
error:
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: %s.", strerror(errno));
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
||||
static int persist__subs_retain_write(struct mosquitto_db *db, FILE *db_fptr, struct mosquitto__subhier *node, const char *topic, int level)
|
||||
static int persist__subs_retain_save(struct mosquitto_db *db, FILE *db_fptr, struct mosquitto__subhier *node, const char *topic, int level)
|
||||
{
|
||||
struct mosquitto__subhier *subhier, *subhier_tmp;
|
||||
struct mosquitto__subleaf *sub;
|
||||
struct P_retain retain_chunk;
|
||||
struct P_sub sub_chunk;
|
||||
char *thistopic;
|
||||
uint32_t length;
|
||||
uint16_t i16temp;
|
||||
uint8_t i8temp;
|
||||
dbid_t i64temp;
|
||||
size_t slen;
|
||||
int rc;
|
||||
|
||||
slen = strlen(topic) + node->topic_len + 2;
|
||||
thistopic = mosquitto__malloc(sizeof(char)*slen);
|
||||
@ -302,58 +204,46 @@ static int persist__subs_retain_write(struct mosquitto_db *db, FILE *db_fptr, st
|
||||
sub = node->subs;
|
||||
while(sub){
|
||||
if(sub->context->clean_start == false && sub->context->id){
|
||||
length = htonl(2+strlen(sub->context->id) + 2+strlen(thistopic) + sizeof(uint8_t));
|
||||
sub_chunk.F.id_len = strlen(sub->context->id);
|
||||
sub_chunk.F.topic_len = strlen(thistopic);
|
||||
sub_chunk.F.qos = (uint8_t)sub->qos;
|
||||
sub_chunk.client_id = sub->context->id;
|
||||
sub_chunk.topic = thistopic;
|
||||
|
||||
i16temp = htons(DB_CHUNK_SUB);
|
||||
write_e(db_fptr, &i16temp, sizeof(uint16_t));
|
||||
write_e(db_fptr, &length, sizeof(uint32_t));
|
||||
|
||||
slen = strlen(sub->context->id);
|
||||
i16temp = htons(slen);
|
||||
write_e(db_fptr, &i16temp, sizeof(uint16_t));
|
||||
write_e(db_fptr, sub->context->id, slen);
|
||||
|
||||
slen = strlen(thistopic);
|
||||
i16temp = htons(slen);
|
||||
write_e(db_fptr, &i16temp, sizeof(uint16_t));
|
||||
write_e(db_fptr, thistopic, slen);
|
||||
|
||||
i8temp = (uint8_t )sub->qos;
|
||||
write_e(db_fptr, &i8temp, sizeof(uint8_t));
|
||||
rc = persist__sub_chunk_write_v4(db_fptr, &sub_chunk);
|
||||
if(rc){
|
||||
mosquitto__free(thistopic);
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
sub = sub->next;
|
||||
}
|
||||
if(node->retained){
|
||||
if(strncmp(node->retained->topic, "$SYS", 4)){
|
||||
/* Don't save $SYS messages. */
|
||||
length = htonl(sizeof(dbid_t));
|
||||
|
||||
i16temp = htons(DB_CHUNK_RETAIN);
|
||||
write_e(db_fptr, &i16temp, sizeof(uint16_t));
|
||||
write_e(db_fptr, &length, sizeof(uint32_t));
|
||||
|
||||
i64temp = node->retained->db_id;
|
||||
write_e(db_fptr, &i64temp, sizeof(dbid_t));
|
||||
retain_chunk.F.store_id = node->retained->db_id;
|
||||
rc = persist__retain_chunk_write_v4(db_fptr, &retain_chunk);
|
||||
if(rc){
|
||||
mosquitto__free(thistopic);
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
HASH_ITER(hh, node->children, subhier, subhier_tmp){
|
||||
persist__subs_retain_write(db, db_fptr, subhier, thistopic, level+1);
|
||||
persist__subs_retain_save(db, db_fptr, subhier, thistopic, level+1);
|
||||
}
|
||||
mosquitto__free(thistopic);
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
error:
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: %s.", strerror(errno));
|
||||
return 1;
|
||||
}
|
||||
|
||||
static int persist__subs_retain_write_all(struct mosquitto_db *db, FILE *db_fptr)
|
||||
static int persist__subs_retain_save_all(struct mosquitto_db *db, FILE *db_fptr)
|
||||
{
|
||||
struct mosquitto__subhier *subhier, *subhier_tmp;
|
||||
|
||||
HASH_ITER(hh, db->subs, subhier, subhier_tmp){
|
||||
if(subhier->children){
|
||||
persist__subs_retain_write(db, db_fptr, subhier->children, "", 0);
|
||||
persist__subs_retain_save(db, db_fptr, subhier->children, "", 0);
|
||||
}
|
||||
}
|
||||
|
||||
@ -444,12 +334,12 @@ int persist__backup(struct mosquitto_db *db, bool shutdown)
|
||||
i64temp = db->last_db_id;
|
||||
write_e(db_fptr, &i64temp, sizeof(dbid_t));
|
||||
|
||||
if(persist__message_store_write(db, db_fptr)){
|
||||
if(persist__message_store_save(db, db_fptr)){
|
||||
goto error;
|
||||
}
|
||||
|
||||
persist__client_write(db, db_fptr);
|
||||
persist__subs_retain_write_all(db, db_fptr);
|
||||
persist__client_save(db, db_fptr);
|
||||
persist__subs_retain_save_all(db, db_fptr);
|
||||
|
||||
#ifndef WIN32
|
||||
/**
|
||||
|
215
src/persist_write_v4.c
Normal file
215
src/persist_write_v4.c
Normal file
@ -0,0 +1,215 @@
|
||||
/*
|
||||
Copyright (c) 2010-2018 Roger Light <roger@atchoo.org>
|
||||
|
||||
All rights reserved. This program and the accompanying materials
|
||||
are made available under the terms of the Eclipse Public License v1.0
|
||||
and Eclipse Distribution License v1.0 which accompany this distribution.
|
||||
|
||||
The Eclipse Public License is available at
|
||||
http://www.eclipse.org/legal/epl-v10.html
|
||||
and the Eclipse Distribution License is available at
|
||||
http://www.eclipse.org/org/documents/edl-v10.php.
|
||||
|
||||
Contributors:
|
||||
Roger Light - initial implementation and documentation.
|
||||
*/
|
||||
|
||||
#include "config.h"
|
||||
|
||||
#ifdef WITH_PERSISTENCE
|
||||
|
||||
#ifndef WIN32
|
||||
#include <arpa/inet.h>
|
||||
#endif
|
||||
#include <assert.h>
|
||||
#include <errno.h>
|
||||
#include <fcntl.h>
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
#include <sys/stat.h>
|
||||
#include <time.h>
|
||||
|
||||
#include "mosquitto_broker_internal.h"
|
||||
#include "memory_mosq.h"
|
||||
#include "persist.h"
|
||||
#include "time_mosq.h"
|
||||
#include "util_mosq.h"
|
||||
|
||||
static int persist__write_string(FILE *db_fptr, const char *str, bool nullok);
|
||||
|
||||
static int persist__write_string(FILE *db_fptr, const char *str, bool nullok)
|
||||
{
|
||||
uint16_t i16temp, slen;
|
||||
|
||||
if(str){
|
||||
slen = strlen(str);
|
||||
i16temp = htons(slen);
|
||||
write_e(db_fptr, &i16temp, sizeof(uint16_t));
|
||||
write_e(db_fptr, str, slen);
|
||||
}else if(nullok){
|
||||
i16temp = htons(0);
|
||||
write_e(db_fptr, &i16temp, sizeof(uint16_t));
|
||||
}else{
|
||||
return 1;
|
||||
}
|
||||
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
error:
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: %s.", strerror(errno));
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
||||
int persist__client_chunk_write_v4(FILE *db_fptr, const struct P_client *chunk)
|
||||
{
|
||||
uint32_t length;
|
||||
uint16_t i16temp;
|
||||
time_t disconnect_t;
|
||||
|
||||
length = htonl(2+chunk->F.id_len + sizeof(uint16_t) + sizeof(time_t));
|
||||
|
||||
i16temp = htons(DB_CHUNK_CLIENT);
|
||||
write_e(db_fptr, &i16temp, sizeof(uint16_t));
|
||||
write_e(db_fptr, &length, sizeof(uint32_t));
|
||||
|
||||
if(persist__write_string(db_fptr, chunk->client_id, false)) return 1;
|
||||
|
||||
i16temp = htons(chunk->F.last_mid);
|
||||
write_e(db_fptr, &i16temp, sizeof(uint16_t));
|
||||
if(chunk->F.disconnect_t){
|
||||
disconnect_t = chunk->F.disconnect_t;
|
||||
}else{
|
||||
disconnect_t = time(NULL);
|
||||
}
|
||||
write_e(db_fptr, &disconnect_t, sizeof(time_t));
|
||||
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
error:
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: %s.", strerror(errno));
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
||||
int persist__client_msg_chunk_write_v4(FILE *db_fptr, const struct P_client_msg *chunk)
|
||||
{
|
||||
uint32_t length;
|
||||
uint16_t i16temp;
|
||||
|
||||
length = htonl(sizeof(dbid_t) + sizeof(uint16_t) + sizeof(uint8_t) +
|
||||
sizeof(uint8_t) + sizeof(uint8_t) + sizeof(uint8_t) +
|
||||
sizeof(uint8_t) + 2+chunk->F.id_len);
|
||||
|
||||
i16temp = htons(DB_CHUNK_CLIENT_MSG);
|
||||
write_e(db_fptr, &i16temp, sizeof(uint16_t));
|
||||
write_e(db_fptr, &length, sizeof(uint32_t));
|
||||
|
||||
if(persist__write_string(db_fptr, chunk->client_id, false)) return 1;
|
||||
|
||||
write_e(db_fptr, &chunk->F.store_id, sizeof(dbid_t));
|
||||
|
||||
i16temp = htons(chunk->F.mid);
|
||||
write_e(db_fptr, &i16temp, sizeof(uint16_t));
|
||||
|
||||
write_e(db_fptr, &chunk->F.qos, sizeof(uint8_t));
|
||||
write_e(db_fptr, &chunk->F.retain, sizeof(uint8_t));
|
||||
write_e(db_fptr, &chunk->F.direction, sizeof(uint8_t));
|
||||
write_e(db_fptr, &chunk->F.state, sizeof(uint8_t));
|
||||
write_e(db_fptr, &chunk->F.dup, sizeof(uint8_t));
|
||||
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
error:
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: %s.", strerror(errno));
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
||||
int persist__message_store_chunk_write_v4(FILE *db_fptr, const struct P_msg_store *chunk)
|
||||
{
|
||||
uint32_t length;
|
||||
uint32_t i32temp;
|
||||
uint16_t i16temp;
|
||||
|
||||
length = htonl(sizeof(dbid_t) + sizeof(uint16_t) +
|
||||
sizeof(uint16_t) + sizeof(uint16_t) +
|
||||
2+chunk->F.topic_len + sizeof(uint32_t) +
|
||||
chunk->F.payloadlen + sizeof(uint8_t) + sizeof(uint8_t)
|
||||
+ 2*sizeof(uint16_t) + chunk->F.source_id_len + chunk->F.source_username_len);
|
||||
|
||||
i16temp = htons(DB_CHUNK_MSG_STORE);
|
||||
write_e(db_fptr, &i16temp, sizeof(uint16_t));
|
||||
write_e(db_fptr, &length, sizeof(uint32_t));
|
||||
|
||||
write_e(db_fptr, &chunk->F.store_id, sizeof(dbid_t));
|
||||
|
||||
if(persist__write_string(db_fptr, chunk->source.id, false)) return 1;
|
||||
if(persist__write_string(db_fptr, chunk->source.username, true)) return 1;
|
||||
|
||||
i16temp = htons(chunk->F.source_port);
|
||||
write_e(db_fptr, &i16temp, sizeof(uint16_t));
|
||||
|
||||
i16temp = htons(chunk->F.source_mid);
|
||||
write_e(db_fptr, &i16temp, sizeof(uint16_t));
|
||||
|
||||
i16temp = htons(chunk->F.mid);
|
||||
write_e(db_fptr, &i16temp, sizeof(uint16_t));
|
||||
|
||||
if(persist__write_string(db_fptr, chunk->topic, true)) return 1;
|
||||
|
||||
write_e(db_fptr, &chunk->F.qos, sizeof(uint8_t));
|
||||
write_e(db_fptr, &chunk->F.retain, sizeof(uint8_t));
|
||||
|
||||
i32temp = htonl(chunk->F.payloadlen);
|
||||
write_e(db_fptr, &i32temp, sizeof(uint32_t));
|
||||
if(chunk->F.payloadlen){
|
||||
write_e(db_fptr, UHPA_ACCESS(chunk->payload, chunk->F.payloadlen), (unsigned int)chunk->F.payloadlen);
|
||||
}
|
||||
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
error:
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: %s.", strerror(errno));
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
||||
int persist__retain_chunk_write_v4(FILE *db_fptr, const struct P_retain *chunk)
|
||||
{
|
||||
uint32_t length;
|
||||
uint16_t i16temp;
|
||||
|
||||
length = htonl(sizeof(dbid_t));
|
||||
|
||||
i16temp = htons(DB_CHUNK_RETAIN);
|
||||
write_e(db_fptr, &i16temp, sizeof(uint16_t));
|
||||
write_e(db_fptr, &length, sizeof(uint32_t));
|
||||
|
||||
write_e(db_fptr, &chunk->F.store_id, sizeof(dbid_t));
|
||||
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
error:
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: %s.", strerror(errno));
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
||||
int persist__sub_chunk_write_v4(FILE *db_fptr, const struct P_sub *chunk)
|
||||
{
|
||||
uint32_t length;
|
||||
uint16_t i16temp;
|
||||
|
||||
length = htonl(2+chunk->F.id_len + 2+chunk->F.topic_len + sizeof(uint8_t));
|
||||
|
||||
i16temp = htons(DB_CHUNK_SUB);
|
||||
write_e(db_fptr, &i16temp, sizeof(uint16_t));
|
||||
write_e(db_fptr, &length, sizeof(uint32_t));
|
||||
|
||||
if(persist__write_string(db_fptr, chunk->client_id, false)) return 1;
|
||||
if(persist__write_string(db_fptr, chunk->topic, false)) return 1;
|
||||
|
||||
write_e(db_fptr, &chunk->F.qos, sizeof(uint8_t));
|
||||
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
error:
|
||||
log__printf(NULL, MOSQ_LOG_ERR, "Error: %s.", strerror(errno));
|
||||
return 1;
|
||||
}
|
||||
#endif
|
@ -42,6 +42,7 @@ PERSIST_WRITE_OBJS = \
|
||||
persist_read.o \
|
||||
persist_read_v234.o \
|
||||
persist_write.o \
|
||||
persist_write_v4.o \
|
||||
subs.o \
|
||||
util_mosq.o
|
||||
|
||||
@ -76,6 +77,9 @@ persist_read_v234.o : ../../src/persist_read_v234.c
|
||||
persist_write.o : ../../src/persist_write.c
|
||||
$(CROSS_COMPILE)$(CC) $(CFLAGS) -DWITH_BROKER -DWITH_PERSISTENCE -c -o $@ $^
|
||||
|
||||
persist_write_v4.o : ../../src/persist_write_v4.c
|
||||
$(CROSS_COMPILE)$(CC) $(CFLAGS) -DWITH_BROKER -DWITH_PERSISTENCE -c -o $@ $^
|
||||
|
||||
property_mosq.o : ../../lib/property_mosq.c
|
||||
$(CROSS_COMPILE)$(CC) $(CFLAGS) -c -o $@ $^
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user