diff --git a/man/mosquitto.conf.5.xml b/man/mosquitto.conf.5.xml
index 01913e29..82484094 100644
--- a/man/mosquitto.conf.5.xml
+++ b/man/mosquitto.conf.5.xml
@@ -1554,6 +1554,17 @@ openssl dhparam -out dhparam.pem 2048
normal.
+
+ [ true | false]
+
+ The regular covers both the local subscriptions
+ and the remote subscriptions. local_cleansession allows splitting this.
+ Setting false will mean that the local connection
+ will preserve subscription, independent of the remote connection.
+
+ Defaults to the value of bridge.cleansession unless explicitly specified.
+
+
name
diff --git a/src/bridge.c b/src/bridge.c
index cc6a8139..cc8118d5 100644
--- a/src/bridge.c
+++ b/src/bridge.c
@@ -118,6 +118,10 @@ int bridge__new(struct mosquitto_db *db, struct mosquitto__bridge *bridge)
#endif
bridge->try_private_accepted = true;
+ if(bridge->clean_start_local == -1){
+ /* default to "regular" clean start setting */
+ bridge->clean_start_local = bridge->clean_start;
+ }
new_context->retain_available = bridge->outgoing_retain;
new_context->protocol = bridge->protocol_version;
@@ -161,9 +165,7 @@ int bridge__connect_step1(struct mosquitto_db *db, struct mosquitto *context)
bridge__packet_cleanup(context);
db__message_reconnect_reset(db, context);
- if(context->clean_start){
- db__messages_delete(db, context);
- }
+ db__messages_delete(db, context);
/* Delete all local subscriptions even for clean_start==false. We don't
* remove any messages and the next loop carries out the resubscription
@@ -340,9 +342,7 @@ int bridge__connect(struct mosquitto_db *db, struct mosquitto *context)
bridge__packet_cleanup(context);
db__message_reconnect_reset(db, context);
- if(context->clean_start){
- db__messages_delete(db, context);
- }
+ db__messages_delete(db, context);
/* Delete all local subscriptions even for clean_start==false. We don't
* remove any messages and the next loop carries out the resubscription
diff --git a/src/conf.c b/src/conf.c
index 910b0a8f..4f0ee7db 100644
--- a/src/conf.c
+++ b/src/conf.c
@@ -1214,6 +1214,17 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
if(conf__parse_bool(&token, "cleansession", &cur_bridge->clean_start, saveptr)) return MOSQ_ERR_INVAL;
#else
log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Bridge support not available.");
+#endif
+ }else if(!strcmp(token, "local_cleansession")){
+#ifdef WITH_BRIDGE
+ if(reload) continue; // FIXME
+ if(!cur_bridge){
+ log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
+ return MOSQ_ERR_INVAL;
+ }
+ if(conf__parse_bool(&token, "local_cleansession", (bool *) &cur_bridge->clean_start_local, saveptr)) return MOSQ_ERR_INVAL;
+#else
+ log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Bridge support not available.");
#endif
}else if(!strcmp(token, "clientid_prefixes")){
if(reload){
@@ -1261,6 +1272,7 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
cur_bridge->protocol_version = mosq_p_mqtt311;
cur_bridge->primary_retry_sock = INVALID_SOCKET;
cur_bridge->outgoing_retain = true;
+ cur_bridge->clean_start_local = -1;
}else{
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty connection value in configuration.");
return MOSQ_ERR_INVAL;
diff --git a/src/context.c b/src/context.c
index 47d76521..a8104e28 100644
--- a/src/context.c
+++ b/src/context.c
@@ -123,8 +123,8 @@ void context__cleanup(struct mosquitto_db *db, struct mosquitto *context, bool d
net__socket_close(db, context);
if(do_free || context->clean_start){
sub__clean_session(db, context);
- db__messages_delete(db, context);
}
+ db__messages_delete(db, context);
mosquitto__free(context->address);
context->address = NULL;
@@ -148,9 +148,6 @@ void context__cleanup(struct mosquitto_db *db, struct mosquitto *context, bool d
context->out_packet = context->out_packet->next;
mosquitto__free(packet);
}
- if(do_free || context->clean_start){
- db__messages_delete(db, context);
- }
#if defined(WITH_BROKER) && defined(__GLIBC__) && defined(WITH_ADNS)
if(context->adns){
gai_cancel(context->adns);
diff --git a/src/database.c b/src/database.c
index cfc8ebd6..20a4ad9a 100644
--- a/src/database.c
+++ b/src/database.c
@@ -564,20 +564,23 @@ int db__messages_delete(struct mosquitto_db *db, struct mosquitto *context)
{
if(!context) return MOSQ_ERR_INVAL;
- db__messages_delete_list(db, &context->msgs_in.inflight);
- db__messages_delete_list(db, &context->msgs_in.queued);
- db__messages_delete_list(db, &context->msgs_out.inflight);
- db__messages_delete_list(db, &context->msgs_out.queued);
+ if(context->clean_start || (context->bridge && context->bridge->clean_start)){
+ db__messages_delete_list(db, &context->msgs_in.inflight);
+ db__messages_delete_list(db, &context->msgs_in.queued);
+ context->msgs_in.msg_bytes = 0;
+ context->msgs_in.msg_bytes12 = 0;
+ context->msgs_in.msg_count = 0;
+ context->msgs_in.msg_count12 = 0;
+ }
- context->msgs_in.msg_bytes = 0;
- context->msgs_in.msg_bytes12 = 0;
- context->msgs_in.msg_count = 0;
- context->msgs_in.msg_count12 = 0;
-
- context->msgs_out.msg_bytes = 0;
- context->msgs_out.msg_bytes12 = 0;
- context->msgs_out.msg_count = 0;
- context->msgs_out.msg_count12 = 0;
+ if(context->bridge && context->bridge->clean_start_local){
+ db__messages_delete_list(db, &context->msgs_out.inflight);
+ db__messages_delete_list(db, &context->msgs_out.queued);
+ context->msgs_out.msg_bytes = 0;
+ context->msgs_out.msg_bytes12 = 0;
+ context->msgs_out.msg_count = 0;
+ context->msgs_out.msg_count12 = 0;
+ }
return MOSQ_ERR_SUCCESS;
}
diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h
index 613eb664..44b06c91 100644
--- a/src/mosquitto_broker_internal.h
+++ b/src/mosquitto_broker_internal.h
@@ -514,6 +514,7 @@ struct mosquitto__bridge{
bool try_private;
bool try_private_accepted;
bool clean_start;
+ int8_t clean_start_local;
int keepalive;
struct mosquitto__bridge_topic *topics;
int topic_count;
diff --git a/test/broker/06-bridge-b2br-disconnect-qos1.py b/test/broker/06-bridge-b2br-disconnect-qos1.py
index 07849b02..1a8e275e 100755
--- a/test/broker/06-bridge-b2br-disconnect-qos1.py
+++ b/test/broker/06-bridge-b2br-disconnect-qos1.py
@@ -13,7 +13,8 @@ def write_config(filename, port1, port2, protocol_version):
f.write("topic bridge/# both 1\n")
f.write("notifications false\n")
f.write("restart_timeout 5\n")
- f.write("bridge_protocol_version %s\n" %(protocol_version))
+ f.write("bridge_protocol_version %s\n" % (protocol_version))
+
def do_test(proto_ver):
if proto_ver == 4:
@@ -33,12 +34,12 @@ def do_test(proto_ver):
connect_packet = mosq_test.gen_connect(client_id, keepalive=keepalive, clean_session=False, proto_ver=proto_ver_connect)
connack_packet = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
- mid = 1
if proto_ver == 5:
opts = mqtt5_opts.MQTT_SUB_OPT_NO_LOCAL | mqtt5_opts.MQTT_SUB_OPT_RETAIN_AS_PUBLISHED
else:
opts = 0
+ mid = 1
subscribe_packet = mosq_test.gen_subscribe(mid, "bridge/#", 1 | opts, proto_ver=proto_ver)
suback_packet = mosq_test.gen_suback(mid, 1, proto_ver=proto_ver)
diff --git a/test/broker/06-bridge-clean-session.py b/test/broker/06-bridge-clean-session.py
new file mode 100755
index 00000000..8b745a7d
--- /dev/null
+++ b/test/broker/06-bridge-clean-session.py
@@ -0,0 +1,256 @@
+#!/usr/bin/env python3
+# Test whether a broker handles cleansession and local_cleansession correctly on bridges
+#
+# Test cases (with settings for broker A (edge). The settings on broker B (core)
+# are irrelevant, though you'll need persistence enabled to test, unless you
+# can simulate network interruptions.
+# Similarly, you'll need persistence on A, _purely_ to simplify the testing with a client
+# t# | LCS | CS | queued from (expected)
+# | A->B | B->A
+# 1 | -(t| t | no | no
+# 2 | -(f| f | yes | yes
+# 3 | t | t | no | no (as per #1)
+# 4 | t | f | no | yes
+# 5 | f | t | yes | no
+# 6 | f | f | yes | yes (as per #2)
+#
+# Test setup is two (real) brokers, so that messages can be published and subscribed in both
+# directions, with two test clients, one at each end.
+
+
+from mosq_test_helper import *
+from collections import namedtuple
+
+# Normally we don't want tests to spew debug, but if you're working on a test, it's useful
+VERBOSE_TEST=False
+def tprint(*args, **kwargs):
+ if VERBOSE_TEST:
+ print(" ".join(map(str,args)), **kwargs)
+
+# this is our "A" broker
+def write_config_edge(filename, persistence_file, remote_port, listen_port, protocol_version, cs=False, lcs=None):
+ with open(filename, 'w') as f:
+ f.write("port %d\n" % (listen_port))
+ f.write("\n")
+ f.write("persistence true\n")
+ f.write("persistence_file %s\n" % (persistence_file))
+ f.write("\n")
+ f.write("connection bridge_sample\n")
+ f.write("address 127.0.0.1:%d\n" % (remote_port))
+ f.write("topic br_out/# out 1\n")
+ f.write("topic br_in/# in 1\n")
+ f.write("notifications false\n")
+ # We need to ensure connections break fast enough to keep test times sane
+ f.write("keepalive_interval 5\n")
+ f.write("restart_timeout 5\n")
+ f.write("cleansession %s\n" % ("true" if cs else "false"))
+ # Ensure defaults are tested
+ if lcs is not None:
+ f.write("local_cleansession %s\n" % ("true" if lcs else "false"))
+ f.write("bridge_protocol_version %s\n" % (protocol_version))
+
+
+# this is our "B" broker
+def write_config_core(filename, listen_port, persistence_file):
+ with open(filename, 'w') as f:
+ f.write("port %d\n" % (listen_port))
+ f.write("\n")
+ f.write("persistence true\n")
+ f.write("persistence_file %s\n" % (persistence_file))
+
+
+def do_test(proto_ver, cs, lcs=None):
+ tprint("Running test with cs:%s, lcs: %s and proto: %d" % (cs, lcs, proto_ver))
+ if proto_ver == 4:
+ bridge_protocol = "mqttv311"
+ else:
+ bridge_protocol = "mqttv50"
+
+ # Match default behaviour of broker
+ expect_queued_ab = True
+ expect_queued_ba = True
+ if lcs is None:
+ lcs = cs
+ if lcs:
+ expect_queued_ab = False
+ if cs:
+ expect_queued_ba = False
+
+
+ (port_a_listen, port_b_listen) = mosq_test.get_port(2)
+ conf_file_a = os.path.basename(__file__).replace('.py', '_a_edge.conf')
+ persistence_file_a = os.path.basename(__file__).replace('.py', '_a_edge.db')
+ write_config_edge(conf_file_a, persistence_file_a, port_b_listen, port_a_listen, bridge_protocol, cs=cs, lcs=lcs)
+
+ conf_file_b = os.path.basename(__file__).replace('.py', '_b_core.conf')
+ persistence_file_b = os.path.basename(__file__).replace('.py', '_b_core.db')
+ write_config_core(conf_file_b, port_b_listen, persistence_file_b)
+
+ AckedPair = namedtuple("AckedPair", "p ack")
+
+ def make_conn(client_tag, proto, cs, session_present=False):
+ client_id = socket.gethostname() + "." + client_tag
+ keepalive = 60
+ conn = mosq_test.gen_connect(client_id, keepalive=keepalive, clean_session=cs, proto_ver=proto, session_expiry=0 if cs else 5000)
+ connack = mosq_test.gen_connack(rc=0, proto_ver=proto_ver, flags=1 if session_present else 0)
+ return AckedPair(conn, connack)
+
+
+ def make_sub(topic, mid, qos, proto):
+ if proto_ver == 5:
+ opts = mqtt5_opts.MQTT_SUB_OPT_NO_LOCAL | mqtt5_opts.MQTT_SUB_OPT_RETAIN_AS_PUBLISHED
+ else:
+ opts = 0
+ sub = mosq_test.gen_subscribe(mid, topic, qos | opts, proto_ver=proto)
+ suback = mosq_test.gen_suback(mid, qos, proto_ver=proto)
+ return AckedPair(sub, suback)
+
+
+ def make_pub(topic, mid, proto, qos=1, payload_tag="message", rc=-1):
+ # Using the mid automatically makes it hard to verify messages that might have been retransmitted.
+ # encourage users to put sequence numbers in topics instead....
+ pub = mosq_test.gen_publish(topic, mid=mid, qos=qos, retain=False, payload=payload_tag + "-from-" + topic, proto_ver=proto)
+ puback = mosq_test.gen_puback(mid, proto_ver=proto, reason_code=rc)
+ return AckedPair(pub, puback)
+
+ # Clients are testing messages in both directions, they need to be durable
+ conn_a = make_conn("client_a_edge", proto_ver, False)
+ conn_b = make_conn("client_b_core", proto_ver, False)
+ # We expect session present when we reconnect
+ reconn_a = make_conn("client_a_edge", proto_ver, False, session_present=True)
+ reconn_b = make_conn("client_b_core", proto_ver, False, session_present=True)
+
+ # remember, mids are from each broker's point of view, not the "world"
+ sub_a = make_sub("br_in/#", qos=1, mid=1, proto=proto_ver)
+ sub_b = make_sub("br_out/#", qos=1, mid=1, proto=proto_ver)
+
+ pub_a1 = make_pub("br_out/test-queued1", mid=1, proto=proto_ver)
+ pub_a2 = make_pub("br_out/test-queued2", mid=2, proto=proto_ver)
+ pub_a3 = make_pub("br_out/test-queued3", mid=3, proto=proto_ver)
+ pub_a3r = make_pub("br_out/test-queued3", mid=2, proto=proto_ver) # without queueing, there is no a2
+
+ pub_b1 = make_pub("br_in/test-queued1", mid=1, proto=proto_ver)
+ pub_b2 = make_pub("br_in/test-queued2", mid=2, proto=proto_ver)
+ pub_b3 = make_pub("br_in/test-queued3", mid=3, proto=proto_ver)
+ pub_b3r = make_pub("br_in/test-queued3", mid=2, proto=proto_ver) # without queueing, there is no b2
+
+ success = False
+ stde_a1 = stde_b1 = None
+ try:
+ # b must start first, as it's the destination of a
+ broker_b = mosq_test.start_broker(filename=conf_file_b, port=port_b_listen, use_conf=True)
+ broker_a = mosq_test.start_broker(filename=conf_file_a, port=port_a_listen, use_conf=True)
+
+ client_a = mosq_test.do_client_connect(conn_a.p, conn_a.ack, port=port_a_listen)
+ mosq_test.do_send_receive(client_a, sub_a.p, sub_a.ack, "suback_a")
+
+ client_b = mosq_test.do_client_connect(conn_b.p, conn_b.ack, port=port_b_listen)
+ mosq_test.do_send_receive(client_b, sub_b.p, sub_b.ack, "suback_b")
+
+ mosq_test.do_send_receive(client_a, pub_a1.p, pub_a1.ack, "puback_a1")
+ mosq_test.do_receive_send(client_b, pub_a1.p, pub_a1.ack, "a->b1 (b-side)")
+
+ mosq_test.do_send_receive(client_b, pub_b1.p, pub_b1.ack, "puback_b1")
+ mosq_test.do_receive_send(client_a, pub_b1.p, pub_b1.ack, "b->a1 (a-side)")
+
+ tprint("Normal bi-dir bridging works. continuing")
+
+ broker_b.terminate()
+ broker_b.wait()
+ (stdo_b1, stde_b1) = broker_b.communicate()
+
+ # as we're _terminating_ the connections should close ~straight away
+ tprint("terminated B", time.time())
+ time.sleep(0.5)
+
+ # should be queued (or not)
+ mosq_test.do_send_receive(client_a, pub_a2.p, pub_a2.ack, "puback_a2")
+
+ broker_b = mosq_test.start_broker(filename=conf_file_b, port=port_b_listen, use_conf=True)
+ # client b needs to reconnect now!
+
+ client_b = mosq_test.do_client_connect(reconn_b.p, reconn_b.ack, port=port_b_listen)
+ tprint("client b reconnected after restarting broker b at ", time.time())
+ # Need to sleep long enough to be sure of a re-connection...
+ time.sleep(10) # yuck, this makes the test run for ages!
+
+ # should go through
+ tprint("(B should be alive again now!) sending (after reconn!) a3 at ", time.time())
+ mosq_test.do_send_receive(client_a, pub_a3.p, pub_a3.ack, "puback_a3")
+
+
+ if expect_queued_ab:
+ tprint("1.expecting a->b queueing")
+ mosq_test.do_receive_send(client_b, pub_a2.p, pub_a2.ack, "a->b_2")
+ mosq_test.do_receive_send(client_b, pub_a3.p, pub_a3.ack, "a->b_3")
+ else:
+ tprint("not expecting a->b queueing")
+ mosq_test.do_receive_send(client_b, pub_a3r.p, pub_a3r.ack, "a->b_3(r)")
+
+ tprint("Stage 1 complete, repeating in other direction")
+
+ # ok, now repeat in the other direction...
+ broker_a.terminate()
+ broker_a.wait()
+ (stdo_a1, stde_a1) = broker_a.communicate()
+ time.sleep(0.5)
+
+ mosq_test.do_send_receive(client_b, pub_b2.p, pub_b2.ack, "puback_b2")
+
+ broker_a = mosq_test.start_broker(filename=conf_file_a, port=port_a_listen, use_conf=True)
+ # client a needs to reconnect now!
+ client_a = mosq_test.do_client_connect(reconn_a.p, reconn_a.ack, port=port_a_listen)
+ tprint("client A reconnected after restarting broker A at ", time.time())
+ # Need to sleep long enough to be sure of a re-connection...
+ time.sleep(10) # yuck, this makes the test run for ages!
+
+ # should go through
+ mosq_test.do_send_receive(client_b, pub_b3.p, pub_b3.ack, "puback_b3")
+
+ if expect_queued_ba:
+ tprint("2.expecting b->a queueueing")
+ mosq_test.do_receive_send(client_a, pub_b2.p, pub_b2.ack, "b->a_2")
+ mosq_test.do_receive_send(client_a, pub_b3.p, pub_b3.ack, "b->a_3")
+ else:
+ tprint("not expecting message b->a_2")
+ mosq_test.do_receive_send(client_a, pub_b3r.p, pub_b3r.ack, "b->a_3(r)")
+
+ success = True
+
+ finally:
+ os.remove(conf_file_a)
+ os.remove(conf_file_b)
+ broker_a.terminate()
+ broker_b.terminate()
+ broker_a.wait()
+ broker_b.wait()
+ (stdo_a, stde_a) = broker_a.communicate()
+ (stdo_b, stde_b) = broker_b.communicate()
+ # Must be after terminating!
+ try:
+ os.remove(persistence_file_a)
+ except FileNotFoundError:
+ print("persistence file a didn't exist, skipping remove")
+ try:
+ os.remove(persistence_file_b)
+ except FileNotFoundError:
+ print("persistence file b didn't exist, skipping remove")
+ if not success:
+ print("Test failed, dumping broker A logs: ")
+ if stde_a1:
+ print(stde_a1.decode('utf-8'))
+ print(stde_a.decode('utf-8'))
+ print("Test failed, dumping broker B logs: ")
+ if stde_b1:
+ print(stde_b1.decode('utf-8'))
+ print(stde_b.decode('utf-8'))
+ exit(1)
+
+for cs in [True, False]:
+ for lcs in [None, True, False]:
+ do_test(proto_ver=4, cs=cs, lcs=lcs)
+ # FIXME - v5 clean session bridging doesn't work: see
+ # https://github.com/eclipse/mosquitto/issues/1632
+ #do_test(proto_ver=5, cs=cs, lcs=lcs)
+
+exit(0)
diff --git a/test/broker/Makefile b/test/broker/Makefile
index e9b07c04..26eb9e12 100644
--- a/test/broker/Makefile
+++ b/test/broker/Makefile
@@ -125,6 +125,7 @@ endif
./06-bridge-br2b-disconnect-qos1.py
./06-bridge-br2b-disconnect-qos2.py
./06-bridge-br2b-remapping.py
+ ./06-bridge-clean-session.py
./06-bridge-fail-persist-resend-qos1.py
./06-bridge-fail-persist-resend-qos2.py
./06-bridge-no-local.py
diff --git a/test/broker/test.py b/test/broker/test.py
index 08aab4d6..7be97239 100755
--- a/test/broker/test.py
+++ b/test/broker/test.py
@@ -100,6 +100,7 @@ tests = [
(2, './06-bridge-br2b-disconnect-qos1.py'),
(2, './06-bridge-br2b-disconnect-qos2.py'),
(2, './06-bridge-br2b-remapping.py'),
+ (2, './06-bridge-clean-session.py'),
(2, './06-bridge-fail-persist-resend-qos1.py'),
(2, './06-bridge-fail-persist-resend-qos2.py'),
(1, './06-bridge-no-local.py'),
diff --git a/test/mosq_test.py b/test/mosq_test.py
index 47656b1f..534637c2 100644
--- a/test/mosq_test.py
+++ b/test/mosq_test.py
@@ -60,6 +60,8 @@ def start_broker(filename, cmd=None, port=0, use_conf=False, expect_fail=False):
return broker
if expect_fail == False:
+ outs, errs = broker.communicate(timeout=1)
+ print("FAIL: unable to start broker: %s" % errs)
raise IOError
else:
return None
@@ -121,6 +123,22 @@ def do_send_receive(sock, send_packet, receive_packet, error_string="send receiv
raise ValueError
+# Useful for mocking a client receiving (with ack) a qos1 publish
+def do_receive_send(sock, receive_packet, send_packet, error_string="receive send error"):
+ if expect_packet(sock, error_string, receive_packet):
+ size = len(send_packet)
+ total_sent = 0
+ while total_sent < size:
+ sent = sock.send(send_packet[total_sent:])
+ if sent == 0:
+ raise RuntimeError("socket connection broken")
+ total_sent += sent
+ return sock
+ else:
+ sock.close()
+ raise ValueError
+
+
def do_client_connect(connect_packet, connack_packet, hostname="localhost", port=1888, timeout=60, connack_error="connack"):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)