db_dump: use objects to simplify printing (#215)

Prepratory work for providing more filtering of the output dump.
This makes _no_ difference to the current behaviour, but makes it much
simpler for people who "need", for whatever reason, to hack some private
changes into the db_dump tool.

Signed-off-by: Karl Palsson <karlp@etactica.com>
This commit is contained in:
Karl Palsson 2016-09-10 12:10:01 +00:00 committed by Roger Light
parent 41a1b51c31
commit 374c62586e

View File

@ -53,6 +53,39 @@ struct msg_store_chunk
uint32_t length;
};
struct db_sub
{
char *client_id;
char *topic;
uint8_t qos;
};
struct db_client
{
char *client_id;
uint16_t last_mid;
time_t disconnect_t;
};
struct db_client_msg
{
char *client_id;
uint8_t qos, retain, direction, state, dup;
dbid_t store_id;
uint16_t mid;
};
struct db_msg
{
dbid_t store_id;
uint32_t payloadlen;
uint16_t source_mid, mid;
uint8_t qos, retain;
uint8_t *payload;
char *source_id;
char *topic;
};
static uint32_t db_version;
static int stats = 0;
static int client_stats = 0;
@ -61,12 +94,110 @@ static int do_print = 1;
struct client_chunk *clients_by_id = NULL;
struct msg_store_chunk *msgs_by_id = NULL;
static int db__client_chunk_restore(struct mosquitto_db *db, FILE *db_fd)
static void
free__db_sub(struct db_sub *sub)
{
uint16_t i16temp, slen, last_mid;
char *client_id = NULL;
if (sub->client_id) {
free(sub->client_id);
}
if (sub->topic) {
free(sub->topic);
}
}
static void
free__db_client(struct db_client *client)
{
if (client->client_id) {
free(client->client_id);
}
}
static void
free__db_client_msg(struct db_client_msg *msg)
{
if (msg->client_id) {
free(msg->client_id);
}
}
static void
free__db_msg(struct db_msg *msg)
{
if (msg->source_id) {
free(msg->source_id);
}
if (msg->topic) {
free(msg->topic);
}
if (msg->payload) {
free(msg->payload);
}
}
static void
print_db_client(struct db_client *client, int length)
{
printf("DB_CHUNK_CLIENT:\n");
printf("\tLength: %d\n", length);
printf("\tClient ID: %s\n", client->client_id);
printf("\tLast MID: %d\n", client->last_mid);
printf("\tDisconnect time: %ld\n", client->disconnect_t);
}
static void
print_db_client_msg(struct db_client_msg *msg, int length)
{
printf("DB_CHUNK_CLIENT_MSG:\n");
printf("\tLength: %d\n", length);
printf("\tClient ID: %s\n", msg->client_id);
printf("\tStore ID: %" PRIu64 "\n", msg->store_id);
printf("\tMID: %d\n", msg->mid);
printf("\tQoS: %d\n", msg->qos);
printf("\tRetain: %d\n", msg->retain);
printf("\tDirection: %d\n", msg->direction);
printf("\tState: %d\n", msg->state);
printf("\tDup: %d\n", msg->dup);
}
static void
print_db_sub(struct db_sub *sub, int length)
{
printf("DB_CHUNK_SUB:\n");
printf("\tLength: %d\n", length);
printf("\tClient ID: %s\n", sub->client_id);
printf("\tTopic: %s\n", sub->topic);
printf("\tQoS: %d\n", sub->qos);
}
static void
print_db_msg(struct db_msg *msg, int length)
{
printf("DB_CHUNK_MSG_STORE:\n");
printf("\tLength: %d\n", length);
printf("\tStore ID: %" PRIu64 "\n", msg->store_id);
printf("\tSource ID: %s\n", msg->source_id);
printf("\tSource MID: %d\n", msg->source_mid);
printf("\tMID: %d\n", msg->mid);
printf("\tTopic: %s\n", msg->topic);
printf("\tQoS: %d\n", msg->qos);
printf("\tRetain: %d\n", msg->retain);
printf("\tPayload Length: %d\n", msg->payloadlen);
bool binary = false;
for(int i=0; i<msg->payloadlen; i++){
if(msg->payload[i] == 0) binary = true;
}
if(binary == false && msg->payloadlen<256){
printf("\tPayload: %s\n", msg->payload);
}
}
static int db__client_chunk_restore(struct mosquitto_db *db, FILE *db_fd, struct db_client *client)
{
uint16_t i16temp, slen;
int rc = 0;
time_t disconnect_t;
struct client_chunk *cc;
read_e(db_fd, &i16temp, sizeof(uint16_t));
@ -76,24 +207,21 @@ static int db__client_chunk_restore(struct mosquitto_db *db, FILE *db_fd)
fclose(db_fd);
return 1;
}
client_id = calloc(slen+1, sizeof(char));
if(!client_id){
client->client_id = calloc(slen+1, sizeof(char));
if(!client->client_id){
fclose(db_fd);
fprintf(stderr, "Error: Out of memory.");
return 1;
}
read_e(db_fd, client_id, slen);
if(do_print) printf("\tClient ID: %s\n", client_id);
read_e(db_fd, client->client_id, slen);
read_e(db_fd, &i16temp, sizeof(uint16_t));
last_mid = ntohs(i16temp);
if(do_print) printf("\tLast MID: %d\n", last_mid);
client->last_mid = ntohs(i16temp);
if(db_version == 2){
disconnect_t = time(NULL);
client->disconnect_t = time(NULL);
}else{
read_e(db_fd, &disconnect_t, sizeof(time_t));
if(do_print) printf("\tDisconnect time: %ld\n", disconnect_t);
read_e(db_fd, &client->disconnect_t, sizeof(time_t));
}
if(client_stats){
@ -102,26 +230,22 @@ static int db__client_chunk_restore(struct mosquitto_db *db, FILE *db_fd)
errno = ENOMEM;
goto error;
}
cc->id = client_id;
cc->id = strdup(client->client_id);
HASH_ADD_KEYPTR(hh_id, clients_by_id, cc->id, strlen(cc->id), cc);
}else{
free(client_id);
}
return rc;
error:
fprintf(stderr, "Error: %s.", strerror(errno));
if(db_fd) fclose(db_fd);
free(client_id);
free(client->client_id);
return 1;
}
static int db__client_msg_chunk_restore(struct mosquitto_db *db, FILE *db_fd, uint32_t length)
static int db__client_msg_chunk_restore(struct mosquitto_db *db, FILE *db_fd, uint32_t length, struct db_client_msg *msg)
{
dbid_t i64temp, store_id;
uint16_t i16temp, slen, mid;
uint8_t qos, retain, direction, state, dup;
char *client_id = NULL;
dbid_t i64temp;
uint16_t i16temp, slen;
struct client_chunk *cc;
struct msg_store_chunk *msc;
@ -132,160 +256,127 @@ static int db__client_msg_chunk_restore(struct mosquitto_db *db, FILE *db_fd, ui
fclose(db_fd);
return 1;
}
client_id = calloc(slen+1, sizeof(char));
if(!client_id){
msg->client_id = calloc(slen+1, sizeof(char));
if(!msg->client_id){
fclose(db_fd);
fprintf(stderr, "Error: Out of memory.");
return 1;
}
read_e(db_fd, client_id, slen);
if(do_print) printf("\tClient ID: %s\n", client_id);
read_e(db_fd, msg->client_id, slen);
read_e(db_fd, &i64temp, sizeof(dbid_t));
store_id = i64temp;
if(do_print) printf("\tStore ID: %" PRIu64 "\n", store_id);
msg->store_id = i64temp;
read_e(db_fd, &i16temp, sizeof(uint16_t));
mid = ntohs(i16temp);
if(do_print) printf("\tMID: %d\n", mid);
msg->mid = ntohs(i16temp);
read_e(db_fd, &qos, sizeof(uint8_t));
if(do_print) printf("\tQoS: %d\n", qos);
read_e(db_fd, &retain, sizeof(uint8_t));
if(do_print) printf("\tRetain: %d\n", retain);
read_e(db_fd, &direction, sizeof(uint8_t));
if(do_print) printf("\tDirection: %d\n", direction);
read_e(db_fd, &state, sizeof(uint8_t));
if(do_print) printf("\tState: %d\n", state);
read_e(db_fd, &dup, sizeof(uint8_t));
if(do_print) printf("\tDup: %d\n", dup);
read_e(db_fd, &msg->qos, sizeof(uint8_t));
read_e(db_fd, &msg->retain, sizeof(uint8_t));
read_e(db_fd, &msg->direction, sizeof(uint8_t));
read_e(db_fd, &msg->state, sizeof(uint8_t));
read_e(db_fd, &msg->dup, sizeof(uint8_t));
if(client_stats){
HASH_FIND(hh_id, clients_by_id, client_id, strlen(client_id), cc);
HASH_FIND(hh_id, clients_by_id, msg->client_id, strlen(msg->client_id), cc);
if(cc){
cc->messages++;
cc->message_size += length;
HASH_FIND(hh, msgs_by_id, &store_id, sizeof(dbid_t), msc);
HASH_FIND(hh, msgs_by_id, &msg->store_id, sizeof(dbid_t), msc);
if(msc){
cc->message_size += msc->length;
}
}
}
free(client_id);
return 0;
error:
fprintf(stderr, "Error: %s.", strerror(errno));
if(db_fd) fclose(db_fd);
free(client_id);
free(msg->client_id);
return 1;
}
static int db__msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fd, uint32_t length)
static int db__msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fd, uint32_t length, struct db_msg *msg)
{
dbid_t i64temp, store_id;
uint32_t i32temp, payloadlen;
uint16_t i16temp, slen, source_mid, mid;
uint8_t qos, retain, *payload = NULL;
char *source_id = NULL;
char *topic = NULL;
dbid_t i64temp;
uint32_t i32temp;
uint16_t i16temp, slen;
int rc = 0;
bool binary;
int i;
struct msg_store_chunk *mcs;
read_e(db_fd, &i64temp, sizeof(dbid_t));
store_id = i64temp;
if(do_print) printf("\tStore ID: %" PRIu64 "\n", store_id);
msg->store_id = i64temp;
read_e(db_fd, &i16temp, sizeof(uint16_t));
slen = ntohs(i16temp);
if(slen){
source_id = calloc(slen+1, sizeof(char));
if(!source_id){
msg->source_id = calloc(slen+1, sizeof(char));
if(!msg->source_id){
fclose(db_fd);
fprintf(stderr, "Error: Out of memory.");
return 1;
}
if(fread(source_id, 1, slen, db_fd) != slen){
if(fread(msg->source_id, 1, slen, db_fd) != slen){
fprintf(stderr, "Error: %s.", strerror(errno));
fclose(db_fd);
free(source_id);
free(msg->source_id);
return 1;
}
if(do_print) printf("\tSource ID: %s\n", source_id);
free(source_id);
}
read_e(db_fd, &i16temp, sizeof(uint16_t));
source_mid = ntohs(i16temp);
if(do_print) printf("\tSource MID: %d\n", source_mid);
msg->source_mid = ntohs(i16temp);
read_e(db_fd, &i16temp, sizeof(uint16_t));
mid = ntohs(i16temp);
if(do_print) printf("\tMID: %d\n", mid);
msg->mid = ntohs(i16temp);
read_e(db_fd, &i16temp, sizeof(uint16_t));
slen = ntohs(i16temp);
if(slen){
topic = calloc(slen+1, sizeof(char));
if(!topic){
msg->topic = calloc(slen+1, sizeof(char));
if(!msg->topic){
fclose(db_fd);
free(source_id);
free(msg->source_id);
fprintf(stderr, "Error: Out of memory.");
return 1;
}
if(fread(topic, 1, slen, db_fd) != slen){
if(fread(msg->topic, 1, slen, db_fd) != slen){
fprintf(stderr, "Error: %s.", strerror(errno));
fclose(db_fd);
free(source_id);
free(topic);
free(msg->source_id);
free(msg->topic);
return 1;
}
if(do_print) printf("\tTopic: %s\n", topic);
free(topic);
}else{
fprintf(stderr, "Error: Invalid msg_store chunk when restoring persistent database.");
fclose(db_fd);
free(source_id);
free(msg->source_id);
return 1;
}
read_e(db_fd, &qos, sizeof(uint8_t));
if(do_print) printf("\tQoS: %d\n", qos);
read_e(db_fd, &retain, sizeof(uint8_t));
if(do_print) printf("\tRetain: %d\n", retain);
read_e(db_fd, &msg->qos, sizeof(uint8_t));
read_e(db_fd, &msg->retain, sizeof(uint8_t));
read_e(db_fd, &i32temp, sizeof(uint32_t));
payloadlen = ntohl(i32temp);
if(do_print) printf("\tPayload Length: %d\n", payloadlen);
msg->payloadlen = ntohl(i32temp);
if(payloadlen){
payload = malloc(payloadlen+1);
if(!payload){
if(msg->payloadlen){
msg->payload = malloc(msg->payloadlen+1);
if(!msg->payload){
fclose(db_fd);
free(source_id);
free(topic);
free(msg->source_id);
free(msg->topic);
fprintf(stderr, "Error: Out of memory.");
return 1;
}
memset(payload, 0, payloadlen+1);
if(fread(payload, 1, payloadlen, db_fd) != payloadlen){
memset(msg->payload, 0, msg->payloadlen+1);
if(fread(msg->payload, 1, msg->payloadlen, db_fd) != msg->payloadlen){
fprintf(stderr, "Error: %s.", strerror(errno));
fclose(db_fd);
free(source_id);
free(topic);
free(payload);
free(msg->source_id);
free(msg->topic);
free(msg->payload);
return 1;
}
binary = false;
for(i=0; i<payloadlen; i++){
if(payload[i] == 0) binary = true;
}
if(binary == false && payloadlen<256){
if(do_print) printf("\tPayload: %s\n", payload);
}
free(payload);
}
if(client_stats){
@ -294,17 +385,17 @@ static int db__msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fd, uin
errno = ENOMEM;
goto error;
}
mcs->store_id = store_id;
mcs->store_id = msg->store_id;
mcs->length = length;
HASH_ADD(hh, msgs_by_id, store_id, sizeof(dbid_t), mcs);
HASH_ADD(hh, msgs_by_id, store_id, sizeof(dbid_t), mcs);
}
return rc;
error:
fprintf(stderr, "Error: %s.", strerror(errno));
if(db_fd) fclose(db_fd);
free(source_id);
free(topic);
free(msg->source_id);
free(msg->topic);
return 1;
}
@ -322,50 +413,41 @@ static int db__retain_chunk_restore(struct mosquitto_db *db, FILE *db_fd)
return 0;
}
static int db__sub_chunk_restore(struct mosquitto_db *db, FILE *db_fd, uint32_t length)
static int db__sub_chunk_restore(struct mosquitto_db *db, FILE *db_fd, uint32_t length, struct db_sub *sub)
{
uint16_t i16temp, slen;
uint8_t qos;
char *client_id;
char *topic;
int rc = 0;
struct client_chunk *cc;
read_e(db_fd, &i16temp, sizeof(uint16_t));
slen = ntohs(i16temp);
client_id = calloc(slen+1, sizeof(char));
if(!client_id){
sub->client_id = calloc(slen+1, sizeof(char));
if(!sub->client_id){
fclose(db_fd);
fprintf(stderr, "Error: Out of memory.");
return 1;
}
read_e(db_fd, client_id, slen);
if(do_print) printf("\tClient ID: %s\n", client_id);
read_e(db_fd, sub->client_id, slen);
read_e(db_fd, &i16temp, sizeof(uint16_t));
slen = ntohs(i16temp);
topic = calloc(slen+1, sizeof(char));
if(!topic){
sub->topic = calloc(slen+1, sizeof(char));
if(!sub->topic){
fclose(db_fd);
fprintf(stderr, "Error: Out of memory.");
free(client_id);
free(sub->client_id);
return 1;
}
read_e(db_fd, topic, slen);
if(do_print) printf("\tTopic: %s\n", topic);
read_e(db_fd, &qos, sizeof(uint8_t));
if(do_print) printf("\tQoS: %d\n", qos);
read_e(db_fd, sub->topic, slen);
read_e(db_fd, &sub->qos, sizeof(uint8_t));
if(client_stats){
HASH_FIND(hh_id, clients_by_id, client_id, strlen(client_id), cc);
HASH_FIND(hh_id, clients_by_id, sub->client_id, strlen(sub->client_id), cc);
if(cc){
cc->subscriptions++;
cc->subscription_size += length;
}
}
free(client_id);
free(topic);
return rc;
error:
fprintf(stderr, "Error: %s.", strerror(errno));
@ -446,16 +528,22 @@ int main(int argc, char *argv[])
case DB_CHUNK_MSG_STORE:
msg_store_count++;
if(do_print) printf("DB_CHUNK_MSG_STORE:\n");
if(do_print) printf("\tLength: %d\n", length);
if(db__msg_store_chunk_restore(&db, fd, length)) return 1;
struct db_msg msg = {0};
if(db__msg_store_chunk_restore(&db, fd, length, &msg)) return 1;
if(do_print) {
print_db_msg(&msg, length);
}
free__db_msg(&msg);
break;
case DB_CHUNK_CLIENT_MSG:
client_msg_count++;
if(do_print) printf("DB_CHUNK_CLIENT_MSG:\n");
if(do_print) printf("\tLength: %d\n", length);
if(db__client_msg_chunk_restore(&db, fd, length)) return 1;
struct db_client_msg cmsg = {0};
if(db__client_msg_chunk_restore(&db, fd, length, &cmsg)) return 1;
if(do_print) {
print_db_client_msg(&cmsg, length);
}
free__db_client_msg(&cmsg);
break;
case DB_CHUNK_RETAIN:
@ -467,16 +555,22 @@ int main(int argc, char *argv[])
case DB_CHUNK_SUB:
sub_count++;
if(do_print) printf("DB_CHUNK_SUB:\n");
if(do_print) printf("\tLength: %d\n", length);
if(db__sub_chunk_restore(&db, fd, length)) return 1;
struct db_sub sub = {0};
if(db__sub_chunk_restore(&db, fd, length, &sub)) return 1;
if(do_print) {
print_db_sub(&sub, length);
}
free__db_sub(&sub);
break;
case DB_CHUNK_CLIENT:
client_count++;
if(do_print) printf("DB_CHUNK_CLIENT:\n");
if(do_print) printf("\tLength: %d\n", length);
if(db__client_chunk_restore(&db, fd)) return 1;
struct db_client client = {0};
if(db__client_chunk_restore(&db, fd, &client)) return 1;
if(do_print) {
print_db_client(&client, length);
}
free__db_client(&client);
break;
default:
@ -506,6 +600,7 @@ int main(int argc, char *argv[])
HASH_ITER(hh_id, clients_by_id, cc, cc_tmp){
printf("SC: %d SS: %d MC: %d MS: %ld ", cc->subscriptions, cc->subscription_size, cc->messages, cc->message_size);
printf("%s\n", cc->id);
free(cc->id);
}
}