From db7901884fc0dee44c26e2e17bf5829fe8f2f18c Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Fri, 14 Dec 2018 13:21:53 +0000 Subject: [PATCH] Retain-as-published support. --- client/client_shared.c | 5 ++++ client/client_shared.h | 1 + client/sub_client.c | 2 +- lib/actions.c | 13 +++++---- lib/mosquitto.h | 52 +++++++++++++++++++++++++++++++-- lib/mqtt_protocol.h | 8 +++++ lib/send_subscribe.c | 4 +-- man/mosquitto_sub.1.xml | 16 +++++++++- src/bridge.c | 2 +- src/handle_subscribe.c | 7 +++-- src/mosquitto_broker_internal.h | 3 +- src/persist.c | 3 +- src/subs.c | 20 +++++-------- 13 files changed, 108 insertions(+), 28 deletions(-) diff --git a/client/client_shared.c b/client/client_shared.c index 88112201..b37d3067 100644 --- a/client/client_shared.c +++ b/client/client_shared.c @@ -700,6 +700,11 @@ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, c }else{ cfg->pub_mode = MSGMODE_NULL; } + }else if(!strcmp(argv[i], "--retain-as-published")){ + if(pub_or_sub == CLIENT_PUB){ + goto unknown_option; + } + cfg->sub_opts |= MQTT_SUB_OPT_RETAIN_AS_PUBLISHED; }else if(!strcmp(argv[i], "-V") || !strcmp(argv[i], "--protocol-version")){ if(i==argc-1){ fprintf(stderr, "Error: --protocol-version argument given but no version specified.\n\n"); diff --git a/client/client_shared.h b/client/client_shared.h index ade5b274..1a8ad280 100644 --- a/client/client_shared.h +++ b/client/client_shared.h @@ -86,6 +86,7 @@ struct mosq_config { int msg_count; /* sub */ char *format; /* sub */ int timeout; /* sub */ + int sub_opts; /* sub */ #ifdef WITH_SOCKS char *socks5_host; int socks5_port; diff --git a/client/sub_client.c b/client/sub_client.c index 1a26bcb0..62d91ca7 100644 --- a/client/sub_client.c +++ b/client/sub_client.c @@ -90,7 +90,7 @@ void my_connect_callback(struct mosquitto *mosq, void *obj, int result, int flag int i; if(!result){ - mosquitto_subscribe_multiple(mosq, NULL, cfg.topic_count, cfg.topics, cfg.qos, cfg.subscribe_props); + mosquitto_subscribe_multiple(mosq, NULL, cfg.topic_count, cfg.topics, cfg.qos, cfg.sub_opts, cfg.subscribe_props); for(i=0; i 2) return MOSQ_ERR_INVAL; + if((options & 0x30) == 0x30 || (options & 0xC0) != 0) return MOSQ_ERR_INVAL; if(mosq->protocol != mosq_p_mqtt5 && properties) return MOSQ_ERR_NOT_SUPPORTED; if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN; @@ -175,11 +177,11 @@ int mosquitto_subscribe_v5(struct mosquitto *mosq, int *mid, const char *sub, in if(rc) return rc; } - return send__subscribe(mosq, mid, 1, (char *const *const)&sub, qos, outgoing_properties); + return send__subscribe(mosq, mid, 1, (char *const *const)&sub, qos|options, outgoing_properties); } -int mosquitto_subscribe_multiple(struct mosquitto *mosq, int *mid, int sub_count, char *const *const sub, int qos, const mosquitto_property *properties) +int mosquitto_subscribe_multiple(struct mosquitto *mosq, int *mid, int sub_count, char *const *const sub, int qos, int options, const mosquitto_property *properties) { const mosquitto_property *outgoing_properties = NULL; mosquitto_property local_property; @@ -189,6 +191,7 @@ int mosquitto_subscribe_multiple(struct mosquitto *mosq, int *mid, int sub_count if(!mosq || !sub_count || !sub) return MOSQ_ERR_INVAL; if(mosq->protocol != mosq_p_mqtt5 && properties) return MOSQ_ERR_NOT_SUPPORTED; if(qos < 0 || qos > 2) return MOSQ_ERR_INVAL; + if((options & 0x30) == 0x30 || (options & 0xC0) != 0) return MOSQ_ERR_INVAL; if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN; if(properties){ @@ -209,7 +212,7 @@ int mosquitto_subscribe_multiple(struct mosquitto *mosq, int *mid, int sub_count if(mosquitto_validate_utf8(sub[i], strlen(sub[i]))) return MOSQ_ERR_MALFORMED_UTF8; } - return send__subscribe(mosq, mid, sub_count, sub, qos, properties); + return send__subscribe(mosq, mid, sub_count, sub, qos|options, properties); } diff --git a/lib/mosquitto.h b/lib/mosquitto.h index c5624c78..bb1524bd 100644 --- a/lib/mosquitto.h +++ b/lib/mosquitto.h @@ -840,6 +840,30 @@ libmosq_EXPORT int mosquitto_subscribe(struct mosquitto *mosq, int *mid, const c * sent. * sub - the subscription pattern. * qos - the requested Quality of Service for this subscription. + * options - options to apply to this subscription, OR'd together. Set to 0 to + * use the default options, otherwise choose from the list: + * MQTT_SUB_OPT_NO_LOCAL - with this option set, if this client + * publishes to a topic to which it is subscribed, the + * broker will not publish the message back to the + * client. + * MQTT_SUB_OPT_RETAIN_AS_PUBLISHED - with this option set, messages + * published for this subscription will keep the + * retain flag as was set by the publishing client. + * The default behaviour without this option set has + * the retain flag indicating whether a message is + * fresh/stale. + * MQTT_SUB_OPT_SEND_RETAIN_ALWAYS - with this option set, + * pre-existing retained messages are sent as soon as + * the subscription is made, even if the subscription + * already exists. This is the default behaviour, so + * it is not necessary to set this option. + * MQTT_SUB_OPT_SEND_RETAIN_NEW - with this option set, pre-existing + * retained messages for this subscription will be + * sent when the subscription is made, but only if the + * subscription does not already exist. + * MQTT_SUB_OPT_SEND_RETAIN_NEVER - with this option set, + * pre-existing retained messages will never be sent + * for this subscription. * properties - a valid mosquitto_property list, or NULL. * * Returns: @@ -851,7 +875,7 @@ libmosq_EXPORT int mosquitto_subscribe(struct mosquitto *mosq, int *mid, const c * MOSQ_ERR_DUPLICATE_PROPERTY - if a property is duplicated where it is forbidden. * MOSQ_ERR_PROTOCOL - if any property is invalid for use with SUBSCRIBE. */ -libmosq_EXPORT int mosquitto_subscribe_v5(struct mosquitto *mosq, int *mid, const char *sub, int qos, const mosquitto_property *properties); +libmosq_EXPORT int mosquitto_subscribe_v5(struct mosquitto *mosq, int *mid, const char *sub, int qos, int options, const mosquitto_property *properties); /* * Function: mosquitto_subscribe_multiple @@ -871,6 +895,30 @@ libmosq_EXPORT int mosquitto_subscribe_v5(struct mosquitto *mosq, int *mid, cons * familiar with this, just think of it as a safer "char **", * equivalent to "const char *" for a simple string pointer. * qos - the requested Quality of Service for each subscription. + * options - options to apply to this subscription, OR'd together. Set to 0 to + * use the default options, otherwise choose from the list: + * MQTT_SUB_OPT_NO_LOCAL - with this option set, if this client + * publishes to a topic to which it is subscribed, the + * broker will not publish the message back to the + * client. + * MQTT_SUB_OPT_RETAIN_AS_PUBLISHED - with this option set, messages + * published for this subscription will keep the + * retain flag as was set by the publishing client. + * The default behaviour without this option set has + * the retain flag indicating whether a message is + * fresh/stale. + * MQTT_SUB_OPT_SEND_RETAIN_ALWAYS - with this option set, + * pre-existing retained messages are sent as soon as + * the subscription is made, even if the subscription + * already exists. This is the default behaviour, so + * it is not necessary to set this option. + * MQTT_SUB_OPT_SEND_RETAIN_NEW - with this option set, pre-existing + * retained messages for this subscription will be + * sent when the subscription is made, but only if the + * subscription does not already exist. + * MQTT_SUB_OPT_SEND_RETAIN_NEVER - with this option set, + * pre-existing retained messages will never be sent + * for this subscription. * properties - a valid mosquitto_property list, or NULL. Only used with MQTT * v5 clients. * @@ -881,7 +929,7 @@ libmosq_EXPORT int mosquitto_subscribe_v5(struct mosquitto *mosq, int *mid, cons * MOSQ_ERR_NO_CONN - if the client isn't connected to a broker. * MOSQ_ERR_MALFORMED_UTF8 - if a topic is not valid UTF-8 */ -int mosquitto_subscribe_multiple(struct mosquitto *mosq, int *mid, int sub_count, char *const *const sub, int qos, const mosquitto_property *properties); +int mosquitto_subscribe_multiple(struct mosquitto *mosq, int *mid, int sub_count, char *const *const sub, int qos, int options, const mosquitto_property *properties); /* * Function: mosquitto_unsubscribe diff --git a/lib/mqtt_protocol.h b/lib/mqtt_protocol.h index 2d53f6f8..672d50b7 100644 --- a/lib/mqtt_protocol.h +++ b/lib/mqtt_protocol.h @@ -145,6 +145,14 @@ enum mqtt5_property_type { MQTT_PROP_TYPE_STRING_PAIR = 7 }; +enum mqtt5_sub_options { + MQTT_SUB_OPT_NO_LOCAL = 0x04, + MQTT_SUB_OPT_RETAIN_AS_PUBLISHED = 0x08, + MQTT_SUB_OPT_SEND_RETAIN_ALWAYS = 0x00, + MQTT_SUB_OPT_SEND_RETAIN_NEW = 0x10, + MQTT_SUB_OPT_SEND_RETAIN_NEVER = 0x20, +}; + #define MQTT_MAX_PAYLOAD 268435455 #endif diff --git a/lib/send_subscribe.c b/lib/send_subscribe.c index 1fb02b14..ffe7ef7d 100644 --- a/lib/send_subscribe.c +++ b/lib/send_subscribe.c @@ -83,11 +83,11 @@ int send__subscribe(struct mosquitto *mosq, int *mid, int topic_count, const cha #ifdef WITH_BROKER # ifdef WITH_BRIDGE - log__printf(mosq, MOSQ_LOG_DEBUG, "Bridge %s sending SUBSCRIBE (Mid: %d, Topic: %s, QoS: %d)", mosq->id, local_mid, topic[0], topic_qos); + log__printf(mosq, MOSQ_LOG_DEBUG, "Bridge %s sending SUBSCRIBE (Mid: %d, Topic: %s, QoS: %d, Options: 0x%02x)", mosq->id, local_mid, topic[0], topic_qos&0x03, topic_qos&0xFC); # endif #else for(i=0; iid, local_mid, topic[i], topic_qos); + log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending SUBSCRIBE (Mid: %d, Topic: %s, QoS: %d, Options: 0x%02x)", mosq->id, local_mid, topic[i], topic_qos&0x03, topic_qos&0xFC); } #endif diff --git a/man/mosquitto_sub.1.xml b/man/mosquitto_sub.1.xml index bffcd6e0..7c4861ab 100644 --- a/man/mosquitto_sub.1.xml +++ b/man/mosquitto_sub.1.xml @@ -47,6 +47,7 @@ + filter-out unsub-topic @@ -459,7 +460,20 @@ Messages with retain set are "stale", in that it is not known when they were originally published. With this argument in use, the receipt of the first non-stale - message will cause the client to exit. + message will cause the client to exit. See also the + option. + + + + + + If this argument is given, the subscriptions will + have the "retain as published" option set. This means + that the retain flag on an incoming message will be + exactly as set by the publishing client, rather than + indicating whether the message is fresh/stale. + This option is not valid for MQTT v3.1/v3.1.1 + clients. diff --git a/src/bridge.c b/src/bridge.c index 80bed550..10c5744c 100644 --- a/src/bridge.c +++ b/src/bridge.c @@ -312,7 +312,7 @@ int bridge__connect(struct mosquitto_db *db, struct mosquitto *context) for(i=0; ibridge->topic_count; i++){ if(context->bridge->topics[i].direction == bd_out || context->bridge->topics[i].direction == bd_both){ log__printf(NULL, MOSQ_LOG_DEBUG, "Bridge %s doing local SUBSCRIBE on topic %s", context->id, context->bridge->topics[i].local_topic); - if(sub__add(db, context, context->bridge->topics[i].local_topic, context->bridge->topics[i].qos, &db->subs) > 0){ + if(sub__add(db, context, context->bridge->topics[i].local_topic, context->bridge->topics[i].qos, true, &db->subs) > 0){ return 1; } sub__retain_queue(db, context, diff --git a/src/handle_subscribe.c b/src/handle_subscribe.c index ab2de078..7cf62855 100644 --- a/src/handle_subscribe.c +++ b/src/handle_subscribe.c @@ -94,10 +94,13 @@ int handle__subscribe(struct mosquitto_db *db, struct mosquitto *context) } if(context->protocol == mosq_p_mqtt31 || context->protocol == mosq_p_mqtt311){ qos = subscription_options; + if(context->is_bridge){ + retain_as_published = 1; + } }else{ qos = subscription_options & 0x03; - retain_as_published = subscription_options & 0x04; + retain_as_published = subscription_options & 0x08; retain_handling = (subscription_options & 0x30) >> 4; if(retain_handling == 3){ @@ -148,7 +151,7 @@ int handle__subscribe(struct mosquitto_db *db, struct mosquitto *context) } if(qos != 0x80){ - rc2 = sub__add(db, context, sub, qos, &db->subs); + rc2 = sub__add(db, context, sub, qos, retain_as_published, &db->subs); if(context->protocol == mosq_p_mqtt311 || context->protocol == mosq_p_mqtt31){ if(rc2 == MOSQ_ERR_SUCCESS || rc2 == MOSQ_ERR_SUB_EXISTS){ if(sub__retain_queue(db, context, sub, qos)) rc = 1; diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index cc0a5edd..2156cc4e 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -290,6 +290,7 @@ struct mosquitto__subleaf { struct mosquitto__subleaf *next; struct mosquitto *context; int qos; + bool retain_as_published; }; struct mosquitto__subhier { @@ -572,7 +573,7 @@ void sys_tree__update(struct mosquitto_db *db, int interval, time_t start_time); /* ============================================================ * Subscription functions * ============================================================ */ -int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int qos, struct mosquitto__subhier **root); +int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int qos, bool retain_as_published, struct mosquitto__subhier **root); struct mosquitto__subhier *sub__add_hier_entry(struct mosquitto__subhier *parent, struct mosquitto__subhier **sibling, const char *topic, size_t len); int sub__remove(struct mosquitto_db *db, struct mosquitto *context, const char *sub, struct mosquitto__subhier *root); void sub__tree_print(struct mosquitto__subhier *root, int level); diff --git a/src/persist.c b/src/persist.c index b9e48ba3..8cc2de5f 100644 --- a/src/persist.c +++ b/src/persist.c @@ -935,7 +935,8 @@ static int persist__restore_sub(struct mosquitto_db *db, const char *client_id, context = persist__find_or_add_context(db, client_id, 0); if(!context) return 1; - return sub__add(db, context, sub, qos, &db->subs); + /* FIXME - retain_as_published needs saving */ + return sub__add(db, context, sub, qos, false, &db->subs); } #endif diff --git a/src/subs.c b/src/subs.c index 901cb72b..d608f36e 100644 --- a/src/subs.c +++ b/src/subs.c @@ -123,14 +123,9 @@ static int subs__process(struct mosquitto_db *db, struct mosquitto__subhier *hie }else{ mid = 0; } - if(leaf->context->is_bridge){ - /* If we know the client is a bridge then we should set retain - * even if the message is fresh. If we don't do this, retained - * messages won't be propagated. */ + if(leaf->retain_as_published){ client_retain = retain; }else{ - /* Client is not a bridge and this isn't a stale message so - * retain should be false. */ client_retain = false; } if(db__message_insert(db, leaf->context, mid, mosq_md_out, msg_qos, client_retain, stored) == 1) rc = 1; @@ -245,7 +240,7 @@ static void sub__topic_tokens_free(struct sub__token *tokens) } } -static int sub__add_recurse(struct mosquitto_db *db, struct mosquitto *context, int qos, struct mosquitto__subhier *subhier, struct sub__token *tokens) +static int sub__add_recurse(struct mosquitto_db *db, struct mosquitto *context, int qos, bool retain_as_published, struct mosquitto__subhier *subhier, struct sub__token *tokens) /* FIXME - this function has the potential to leak subhier, audit calling functions. */ { struct mosquitto__subhier *branch; @@ -279,6 +274,7 @@ static int sub__add_recurse(struct mosquitto_db *db, struct mosquitto *context, leaf->next = NULL; leaf->context = context; leaf->qos = qos; + leaf->retain_as_published = retain_as_published; for(i=0; isub_count; i++){ if(!context->subs[i]){ context->subs[i] = subhier; @@ -311,13 +307,13 @@ static int sub__add_recurse(struct mosquitto_db *db, struct mosquitto *context, HASH_FIND(hh, subhier->children, UHPA_ACCESS_TOPIC(tokens), tokens->topic_len, branch); if(branch){ - return sub__add_recurse(db, context, qos, branch, tokens->next); + return sub__add_recurse(db, context, qos, retain_as_published, branch, tokens->next); }else{ /* Not found */ branch = sub__add_hier_entry(subhier, &subhier->children, UHPA_ACCESS_TOPIC(tokens), tokens->topic_len); if(!branch) return MOSQ_ERR_NOMEM; - return sub__add_recurse(db, context, qos, branch, tokens->next); + return sub__add_recurse(db, context, qos, retain_as_published, branch, tokens->next); } } @@ -446,7 +442,7 @@ struct mosquitto__subhier *sub__add_hier_entry(struct mosquitto__subhier *parent } -int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int qos, struct mosquitto__subhier **root) +int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int qos, bool retain_as_published, struct mosquitto__subhier **root) { int rc = 0; struct mosquitto__subhier *subhier; @@ -468,7 +464,7 @@ int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub } } - rc = sub__add_recurse(db, context, qos, subhier, tokens); + rc = sub__add_recurse(db, context, qos, retain_as_published, subhier, tokens); sub__topic_tokens_free(tokens); @@ -519,7 +515,7 @@ int sub__messages_queue(struct mosquitto_db *db, const char *source_id, const ch /* We have a message that needs to be retained, so ensure that the subscription * tree for its topic exists. */ - sub__add_recurse(db, NULL, 0, subhier, tokens); + sub__add_recurse(db, NULL, 0, false, subhier, tokens); } sub__search(db, subhier, tokens, source_id, topic, qos, retain, *stored, true); }