Shared subscription support.
This commit is contained in:
parent
173b209bd8
commit
8a8d13cf96
@ -259,7 +259,9 @@ struct mosquitto {
|
||||
time_t disconnect_t;
|
||||
struct mosquitto__packet *out_packet_last;
|
||||
struct mosquitto__subhier **subs;
|
||||
struct mosquitto__subshared_ref **shared_subs;
|
||||
int sub_count;
|
||||
int shared_sub_count;
|
||||
int pollfd_index;
|
||||
# ifdef WITH_WEBSOCKETS
|
||||
# if defined(LWS_LIBRARY_VERSION_NUMBER)
|
||||
|
@ -297,11 +297,25 @@ struct mosquitto__subleaf {
|
||||
bool retain_as_published;
|
||||
};
|
||||
|
||||
|
||||
struct mosquitto__subshared_ref {
|
||||
struct mosquitto__subhier *hier;
|
||||
struct mosquitto__subshared *shared;
|
||||
};
|
||||
|
||||
|
||||
struct mosquitto__subshared {
|
||||
UT_hash_handle hh;
|
||||
char *name;
|
||||
struct mosquitto__subleaf *subs;
|
||||
};
|
||||
|
||||
struct mosquitto__subhier {
|
||||
UT_hash_handle hh;
|
||||
struct mosquitto__subhier *parent;
|
||||
struct mosquitto__subhier *children;
|
||||
struct mosquitto__subleaf *subs;
|
||||
struct mosquitto__subshared *shared;
|
||||
struct mosquitto_msg_store *retained;
|
||||
char *topic;
|
||||
uint16_t topic_len;
|
||||
@ -397,6 +411,7 @@ struct mosquitto_db{
|
||||
bool verbose;
|
||||
#ifdef WITH_SYS_TREE
|
||||
int subscription_count;
|
||||
int shared_subscription_count;
|
||||
int retained_count;
|
||||
#endif
|
||||
int persistence_changes;
|
||||
|
481
src/subs.c
481
src/subs.c
@ -56,58 +56,27 @@ Contributors:
|
||||
#include "mqtt_protocol.h"
|
||||
#include "util_mosq.h"
|
||||
|
||||
#include "utlist.h"
|
||||
|
||||
struct sub__token {
|
||||
struct sub__token *next;
|
||||
char *topic;
|
||||
uint16_t topic_len;
|
||||
};
|
||||
|
||||
static int subs__process(struct mosquitto_db *db, struct mosquitto__subhier *hier, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored, bool set_retain)
|
||||
|
||||
static int subs__send(struct mosquitto_db *db, struct mosquitto__subleaf *leaf, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored)
|
||||
{
|
||||
int rc = 0;
|
||||
int rc2;
|
||||
int client_qos, msg_qos;
|
||||
uint16_t mid;
|
||||
struct mosquitto__subleaf *leaf;
|
||||
bool client_retain;
|
||||
uint16_t mid;
|
||||
int client_qos, msg_qos;
|
||||
mosquitto_property *properties = NULL;
|
||||
int rc2;
|
||||
|
||||
leaf = hier->subs;
|
||||
|
||||
if(retain && set_retain){
|
||||
#ifdef WITH_PERSISTENCE
|
||||
if(strncmp(topic, "$SYS", 4)){
|
||||
/* Retained messages count as a persistence change, but only if
|
||||
* they aren't for $SYS. */
|
||||
db->persistence_changes++;
|
||||
}
|
||||
#endif
|
||||
if(hier->retained){
|
||||
db__msg_store_deref(db, &hier->retained);
|
||||
#ifdef WITH_SYS_TREE
|
||||
db->retained_count--;
|
||||
#endif
|
||||
}
|
||||
if(stored->payloadlen){
|
||||
hier->retained = stored;
|
||||
hier->retained->ref_count++;
|
||||
#ifdef WITH_SYS_TREE
|
||||
db->retained_count++;
|
||||
#endif
|
||||
}else{
|
||||
hier->retained = NULL;
|
||||
}
|
||||
}
|
||||
while(source_id && leaf){
|
||||
if(!leaf->context->id || (leaf->no_local && !strcmp(leaf->context->id, source_id))){
|
||||
leaf = leaf->next;
|
||||
continue;
|
||||
}
|
||||
/* Check for ACL topic access. */
|
||||
rc2 = mosquitto_acl_check(db, leaf->context, topic, stored->payloadlen, UHPA_ACCESS(stored->payload, stored->payloadlen), stored->qos, stored->retain, MOSQ_ACL_READ);
|
||||
if(rc2 == MOSQ_ERR_ACL_DENIED){
|
||||
leaf = leaf->next;
|
||||
continue;
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}else if(rc2 == MOSQ_ERR_SUCCESS){
|
||||
client_qos = leaf->qos;
|
||||
|
||||
@ -133,13 +102,81 @@ static int subs__process(struct mosquitto_db *db, struct mosquitto__subhier *hie
|
||||
if(leaf->identifier){
|
||||
mosquitto_property_add_varint(&properties, MQTT_PROP_SUBSCRIPTION_IDENTIFIER, leaf->identifier);
|
||||
}
|
||||
if(db__message_insert(db, leaf->context, mid, mosq_md_out, msg_qos, client_retain, stored, properties) == 1) rc = 1;
|
||||
if(db__message_insert(db, leaf->context, mid, mosq_md_out, msg_qos, client_retain, stored, properties) == 1){
|
||||
return 1;
|
||||
}
|
||||
}else{
|
||||
return 1; /* Application error */
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
static int subs__shared_process(struct mosquitto_db *db, struct mosquitto__subhier *hier, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored, bool set_retain)
|
||||
{
|
||||
int rc = 0, rc2;
|
||||
struct mosquitto__subshared *shared, *shared_tmp;
|
||||
struct mosquitto__subleaf *leaf;
|
||||
|
||||
HASH_ITER(hh, hier->shared, shared, shared_tmp){
|
||||
leaf = shared->subs;
|
||||
rc2 = subs__send(db, leaf, topic, qos, retain, stored);
|
||||
/* Remove current from the top, add back to the bottom */
|
||||
DL_DELETE(shared->subs, leaf);
|
||||
DL_APPEND(shared->subs, leaf);
|
||||
|
||||
if(rc2) rc = 1;
|
||||
}
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
static int subs__process(struct mosquitto_db *db, struct mosquitto__subhier *hier, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored, bool set_retain)
|
||||
{
|
||||
int rc = 0;
|
||||
int rc2;
|
||||
struct mosquitto__subleaf *leaf;
|
||||
|
||||
if(retain && set_retain){
|
||||
#ifdef WITH_PERSISTENCE
|
||||
if(strncmp(topic, "$SYS", 4)){
|
||||
/* Retained messages count as a persistence change, but only if
|
||||
* they aren't for $SYS. */
|
||||
db->persistence_changes++;
|
||||
}
|
||||
#endif
|
||||
if(hier->retained){
|
||||
db__msg_store_deref(db, &hier->retained);
|
||||
#ifdef WITH_SYS_TREE
|
||||
db->retained_count--;
|
||||
#endif
|
||||
}
|
||||
if(stored->payloadlen){
|
||||
hier->retained = stored;
|
||||
hier->retained->ref_count++;
|
||||
#ifdef WITH_SYS_TREE
|
||||
db->retained_count++;
|
||||
#endif
|
||||
}else{
|
||||
hier->retained = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
rc2 = subs__shared_process(db, hier, source_id, topic, qos, retain, stored, set_retain);
|
||||
|
||||
leaf = hier->subs;
|
||||
while(source_id && leaf){
|
||||
if(!leaf->context->id || (leaf->no_local && !strcmp(leaf->context->id, source_id))){
|
||||
leaf = leaf->next;
|
||||
continue;
|
||||
}
|
||||
rc2 = subs__send(db, leaf, topic, qos, retain, stored);
|
||||
if(rc2){
|
||||
rc = 1;
|
||||
}
|
||||
leaf = leaf->next;
|
||||
}
|
||||
if(hier->subs){
|
||||
if(hier->subs || hier->shared){
|
||||
return rc;
|
||||
}else{
|
||||
return MOSQ_ERR_NO_SUBSCRIBERS;
|
||||
@ -251,18 +288,14 @@ static void sub__topic_tokens_free(struct sub__token *tokens)
|
||||
}
|
||||
}
|
||||
|
||||
static int sub__add_recurse(struct mosquitto_db *db, struct mosquitto *context, int qos, uint32_t identifier, int options, struct mosquitto__subhier *subhier, struct sub__token *tokens)
|
||||
/* FIXME - this function has the potential to leak subhier, audit calling functions. */
|
||||
{
|
||||
struct mosquitto__subhier *branch;
|
||||
struct mosquitto__subleaf *leaf, *last_leaf;
|
||||
struct mosquitto__subhier **subs;
|
||||
int i;
|
||||
|
||||
if(!tokens){
|
||||
if(context && context->id){
|
||||
leaf = subhier->subs;
|
||||
last_leaf = NULL;
|
||||
static int sub__add_leaf(struct mosquitto *context, int qos, uint32_t identifier, int options, struct mosquitto__subleaf **head, struct mosquitto__subleaf **newleaf)
|
||||
{
|
||||
struct mosquitto__subleaf *leaf;
|
||||
|
||||
*newleaf = NULL;
|
||||
leaf = *head;
|
||||
|
||||
while(leaf){
|
||||
if(leaf->context && leaf->context->id && !strcmp(leaf->context->id, context->id)){
|
||||
/* Client making a second subscription to same topic. Only
|
||||
@ -270,15 +303,8 @@ static int sub__add_recurse(struct mosquitto_db *db, struct mosquitto *context,
|
||||
* indicate this to the calling function. */
|
||||
leaf->qos = qos;
|
||||
leaf->identifier = identifier;
|
||||
if(context->protocol == mosq_p_mqtt31 || context->protocol == mosq_p_mqtt5){
|
||||
return MOSQ_ERR_SUB_EXISTS;
|
||||
}else{
|
||||
/* mqttv311/mqttv5 requires retained messages are resent on
|
||||
* resubscribe. */
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}
|
||||
}
|
||||
last_leaf = leaf;
|
||||
leaf = leaf->next;
|
||||
}
|
||||
leaf = mosquitto__malloc(sizeof(struct mosquitto__subleaf));
|
||||
@ -289,6 +315,112 @@ static int sub__add_recurse(struct mosquitto_db *db, struct mosquitto *context,
|
||||
leaf->identifier = identifier;
|
||||
leaf->no_local = ((options & MQTT_SUB_OPT_NO_LOCAL) != 0);
|
||||
leaf->retain_as_published = ((options & MQTT_SUB_OPT_RETAIN_AS_PUBLISHED) != 0);
|
||||
|
||||
DL_APPEND(*head, leaf);
|
||||
*newleaf = leaf;
|
||||
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
static void sub__remove_shared_leaf(struct mosquitto__subhier *subhier, struct mosquitto__subshared *shared, struct mosquitto__subleaf *leaf)
|
||||
{
|
||||
DL_DELETE(shared->subs, leaf);
|
||||
if(shared->subs == NULL){
|
||||
HASH_DELETE(hh, subhier->shared, shared);
|
||||
mosquitto__free(shared->name);
|
||||
mosquitto__free(shared);
|
||||
}
|
||||
mosquitto__free(leaf);
|
||||
}
|
||||
|
||||
|
||||
static int sub__add_shared(struct mosquitto_db *db, struct mosquitto *context, int qos, uint32_t identifier, int options, struct mosquitto__subhier *subhier, struct sub__token *tokens, char *sharename)
|
||||
{
|
||||
struct mosquitto__subleaf *newleaf;
|
||||
struct mosquitto__subshared *shared = NULL;
|
||||
struct mosquitto__subshared_ref **shared_subs;
|
||||
struct mosquitto__subshared_ref *shared_ref;
|
||||
int i;
|
||||
int slen;
|
||||
int rc;
|
||||
|
||||
slen = strlen(sharename);
|
||||
|
||||
HASH_FIND(hh, subhier->shared, sharename, slen, shared);
|
||||
if(shared){
|
||||
mosquitto__free(sharename);
|
||||
}else{
|
||||
shared = mosquitto__calloc(1, sizeof(struct mosquitto__subshared));
|
||||
if(!shared){
|
||||
mosquitto__free(sharename);
|
||||
return MOSQ_ERR_NOMEM;
|
||||
}
|
||||
shared->name = sharename;
|
||||
|
||||
HASH_ADD_KEYPTR(hh, subhier->shared, shared->name, slen, shared);
|
||||
}
|
||||
|
||||
rc = sub__add_leaf(context, qos, identifier, options, &shared->subs, &newleaf);
|
||||
if(rc > 0){
|
||||
if(shared->subs == NULL){
|
||||
HASH_DELETE(hh, subhier->shared, shared);
|
||||
mosquitto__free(shared->name);
|
||||
mosquitto__free(shared);
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
if(rc != MOSQ_ERR_SUB_EXISTS){
|
||||
shared_ref = mosquitto__calloc(1, sizeof(struct mosquitto__subshared_ref));
|
||||
if(!shared_ref){
|
||||
sub__remove_shared_leaf(subhier, shared, newleaf);
|
||||
return MOSQ_ERR_NOMEM;
|
||||
}
|
||||
shared_ref->hier = subhier;
|
||||
shared_ref->shared = shared;
|
||||
|
||||
for(i=0; i<context->shared_sub_count; i++){
|
||||
if(!context->shared_subs[i]){
|
||||
context->shared_subs[i] = shared_ref;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if(i == context->shared_sub_count){
|
||||
shared_subs = mosquitto__realloc(context->shared_subs, sizeof(struct mosquitto__subhier_ref *)*(context->shared_sub_count + 1));
|
||||
if(!shared_subs){
|
||||
sub__remove_shared_leaf(subhier, shared, newleaf);
|
||||
return MOSQ_ERR_NOMEM;
|
||||
}
|
||||
context->shared_subs = shared_subs;
|
||||
context->shared_sub_count++;
|
||||
context->shared_subs[context->shared_sub_count-1] = shared_ref;
|
||||
}
|
||||
#ifdef WITH_SYS_TREE
|
||||
db->shared_subscription_count++;
|
||||
#endif
|
||||
}
|
||||
|
||||
if(context->protocol == mosq_p_mqtt31 || context->protocol == mosq_p_mqtt5){
|
||||
return rc;
|
||||
}else{
|
||||
/* mqttv311/mqttv5 requires retained messages are resent on
|
||||
* resubscribe. */
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static int sub__add_normal(struct mosquitto_db *db, struct mosquitto *context, int qos, uint32_t identifier, int options, struct mosquitto__subhier *subhier, struct sub__token *tokens)
|
||||
{
|
||||
struct mosquitto__subleaf *newleaf;
|
||||
struct mosquitto__subhier **subs;
|
||||
int i;
|
||||
int rc;
|
||||
|
||||
rc = sub__add_leaf(context, qos, identifier, options, &subhier->subs, &newleaf);
|
||||
|
||||
if(rc != MOSQ_ERR_SUB_EXISTS){
|
||||
for(i=0; i<context->sub_count; i++){
|
||||
if(!context->subs[i]){
|
||||
context->subs[i] = subhier;
|
||||
@ -298,60 +430,71 @@ static int sub__add_recurse(struct mosquitto_db *db, struct mosquitto *context,
|
||||
if(i == context->sub_count){
|
||||
subs = mosquitto__realloc(context->subs, sizeof(struct mosquitto__subhier *)*(context->sub_count + 1));
|
||||
if(!subs){
|
||||
mosquitto__free(leaf);
|
||||
DL_DELETE(subhier->subs, newleaf);
|
||||
mosquitto__free(newleaf);
|
||||
return MOSQ_ERR_NOMEM;
|
||||
}
|
||||
context->subs = subs;
|
||||
context->sub_count++;
|
||||
context->subs[context->sub_count-1] = subhier;
|
||||
}
|
||||
if(last_leaf){
|
||||
last_leaf->next = leaf;
|
||||
leaf->prev = last_leaf;
|
||||
}else{
|
||||
subhier->subs = leaf;
|
||||
leaf->prev = NULL;
|
||||
}
|
||||
#ifdef WITH_SYS_TREE
|
||||
db->subscription_count++;
|
||||
#endif
|
||||
}
|
||||
|
||||
if(context->protocol == mosq_p_mqtt31 || context->protocol == mosq_p_mqtt5){
|
||||
return rc;
|
||||
}else{
|
||||
/* mqttv311/mqttv5 requires retained messages are resent on
|
||||
* resubscribe. */
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static int sub__add_recurse(struct mosquitto_db *db, struct mosquitto *context, int qos, uint32_t identifier, int options, struct mosquitto__subhier *subhier, struct sub__token *tokens, char *sharename)
|
||||
/* FIXME - this function has the potential to leak subhier, audit calling functions. */
|
||||
{
|
||||
struct mosquitto__subhier *branch;
|
||||
|
||||
if(!tokens){
|
||||
if(context && context->id){
|
||||
if(sharename){
|
||||
return sub__add_shared(db, context, qos, identifier, options, subhier, tokens, sharename);
|
||||
}else{
|
||||
return sub__add_normal(db, context, qos, identifier, options, subhier, tokens);
|
||||
}
|
||||
}else{
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
HASH_FIND(hh, subhier->children, tokens->topic, tokens->topic_len, branch);
|
||||
if(branch){
|
||||
return sub__add_recurse(db, context, qos, identifier, options, branch, tokens->next);
|
||||
return sub__add_recurse(db, context, qos, identifier, options, branch, tokens->next, sharename);
|
||||
}else{
|
||||
/* Not found */
|
||||
branch = sub__add_hier_entry(subhier, &subhier->children, tokens->topic, tokens->topic_len);
|
||||
if(!branch) return MOSQ_ERR_NOMEM;
|
||||
|
||||
return sub__add_recurse(db, context, qos, identifier, options, branch, tokens->next);
|
||||
return sub__add_recurse(db, context, qos, identifier, options, branch, tokens->next, sharename);
|
||||
}
|
||||
}
|
||||
|
||||
static int sub__remove_recurse(struct mosquitto_db *db, struct mosquitto *context, struct mosquitto__subhier *subhier, struct sub__token *tokens, uint8_t *reason)
|
||||
|
||||
static int sub__remove_normal(struct mosquitto_db *db, struct mosquitto *context, struct mosquitto__subhier *subhier, uint8_t *reason)
|
||||
{
|
||||
struct mosquitto__subhier *branch;
|
||||
struct mosquitto__subleaf *leaf;
|
||||
int i;
|
||||
|
||||
if(!tokens){
|
||||
leaf = subhier->subs;
|
||||
while(leaf){
|
||||
if(leaf->context==context){
|
||||
#ifdef WITH_SYS_TREE
|
||||
db->subscription_count--;
|
||||
#endif
|
||||
if(leaf->prev){
|
||||
leaf->prev->next = leaf->next;
|
||||
}else{
|
||||
subhier->subs = leaf->next;
|
||||
}
|
||||
if(leaf->next){
|
||||
leaf->next->prev = leaf->prev;
|
||||
}
|
||||
DL_DELETE(subhier->subs, leaf);
|
||||
mosquitto__free(leaf);
|
||||
|
||||
/* Remove the reference to the sub that the client is keeping.
|
||||
@ -369,13 +512,77 @@ static int sub__remove_recurse(struct mosquitto_db *db, struct mosquitto *contex
|
||||
}
|
||||
leaf = leaf->next;
|
||||
}
|
||||
return MOSQ_ERR_NO_SUBSCRIBERS;
|
||||
}
|
||||
|
||||
|
||||
static int sub__remove_shared(struct mosquitto_db *db, struct mosquitto *context, struct mosquitto__subhier *subhier, uint8_t *reason, char *sharename)
|
||||
{
|
||||
struct mosquitto__subshared *shared;
|
||||
struct mosquitto__subleaf *leaf;
|
||||
int i;
|
||||
|
||||
HASH_FIND(hh, subhier->shared, sharename, strlen(sharename), shared);
|
||||
mosquitto__free(sharename);
|
||||
if(shared){
|
||||
leaf = shared->subs;
|
||||
while(leaf){
|
||||
if(leaf->context==context){
|
||||
#ifdef WITH_SYS_TREE
|
||||
db->shared_subscription_count--;
|
||||
#endif
|
||||
DL_DELETE(shared->subs, leaf);
|
||||
mosquitto__free(leaf);
|
||||
|
||||
/* Remove the reference to the sub that the client is keeping.
|
||||
* It would be nice to be able to use the reference directly,
|
||||
* but that would involve keeping a copy of the topic string in
|
||||
* each subleaf. Might be worth considering though. */
|
||||
for(i=0; i<context->shared_sub_count; i++){
|
||||
if(context->shared_subs[i]
|
||||
&& context->shared_subs[i]->hier == subhier
|
||||
&& context->shared_subs[i]->shared == shared){
|
||||
|
||||
mosquitto__free(context->shared_subs[i]);
|
||||
context->shared_subs[i] = NULL;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if(shared->subs == NULL){
|
||||
HASH_DELETE(hh, subhier->shared, shared);
|
||||
mosquitto__free(shared->name);
|
||||
mosquitto__free(shared);
|
||||
}
|
||||
|
||||
*reason = 0;
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}
|
||||
leaf = leaf->next;
|
||||
}
|
||||
return MOSQ_ERR_NO_SUBSCRIBERS;
|
||||
}else{
|
||||
return MOSQ_ERR_NO_SUBSCRIBERS;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static int sub__remove_recurse(struct mosquitto_db *db, struct mosquitto *context, struct mosquitto__subhier *subhier, struct sub__token *tokens, uint8_t *reason, char *sharename)
|
||||
{
|
||||
struct mosquitto__subhier *branch;
|
||||
|
||||
if(!tokens){
|
||||
if(sharename){
|
||||
return sub__remove_shared(db, context, subhier, reason, sharename);
|
||||
}else{
|
||||
return sub__remove_normal(db, context, subhier, reason);
|
||||
}
|
||||
}
|
||||
|
||||
HASH_FIND(hh, subhier->children, tokens->topic, tokens->topic_len, branch);
|
||||
if(branch){
|
||||
sub__remove_recurse(db, context, branch, tokens->next, reason);
|
||||
if(!branch->children && !branch->subs && !branch->retained){
|
||||
sub__remove_recurse(db, context, branch, tokens->next, reason, sharename);
|
||||
if(!branch->children && !branch->subs && !branch->retained && !branch->shared){
|
||||
HASH_DELETE(hh, subhier->children, branch);
|
||||
mosquitto__free(branch->topic);
|
||||
mosquitto__free(branch);
|
||||
@ -479,6 +686,7 @@ struct mosquitto__subhier *sub__add_hier_entry(struct mosquitto__subhier *parent
|
||||
strncpy(child->topic, topic, child->topic_len+1);
|
||||
}
|
||||
child->subs = NULL;
|
||||
child->shared = NULL;
|
||||
child->children = NULL;
|
||||
child->retained = NULL;
|
||||
|
||||
@ -492,7 +700,8 @@ int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub
|
||||
{
|
||||
int rc = 0;
|
||||
struct mosquitto__subhier *subhier;
|
||||
struct sub__token *tokens = NULL;
|
||||
struct sub__token *tokens = NULL, *t;
|
||||
char *sharename = NULL;
|
||||
|
||||
assert(root);
|
||||
assert(*root);
|
||||
@ -500,6 +709,27 @@ int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub
|
||||
|
||||
if(sub__topic_tokenise(sub, &tokens)) return 1;
|
||||
|
||||
if(!strcmp(tokens->topic, "$shared")){
|
||||
if(!tokens->next || !tokens->next->next){
|
||||
sub__topic_tokens_free(tokens);
|
||||
return MOSQ_ERR_PROTOCOL;
|
||||
}
|
||||
t = tokens->next;
|
||||
mosquitto__free(tokens->topic);
|
||||
mosquitto__free(tokens);
|
||||
tokens = t;
|
||||
|
||||
sharename = tokens->topic;
|
||||
|
||||
tokens->topic = mosquitto__strdup("");
|
||||
if(!tokens->topic){
|
||||
tokens->topic = sharename;
|
||||
sub__topic_tokens_free(tokens);
|
||||
return MOSQ_ERR_PROTOCOL;
|
||||
}
|
||||
tokens->topic_len = 0;
|
||||
}
|
||||
|
||||
HASH_FIND(hh, *root, tokens->topic, tokens->topic_len, subhier);
|
||||
if(!subhier){
|
||||
subhier = sub__add_hier_entry(NULL, root, tokens->topic, tokens->topic_len);
|
||||
@ -510,7 +740,7 @@ int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub
|
||||
}
|
||||
|
||||
}
|
||||
rc = sub__add_recurse(db, context, qos, identifier, options, subhier, tokens);
|
||||
rc = sub__add_recurse(db, context, qos, identifier, options, subhier, tokens, sharename);
|
||||
|
||||
sub__topic_tokens_free(tokens);
|
||||
|
||||
@ -521,17 +751,39 @@ int sub__remove(struct mosquitto_db *db, struct mosquitto *context, const char *
|
||||
{
|
||||
int rc = 0;
|
||||
struct mosquitto__subhier *subhier;
|
||||
struct sub__token *tokens = NULL;
|
||||
struct sub__token *tokens = NULL, *t;
|
||||
char *sharename = NULL;
|
||||
|
||||
assert(root);
|
||||
assert(sub);
|
||||
|
||||
if(sub__topic_tokenise(sub, &tokens)) return 1;
|
||||
|
||||
if(!strcmp(tokens->topic, "$shared")){
|
||||
if(!tokens->next || !tokens->next->next){
|
||||
sub__topic_tokens_free(tokens);
|
||||
return MOSQ_ERR_PROTOCOL;
|
||||
}
|
||||
t = tokens->next;
|
||||
mosquitto__free(tokens->topic);
|
||||
mosquitto__free(tokens);
|
||||
tokens = t;
|
||||
|
||||
sharename = tokens->topic;
|
||||
|
||||
tokens->topic = mosquitto__strdup("");
|
||||
if(!tokens->topic){
|
||||
tokens->topic = sharename;
|
||||
sub__topic_tokens_free(tokens);
|
||||
return MOSQ_ERR_PROTOCOL;
|
||||
}
|
||||
tokens->topic_len = 0;
|
||||
}
|
||||
|
||||
HASH_FIND(hh, root, tokens->topic, tokens->topic_len, subhier);
|
||||
if(subhier){
|
||||
*reason = MQTT_RC_NO_SUBSCRIPTION_EXISTED;
|
||||
rc = sub__remove_recurse(db, context, subhier, tokens, reason);
|
||||
rc = sub__remove_recurse(db, context, subhier, tokens, reason, sharename);
|
||||
}
|
||||
|
||||
sub__topic_tokens_free(tokens);
|
||||
@ -562,7 +814,7 @@ int sub__messages_queue(struct mosquitto_db *db, const char *source_id, const ch
|
||||
/* We have a message that needs to be retained, so ensure that the subscription
|
||||
* tree for its topic exists.
|
||||
*/
|
||||
sub__add_recurse(db, NULL, 0, 0, 0, subhier, tokens);
|
||||
sub__add_recurse(db, NULL, 0, 0, 0, subhier, tokens, NULL);
|
||||
}
|
||||
rc = sub__search(db, subhier, tokens, source_id, topic, qos, retain, *stored, true);
|
||||
}
|
||||
@ -596,6 +848,7 @@ static struct mosquitto__subhier *tmp_remove_subs(struct mosquitto__subhier *sub
|
||||
if(parent->subs == NULL
|
||||
&& parent->children == NULL
|
||||
&& parent->retained == NULL
|
||||
&& parent->shared == NULL
|
||||
&& parent->parent){
|
||||
|
||||
return parent;
|
||||
@ -605,6 +858,48 @@ static struct mosquitto__subhier *tmp_remove_subs(struct mosquitto__subhier *sub
|
||||
}
|
||||
|
||||
|
||||
static int sub__clean_session_shared(struct mosquitto_db *db, struct mosquitto *context)
|
||||
{
|
||||
int i;
|
||||
struct mosquitto__subleaf *leaf;
|
||||
struct mosquitto__subhier *hier;
|
||||
|
||||
for(i=0; i<context->shared_sub_count; i++){
|
||||
if(context->shared_subs[i] == NULL){
|
||||
continue;
|
||||
}
|
||||
leaf = context->shared_subs[i]->shared->subs;
|
||||
while(leaf){
|
||||
if(leaf->context==context){
|
||||
#ifdef WITH_SYS_TREE
|
||||
db->shared_subscription_count--;
|
||||
#endif
|
||||
sub__remove_shared_leaf(context->shared_subs[i]->hier, context->shared_subs[i]->shared, leaf);
|
||||
break;
|
||||
}
|
||||
leaf = leaf->next;
|
||||
}
|
||||
if(context->shared_subs[i]->hier->subs == NULL
|
||||
&& context->shared_subs[i]->hier->children == NULL
|
||||
&& context->shared_subs[i]->hier->retained == NULL
|
||||
&& context->shared_subs[i]->hier->shared == NULL
|
||||
&& context->shared_subs[i]->hier->parent){
|
||||
|
||||
hier = context->shared_subs[i]->hier;
|
||||
context->shared_subs[i]->hier = NULL;
|
||||
do{
|
||||
hier = tmp_remove_subs(hier);
|
||||
}while(hier);
|
||||
}
|
||||
mosquitto__free(context->shared_subs[i]);
|
||||
}
|
||||
mosquitto__free(context->shared_subs);
|
||||
context->shared_subs = NULL;
|
||||
context->shared_sub_count = 0;
|
||||
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
}
|
||||
|
||||
/* Remove all subscriptions for a client.
|
||||
*/
|
||||
int sub__clean_session(struct mosquitto_db *db, struct mosquitto *context)
|
||||
@ -623,14 +918,7 @@ int sub__clean_session(struct mosquitto_db *db, struct mosquitto *context)
|
||||
#ifdef WITH_SYS_TREE
|
||||
db->subscription_count--;
|
||||
#endif
|
||||
if(leaf->prev){
|
||||
leaf->prev->next = leaf->next;
|
||||
}else{
|
||||
context->subs[i]->subs = leaf->next;
|
||||
}
|
||||
if(leaf->next){
|
||||
leaf->next->prev = leaf->prev;
|
||||
}
|
||||
DL_DELETE(context->subs[i]->subs, leaf);
|
||||
mosquitto__free(leaf);
|
||||
break;
|
||||
}
|
||||
@ -639,6 +927,7 @@ int sub__clean_session(struct mosquitto_db *db, struct mosquitto *context)
|
||||
if(context->subs[i]->subs == NULL
|
||||
&& context->subs[i]->children == NULL
|
||||
&& context->subs[i]->retained == NULL
|
||||
&& context->subs[i]->shared == NULL
|
||||
&& context->subs[i]->parent){
|
||||
|
||||
hier = context->subs[i];
|
||||
@ -652,7 +941,7 @@ int sub__clean_session(struct mosquitto_db *db, struct mosquitto *context)
|
||||
context->subs = NULL;
|
||||
context->sub_count = 0;
|
||||
|
||||
return MOSQ_ERR_SUCCESS;
|
||||
return sub__clean_session_shared(db, context);
|
||||
}
|
||||
|
||||
void sub__tree_print(struct mosquitto__subhier *root, int level)
|
||||
|
@ -171,6 +171,7 @@ void sys_tree__update(struct mosquitto_db *db, int interval, time_t start_time)
|
||||
static unsigned long long pub_bytes_received = -1;
|
||||
static unsigned long long pub_bytes_sent = -1;
|
||||
static int subscription_count = -1;
|
||||
static int shared_subscription_count = -1;
|
||||
static int retained_count = -1;
|
||||
|
||||
static double msgs_received_load1 = 0;
|
||||
@ -303,6 +304,12 @@ void sys_tree__update(struct mosquitto_db *db, int interval, time_t start_time)
|
||||
db__messages_easy_queue(db, NULL, "$SYS/broker/subscriptions/count", SYS_TREE_QOS, strlen(buf), buf, 1, 60, NULL);
|
||||
}
|
||||
|
||||
if(db->shared_subscription_count != shared_subscription_count){
|
||||
shared_subscription_count = db->shared_subscription_count;
|
||||
snprintf(buf, BUFLEN, "%d", shared_subscription_count);
|
||||
db__messages_easy_queue(db, NULL, "$SYS/broker/shared_subscriptions/count", SYS_TREE_QOS, strlen(buf), buf, 1, 60, NULL);
|
||||
}
|
||||
|
||||
if(db->retained_count != retained_count){
|
||||
retained_count = db->retained_count;
|
||||
snprintf(buf, BUFLEN, "%d", retained_count);
|
||||
|
132
test/broker/02-shared-qos0-v5.py
Executable file
132
test/broker/02-shared-qos0-v5.py
Executable file
@ -0,0 +1,132 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Test whether shared subscriptions work
|
||||
|
||||
# Client 1 subscribes to #, non shared. Should receive everything.
|
||||
# Client 2 subscribes to $shared/one/share-test
|
||||
# Client 3 subscribes to $shared/one/share-test and $shared/two/share-test
|
||||
# Client 4 subscribes to $shared/two/share-test
|
||||
# Client 5 subscribes to $shared/one/share-test
|
||||
|
||||
# A publish to "share-test" should always go to client 1.
|
||||
# The first publish should also go to client 2 (share one) and client 3 (share two)
|
||||
# The second publish should also go to client 3 (share one) and client 4 (share two)
|
||||
# The third publish should also go to client 3 (share two) and client 5 (share one)
|
||||
|
||||
from mosq_test_helper import *
|
||||
|
||||
rc = 1
|
||||
keepalive = 60
|
||||
mid = 1
|
||||
|
||||
connect1_packet = mosq_test.gen_connect("client1", keepalive=keepalive, proto_ver=5)
|
||||
connack1_packet = mosq_test.gen_connack(rc=0, proto_ver=5)
|
||||
|
||||
connect2_packet = mosq_test.gen_connect("client2", keepalive=keepalive, proto_ver=5)
|
||||
connack2_packet = mosq_test.gen_connack(rc=0, proto_ver=5)
|
||||
|
||||
connect3_packet = mosq_test.gen_connect("client3", keepalive=keepalive, proto_ver=5)
|
||||
connack3_packet = mosq_test.gen_connack(rc=0, proto_ver=5)
|
||||
|
||||
connect4_packet = mosq_test.gen_connect("client4", keepalive=keepalive, proto_ver=5)
|
||||
connack4_packet = mosq_test.gen_connack(rc=0, proto_ver=5)
|
||||
|
||||
connect5_packet = mosq_test.gen_connect("client5", keepalive=keepalive, proto_ver=5)
|
||||
connack5_packet = mosq_test.gen_connack(rc=0, proto_ver=5)
|
||||
|
||||
subscribe1_packet = mosq_test.gen_subscribe(mid, "#", 0, proto_ver=5)
|
||||
suback1_packet = mosq_test.gen_suback(mid, 0, proto_ver=5)
|
||||
|
||||
subscribe2_packet = mosq_test.gen_subscribe(mid, "$shared/one/share-test", 0, proto_ver=5)
|
||||
suback2_packet = mosq_test.gen_suback(mid, 0, proto_ver=5)
|
||||
|
||||
subscribe3a_packet = mosq_test.gen_subscribe(mid, "$shared/one/share-test", 0, proto_ver=5)
|
||||
suback3a_packet = mosq_test.gen_suback(mid, 0, proto_ver=5)
|
||||
|
||||
subscribe3b_packet = mosq_test.gen_subscribe(mid, "$shared/two/share-test", 0, proto_ver=5)
|
||||
suback3b_packet = mosq_test.gen_suback(mid, 0, proto_ver=5)
|
||||
|
||||
subscribe4_packet = mosq_test.gen_subscribe(mid, "$shared/two/share-test", 0, proto_ver=5)
|
||||
suback4_packet = mosq_test.gen_suback(mid, 0, proto_ver=5)
|
||||
|
||||
subscribe5_packet = mosq_test.gen_subscribe(mid, "$shared/one/share-test", 0, proto_ver=5)
|
||||
suback5_packet = mosq_test.gen_suback(mid, 0, proto_ver=5)
|
||||
|
||||
publish1_packet = mosq_test.gen_publish("share-test", qos=0, payload="message1", proto_ver=5)
|
||||
publish2_packet = mosq_test.gen_publish("share-test", qos=0, payload="message2", proto_ver=5)
|
||||
publish3_packet = mosq_test.gen_publish("share-test", qos=0, payload="message3", proto_ver=5)
|
||||
|
||||
mid = 2
|
||||
unsubscribe1_packet = mosq_test.gen_unsubscribe(mid, "#", proto_ver=5)
|
||||
unsuback1_packet = mosq_test.gen_unsuback(mid, proto_ver=5)
|
||||
|
||||
unsubscribe2_packet = mosq_test.gen_unsubscribe(mid, "$shared/one/share-test", proto_ver=5)
|
||||
unsuback2_packet = mosq_test.gen_unsuback(mid, proto_ver=5)
|
||||
|
||||
unsubscribe3a_packet = mosq_test.gen_unsubscribe(mid, "$shared/one/share-test", proto_ver=5)
|
||||
unsuback3a_packet = mosq_test.gen_unsuback(mid, proto_ver=5)
|
||||
|
||||
unsubscribe3b_packet = mosq_test.gen_unsubscribe(mid, "$shared/two/share-test", proto_ver=5)
|
||||
unsuback3b_packet = mosq_test.gen_unsuback(mid, proto_ver=5)
|
||||
|
||||
unsubscribe4_packet = mosq_test.gen_unsubscribe(mid, "$shared/two/share-test", proto_ver=5)
|
||||
unsuback4_packet = mosq_test.gen_unsuback(mid, proto_ver=5)
|
||||
|
||||
unsubscribe5_packet = mosq_test.gen_unsubscribe(mid, "$shared/one/share-test", proto_ver=5)
|
||||
unsuback5_packet = mosq_test.gen_unsuback(mid, proto_ver=5)
|
||||
|
||||
|
||||
port = mosq_test.get_port()
|
||||
broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port)
|
||||
|
||||
try:
|
||||
sock1 = mosq_test.do_client_connect(connect1_packet, connack1_packet, timeout=20, port=port)
|
||||
sock2 = mosq_test.do_client_connect(connect2_packet, connack2_packet, timeout=20, port=port)
|
||||
sock3 = mosq_test.do_client_connect(connect3_packet, connack3_packet, timeout=20, port=port)
|
||||
sock4 = mosq_test.do_client_connect(connect4_packet, connack4_packet, timeout=20, port=port)
|
||||
sock5 = mosq_test.do_client_connect(connect5_packet, connack5_packet, timeout=20, port=port)
|
||||
|
||||
mosq_test.do_send_receive(sock1, subscribe1_packet, suback1_packet, "suback1")
|
||||
mosq_test.do_send_receive(sock2, subscribe2_packet, suback2_packet, "suback2")
|
||||
mosq_test.do_send_receive(sock3, subscribe3a_packet, suback3a_packet, "suback3a")
|
||||
mosq_test.do_send_receive(sock3, subscribe3b_packet, suback3b_packet, "suback3b")
|
||||
mosq_test.do_send_receive(sock4, subscribe4_packet, suback4_packet, "suback4")
|
||||
mosq_test.do_send_receive(sock5, subscribe5_packet, suback5_packet, "suback5")
|
||||
|
||||
sock1.send(publish1_packet)
|
||||
if mosq_test.expect_packet(sock1, "publish1 1", publish1_packet):
|
||||
if mosq_test.expect_packet(sock2, "publish1 2", publish1_packet):
|
||||
if mosq_test.expect_packet(sock3, "publish1 3", publish1_packet):
|
||||
|
||||
sock1.send(publish2_packet)
|
||||
if mosq_test.expect_packet(sock1, "publish2 1", publish2_packet):
|
||||
if mosq_test.expect_packet(sock3, "publish2 3", publish2_packet):
|
||||
if mosq_test.expect_packet(sock4, "publish2 4", publish2_packet):
|
||||
|
||||
sock1.send(publish3_packet)
|
||||
if mosq_test.expect_packet(sock1, "publish3 1", publish3_packet):
|
||||
if mosq_test.expect_packet(sock3, "publish3 3", publish3_packet):
|
||||
if mosq_test.expect_packet(sock5, "publish3 5", publish3_packet):
|
||||
mosq_test.do_send_receive(sock1, unsubscribe1_packet, unsuback1_packet, "unsuback1")
|
||||
mosq_test.do_send_receive(sock2, unsubscribe2_packet, unsuback2_packet, "unsuback2")
|
||||
mosq_test.do_send_receive(sock3, unsubscribe3a_packet, unsuback3a_packet, "unsuback3a")
|
||||
mosq_test.do_send_receive(sock3, unsubscribe3b_packet, unsuback3b_packet, "unsuback3b")
|
||||
mosq_test.do_send_receive(sock4, unsubscribe4_packet, unsuback4_packet, "unsuback4")
|
||||
mosq_test.do_send_receive(sock5, unsubscribe5_packet, unsuback5_packet, "unsuback5")
|
||||
|
||||
rc = 0
|
||||
|
||||
sock1.close()
|
||||
sock2.close()
|
||||
sock3.close()
|
||||
sock4.close()
|
||||
sock5.close()
|
||||
finally:
|
||||
broker.terminate()
|
||||
broker.wait()
|
||||
(stdo, stde) = broker.communicate()
|
||||
if rc:
|
||||
print(stde)
|
||||
|
||||
exit(rc)
|
||||
|
@ -44,6 +44,7 @@ endif
|
||||
|
||||
|
||||
02 :
|
||||
./02-shared-qos0-v5.py
|
||||
./02-subhier-crash.py
|
||||
./02-subpub-qos0-retain-as-publish.py
|
||||
./02-subpub-qos0-send-retain.py
|
||||
|
@ -25,6 +25,7 @@ tests = [
|
||||
(1, './01-connect-uname-password-success.py'),
|
||||
(1, './01-connect-uname-pwd-no-flag.py'),
|
||||
|
||||
(1, './02-shared-qos0-v5.py'),
|
||||
(1, './02-subhier-crash.py'),
|
||||
(1, './02-subpub-qos0-retain-as-publish.py'),
|
||||
(1, './02-subpub-qos0-send-retain.py'),
|
||||
|
Loading…
Reference in New Issue
Block a user