Better subtree searching.

This commit is contained in:
Roger A. Light 2016-07-19 15:05:53 +01:00
parent 0b5d524723
commit 883af8af53
10 changed files with 128 additions and 227 deletions

View File

@ -30,8 +30,7 @@ static int max_queued = 100;
int db__open(struct mosquitto__config *config, struct mosquitto_db *db)
{
int rc = 0;
struct mosquitto__subhier *child;
struct mosquitto__subhier *subhier;
if(!config || !db) return MOSQ_ERR_INVAL;
@ -48,56 +47,13 @@ int db__open(struct mosquitto__config *config, struct mosquitto_db *db)
// Initialize the hashtable
db->clientid_index_hash = NULL;
db->subs.next = NULL;
db->subs.subs = NULL;
db->subs.topic_len = strlen("");
if(UHPA_ALLOC(db->subs.topic, db->subs.topic_len+1) == 0){
db->subs.topic_len = 0;
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}else{
strncpy(UHPA_ACCESS(db->subs.topic, db->subs.topic_len+1), "", db->subs.topic_len+1);
}
db->subs = NULL;
child = mosquitto__malloc(sizeof(struct mosquitto__subhier));
if(!child){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
child->parent = NULL;
child->next = NULL;
child->topic_len = strlen("");
if(UHPA_ALLOC_TOPIC(child) == 0){
child->topic_len = 0;
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}else{
strncpy(UHPA_ACCESS_TOPIC(child), "", child->topic_len+1);
}
child->subs = NULL;
child->children = NULL;
child->retained = NULL;
db->subs.children = child;
subhier = sub__add_hier_entry(&db->subs, " ", strlen(" "));
if(!subhier) return MOSQ_ERR_NOMEM;
child = mosquitto__malloc(sizeof(struct mosquitto__subhier));
if(!child){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
child->parent = NULL;
child->next = NULL;
child->topic_len = strlen("$SYS");
if(UHPA_ALLOC_TOPIC(child) == 0){
child->topic_len = 0;
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}else{
strncpy(UHPA_ACCESS_TOPIC(child), "$SYS", child->topic_len+1);
}
child->subs = NULL;
child->children = NULL;
child->retained = NULL;
db->subs.children->next = child;
subhier = sub__add_hier_entry(&db->subs, "$SYS", strlen("$SYS"));
if(!subhier) return MOSQ_ERR_NOMEM;
db->unpwd = NULL;
@ -107,36 +63,35 @@ int db__open(struct mosquitto__config *config, struct mosquitto_db *db)
}
#endif
return rc;
return MOSQ_ERR_SUCCESS;
}
static void subhier_clean(struct mosquitto_db *db, struct mosquitto__subhier *subhier)
{
struct mosquitto__subhier *next;
struct mosquitto__subhier *peer, *subhier_tmp;
struct mosquitto__subleaf *leaf, *nextleaf;
while(subhier){
next = subhier->next;
leaf = subhier->subs;
HASH_ITER(hh, subhier, peer, subhier_tmp){
leaf = peer->subs;
while(leaf){
nextleaf = leaf->next;
mosquitto__free(leaf);
leaf = nextleaf;
}
if(subhier->retained){
db__msg_store_deref(db, &subhier->retained);
if(peer->retained){
db__msg_store_deref(db, &peer->retained);
}
subhier_clean(db, subhier->children);
UHPA_FREE_TOPIC(subhier);
subhier_clean(db, peer->children);
UHPA_FREE_TOPIC(peer);
mosquitto__free(subhier);
subhier = next;
HASH_DELETE(hh, subhier, peer);
mosquitto__free(peer);
}
}
int db__close(struct mosquitto_db *db)
{
subhier_clean(db, db->subs.children);
subhier_clean(db, db->subs);
db__msg_store_clean(db);
return MOSQ_ERR_SUCCESS;

View File

@ -22,16 +22,6 @@ Contributors:
#include "mosquitto_broker_internal.h"
#include "memory_mosq.h"
#include "packet_mosq.h"
/*
#include "mosquitto_broker_internal.h"
#include "mqtt3_protocol.h"
#include "send_mosq.h"
#include "sys_tree.h"
#include "time_mosq.h"
#include "tls_mosq.h"
#include "util_mosq.h"
*/

View File

@ -76,7 +76,7 @@ int handle__unsubscribe(struct mosquitto_db *db, struct mosquitto *context)
}
log__printf(NULL, MOSQ_LOG_DEBUG, "\t%s", sub);
sub__remove(db, context, sub, &db->subs);
sub__remove(db, context, sub, db->subs);
log__printf(NULL, MOSQ_LOG_UNSUBSCRIBE, "%s %s", context->id, sub);
mosquitto__free(sub);
}

View File

@ -119,6 +119,8 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
sigemptyset(&sigblock);
sigaddset(&sigblock, SIGINT);
sigaddset(&sigblock, SIGTERM);
sigaddset(&sigblock, SIGUSR1);
sigaddset(&sigblock, SIGUSR2);
#endif
if(db->config->persistent_client_expiration > 0){
@ -356,7 +358,7 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
flag_reload = false;
}
if(flag_tree_print){
sub__tree_print(&db->subs, 0);
sub__tree_print(db->subs, 0);
flag_tree_print = false;
}
#ifdef WITH_WEBSOCKETS

View File

@ -231,9 +231,9 @@ struct mosquitto__subleaf {
};
struct mosquitto__subhier {
UT_hash_handle hh;
struct mosquitto__subhier *parent;
struct mosquitto__subhier *children;
struct mosquitto__subhier *next;
struct mosquitto__subleaf *subs;
struct mosquitto_msg_store *retained;
mosquitto__topic_element_uhpa topic;
@ -341,7 +341,7 @@ struct mosquitto__auth_plugin{
struct mosquitto_db{
dbid_t last_db_id;
struct mosquitto__subhier subs;
struct mosquitto__subhier *subs;
struct mosquitto__unpwd *unpwd;
struct mosquitto__acl_user *acl_list;
struct mosquitto__acl *acl_patterns;
@ -537,7 +537,8 @@ void sys_tree__update(struct mosquitto_db *db, int interval, time_t start_time);
/* ============================================================
* Subscription functions
* ============================================================ */
int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int qos, struct mosquitto__subhier *root);
int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int qos, struct mosquitto__subhier **root);
struct mosquitto__subhier *sub__add_hier_entry(struct mosquitto__subhier **parent, const char *topic, size_t len);
int sub__remove(struct mosquitto_db *db, struct mosquitto *context, const char *sub, struct mosquitto__subhier *root);
void sub__tree_print(struct mosquitto__subhier *root, int level);
int sub__clean_session(struct mosquitto_db *db, struct mosquitto *context);

View File

@ -257,7 +257,7 @@ error:
static int persist__subs_retain_write(struct mosquitto_db *db, FILE *db_fptr, struct mosquitto__subhier *node, const char *topic, int level)
{
struct mosquitto__subhier *subhier;
struct mosquitto__subhier *subhier, *subhier_tmp;
struct mosquitto__subleaf *sub;
char *thistopic;
uint32_t length;
@ -311,10 +311,8 @@ static int persist__subs_retain_write(struct mosquitto_db *db, FILE *db_fptr, st
}
}
subhier = node->children;
while(subhier){
HASH_ITER(hh, node->children, subhier, subhier_tmp){
persist__subs_retain_write(db, db_fptr, subhier, thistopic, level+1);
subhier = subhier->next;
}
mosquitto__free(thistopic);
return MOSQ_ERR_SUCCESS;
@ -325,14 +323,12 @@ error:
static int persist__subs_retain_write_all(struct mosquitto_db *db, FILE *db_fptr)
{
struct mosquitto__subhier *subhier;
struct mosquitto__subhier *subhier, *subhier_tmp;
subhier = db->subs.children;
while(subhier){
HASH_ITER(hh, db->subs, subhier, subhier_tmp){
if(subhier->children){
persist__subs_retain_write(db, db_fptr, subhier->children, "", 0);
}
subhier = subhier->next;
}
return MOSQ_ERR_SUCCESS;

View File

@ -55,8 +55,6 @@ Contributors:
#include "memory_mosq.h"
#include "util_mosq.h"
struct mosquitto_db int_db;
extern bool flag_reload;
#ifdef WITH_PERSISTENCE
extern bool flag_db_backup;
@ -64,10 +62,6 @@ extern bool flag_db_backup;
extern bool flag_tree_print;
extern int run;
void handle_sigint(int signal);
void handle_sigusr1(int signal);
void handle_sigusr2(int signal);
#ifdef SIGHUP
/* Signal handler for SIGHUP - flag a config reload. */
void handle_sighup(int signal)

View File

@ -183,7 +183,7 @@ static int sub__topic_tokenise(const char *subtopic, struct sub__token **topics)
assert(topics);
if(subtopic[0] != '$'){
new_topic = sub__topic_append(&tail, topics, "");
new_topic = sub__topic_append(&tail, topics, " ");
if(!new_topic) goto cleanup;
}
@ -248,7 +248,7 @@ static void sub__topic_tokens_free(struct sub__token *tokens)
static int sub__add_recurse(struct mosquitto_db *db, struct mosquitto *context, int qos, struct mosquitto__subhier *subhier, struct sub__token *tokens)
/* FIXME - this function has the potential to leak subhier, audit calling functions. */
{
struct mosquitto__subhier *branch, *last = NULL;
struct mosquitto__subhier *branch;
struct mosquitto__subleaf *leaf, *last_leaf;
struct mosquitto__subhier **subs;
int i;
@ -309,35 +309,21 @@ static int sub__add_recurse(struct mosquitto_db *db, struct mosquitto *context,
return MOSQ_ERR_SUCCESS;
}
branch = subhier->children;
while(branch){
if(!strcmp(UHPA_ACCESS_TOPIC(branch), UHPA_ACCESS_TOPIC(tokens))){
return sub__add_recurse(db, context, qos, branch, tokens->next);
}
last = branch;
branch = branch->next;
}
/* Not found */
branch = mosquitto__calloc(1, sizeof(struct mosquitto__subhier));
if(!branch) return MOSQ_ERR_NOMEM;
branch->parent = subhier;
branch->topic_len = tokens->topic_len;
if(UHPA_ALLOC_TOPIC(branch) == 0){
mosquitto__free(branch);
return MOSQ_ERR_NOMEM;
}
strncpy(UHPA_ACCESS_TOPIC(branch), UHPA_ACCESS_TOPIC(tokens), branch->topic_len+1);
if(!last){
subhier->children = branch;
HASH_FIND(hh, subhier->children, UHPA_ACCESS_TOPIC(tokens), tokens->topic_len, branch);
if(branch){
return sub__add_recurse(db, context, qos, branch, tokens->next);
}else{
last->next = branch;
/* Not found */
branch = sub__add_hier_entry(&subhier->children, UHPA_ACCESS_TOPIC(tokens), tokens->topic_len+1);
if(!branch) return MOSQ_ERR_NOMEM;
return sub__add_recurse(db, context, qos, branch, tokens->next);
}
return sub__add_recurse(db, context, qos, branch, tokens->next);
}
static int sub__remove_recurse(struct mosquitto_db *db, struct mosquitto *context, struct mosquitto__subhier *subhier, struct sub__token *tokens)
{
struct mosquitto__subhier *branch, *last = NULL;
struct mosquitto__subhier *branch;
struct mosquitto__subleaf *leaf;
int i;
@ -375,23 +361,14 @@ static int sub__remove_recurse(struct mosquitto_db *db, struct mosquitto *contex
return MOSQ_ERR_SUCCESS;
}
branch = subhier->children;
while(branch){
if(!strcmp(UHPA_ACCESS_TOPIC(branch), UHPA_ACCESS_TOPIC(tokens))){
sub__remove_recurse(db, context, branch, tokens->next);
if(!branch->children && !branch->subs && !branch->retained){
if(last){
last->next = branch->next;
}else{
subhier->children = branch->next;
}
UHPA_FREE_TOPIC(branch);
mosquitto__free(branch);
}
return MOSQ_ERR_SUCCESS;
HASH_FIND(hh, subhier->children, UHPA_ACCESS_TOPIC(tokens), tokens->topic_len, branch);
if(branch){
sub__remove_recurse(db, context, branch, tokens->next);
if(!branch->children && !branch->subs && !branch->retained){
HASH_DELETE(hh, subhier->children, branch);
UHPA_FREE_TOPIC(branch);
mosquitto__free(branch);
}
last = branch;
branch = branch->next;
}
return MOSQ_ERR_SUCCESS;
}
@ -399,11 +376,10 @@ static int sub__remove_recurse(struct mosquitto_db *db, struct mosquitto *contex
static void sub__search(struct mosquitto_db *db, struct mosquitto__subhier *subhier, struct sub__token *tokens, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored, bool set_retain)
{
/* FIXME - need to take into account source_id if the client is a bridge */
struct mosquitto__subhier *branch;
struct mosquitto__subhier *branch, *branch_tmp;
bool sr;
branch = subhier->children;
while(branch){
HASH_ITER(hh, subhier->children, branch, branch_tmp){
sr = set_retain;
if(tokens && UHPA_ACCESS_TOPIC(tokens)
@ -426,57 +402,73 @@ static void sub__search(struct mosquitto_db *db, struct mosquitto__subhier *subh
*/
subs__process(db, branch, source_id, topic, qos, retain, stored, false);
}
branch = branch->next;
}
}
int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int qos, struct mosquitto__subhier *root)
struct mosquitto__subhier *sub__add_hier_entry(struct mosquitto__subhier **parent, const char *topic, size_t len)
{
struct mosquitto__subhier *child;
assert(parent);
child = mosquitto__malloc(sizeof(struct mosquitto__subhier));
if(!child){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return NULL;
}
child->parent = *parent;
child->topic_len = strlen(topic);
if(UHPA_ALLOC_TOPIC(child) == 0){
mosquitto__free(child);
child->topic_len = 0;
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return NULL;
}else{
strncpy(UHPA_ACCESS_TOPIC(child), topic, child->topic_len+1);
}
child->subs = NULL;
child->children = NULL;
child->retained = NULL;
if(child->topic_len+1 > sizeof(child->topic.array)){
if(child->topic.ptr){
HASH_ADD_KEYPTR(hh, *parent, child->topic.ptr, child->topic_len, child);
}else{
mosquitto__free(child);
return NULL;
}
}else{
HASH_ADD(hh, *parent, topic.array, child->topic_len, child);
}
return child;
}
int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int qos, struct mosquitto__subhier **root)
{
int rc = 0;
struct mosquitto__subhier *subhier, *child;
struct mosquitto__subhier *subhier;
struct sub__token *tokens = NULL;
assert(root);
assert(*root);
assert(sub);
if(sub__topic_tokenise(sub, &tokens)) return 1;
subhier = root->children;
while(subhier){
if(!strcmp(UHPA_ACCESS_TOPIC(subhier), UHPA_ACCESS_TOPIC(tokens))){
rc = sub__add_recurse(db, context, qos, subhier, tokens);
break;
}
subhier = subhier->next;
}
HASH_FIND(hh, *root, UHPA_ACCESS_TOPIC(tokens), tokens->topic_len, subhier);
if(!subhier){
child = mosquitto__malloc(sizeof(struct mosquitto__subhier));
if(!child){
subhier = sub__add_hier_entry(root, UHPA_ACCESS_TOPIC(tokens), tokens->topic_len+1);
if(!subhier){
sub__topic_tokens_free(tokens);
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
child->parent = root;
child->topic_len = tokens->topic_len;
if(UHPA_ALLOC_TOPIC(child) == 0){
sub__topic_tokens_free(tokens);
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
strncpy(UHPA_ACCESS_TOPIC(child), UHPA_ACCESS_TOPIC(tokens), child->topic_len+1);
child->subs = NULL;
child->children = NULL;
child->retained = NULL;
if(root->children){
child->next = root->children;
}else{
child->next = NULL;
}
root->children = child;
rc = sub__add_recurse(db, context, qos, child, tokens);
}
rc = sub__add_recurse(db, context, qos, subhier, tokens);
sub__topic_tokens_free(tokens);
@ -496,13 +488,11 @@ int sub__remove(struct mosquitto_db *db, struct mosquitto *context, const char *
if(sub__topic_tokenise(sub, &tokens)) return 1;
subhier = root->children;
while(subhier){
if(!strcmp(UHPA_ACCESS_TOPIC(subhier), UHPA_ACCESS_TOPIC(tokens))){
rc = sub__remove_recurse(db, context, subhier, tokens);
break;
}
subhier = subhier->next;
HASH_FIND(hh, root, UHPA_ACCESS_TOPIC(tokens), tokens->topic_len, subhier);
if(subhier){
rc = sub__remove_recurse(db, context, subhier, tokens);
}else{
printf("nope\n");
}
sub__topic_tokens_free(tokens);
@ -527,18 +517,15 @@ int sub__messages_queue(struct mosquitto_db *db, const char *source_id, const ch
*/
(*stored)->ref_count++;
subhier = db->subs.children;
while(subhier){
if(!strcmp(UHPA_ACCESS_TOPIC(subhier), UHPA_ACCESS_TOPIC(tokens))){
if(retain){
/* We have a message that needs to be retained, so ensure that the subscription
* tree for its topic exists.
*/
sub__add_recurse(db, NULL, 0, subhier, tokens);
}
sub__search(db, subhier, tokens, source_id, topic, qos, retain, *stored, true);
HASH_FIND(hh, db->subs, UHPA_ACCESS_TOPIC(tokens), tokens->topic_len, subhier);
if(subhier){
if(retain){
/* We have a message that needs to be retained, so ensure that the subscription
* tree for its topic exists.
*/
sub__add_recurse(db, NULL, 0, subhier, tokens);
}
subhier = subhier->next;
sub__search(db, subhier, tokens, source_id, topic, qos, retain, *stored, true);
}
sub__topic_tokens_free(tokens);
@ -553,8 +540,6 @@ int sub__messages_queue(struct mosquitto_db *db, const char *source_id, const ch
static struct mosquitto__subhier *tmp_remove_subs(struct mosquitto__subhier *sub)
{
struct mosquitto__subhier *parent;
struct mosquitto__subhier *hier;
struct mosquitto__subhier *last = NULL;
if(!sub || !sub->parent){
return NULL;
@ -565,22 +550,8 @@ static struct mosquitto__subhier *tmp_remove_subs(struct mosquitto__subhier *sub
}
parent = sub->parent;
hier = sub->parent->children;
HASH_DELETE(hh, parent, sub);
while(hier){
if(hier == sub){
if(last){
last->next = hier->next;
}else{
parent->children = hier->next;
}
UHPA_FREE_TOPIC(sub);
mosquitto__free(sub);
break;
}
last = hier;
hier = hier->next;
}
if(parent->subs == NULL
&& parent->children == NULL
&& parent->retained == NULL
@ -646,15 +617,16 @@ int sub__clean_session(struct mosquitto_db *db, struct mosquitto *context)
void sub__tree_print(struct mosquitto__subhier *root, int level)
{
int i;
struct mosquitto__subhier *branch;
struct mosquitto__subhier *branch, *branch_tmp;
struct mosquitto__subleaf *leaf;
if(level > 1){
for(i=0; i<(level-2)*2; i++){
HASH_ITER(hh, root, branch, branch_tmp){
if(level > -1){
for(i=0; i<(level+2)*2; i++){
printf(" ");
}
printf("%s", UHPA_ACCESS_TOPIC(root));
leaf = root->subs;
printf("%s", UHPA_ACCESS_TOPIC(branch));
leaf = branch->subs;
while(leaf){
if(leaf->context){
printf(" (%s, %d)", leaf->context->id, leaf->qos);
@ -663,16 +635,13 @@ void sub__tree_print(struct mosquitto__subhier *root, int level)
}
leaf = leaf->next;
}
if(root->retained){
if(branch->retained){
printf(" (r)");
}
printf("\n");
}
branch = root->children;
while(branch){
sub__tree_print(branch, level+1);
branch = branch->next;
sub__tree_print(branch->children, level+1);
}
}
@ -702,11 +671,10 @@ static int retain__process(struct mosquitto_db *db, struct mosquitto_msg_store *
static int retain__search(struct mosquitto_db *db, struct mosquitto__subhier *subhier, struct sub__token *tokens, struct mosquitto *context, const char *sub, int sub_qos, int level)
{
struct mosquitto__subhier *branch;
struct mosquitto__subhier *branch, *branch_tmp;
int flag = 0;
branch = subhier->children;
while(branch){
HASH_ITER(hh, subhier->children, branch, branch_tmp){
/* Subscriptions with wildcards in aren't really valid topics to publish to
* so they can't have retained messages.
*/
@ -727,7 +695,7 @@ static int retain__search(struct mosquitto_db *db, struct mosquitto__subhier *su
|| !strcmp(UHPA_ACCESS_TOPIC(tokens), "+"))){
if(tokens->next){
if(retain__search(db, branch, tokens->next, context, sub, sub_qos, level+1) == -1
|| (!branch->next && tokens->next && !strcmp(UHPA_ACCESS_TOPIC(tokens->next), "#") && level>0)){
|| (!branch_tmp && tokens->next && !strcmp(UHPA_ACCESS_TOPIC(tokens->next), "#") && level>0)){
if(branch->retained){
retain__process(db, branch->retained, context, sub, sub_qos);
@ -739,8 +707,6 @@ static int retain__search(struct mosquitto_db *db, struct mosquitto__subhier *su
}
}
}
branch = branch->next;
}
return flag;
}
@ -756,13 +722,10 @@ int sub__retain_queue(struct mosquitto_db *db, struct mosquitto *context, const
if(sub__topic_tokenise(sub, &tokens)) return 1;
subhier = db->subs.children;
while(subhier){
if(!strcmp(UHPA_ACCESS_TOPIC(subhier), UHPA_ACCESS_TOPIC(tokens))){
retain__search(db, subhier, tokens, context, sub, sub_qos, 0);
break;
}
subhier = subhier->next;
HASH_FIND(hh, db->subs, UHPA_ACCESS_TOPIC(tokens), tokens->topic_len, subhier);
if(subhier){
retain__search(db, subhier, tokens, context, sub, sub_qos, 0);
}
while(tokens){
tail = tokens->next;

View File

@ -28,7 +28,7 @@ mid_unsub = 593
unsubscribe_packet = mosq_test.gen_unsubscribe(mid_unsub, "retain/clear/test")
unsuback_packet = mosq_test.gen_unsuback(mid_unsub)
cmd = ['../../src/mosquitto', '-p', '1888']
cmd = ['../../src/mosquitto', '-p', '1888', '-v']
broker = mosq_test.start_broker(filename=os.path.basename(__file__), cmd=cmd)
try:

View File

@ -10,7 +10,7 @@ def start_broker(filename, cmd=None, port=1888):
if cmd is None:
cmd = ['../../src/mosquitto', '-v', '-c', filename.replace('.py', '.conf')]
if os.environ.get('MOSQ_USE_VALGRIND') is not None:
cmd = ['valgrind', '--trace-children=yes', '-v', '--log-file='+filename+'.vglog'] + cmd
cmd = ['valgrind', '--trace-children=yes', '--leak-check=full', '--show-leak-kinds=all', '--log-file='+filename+'.vglog'] + cmd
delay = 1
broker = subprocess.Popen(cmd, stderr=subprocess.PIPE)