From efc8ed39afa8c7a684bbca692fe307c89663cc0f Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Wed, 31 Jul 2019 18:37:59 +0100 Subject: [PATCH] Fix incoming msgs not being removed when there are no subs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fix incoming messages not being removed for a client if the topic being published to does not have any subscribers. Closes #1322. Thanks to Yannic Schröder. --- ChangeLog.txt | 2 + lib/handle_pubrel.c | 6 +- src/database.c | 13 ++- test/broker/02-subpub-qos2-1322.py | 137 +++++++++++++++++++++++++++++ test/broker/Makefile | 1 + test/broker/test.py | 1 + 6 files changed, 154 insertions(+), 6 deletions(-) create mode 100755 test/broker/02-subpub-qos2-1322.py diff --git a/ChangeLog.txt b/ChangeLog.txt index 2c55c4b1..fb736c96 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -8,6 +8,8 @@ Broker: Closes #515. - Fix incoming QoS 2 messages being blocked when `max_inflight_messages` was set to 1. Closes #1332. +- Fix incoming messages not being removed for a client if the topic being + published to does not have any subscribers. Closes #1322. Client library: - Fix MQTT v5 subscription options being incorrectly set for MQTT v3 diff --git a/lib/handle_pubrel.c b/lib/handle_pubrel.c index d4a90652..209a3e0d 100644 --- a/lib/handle_pubrel.c +++ b/lib/handle_pubrel.c @@ -78,11 +78,11 @@ int handle__pubrel(struct mosquitto_db *db, struct mosquitto *mosq) mosquitto_property_free_all(&properties); rc = db__message_release_incoming(db, mosq, mid); - if(rc == MOSQ_ERR_PROTOCOL){ - return rc; - }else if(rc != MOSQ_ERR_SUCCESS){ + if(rc == MOSQ_ERR_NOT_FOUND){ /* Message not found. Still send a PUBCOMP anyway because this could be * due to a repeated PUBREL after a client has reconnected. */ + }else if(rc != MOSQ_ERR_SUCCESS){ + return rc; } rc = send__pubcomp(mosq, mid); diff --git a/src/database.c b/src/database.c index 87daf68c..715c1690 100644 --- a/src/database.c +++ b/src/database.c @@ -893,6 +893,7 @@ int db__message_release_incoming(struct mosquitto_db *db, struct mosquitto *cont char *source_id; int msg_index = 0; bool deleted = false; + int rc; if(!context) return MOSQ_ERR_INVAL; @@ -910,11 +911,17 @@ int db__message_release_incoming(struct mosquitto_db *db, struct mosquitto *cont * denied/dropped and is being processed so the client doesn't * keep resending it. That means we don't send it to other * clients. */ - if(!topic || !sub__messages_queue(db, source_id, topic, 2, retain, &tail->store)){ + if(!topic){ db__message_remove(db, &context->msgs_in, tail); deleted = true; }else{ - return 1; + rc = sub__messages_queue(db, source_id, topic, 2, retain, &tail->store); + if(rc == MOSQ_ERR_SUCCESS || rc == MOSQ_ERR_NO_SUBSCRIBERS){ + db__message_remove(db, &context->msgs_in, tail); + deleted = true; + }else{ + return 1; + } } } } @@ -936,7 +943,7 @@ int db__message_release_incoming(struct mosquitto_db *db, struct mosquitto *cont if(deleted){ return MOSQ_ERR_SUCCESS; }else{ - return 1; + return MOSQ_ERR_NOT_FOUND; } } diff --git a/test/broker/02-subpub-qos2-1322.py b/test/broker/02-subpub-qos2-1322.py new file mode 100755 index 00000000..00720be5 --- /dev/null +++ b/test/broker/02-subpub-qos2-1322.py @@ -0,0 +1,137 @@ +#!/usr/bin/env python3 + +# Test for issue 1322: + +## restart mosquitto +#sudo systemctl restart mosquitto.service +# +## listen on topic1 +#mosquitto_sub -t "topic1" +# +## publish to topic1 without clean session +#mosquitto_pub -t "topic1" -q 2 -c --id "foobar" -m "message1" +## message1 on topic1 is received as expected +# +## publish to topic2 without clean session +## IMPORTANT: no subscription to this topic is present on broker! +#mosquitto_pub -t "topic2" -q 2 -c --id "foobar" -m "message2" +## this goes nowhere, as no subscriber present +# +## publish to topic1 without clean session +#mosquitto_pub -t "topic1" -q 2 -c --id "foobar" -m "message3" +## message3 on topic1 IS NOT RECEIVED +# +## listen on topic2 +#mosquitto_sub -t "topic2" +# +## publish to topic1 without clean session +#mosquitto_pub -t "topic1" -q 2 -c --id "foobar" -m "message4" +## message2 on topic2 is received incorrectly +# +## publish to topic1 without clean session +#mosquitto_pub -t "topic1" -q 2 -c --id "foobar" -m "message5" +## message5 on topic1 is received as expected (message4 was dropped) + + + +from mosq_test_helper import * + +rc = 1 +keepalive = 60 +pub_connect_packet = mosq_test.gen_connect("pub", keepalive=keepalive, clean_session=False) +pub_connack1_packet = mosq_test.gen_connack(rc=0) +pub_connack2_packet = mosq_test.gen_connack(rc=0, flags=1) + +sub1_connect_packet = mosq_test.gen_connect("sub1", keepalive=keepalive) +sub1_connack_packet = mosq_test.gen_connack(rc=0) + +sub2_connect_packet = mosq_test.gen_connect("sub2", keepalive=keepalive) +sub2_connack_packet = mosq_test.gen_connack(rc=0) + +mid = 1 +subscribe1_packet = mosq_test.gen_subscribe(mid, "topic1", 0) +suback1_packet = mosq_test.gen_suback(mid, 0) + +mid = 1 +subscribe2_packet = mosq_test.gen_subscribe(mid, "topic2", 0) +suback2_packet = mosq_test.gen_suback(mid, 0) + +# All publishes have the same mid +mid = 1 +pubrec_packet = mosq_test.gen_pubrec(mid) +pubrel_packet = mosq_test.gen_pubrel(mid) +pubcomp_packet = mosq_test.gen_pubcomp(mid) + +publish1s_packet = mosq_test.gen_publish("topic1", qos=2, mid=mid, payload="message1") +publish2s_packet = mosq_test.gen_publish("topic2", qos=2, mid=mid, payload="message2") +publish3s_packet = mosq_test.gen_publish("topic1", qos=2, mid=mid, payload="message3") +publish4s_packet = mosq_test.gen_publish("topic1", qos=2, mid=mid, payload="message4") +publish5s_packet = mosq_test.gen_publish("topic1", qos=2, mid=mid, payload="message5") + +publish1r_packet = mosq_test.gen_publish("topic1", qos=0, payload="message1") +publish2r_packet = mosq_test.gen_publish("topic2", qos=0, payload="message2") +publish3r_packet = mosq_test.gen_publish("topic1", qos=0, payload="message3") +publish4r_packet = mosq_test.gen_publish("topic1", qos=0, payload="message4") +publish5r_packet = mosq_test.gen_publish("topic1", qos=0, payload="message5") + +port = mosq_test.get_port() +broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port) + +try: + sub1 = mosq_test.do_client_connect(sub1_connect_packet, sub1_connack_packet, timeout=10, port=port) + mosq_test.do_send_receive(sub1, subscribe1_packet, suback1_packet, "suback1") + + pub = mosq_test.do_client_connect(pub_connect_packet, pub_connack1_packet, timeout=10, port=port) + mosq_test.do_send_receive(pub, publish1s_packet, pubrec_packet, "pubrec1") + mosq_test.do_send_receive(pub, pubrel_packet, pubcomp_packet, "pubcomp1") + pub.close() + + if mosq_test.expect_packet(sub1, "publish1", publish1r_packet): + pub = mosq_test.do_client_connect(pub_connect_packet, pub_connack2_packet, timeout=10, port=port) + mosq_test.do_send_receive(pub, publish2s_packet, pubrec_packet, "pubrec2") + mosq_test.do_send_receive(pub, pubrel_packet, pubcomp_packet, "pubcomp2") + pub.close() + + # We expect nothing on sub1 + mosq_test.do_ping(sub1, error_string="pingresp1") + + pub = mosq_test.do_client_connect(pub_connect_packet, pub_connack2_packet, timeout=10, port=port) + mosq_test.do_send_receive(pub, publish3s_packet, pubrec_packet, "pubrec3") + mosq_test.do_send_receive(pub, pubrel_packet, pubcomp_packet, "pubcomp3") + pub.close() + + if mosq_test.expect_packet(sub1, "publish3", publish3r_packet): + sub2 = mosq_test.do_client_connect(sub2_connect_packet, sub2_connack_packet, timeout=10, port=port) + mosq_test.do_send_receive(sub2, subscribe2_packet, suback2_packet, "suback2") + + pub = mosq_test.do_client_connect(pub_connect_packet, pub_connack2_packet, timeout=10, port=port) + mosq_test.do_send_receive(pub, publish4s_packet, pubrec_packet, "pubrec4") + mosq_test.do_send_receive(pub, pubrel_packet, pubcomp_packet, "pubcomp4") + pub.close() + + # We expect nothing on sub2 + mosq_test.do_ping(sub2, error_string="pingresp2") + + if mosq_test.expect_packet(sub1, "publish4", publish4r_packet): + pub = mosq_test.do_client_connect(pub_connect_packet, pub_connack2_packet, timeout=10, port=port) + mosq_test.do_send_receive(pub, publish5s_packet, pubrec_packet, "pubrec5") + mosq_test.do_send_receive(pub, pubrel_packet, pubcomp_packet, "pubcomp5") + pub.close() + + # We expect nothing on sub2 + mosq_test.do_ping(sub2, error_string="pingresp2") + + if mosq_test.expect_packet(sub1, "publish5", publish5r_packet): + rc = 0 + + sub2.close() + sub1.close() +finally: + broker.terminate() + broker.wait() + (stdo, stde) = broker.communicate() + if rc: + print(stde.decode('utf-8')) + +exit(rc) + diff --git a/test/broker/Makefile b/test/broker/Makefile index 3b4ee95b..8da4d7fa 100644 --- a/test/broker/Makefile +++ b/test/broker/Makefile @@ -64,6 +64,7 @@ endif ./02-subpub-qos1-nolocal.py ./02-subpub-qos1-v5.py ./02-subpub-qos1.py + ./02-subpub-qos2-1322.py ./02-subpub-qos2-bad-puback-1.py ./02-subpub-qos2-bad-puback-2.py ./02-subpub-qos2-bad-pubcomp.py diff --git a/test/broker/test.py b/test/broker/test.py index 3d2e3ecd..afaa5388 100755 --- a/test/broker/test.py +++ b/test/broker/test.py @@ -43,6 +43,7 @@ tests = [ (1, './02-subpub-qos1-nolocal.py'), (1, './02-subpub-qos1-v5.py'), (1, './02-subpub-qos1.py'), + (1, './02-subpub-qos2-1322.py'), (1, './02-subpub-qos2-bad-puback-1.py'), (1, './02-subpub-qos2-bad-puback-2.py'), (1, './02-subpub-qos2-bad-pubcomp.py'),