diff --git a/client/client_props.c b/client/client_props.c index 71dcc2d4..7415bc06 100644 --- a/client/client_props.c +++ b/client/client_props.c @@ -123,7 +123,7 @@ int cfg_parse_property(struct mosq_config *cfg, int argc, char *argv[], int *idx break; case CMD_SUBSCRIBE: - if(identifier != MQTT_PROP_USER_PROPERTY){ + if(identifier != MQTT_PROP_SUBSCRIPTION_IDENTIFIER && identifier != MQTT_PROP_USER_PROPERTY){ fprintf(stderr, "Error: %s property not supported for %s in --property argument.\n\n", propname, cmdname); return MOSQ_ERR_NOT_SUPPORTED; } diff --git a/lib/actions.c b/lib/actions.c index 57a48d54..7c0e8c27 100644 --- a/lib/actions.c +++ b/lib/actions.c @@ -93,7 +93,7 @@ int mosquitto_publish_v5(struct mosquitto *mosq, int *mid, const char *topic, in } if(qos == 0){ - return send__publish(mosq, local_mid, topic, payloadlen, payload, qos, retain, false, outgoing_properties); + return send__publish(mosq, local_mid, topic, payloadlen, payload, qos, retain, false, outgoing_properties, NULL); }else{ message = mosquitto__calloc(1, sizeof(struct mosquitto_message_all)); if(!message) return MOSQ_ERR_NOMEM; @@ -133,7 +133,7 @@ int mosquitto_publish_v5(struct mosquitto *mosq, int *mid, const char *topic, in message->state = mosq_ms_wait_for_pubrec; } pthread_mutex_unlock(&mosq->out_message_mutex); - return send__publish(mosq, message->msg.mid, message->msg.topic, message->msg.payloadlen, message->msg.payload, message->msg.qos, message->msg.retain, message->dup, outgoing_properties); + return send__publish(mosq, message->msg.mid, message->msg.topic, message->msg.payloadlen, message->msg.payload, message->msg.qos, message->msg.retain, message->dup, outgoing_properties, NULL); }else{ message->state = mosq_ms_invalid; pthread_mutex_unlock(&mosq->out_message_mutex); diff --git a/lib/connect.c b/lib/connect.c index ad3bb1d0..5beed13a 100644 --- a/lib/connect.c +++ b/lib/connect.c @@ -168,7 +168,7 @@ static int mosquitto__reconnect(struct mosquitto *mosq, bool blocking, const mos local_property.next = NULL; outgoing_properties = &local_property; } - rc = mosquitto_property_check_all(CMD_DISCONNECT, outgoing_properties); + rc = mosquitto_property_check_all(CMD_CONNECT, outgoing_properties); if(rc) return rc; } diff --git a/lib/messages_mosq.c b/lib/messages_mosq.c index 2a737d9b..9d494788 100644 --- a/lib/messages_mosq.c +++ b/lib/messages_mosq.c @@ -264,7 +264,7 @@ int message__remove(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_dir }else if(cur->msg.qos == 2){ cur->state = mosq_ms_wait_for_pubrec; } - rc = send__publish(mosq, cur->msg.mid, cur->msg.topic, cur->msg.payloadlen, cur->msg.payload, cur->msg.qos, cur->msg.retain, cur->dup, NULL); + rc = send__publish(mosq, cur->msg.mid, cur->msg.topic, cur->msg.payloadlen, cur->msg.payload, cur->msg.qos, cur->msg.retain, cur->dup, NULL, NULL); if(rc){ pthread_mutex_unlock(&mosq->out_message_mutex); return rc; @@ -334,7 +334,7 @@ void message__retry_check_actual(struct mosquitto *mosq, struct mosquitto_messag case mosq_ms_publish_qos2: messages->timestamp = now; messages->dup = true; - send__publish(mosq, messages->msg.mid, messages->msg.topic, messages->msg.payloadlen, messages->msg.payload, messages->msg.qos, messages->msg.retain, messages->dup, NULL); + send__publish(mosq, messages->msg.mid, messages->msg.topic, messages->msg.payloadlen, messages->msg.payload, messages->msg.qos, messages->msg.retain, messages->dup, NULL, NULL); break; case mosq_ms_wait_for_pubrel: messages->timestamp = now; diff --git a/lib/property_mosq.c b/lib/property_mosq.c index c8dde7f1..e9399fd5 100644 --- a/lib/property_mosq.c +++ b/lib/property_mosq.c @@ -397,13 +397,15 @@ int property__write(struct mosquitto__packet *packet, const mosquitto_property * } -int property__write_all(struct mosquitto__packet *packet, const mosquitto_property *properties) +int property__write_all(struct mosquitto__packet *packet, const mosquitto_property *properties, bool write_len) { int rc; const mosquitto_property *p; - rc = packet__write_varint(packet, property__get_length_all(properties)); - if(rc) return rc; + if(write_len){ + rc = packet__write_varint(packet, property__get_length_all(properties)); + if(rc) return rc; + } p = properties; while(p){ diff --git a/lib/property_mosq.h b/lib/property_mosq.h index d31ce85e..d965d8a3 100644 --- a/lib/property_mosq.h +++ b/lib/property_mosq.h @@ -41,7 +41,7 @@ struct mqtt5__property { int property__read_all(int command, struct mosquitto__packet *packet, mosquitto_property **property); -int property__write_all(struct mosquitto__packet *packet, const mosquitto_property *property); +int property__write_all(struct mosquitto__packet *packet, const mosquitto_property *property, bool write_len); void property__free(mosquitto_property **property); int property__get_length(const mosquitto_property *property); diff --git a/lib/send_connect.c b/lib/send_connect.c index 5b50e2c7..99778634 100644 --- a/lib/send_connect.c +++ b/lib/send_connect.c @@ -141,7 +141,7 @@ int send__connect(struct mosquitto *mosq, uint16_t keepalive, bool clean_session if(mosq->protocol == mosq_p_mqtt5){ /* Write properties */ - property__write_all(packet, properties); + property__write_all(packet, properties, true); } /* Payload */ @@ -153,7 +153,7 @@ int send__connect(struct mosquitto *mosq, uint16_t keepalive, bool clean_session if(will){ if(mosq->protocol == mosq_p_mqtt5){ /* Write will properties */ - property__write_all(packet, mosq->will->properties); + property__write_all(packet, mosq->will->properties, true); } packet__write_string(packet, mosq->will->msg.topic, strlen(mosq->will->msg.topic)); packet__write_string(packet, (const char *)mosq->will->msg.payload, mosq->will->msg.payloadlen); diff --git a/lib/send_disconnect.c b/lib/send_disconnect.c index 120ef82f..c0959fbb 100644 --- a/lib/send_disconnect.c +++ b/lib/send_disconnect.c @@ -67,7 +67,7 @@ int send__disconnect(struct mosquitto *mosq, uint8_t reason_code, const mosquitt } if(mosq->protocol == mosq_p_mqtt5){ packet__write_byte(packet, reason_code); - property__write_all(packet, properties); + property__write_all(packet, properties, true); } return packet__queue(mosq, packet); diff --git a/lib/send_mosq.c b/lib/send_mosq.c index 3899589c..8b54b996 100644 --- a/lib/send_mosq.c +++ b/lib/send_mosq.c @@ -144,7 +144,7 @@ int send__command_with_mid(struct mosquitto *mosq, uint8_t command, uint16_t mid if(mosq->protocol == mosq_p_mqtt5){ packet__write_byte(packet, reason_code); - property__write_all(packet, properties); + property__write_all(packet, properties, true); } return packet__queue(mosq, packet); diff --git a/lib/send_mosq.h b/lib/send_mosq.h index c26615be..897f2eb8 100644 --- a/lib/send_mosq.h +++ b/lib/send_mosq.h @@ -21,7 +21,7 @@ Contributors: int send__simple_command(struct mosquitto *mosq, uint8_t command); int send__command_with_mid(struct mosquitto *mosq, uint8_t command, uint16_t mid, bool dup, uint8_t reason_code, const mosquitto_property *properties); -int send__real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup, const mosquitto_property *properties); +int send__real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup, const mosquitto_property *cmsg_props, const mosquitto_property *store_props); int send__connect(struct mosquitto *mosq, uint16_t keepalive, bool clean_session, const mosquitto_property *properties); int send__disconnect(struct mosquitto *mosq, uint8_t reason_code, const mosquitto_property *properties); @@ -29,7 +29,7 @@ int send__pingreq(struct mosquitto *mosq); int send__pingresp(struct mosquitto *mosq); int send__puback(struct mosquitto *mosq, uint16_t mid); int send__pubcomp(struct mosquitto *mosq, uint16_t mid); -int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup, const mosquitto_property *properties); +int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup, const mosquitto_property *cmsg_props, const mosquitto_property *store_props); int send__pubrec(struct mosquitto *mosq, uint16_t mid); int send__pubrel(struct mosquitto *mosq, uint16_t mid); int send__subscribe(struct mosquitto *mosq, int *mid, int topic_count, char *const *const topic, int topic_qos, const mosquitto_property *properties); diff --git a/lib/send_publish.c b/lib/send_publish.c index ee25d3b4..c1d81e0e 100644 --- a/lib/send_publish.c +++ b/lib/send_publish.c @@ -37,7 +37,7 @@ Contributors: #include "send_mosq.h" -int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup, const mosquitto_property *properties) +int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup, const mosquitto_property *cmsg_props, const mosquitto_property *store_props) { #ifdef WITH_BROKER size_t len; @@ -110,7 +110,7 @@ int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint3 } log__printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBLISH to %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", mosq->id, dup, qos, retain, mid, mapped_topic, (long)payloadlen); G_PUB_BYTES_SENT_INC(payloadlen); - rc = send__real_publish(mosq, mid, mapped_topic, payloadlen, payload, qos, retain, dup, properties); + rc = send__real_publish(mosq, mid, mapped_topic, payloadlen, payload, qos, retain, dup, cmsg_props, store_props); mosquitto__free(mapped_topic); return rc; } @@ -124,15 +124,15 @@ int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint3 log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBLISH (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", mosq->id, dup, qos, retain, mid, topic, (long)payloadlen); #endif - return send__real_publish(mosq, mid, topic, payloadlen, payload, qos, retain, dup, properties); + return send__real_publish(mosq, mid, topic, payloadlen, payload, qos, retain, dup, cmsg_props, store_props); } -int send__real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup, const mosquitto_property *properties) +int send__real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup, const mosquitto_property *cmsg_props, const mosquitto_property *store_props) { struct mosquitto__packet *packet = NULL; int packetlen; - int proplen, varbytes; + int proplen = 0, varbytes; int rc; assert(mosq); @@ -144,11 +144,14 @@ int send__real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic, } if(qos > 0) packetlen += 2; /* For message id */ if(mosq->protocol == mosq_p_mqtt5){ - proplen = property__get_length_all(properties); + proplen = 0; + proplen += property__get_length_all(cmsg_props); + proplen += property__get_length_all(store_props); varbytes = packet__varint_bytes(proplen); if(varbytes > 4){ /* FIXME - Properties too big, don't publish any - should remove some first really */ - properties = NULL; + cmsg_props = NULL; + store_props = NULL; }else{ packetlen += proplen + varbytes; } @@ -175,7 +178,9 @@ int send__real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic, } if(mosq->protocol == mosq_p_mqtt5){ - property__write_all(packet, properties); + packet__write_varint(packet, proplen); + property__write_all(packet, cmsg_props, false); + property__write_all(packet, store_props, false); } /* Payload */ diff --git a/lib/send_subscribe.c b/lib/send_subscribe.c index ffe7ef7d..4095e4af 100644 --- a/lib/send_subscribe.c +++ b/lib/send_subscribe.c @@ -72,7 +72,7 @@ int send__subscribe(struct mosquitto *mosq, int *mid, int topic_count, const cha packet__write_uint16(packet, local_mid); if(mosq->protocol == mosq_p_mqtt5){ - property__write_all(packet, properties); + property__write_all(packet, properties, true); } /* Payload */ diff --git a/lib/send_unsubscribe.c b/lib/send_unsubscribe.c index a3e75251..6e0623df 100644 --- a/lib/send_unsubscribe.c +++ b/lib/send_unsubscribe.c @@ -70,7 +70,7 @@ int send__unsubscribe(struct mosquitto *mosq, int *mid, const char *topic, const if(mosq->protocol == mosq_p_mqtt5){ /* We don't use User Property yet. */ - property__write_all(packet, properties); + property__write_all(packet, properties, true); } /* Payload */ diff --git a/src/bridge.c b/src/bridge.c index 365658e6..46dbf51c 100644 --- a/src/bridge.c +++ b/src/bridge.c @@ -149,6 +149,7 @@ int bridge__connect_step1(struct mosquitto_db *db, struct mosquitto *context) context, context->bridge->topics[i].local_topic, context->bridge->topics[i].qos, + 0, MQTT_SUB_OPT_NO_LOCAL | MQTT_SUB_OPT_RETAIN_AS_PUBLISHED, &db->subs) > 0){ return 1; @@ -322,6 +323,7 @@ int bridge__connect(struct mosquitto_db *db, struct mosquitto *context) context, context->bridge->topics[i].local_topic, context->bridge->topics[i].qos, + 0, MQTT_SUB_OPT_NO_LOCAL | MQTT_SUB_OPT_RETAIN_AS_PUBLISHED, &db->subs) > 0){ @@ -329,7 +331,7 @@ int bridge__connect(struct mosquitto_db *db, struct mosquitto *context) } sub__retain_queue(db, context, context->bridge->topics[i].local_topic, - context->bridge->topics[i].qos); + context->bridge->topics[i].qos, 0); } } diff --git a/src/database.c b/src/database.c index eb5547f7..666c58ef 100644 --- a/src/database.c +++ b/src/database.c @@ -336,7 +336,7 @@ int db__message_delete(struct mosquitto_db *db, struct mosquitto *context, uint1 return MOSQ_ERR_SUCCESS; } -int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, int qos, bool retain, struct mosquitto_msg_store *stored) +int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, int qos, bool retain, struct mosquitto_msg_store *stored, mosquitto_property *properties) { struct mosquitto_client_msg *msg; struct mosquitto_client_msg **msgs, **last_msg; @@ -362,6 +362,7 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1 for(i=0; idest_id_count; i++){ if(!strcmp(stored->dest_ids[i], context->id)){ /* We have already sent this message to this client. */ + mosquitto_property_free_all(&properties); return MOSQ_ERR_SUCCESS; } } @@ -370,9 +371,11 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1 /* Client is not connected only queue messages with QoS>0. */ if(qos == 0 && !db->config->queue_qos0_messages){ if(!context->bridge){ + mosquitto_property_free_all(&properties); return 2; }else{ if(context->bridge->start_type != bst_lazy){ + mosquitto_property_free_all(&properties); return 2; } } @@ -397,6 +400,7 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1 if(qos == 2){ state = mosq_ms_wait_for_pubrel; }else{ + mosquitto_property_free_all(&properties); return 1; } } @@ -412,6 +416,7 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1 context->id); } G_MSGS_DROPPED_INC(); + mosquitto_property_free_all(&properties); return 2; } }else{ @@ -425,6 +430,7 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1 "Outgoing messages are being dropped for client %s.", context->id); } + mosquitto_property_free_all(&properties); return 2; } } @@ -448,6 +454,7 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1 msg->dup = false; msg->qos = qos; msg->retain = retain; + msg->properties = properties; if (state == mosq_ms_queued){ msgs = &(context->queued_msgs); @@ -875,7 +882,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context) uint32_t payloadlen; const void *payload; int msg_count = 0; - mosquitto_property *properties; + mosquitto_property *cmsg_props = NULL, *store_props = NULL; time_t now; if(!context || context->sock == INVALID_SOCKET @@ -903,11 +910,12 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context) qos = tail->qos; payloadlen = tail->store->payloadlen; payload = UHPA_ACCESS_PAYLOAD(tail->store); - properties = tail->store->properties; + cmsg_props = tail->properties; + store_props = tail->store->properties; switch(tail->state){ case mosq_ms_publish_qos0: - rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, properties); + rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props); if(!rc){ db__message_remove(db, context, &tail, last); }else{ @@ -916,7 +924,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context) break; case mosq_ms_publish_qos1: - rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, properties); + rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props); if(!rc){ tail->timestamp = mosquitto_time(); tail->dup = 1; /* Any retry attempts are a duplicate. */ @@ -929,7 +937,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context) break; case mosq_ms_publish_qos2: - rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, properties); + rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props); if(!rc){ tail->timestamp = mosquitto_time(); tail->dup = 1; /* Any retry attempts are a duplicate. */ diff --git a/src/handle_connack.c b/src/handle_connack.c index bc6e37a8..9ef56812 100644 --- a/src/handle_connack.c +++ b/src/handle_connack.c @@ -59,7 +59,7 @@ int handle__connack(struct mosquitto_db *db, struct mosquitto *context) if(context->bridge->notification_topic){ if(!context->bridge->notifications_local_only){ if(send__real_publish(context, mosquitto__mid_generate(context), - context->bridge->notification_topic, 1, ¬ification_payload, 1, true, 0, NULL)){ + context->bridge->notification_topic, 1, ¬ification_payload, 1, true, 0, NULL, NULL)){ return 1; } @@ -74,7 +74,7 @@ int handle__connack(struct mosquitto_db *db, struct mosquitto *context) notification_payload = '1'; if(!context->bridge->notifications_local_only){ if(send__real_publish(context, mosquitto__mid_generate(context), - notification_topic, 1, ¬ification_payload, 1, true, 0, NULL)){ + notification_topic, 1, ¬ification_payload, 1, true, 0, NULL, NULL)){ mosquitto__free(notification_topic); return 1; @@ -107,11 +107,12 @@ int handle__connack(struct mosquitto_db *db, struct mosquitto *context) context, context->bridge->topics[i].local_topic, context->bridge->topics[i].qos, + 0, MQTT_SUB_OPT_NO_LOCAL | MQTT_SUB_OPT_RETAIN_AS_PUBLISHED, &db->subs)) return 1; sub__retain_queue(db, context, context->bridge->topics[i].local_topic, - context->bridge->topics[i].qos); + context->bridge->topics[i].qos, 0); } } } diff --git a/src/handle_publish.c b/src/handle_publish.c index f328a77e..3b02c1af 100644 --- a/src/handle_publish.c +++ b/src/handle_publish.c @@ -279,7 +279,7 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context) break; case 2: if(!dup){ - res = db__message_insert(db, context, mid, mosq_md_in, qos, retain, stored); + res = db__message_insert(db, context, mid, mosq_md_in, qos, retain, stored, NULL); }else{ res = 0; } @@ -308,7 +308,7 @@ process_bad_message: if(db__message_store(db, context->id, mid, NULL, qos, 0, NULL, false, &stored, 0, NULL, 0)){ return 1; } - res = db__message_insert(db, context, mid, mosq_md_in, qos, false, stored); + res = db__message_insert(db, context, mid, mosq_md_in, qos, false, stored, NULL); }else{ res = 0; } diff --git a/src/handle_subscribe.c b/src/handle_subscribe.c index 4d59da6f..1f8412f7 100644 --- a/src/handle_subscribe.c +++ b/src/handle_subscribe.c @@ -34,6 +34,7 @@ int handle__subscribe(struct mosquitto_db *db, struct mosquitto *context) uint16_t mid; char *sub; uint8_t subscription_options; + uint32_t subscription_identifier = 0; uint8_t qos; uint8_t retain_handling = 0; uint8_t *payload = NULL, *tmp_payload; @@ -57,9 +58,20 @@ int handle__subscribe(struct mosquitto_db *db, struct mosquitto *context) if(context->protocol == mosq_p_mqtt5){ rc = property__read_all(CMD_SUBSCRIBE, &context->in_packet, &properties); if(rc) return rc; + + if(mosquitto_property_read_varint(properties, MQTT_PROP_SUBSCRIPTION_IDENTIFIER, + &subscription_identifier, false)){ + + /* If the identifier was force set to 0, this is an error */ + if(subscription_identifier == 0){ + mosquitto_property_free_all(&properties); + return MOSQ_ERR_PROTOCOL; + } + } + mosquitto_property_free_all(&properties); + /* Note - User Property not handled */ } - mosquitto_property_free_all(&properties); /* FIXME - TEMPORARY UNTIL PROPERTIES PROCESSED */ while(context->in_packet.pos < context->in_packet.remaining_length){ sub = NULL; @@ -147,20 +159,20 @@ int handle__subscribe(struct mosquitto_db *db, struct mosquitto *context) } if(qos != 0x80){ - rc2 = sub__add(db, context, sub, qos, subscription_options, &db->subs); + rc2 = sub__add(db, context, sub, qos, subscription_identifier, subscription_options, &db->subs); if(rc2 > 0){ mosquitto__free(sub); return rc2; } if(context->protocol == mosq_p_mqtt311 || context->protocol == mosq_p_mqtt31){ if(rc2 == MOSQ_ERR_SUCCESS || rc2 == MOSQ_ERR_SUB_EXISTS){ - if(sub__retain_queue(db, context, sub, qos)) rc = 1; + if(sub__retain_queue(db, context, sub, qos, 0)) rc = 1; } }else{ if((retain_handling == MQTT_SUB_OPT_SEND_RETAIN_ALWAYS) || (rc2 == MOSQ_ERR_SUCCESS && retain_handling == MQTT_SUB_OPT_SEND_RETAIN_NEW)){ - if(sub__retain_queue(db, context, sub, qos)) rc = 1; + if(sub__retain_queue(db, context, sub, qos, subscription_identifier)) rc = 1; } } diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index 9cf0246e..cd5eddb8 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -290,7 +290,8 @@ struct mosquitto__subleaf { struct mosquitto__subleaf *prev; struct mosquitto__subleaf *next; struct mosquitto *context; - int qos; + uint32_t identifier; + uint8_t qos; bool no_local; bool retain_as_published; }; @@ -333,6 +334,7 @@ struct mosquitto_msg_store{ struct mosquitto_client_msg{ struct mosquitto_client_msg *next; struct mosquitto_msg_store *store; + mosquitto_property *properties; time_t timestamp; uint16_t mid; uint8_t qos; @@ -554,7 +556,7 @@ void db__limits_set(int inflight, unsigned long inflight_bytes, int queued, unsi /* Return the number of in-flight messages in count. */ int db__message_count(int *count); int db__message_delete(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir); -int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, int qos, bool retain, struct mosquitto_msg_store *stored); +int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, int qos, bool retain, struct mosquitto_msg_store *stored, mosquitto_property *properties); int db__message_release(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir); int db__message_update(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, enum mosquitto_msg_state state); int db__message_write(struct mosquitto_db *db, struct mosquitto *context); @@ -575,12 +577,12 @@ void sys_tree__update(struct mosquitto_db *db, int interval, time_t start_time); /* ============================================================ * Subscription functions * ============================================================ */ -int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int qos, int options, struct mosquitto__subhier **root); +int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int qos, uint32_t identifier, int options, struct mosquitto__subhier **root); struct mosquitto__subhier *sub__add_hier_entry(struct mosquitto__subhier *parent, struct mosquitto__subhier **sibling, const char *topic, size_t len); int sub__remove(struct mosquitto_db *db, struct mosquitto *context, const char *sub, struct mosquitto__subhier *root); void sub__tree_print(struct mosquitto__subhier *root, int level); int sub__clean_session(struct mosquitto_db *db, struct mosquitto *context); -int sub__retain_queue(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int sub_qos); +int sub__retain_queue(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int sub_qos, uint32_t subscription_identifier); int sub__messages_queue(struct mosquitto_db *db, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store **stored); /* ============================================================ diff --git a/src/persist.c b/src/persist.c index b8b3e4b8..a48028a7 100644 --- a/src/persist.c +++ b/src/persist.c @@ -935,8 +935,8 @@ static int persist__restore_sub(struct mosquitto_db *db, const char *client_id, context = persist__find_or_add_context(db, client_id, 0); if(!context) return 1; - /* FIXME - retain_as_published needs saving */ - return sub__add(db, context, sub, qos, false, &db->subs); + /* FIXME - identifer, options need saving */ + return sub__add(db, context, sub, qos, 0, 0, &db->subs); } #endif diff --git a/src/send_connack.c b/src/send_connack.c index 1c20820c..d51179b0 100644 --- a/src/send_connack.c +++ b/src/send_connack.c @@ -62,11 +62,6 @@ int send__connack(struct mosquitto_db *db, struct mosquitto *context, int ack, i mosquitto__free(packet); return rc; } - rc = mosquitto_property_add_byte(&connack_props, MQTT_PROP_SUBSCRIPTION_ID_AVAILABLE, 0); - if(rc){ - mosquitto__free(packet); - return rc; - } proplen = property__get_length_all(connack_props); varbytes = packet__varint_bytes(proplen); @@ -80,7 +75,7 @@ int send__connack(struct mosquitto_db *db, struct mosquitto *context, int ack, i packet__write_byte(packet, ack); packet__write_byte(packet, reason_code); if(context->protocol == mosq_p_mqtt5){ - property__write_all(packet, connack_props); + property__write_all(packet, connack_props, true); } mosquitto_property_free_all(&connack_props); diff --git a/src/send_suback.c b/src/send_suback.c index 49daeebc..98c13bc7 100644 --- a/src/send_suback.c +++ b/src/send_suback.c @@ -52,7 +52,7 @@ int send__suback(struct mosquitto *context, uint16_t mid, uint32_t payloadlen, c if(context->protocol == mosq_p_mqtt5){ /* We don't use Reason String or User Property yet. */ - property__write_all(packet, properties); + property__write_all(packet, properties, true); } if(payloadlen){ diff --git a/src/send_unsuback.c b/src/send_unsuback.c index 59fc50a3..ee0c8333 100644 --- a/src/send_unsuback.c +++ b/src/send_unsuback.c @@ -53,7 +53,7 @@ int send__unsuback(struct mosquitto *mosq, uint16_t mid, const mosquitto_propert packet__write_uint16(packet, mid); if(mosq->protocol == mosq_p_mqtt5){ - property__write_all(packet, properties); + property__write_all(packet, properties, true); } return packet__queue(mosq, packet); diff --git a/src/subs.c b/src/subs.c index acc8714a..04bb2231 100644 --- a/src/subs.c +++ b/src/subs.c @@ -70,6 +70,7 @@ static int subs__process(struct mosquitto_db *db, struct mosquitto__subhier *hie uint16_t mid; struct mosquitto__subleaf *leaf; bool client_retain; + mosquitto_property *properties = NULL; leaf = hier->subs; @@ -129,7 +130,10 @@ static int subs__process(struct mosquitto_db *db, struct mosquitto__subhier *hie }else{ client_retain = false; } - if(db__message_insert(db, leaf->context, mid, mosq_md_out, msg_qos, client_retain, stored) == 1) rc = 1; + if(leaf->identifier){ + mosquitto_property_add_varint(&properties, MQTT_PROP_SUBSCRIPTION_IDENTIFIER, leaf->identifier); + } + if(db__message_insert(db, leaf->context, mid, mosq_md_out, msg_qos, client_retain, stored, properties) == 1) rc = 1; }else{ return 1; /* Application error */ } @@ -241,7 +245,7 @@ static void sub__topic_tokens_free(struct sub__token *tokens) } } -static int sub__add_recurse(struct mosquitto_db *db, struct mosquitto *context, int qos, int options, struct mosquitto__subhier *subhier, struct sub__token *tokens) +static int sub__add_recurse(struct mosquitto_db *db, struct mosquitto *context, int qos, uint32_t identifier, int options, struct mosquitto__subhier *subhier, struct sub__token *tokens) /* FIXME - this function has the potential to leak subhier, audit calling functions. */ { struct mosquitto__subhier *branch; @@ -275,6 +279,7 @@ static int sub__add_recurse(struct mosquitto_db *db, struct mosquitto *context, leaf->next = NULL; leaf->context = context; leaf->qos = qos; + leaf->identifier = identifier; leaf->no_local = ((options & MQTT_SUB_OPT_NO_LOCAL) != 0); leaf->retain_as_published = ((options & MQTT_SUB_OPT_RETAIN_AS_PUBLISHED) != 0); for(i=0; isub_count; i++){ @@ -309,13 +314,13 @@ static int sub__add_recurse(struct mosquitto_db *db, struct mosquitto *context, HASH_FIND(hh, subhier->children, UHPA_ACCESS_TOPIC(tokens), tokens->topic_len, branch); if(branch){ - return sub__add_recurse(db, context, qos, options, branch, tokens->next); + return sub__add_recurse(db, context, qos, identifier, options, branch, tokens->next); }else{ /* Not found */ branch = sub__add_hier_entry(subhier, &subhier->children, UHPA_ACCESS_TOPIC(tokens), tokens->topic_len); if(!branch) return MOSQ_ERR_NOMEM; - return sub__add_recurse(db, context, qos, options, branch, tokens->next); + return sub__add_recurse(db, context, qos, identifier, options, branch, tokens->next); } } @@ -444,7 +449,7 @@ struct mosquitto__subhier *sub__add_hier_entry(struct mosquitto__subhier *parent } -int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int qos, int options, struct mosquitto__subhier **root) +int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int qos, uint32_t identifier, int options, struct mosquitto__subhier **root) { int rc = 0; struct mosquitto__subhier *subhier; @@ -466,7 +471,7 @@ int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub } } - rc = sub__add_recurse(db, context, qos, options, subhier, tokens); + rc = sub__add_recurse(db, context, qos, identifier, options, subhier, tokens); sub__topic_tokens_free(tokens); @@ -517,7 +522,7 @@ int sub__messages_queue(struct mosquitto_db *db, const char *source_id, const ch /* We have a message that needs to be retained, so ensure that the subscription * tree for its topic exists. */ - sub__add_recurse(db, NULL, 0, false, subhier, tokens); + sub__add_recurse(db, NULL, 0, 0, 0, subhier, tokens); } sub__search(db, subhier, tokens, source_id, topic, qos, retain, *stored, true); } @@ -641,11 +646,12 @@ void sub__tree_print(struct mosquitto__subhier *root, int level) } } -static int retain__process(struct mosquitto_db *db, struct mosquitto_msg_store *retained, struct mosquitto *context, const char *sub, int sub_qos) +static int retain__process(struct mosquitto_db *db, struct mosquitto_msg_store *retained, struct mosquitto *context, const char *sub, int sub_qos, uint32_t subscription_identifier) { int rc = 0; int qos; uint16_t mid; + mosquitto_property *properties = NULL; rc = mosquitto_acl_check(db, context, retained->topic, retained->payloadlen, UHPA_ACCESS(retained->payload, retained->payloadlen), retained->qos, retained->retain, MOSQ_ACL_READ); @@ -666,10 +672,13 @@ static int retain__process(struct mosquitto_db *db, struct mosquitto_msg_store * }else{ mid = 0; } - return db__message_insert(db, context, mid, mosq_md_out, qos, true, retained); + if(subscription_identifier > 0){ + mosquitto_property_add_varint(&properties, MQTT_PROP_SUBSCRIPTION_IDENTIFIER, subscription_identifier); + } + return db__message_insert(db, context, mid, mosq_md_out, qos, true, retained, properties); } -static int retain__search(struct mosquitto_db *db, struct mosquitto__subhier *subhier, struct sub__token *tokens, struct mosquitto *context, const char *sub, int sub_qos, int level) +static int retain__search(struct mosquitto_db *db, struct mosquitto__subhier *subhier, struct sub__token *tokens, struct mosquitto *context, const char *sub, int sub_qos, uint32_t subscription_identifier, int level) { struct mosquitto__subhier *branch, *branch_tmp; int flag = 0; @@ -685,25 +694,25 @@ static int retain__search(struct mosquitto_db *db, struct mosquitto__subhier *su */ flag = -1; if(branch->retained){ - retain__process(db, branch->retained, context, sub, sub_qos); + retain__process(db, branch->retained, context, sub, sub_qos, subscription_identifier); } if(branch->children){ - retain__search(db, branch, tokens, context, sub, sub_qos, level+1); + retain__search(db, branch, tokens, context, sub, sub_qos, subscription_identifier, level+1); } }else if(strcmp(UHPA_ACCESS_TOPIC(branch), "+") && (!strcmp(UHPA_ACCESS_TOPIC(branch), UHPA_ACCESS_TOPIC(tokens)) || !strcmp(UHPA_ACCESS_TOPIC(tokens), "+"))){ if(tokens->next){ - if(retain__search(db, branch, tokens->next, context, sub, sub_qos, level+1) == -1 + if(retain__search(db, branch, tokens->next, context, sub, sub_qos, subscription_identifier, level+1) == -1 || (!branch_tmp && tokens->next && !strcmp(UHPA_ACCESS_TOPIC(tokens->next), "#") && level>0)){ if(branch->retained){ - retain__process(db, branch->retained, context, sub, sub_qos); + retain__process(db, branch->retained, context, sub, sub_qos, subscription_identifier); } } }else{ if(branch->retained){ - retain__process(db, branch->retained, context, sub, sub_qos); + retain__process(db, branch->retained, context, sub, sub_qos, subscription_identifier); } } } @@ -711,7 +720,7 @@ static int retain__search(struct mosquitto_db *db, struct mosquitto__subhier *su return flag; } -int sub__retain_queue(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int sub_qos) +int sub__retain_queue(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int sub_qos, uint32_t subscription_identifier) { struct mosquitto__subhier *subhier; struct sub__token *tokens = NULL, *tail; @@ -725,7 +734,7 @@ int sub__retain_queue(struct mosquitto_db *db, struct mosquitto *context, const HASH_FIND(hh, db->subs, UHPA_ACCESS_TOPIC(tokens), tokens->topic_len, subhier); if(subhier){ - retain__search(db, subhier, tokens, context, sub, sub_qos, 0); + retain__search(db, subhier, tokens, context, sub, sub_qos, subscription_identifier, 0); } while(tokens){ tail = tokens->next; diff --git a/test/mosq_test.py b/test/mosq_test.py index 8d29d1a5..a2db51fd 100644 --- a/test/mosq_test.py +++ b/test/mosq_test.py @@ -368,7 +368,6 @@ def gen_connect(client_id, clean_session=True, keepalive=60, username=None, pass def gen_connack(resv=0, rc=0, proto_ver=4, properties=""): if proto_ver == 5: properties += mqtt5_props.gen_byte_prop(mqtt5_props.PROP_SHARED_SUB_AVAILABLE, 0) - properties += mqtt5_props.gen_byte_prop(mqtt5_props.PROP_SUBSCRIPTION_ID_AVAILABLE, 0) properties = mqtt5_props.prop_finalise(properties) packet = struct.pack('!BBBB', 32, 2+len(properties), resv, rc) + properties