From 6a59e92db8b0076ab504837d57ea1779cec377f7 Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Tue, 22 Jan 2019 12:43:52 +0000 Subject: [PATCH] Set remaining message expiry interval when republishing. --- lib/actions.c | 4 +- lib/messages_mosq.c | 4 +- lib/send_mosq.h | 4 +- lib/send_publish.c | 22 ++++-- src/database.c | 19 ++++-- src/handle_auth.c | 2 +- src/handle_connack.c | 4 +- test/broker/02-subpub-qos1-message-expiry.py | 72 ++++++++++++++++++++ test/broker/Makefile | 1 + test/broker/ptest.py | 1 + 10 files changed, 113 insertions(+), 20 deletions(-) create mode 100755 test/broker/02-subpub-qos1-message-expiry.py diff --git a/lib/actions.c b/lib/actions.c index b7abd592..bc6e2e1a 100644 --- a/lib/actions.c +++ b/lib/actions.c @@ -94,7 +94,7 @@ int mosquitto_publish_v5(struct mosquitto *mosq, int *mid, const char *topic, in } if(qos == 0){ - return send__publish(mosq, local_mid, topic, payloadlen, payload, qos, retain, false, outgoing_properties, NULL); + return send__publish(mosq, local_mid, topic, payloadlen, payload, qos, retain, false, outgoing_properties, NULL, 0); }else{ message = mosquitto__calloc(1, sizeof(struct mosquitto_message_all)); if(!message) return MOSQ_ERR_NOMEM; @@ -134,7 +134,7 @@ int mosquitto_publish_v5(struct mosquitto *mosq, int *mid, const char *topic, in message->state = mosq_ms_wait_for_pubrec; } pthread_mutex_unlock(&mosq->out_message_mutex); - return send__publish(mosq, message->msg.mid, message->msg.topic, message->msg.payloadlen, message->msg.payload, message->msg.qos, message->msg.retain, message->dup, outgoing_properties, NULL); + return send__publish(mosq, message->msg.mid, message->msg.topic, message->msg.payloadlen, message->msg.payload, message->msg.qos, message->msg.retain, message->dup, outgoing_properties, NULL, 0); }else{ message->state = mosq_ms_invalid; pthread_mutex_unlock(&mosq->out_message_mutex); diff --git a/lib/messages_mosq.c b/lib/messages_mosq.c index 51f48550..75e4c9d9 100644 --- a/lib/messages_mosq.c +++ b/lib/messages_mosq.c @@ -260,7 +260,7 @@ int message__remove(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_dir }else if(cur->msg.qos == 2){ cur->state = mosq_ms_wait_for_pubrec; } - rc = send__publish(mosq, cur->msg.mid, cur->msg.topic, cur->msg.payloadlen, cur->msg.payload, cur->msg.qos, cur->msg.retain, cur->dup, NULL, NULL); + rc = send__publish(mosq, cur->msg.mid, cur->msg.topic, cur->msg.payloadlen, cur->msg.payload, cur->msg.qos, cur->msg.retain, cur->dup, NULL, NULL, 0); if(rc){ pthread_mutex_unlock(&mosq->out_message_mutex); return rc; @@ -330,7 +330,7 @@ void message__retry_check_actual(struct mosquitto *mosq, struct mosquitto_messag case mosq_ms_publish_qos2: messages->timestamp = now; messages->dup = true; - send__publish(mosq, messages->msg.mid, messages->msg.topic, messages->msg.payloadlen, messages->msg.payload, messages->msg.qos, messages->msg.retain, messages->dup, NULL, NULL); + send__publish(mosq, messages->msg.mid, messages->msg.topic, messages->msg.payloadlen, messages->msg.payload, messages->msg.qos, messages->msg.retain, messages->dup, NULL, NULL, 0); break; case mosq_ms_wait_for_pubrel: messages->timestamp = now; diff --git a/lib/send_mosq.h b/lib/send_mosq.h index 897f2eb8..9e2853b3 100644 --- a/lib/send_mosq.h +++ b/lib/send_mosq.h @@ -21,7 +21,7 @@ Contributors: int send__simple_command(struct mosquitto *mosq, uint8_t command); int send__command_with_mid(struct mosquitto *mosq, uint8_t command, uint16_t mid, bool dup, uint8_t reason_code, const mosquitto_property *properties); -int send__real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup, const mosquitto_property *cmsg_props, const mosquitto_property *store_props); +int send__real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup, const mosquitto_property *cmsg_props, const mosquitto_property *store_props, uint32_t expiry_interval); int send__connect(struct mosquitto *mosq, uint16_t keepalive, bool clean_session, const mosquitto_property *properties); int send__disconnect(struct mosquitto *mosq, uint8_t reason_code, const mosquitto_property *properties); @@ -29,7 +29,7 @@ int send__pingreq(struct mosquitto *mosq); int send__pingresp(struct mosquitto *mosq); int send__puback(struct mosquitto *mosq, uint16_t mid); int send__pubcomp(struct mosquitto *mosq, uint16_t mid); -int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup, const mosquitto_property *cmsg_props, const mosquitto_property *store_props); +int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup, const mosquitto_property *cmsg_props, const mosquitto_property *store_props, uint32_t expiry_interval); int send__pubrec(struct mosquitto *mosq, uint16_t mid); int send__pubrel(struct mosquitto *mosq, uint16_t mid); int send__subscribe(struct mosquitto *mosq, int *mid, int topic_count, char *const *const topic, int topic_qos, const mosquitto_property *properties); diff --git a/lib/send_publish.c b/lib/send_publish.c index c1d81e0e..41b5e9ce 100644 --- a/lib/send_publish.c +++ b/lib/send_publish.c @@ -37,7 +37,7 @@ Contributors: #include "send_mosq.h" -int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup, const mosquitto_property *cmsg_props, const mosquitto_property *store_props) +int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup, const mosquitto_property *cmsg_props, const mosquitto_property *store_props, uint32_t expiry_interval) { #ifdef WITH_BROKER size_t len; @@ -110,7 +110,7 @@ int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint3 } log__printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBLISH to %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", mosq->id, dup, qos, retain, mid, mapped_topic, (long)payloadlen); G_PUB_BYTES_SENT_INC(payloadlen); - rc = send__real_publish(mosq, mid, mapped_topic, payloadlen, payload, qos, retain, dup, cmsg_props, store_props); + rc = send__real_publish(mosq, mid, mapped_topic, payloadlen, payload, qos, retain, dup, cmsg_props, store_props, expiry_interval); mosquitto__free(mapped_topic); return rc; } @@ -124,16 +124,17 @@ int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint3 log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBLISH (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", mosq->id, dup, qos, retain, mid, topic, (long)payloadlen); #endif - return send__real_publish(mosq, mid, topic, payloadlen, payload, qos, retain, dup, cmsg_props, store_props); + return send__real_publish(mosq, mid, topic, payloadlen, payload, qos, retain, dup, cmsg_props, store_props, expiry_interval); } -int send__real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup, const mosquitto_property *cmsg_props, const mosquitto_property *store_props) +int send__real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup, const mosquitto_property *cmsg_props, const mosquitto_property *store_props, uint32_t expiry_interval) { struct mosquitto__packet *packet = NULL; int packetlen; int proplen = 0, varbytes; int rc; + mosquitto_property expiry_prop; assert(mosq); @@ -147,11 +148,21 @@ int send__real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic, proplen = 0; proplen += property__get_length_all(cmsg_props); proplen += property__get_length_all(store_props); + if(expiry_interval > 0){ + expiry_prop.next = NULL; + expiry_prop.value.i32 = expiry_interval; + expiry_prop.identifier = MQTT_PROP_MESSAGE_EXPIRY_INTERVAL; + expiry_prop.client_generated = false; + + proplen += property__get_length_all(&expiry_prop); + } + varbytes = packet__varint_bytes(proplen); if(varbytes > 4){ /* FIXME - Properties too big, don't publish any - should remove some first really */ cmsg_props = NULL; store_props = NULL; + expiry_interval = 0; }else{ packetlen += proplen + varbytes; } @@ -181,6 +192,9 @@ int send__real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic, packet__write_varint(packet, proplen); property__write_all(packet, cmsg_props, false); property__write_all(packet, store_props, false); + if(expiry_interval > 0){ + property__write_all(packet, &expiry_prop, false); + } } /* Payload */ diff --git a/src/database.c b/src/database.c index e00c9ada..56f4eb91 100644 --- a/src/database.c +++ b/src/database.c @@ -887,6 +887,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context) int msg_count = 0; mosquitto_property *cmsg_props = NULL, *store_props = NULL; time_t now; + uint32_t expiry_interval = 0; if(!context || context->sock == INVALID_SOCKET || (context->state == mosq_cs_connected && !context->id)){ @@ -901,10 +902,14 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context) tail = context->inflight_msgs; while(tail){ msg_count++; - if(tail->store->message_expiry_time && now > tail->store->message_expiry_time){ - /* Message is expired, must not send. */ - db__message_remove(db, context, &tail, last); - continue; + if(tail->store->message_expiry_time){ + if(now > tail->store->message_expiry_time){ + /* Message is expired, must not send. */ + db__message_remove(db, context, &tail, last); + continue; + }else{ + expiry_interval = tail->store->message_expiry_time - now; + } } mid = tail->mid; retries = tail->dup; @@ -918,7 +923,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context) switch(tail->state){ case mosq_ms_publish_qos0: - rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props); + rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props, expiry_interval); if(!rc){ db__message_remove(db, context, &tail, last); }else{ @@ -927,7 +932,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context) break; case mosq_ms_publish_qos1: - rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props); + rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props, expiry_interval); if(!rc){ tail->timestamp = mosquitto_time(); tail->dup = 1; /* Any retry attempts are a duplicate. */ @@ -940,7 +945,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context) break; case mosq_ms_publish_qos2: - rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props); + rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props, expiry_interval); if(!rc){ tail->timestamp = mosquitto_time(); tail->dup = 1; /* Any retry attempts are a duplicate. */ diff --git a/src/handle_auth.c b/src/handle_auth.c index 1520f22d..a4953e47 100644 --- a/src/handle_auth.c +++ b/src/handle_auth.c @@ -38,7 +38,7 @@ int handle__auth(struct mosquitto_db *db, struct mosquitto *context) return MOSQ_ERR_PROTOCOL; } - if(mosq->in_packet.remaining_length > 0){ + if(context->in_packet.remaining_length > 0){ if(packet__read_byte(&context->in_packet, &reason_code)) return 1; rc = property__read_all(CMD_AUTH, &context->in_packet, &properties); diff --git a/src/handle_connack.c b/src/handle_connack.c index 9ef56812..d288a671 100644 --- a/src/handle_connack.c +++ b/src/handle_connack.c @@ -59,7 +59,7 @@ int handle__connack(struct mosquitto_db *db, struct mosquitto *context) if(context->bridge->notification_topic){ if(!context->bridge->notifications_local_only){ if(send__real_publish(context, mosquitto__mid_generate(context), - context->bridge->notification_topic, 1, ¬ification_payload, 1, true, 0, NULL, NULL)){ + context->bridge->notification_topic, 1, ¬ification_payload, 1, true, 0, NULL, NULL, 0)){ return 1; } @@ -74,7 +74,7 @@ int handle__connack(struct mosquitto_db *db, struct mosquitto *context) notification_payload = '1'; if(!context->bridge->notifications_local_only){ if(send__real_publish(context, mosquitto__mid_generate(context), - notification_topic, 1, ¬ification_payload, 1, true, 0, NULL, NULL)){ + notification_topic, 1, ¬ification_payload, 1, true, 0, NULL, NULL, 0)){ mosquitto__free(notification_topic); return 1; diff --git a/test/broker/02-subpub-qos1-message-expiry.py b/test/broker/02-subpub-qos1-message-expiry.py new file mode 100755 index 00000000..3ba16773 --- /dev/null +++ b/test/broker/02-subpub-qos1-message-expiry.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python + +# Test whether the broker reduces the message expiry interval when republishing. +# MQTT v5 + +# Client connects with clean session set false, subscribes with qos=1, then disconnects +# Helper publishes two messages, one with a short expiry and one with a long expiry +# We wait until the short expiry will have expired but the long one not. +# Client reconnects, expects delivery of the long expiry message with a reduced +# expiry interval property. + +from mosq_test_helper import * + +rc = 1 +mid = 53 +keepalive = 60 +connect_packet = mosq_test.gen_connect("subpub-qos0-test", keepalive=keepalive, proto_ver=5, clean_session=False) +connack1_packet = mosq_test.gen_connack(rc=0, proto_ver=5) +connack2_packet = mosq_test.gen_connack(rc=0, proto_ver=5, flags=1) + +subscribe_packet = mosq_test.gen_subscribe(mid, "subpub/qos1", 1, proto_ver=5) +suback_packet = mosq_test.gen_suback(mid, 1, proto_ver=5) + + + +helper_connect = mosq_test.gen_connect("helper", proto_ver=5) +helper_connack = mosq_test.gen_connack(rc=0, proto_ver=5) + +mid=1 +props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_MESSAGE_EXPIRY_INTERVAL, 1) +publish1s_packet = mosq_test.gen_publish("subpub/qos1", mid=mid, qos=1, payload="message1", proto_ver=5, properties=props) +puback1s_packet = mosq_test.gen_puback(mid) + +mid=2 +props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_MESSAGE_EXPIRY_INTERVAL, 10) +publish2s_packet = mosq_test.gen_publish("subpub/qos1", mid=mid, qos=1, payload="message2", proto_ver=5, properties=props) +puback2s_packet = mosq_test.gen_puback(mid) + + +port = mosq_test.get_port() +broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port) + +try: + sock = mosq_test.do_client_connect(connect_packet, connack1_packet, timeout=20, port=port) + mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback") + sock.close() + + helper = mosq_test.do_client_connect(helper_connect, helper_connack, timeout=20, port=port) + mosq_test.do_send_receive(helper, publish1s_packet, puback1s_packet, "puback 1") + mosq_test.do_send_receive(helper, publish2s_packet, puback2s_packet, "puback 2") + + time.sleep(2) + + sock = mosq_test.do_client_connect(connect_packet, connack2_packet, timeout=20, port=port) + packet = sock.recv(len(publish2s_packet)) + for i in range(9, 5, -1): + props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_MESSAGE_EXPIRY_INTERVAL, i) + publish2r_packet = mosq_test.gen_publish("subpub/qos1", mid=2, qos=1, payload="message2", proto_ver=5, properties=props) + if packet == publish2r_packet: + rc = 0 + break + + sock.close() +finally: + broker.terminate() + broker.wait() + (stdo, stde) = broker.communicate() + if rc: + print(stde) + +exit(rc) + diff --git a/test/broker/Makefile b/test/broker/Makefile index f6434d75..4dedbc2f 100644 --- a/test/broker/Makefile +++ b/test/broker/Makefile @@ -53,6 +53,7 @@ endif ./02-subpub-qos0-v5.py ./02-subpub-qos1-v5.py ./02-subpub-qos2-v5.py + ./02-subpub-qos1-message-expiry.py ./02-subpub-qos1-nolocal.py ./02-subpub-qos0-retain-as-publish.py ./02-subpub-qos0-send-retain.py diff --git a/test/broker/ptest.py b/test/broker/ptest.py index 277b972e..d9d243c8 100755 --- a/test/broker/ptest.py +++ b/test/broker/ptest.py @@ -36,6 +36,7 @@ tests = [ (1, './02-subpub-qos0-v5.py'), (1, './02-subpub-qos1-v5.py'), (1, './02-subpub-qos2-v5.py'), + (1, './02-subpub-qos1-message-expiry.py'), (1, './02-subpub-qos1-nolocal.py'), (1, './02-subpub-qos0-retain-as-publish.py'), (1, './02-subpub-qos0-send-retain.py'),