Fix incoming msgs not being removed when there are no subs

Fix incoming messages not being removed for a client if the topic being
published to does not have any subscribers.

Closes #1322. Thanks to Yannic Schröder.
This commit is contained in:
Roger A. Light 2019-07-31 18:37:59 +01:00
parent b3f3513b35
commit efc8ed39af
6 changed files with 154 additions and 6 deletions

View File

@ -8,6 +8,8 @@ Broker:
Closes #515.
- Fix incoming QoS 2 messages being blocked when `max_inflight_messages` was
set to 1. Closes #1332.
- Fix incoming messages not being removed for a client if the topic being
published to does not have any subscribers. Closes #1322.
Client library:
- Fix MQTT v5 subscription options being incorrectly set for MQTT v3

View File

@ -78,11 +78,11 @@ int handle__pubrel(struct mosquitto_db *db, struct mosquitto *mosq)
mosquitto_property_free_all(&properties);
rc = db__message_release_incoming(db, mosq, mid);
if(rc == MOSQ_ERR_PROTOCOL){
return rc;
}else if(rc != MOSQ_ERR_SUCCESS){
if(rc == MOSQ_ERR_NOT_FOUND){
/* Message not found. Still send a PUBCOMP anyway because this could be
* due to a repeated PUBREL after a client has reconnected. */
}else if(rc != MOSQ_ERR_SUCCESS){
return rc;
}
rc = send__pubcomp(mosq, mid);

View File

@ -893,6 +893,7 @@ int db__message_release_incoming(struct mosquitto_db *db, struct mosquitto *cont
char *source_id;
int msg_index = 0;
bool deleted = false;
int rc;
if(!context) return MOSQ_ERR_INVAL;
@ -910,7 +911,12 @@ int db__message_release_incoming(struct mosquitto_db *db, struct mosquitto *cont
* denied/dropped and is being processed so the client doesn't
* keep resending it. That means we don't send it to other
* clients. */
if(!topic || !sub__messages_queue(db, source_id, topic, 2, retain, &tail->store)){
if(!topic){
db__message_remove(db, &context->msgs_in, tail);
deleted = true;
}else{
rc = sub__messages_queue(db, source_id, topic, 2, retain, &tail->store);
if(rc == MOSQ_ERR_SUCCESS || rc == MOSQ_ERR_NO_SUBSCRIBERS){
db__message_remove(db, &context->msgs_in, tail);
deleted = true;
}else{
@ -918,6 +924,7 @@ int db__message_release_incoming(struct mosquitto_db *db, struct mosquitto *cont
}
}
}
}
DL_FOREACH_SAFE(context->msgs_in.queued, tail, tmp){
if(context->msgs_in.inflight_maximum != 0 && msg_index >= context->msgs_in.inflight_maximum){
@ -936,7 +943,7 @@ int db__message_release_incoming(struct mosquitto_db *db, struct mosquitto *cont
if(deleted){
return MOSQ_ERR_SUCCESS;
}else{
return 1;
return MOSQ_ERR_NOT_FOUND;
}
}

View File

@ -0,0 +1,137 @@
#!/usr/bin/env python3
# Test for issue 1322:
## restart mosquitto
#sudo systemctl restart mosquitto.service
#
## listen on topic1
#mosquitto_sub -t "topic1"
#
## publish to topic1 without clean session
#mosquitto_pub -t "topic1" -q 2 -c --id "foobar" -m "message1"
## message1 on topic1 is received as expected
#
## publish to topic2 without clean session
## IMPORTANT: no subscription to this topic is present on broker!
#mosquitto_pub -t "topic2" -q 2 -c --id "foobar" -m "message2"
## this goes nowhere, as no subscriber present
#
## publish to topic1 without clean session
#mosquitto_pub -t "topic1" -q 2 -c --id "foobar" -m "message3"
## message3 on topic1 IS NOT RECEIVED
#
## listen on topic2
#mosquitto_sub -t "topic2"
#
## publish to topic1 without clean session
#mosquitto_pub -t "topic1" -q 2 -c --id "foobar" -m "message4"
## message2 on topic2 is received incorrectly
#
## publish to topic1 without clean session
#mosquitto_pub -t "topic1" -q 2 -c --id "foobar" -m "message5"
## message5 on topic1 is received as expected (message4 was dropped)
from mosq_test_helper import *
rc = 1
keepalive = 60
pub_connect_packet = mosq_test.gen_connect("pub", keepalive=keepalive, clean_session=False)
pub_connack1_packet = mosq_test.gen_connack(rc=0)
pub_connack2_packet = mosq_test.gen_connack(rc=0, flags=1)
sub1_connect_packet = mosq_test.gen_connect("sub1", keepalive=keepalive)
sub1_connack_packet = mosq_test.gen_connack(rc=0)
sub2_connect_packet = mosq_test.gen_connect("sub2", keepalive=keepalive)
sub2_connack_packet = mosq_test.gen_connack(rc=0)
mid = 1
subscribe1_packet = mosq_test.gen_subscribe(mid, "topic1", 0)
suback1_packet = mosq_test.gen_suback(mid, 0)
mid = 1
subscribe2_packet = mosq_test.gen_subscribe(mid, "topic2", 0)
suback2_packet = mosq_test.gen_suback(mid, 0)
# All publishes have the same mid
mid = 1
pubrec_packet = mosq_test.gen_pubrec(mid)
pubrel_packet = mosq_test.gen_pubrel(mid)
pubcomp_packet = mosq_test.gen_pubcomp(mid)
publish1s_packet = mosq_test.gen_publish("topic1", qos=2, mid=mid, payload="message1")
publish2s_packet = mosq_test.gen_publish("topic2", qos=2, mid=mid, payload="message2")
publish3s_packet = mosq_test.gen_publish("topic1", qos=2, mid=mid, payload="message3")
publish4s_packet = mosq_test.gen_publish("topic1", qos=2, mid=mid, payload="message4")
publish5s_packet = mosq_test.gen_publish("topic1", qos=2, mid=mid, payload="message5")
publish1r_packet = mosq_test.gen_publish("topic1", qos=0, payload="message1")
publish2r_packet = mosq_test.gen_publish("topic2", qos=0, payload="message2")
publish3r_packet = mosq_test.gen_publish("topic1", qos=0, payload="message3")
publish4r_packet = mosq_test.gen_publish("topic1", qos=0, payload="message4")
publish5r_packet = mosq_test.gen_publish("topic1", qos=0, payload="message5")
port = mosq_test.get_port()
broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port)
try:
sub1 = mosq_test.do_client_connect(sub1_connect_packet, sub1_connack_packet, timeout=10, port=port)
mosq_test.do_send_receive(sub1, subscribe1_packet, suback1_packet, "suback1")
pub = mosq_test.do_client_connect(pub_connect_packet, pub_connack1_packet, timeout=10, port=port)
mosq_test.do_send_receive(pub, publish1s_packet, pubrec_packet, "pubrec1")
mosq_test.do_send_receive(pub, pubrel_packet, pubcomp_packet, "pubcomp1")
pub.close()
if mosq_test.expect_packet(sub1, "publish1", publish1r_packet):
pub = mosq_test.do_client_connect(pub_connect_packet, pub_connack2_packet, timeout=10, port=port)
mosq_test.do_send_receive(pub, publish2s_packet, pubrec_packet, "pubrec2")
mosq_test.do_send_receive(pub, pubrel_packet, pubcomp_packet, "pubcomp2")
pub.close()
# We expect nothing on sub1
mosq_test.do_ping(sub1, error_string="pingresp1")
pub = mosq_test.do_client_connect(pub_connect_packet, pub_connack2_packet, timeout=10, port=port)
mosq_test.do_send_receive(pub, publish3s_packet, pubrec_packet, "pubrec3")
mosq_test.do_send_receive(pub, pubrel_packet, pubcomp_packet, "pubcomp3")
pub.close()
if mosq_test.expect_packet(sub1, "publish3", publish3r_packet):
sub2 = mosq_test.do_client_connect(sub2_connect_packet, sub2_connack_packet, timeout=10, port=port)
mosq_test.do_send_receive(sub2, subscribe2_packet, suback2_packet, "suback2")
pub = mosq_test.do_client_connect(pub_connect_packet, pub_connack2_packet, timeout=10, port=port)
mosq_test.do_send_receive(pub, publish4s_packet, pubrec_packet, "pubrec4")
mosq_test.do_send_receive(pub, pubrel_packet, pubcomp_packet, "pubcomp4")
pub.close()
# We expect nothing on sub2
mosq_test.do_ping(sub2, error_string="pingresp2")
if mosq_test.expect_packet(sub1, "publish4", publish4r_packet):
pub = mosq_test.do_client_connect(pub_connect_packet, pub_connack2_packet, timeout=10, port=port)
mosq_test.do_send_receive(pub, publish5s_packet, pubrec_packet, "pubrec5")
mosq_test.do_send_receive(pub, pubrel_packet, pubcomp_packet, "pubcomp5")
pub.close()
# We expect nothing on sub2
mosq_test.do_ping(sub2, error_string="pingresp2")
if mosq_test.expect_packet(sub1, "publish5", publish5r_packet):
rc = 0
sub2.close()
sub1.close()
finally:
broker.terminate()
broker.wait()
(stdo, stde) = broker.communicate()
if rc:
print(stde.decode('utf-8'))
exit(rc)

View File

@ -64,6 +64,7 @@ endif
./02-subpub-qos1-nolocal.py
./02-subpub-qos1-v5.py
./02-subpub-qos1.py
./02-subpub-qos2-1322.py
./02-subpub-qos2-bad-puback-1.py
./02-subpub-qos2-bad-puback-2.py
./02-subpub-qos2-bad-pubcomp.py

View File

@ -43,6 +43,7 @@ tests = [
(1, './02-subpub-qos1-nolocal.py'),
(1, './02-subpub-qos1-v5.py'),
(1, './02-subpub-qos1.py'),
(1, './02-subpub-qos2-1322.py'),
(1, './02-subpub-qos2-bad-puback-1.py'),
(1, './02-subpub-qos2-bad-puback-2.py'),
(1, './02-subpub-qos2-bad-pubcomp.py'),