From 2af260ba58afc4055bbbc3710f8662dcec92ff22 Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Thu, 7 Nov 2019 11:58:43 +0000 Subject: [PATCH] Add `bridge_outgoing_retain` option This allows outgoing messages from a bridge to have the retain bit completely disabled, which is useful when bridging to e.g. Amazon or Google. --- ChangeLog.txt | 3 + lib/mosquitto_internal.h | 2 +- lib/send_publish.c | 4 + man/mosquitto.conf.5.xml | 14 +++ mosquitto.conf | 9 ++ src/bridge.c | 1 + src/conf.c | 12 +++ src/context.c | 1 + src/mosquitto_broker_internal.h | 1 + test/broker/06-bridge-outgoing-retain.py | 106 +++++++++++++++++++++++ test/broker/Makefile | 1 + test/broker/test.py | 1 + 12 files changed, 154 insertions(+), 1 deletion(-) create mode 100755 test/broker/06-bridge-outgoing-retain.py diff --git a/ChangeLog.txt b/ChangeLog.txt index d2c36b29..eb7924bc 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -3,6 +3,9 @@ Broker: then try "nobody" instead. This reduces the burden on users installing Mosquitto themselves. - Add support for Unix domain socket listeners. +- Add `bridge_outgoing_retain` option, to allow outgoing messages from a + bridge to have the retain bit completely disabled, which is useful when + bridging to e.g. Amazon or Google. Client library: - Client no longer generates random client ids for v3.1.1 clients, these are diff --git a/lib/mosquitto_internal.h b/lib/mosquitto_internal.h index 08f20091..9a51f635 100644 --- a/lib/mosquitto_internal.h +++ b/lib/mosquitto_internal.h @@ -329,7 +329,6 @@ struct mosquitto { unsigned int reconnect_delay; unsigned int reconnect_delay_max; bool reconnect_exponential_backoff; - uint8_t retain_available; char threaded; struct mosquitto__packet *out_packet_last; # ifdef WITH_SRV @@ -337,6 +336,7 @@ struct mosquitto { # endif #endif uint8_t maximum_qos; + uint8_t retain_available; #ifdef WITH_BROKER UT_hash_handle hh_id; diff --git a/lib/send_publish.c b/lib/send_publish.c index f07c539d..5ae6658d 100644 --- a/lib/send_publish.c +++ b/lib/send_publish.c @@ -58,6 +58,10 @@ int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint3 if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN; #endif + if(!mosq->retain_available){ + retain = false; + } + #ifdef WITH_BROKER if(mosq->listener && mosq->listener->mount_point){ len = strlen(mosq->listener->mount_point); diff --git a/man/mosquitto.conf.5.xml b/man/mosquitto.conf.5.xml index 6f9001a8..01913e29 100644 --- a/man/mosquitto.conf.5.xml +++ b/man/mosquitto.conf.5.xml @@ -1503,6 +1503,20 @@ openssl dhparam -out dhparam.pem 2048 true. + + [ true | false ] + + Some MQTT brokers do not allow retained messages. MQTT v5 gives + a mechanism for brokers to tell clients that they do not support + retained messages, but this is not possible for MQTT v3.1.1 or v3.1. + If you need to bridge to a v3.1.1 or v3.1 broker that does not support + retained messages, set the + option to false. This will remove the + retain bit on all outgoing messages to that bridge, regardless of any + other setting. Defaults to true. + + + version diff --git a/mosquitto.conf b/mosquitto.conf index c4367c07..03112484 100644 --- a/mosquitto.conf +++ b/mosquitto.conf @@ -943,6 +943,15 @@ # properly. #try_private true +# Some MQTT brokers do not allow retained messages. MQTT v5 gives a mechanism +# for brokers to tell clients that they do not support retained messages, but +# this is not possible for MQTT v3.1.1 or v3.1. If you need to bridge to a +# v3.1.1 or v3.1 broker that does not support retained messages, set the +# bridge_outgoing_retain option to false. This will remove the retain bit on +# all outgoing messages to that bridge, regardless of any other setting. +#bridge_outgoing_retain true + + # ----------------------------------------------------------------- # Certificate based SSL/TLS support # ----------------------------------------------------------------- diff --git a/src/bridge.c b/src/bridge.c index ed66a7ef..2a0e37d0 100644 --- a/src/bridge.c +++ b/src/bridge.c @@ -109,6 +109,7 @@ int bridge__new(struct mosquitto_db *db, struct mosquitto__bridge *bridge) #endif bridge->try_private_accepted = true; + new_context->retain_available = bridge->outgoing_retain; new_context->protocol = bridge->protocol_version; bridges = mosquitto__realloc(db->bridges, (db->bridge_count+1)*sizeof(struct mosquitto *)); diff --git a/src/conf.c b/src/conf.c index 94eb18db..10f32dbf 100644 --- a/src/conf.c +++ b/src/conf.c @@ -1090,6 +1090,17 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct if(conf__parse_bool(&token, "bridge_require_ocsp", &cur_bridge->tls_ocsp_required, saveptr)) return MOSQ_ERR_INVAL; #else log__printf(NULL, MOSQ_LOG_WARNING, "Warning: TLS support not available."); +#endif + }else if(!strcmp(token, "bridge_outgoing_retain")){ +#if defined(WITH_BRIDGE) + if(reload) continue; // Listeners not valid for reloading. + if(!cur_bridge){ + log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration."); + return MOSQ_ERR_INVAL; + } + if(conf__parse_bool(&token, "bridge_outgoing_retain", &cur_bridge->outgoing_retain, saveptr)) return MOSQ_ERR_INVAL; +#else + log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Bridge support not available."); #endif }else if(!strcmp(token, "bridge_keyfile")){ #if defined(WITH_BRIDGE) && defined(WITH_TLS) @@ -1266,6 +1277,7 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct cur_bridge->attempt_unsubscribe = true; cur_bridge->protocol_version = mosq_p_mqtt311; cur_bridge->primary_retry_sock = INVALID_SOCKET; + cur_bridge->outgoing_retain = true; }else{ log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty connection value in configuration."); return MOSQ_ERR_INVAL; diff --git a/src/context.c b/src/context.c index e8ad57d3..6f53c0bb 100644 --- a/src/context.c +++ b/src/context.c @@ -52,6 +52,7 @@ struct mosquitto *context__init(struct mosquitto_db *db, mosq_sock_t sock) context->password = NULL; context->listener = NULL; context->acl_list = NULL; + context->retain_available = true; /* is_bridge records whether this client is a bridge or not. This could be * done by looking at context->bridge for bridges that we create ourself, diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index dec2c827..1d03c6e8 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -523,6 +523,7 @@ struct mosquitto__bridge{ bool lazy_reconnect; bool attempt_unsubscribe; bool initial_notification_done; + bool outgoing_retain; #ifdef WITH_TLS bool tls_insecure; bool tls_ocsp_required; diff --git a/test/broker/06-bridge-outgoing-retain.py b/test/broker/06-bridge-outgoing-retain.py new file mode 100755 index 00000000..be2329e8 --- /dev/null +++ b/test/broker/06-bridge-outgoing-retain.py @@ -0,0 +1,106 @@ +#!/usr/bin/env python3 + +# Does a bridge with bridge_outgoing_retain set to false not set the retain bit +# on outgoing messages? + +from mosq_test_helper import * + +def write_config(filename, port1, port2, protocol_version, outgoing_retain): + with open(filename, 'w') as f: + f.write("port %d\n" % (port2)) + f.write("\n") + f.write("connection bridge_sample\n") + f.write("address 127.0.0.1:%d\n" % (port1)) + f.write("topic bridge/# both 1\n") + f.write("notifications false\n") + f.write("restart_timeout 5\n") + f.write("bridge_protocol_version %s\n" %(protocol_version)) + f.write("bridge_outgoing_retain %s\n" %(outgoing_retain)) + +def do_test(proto_ver, outgoing_retain): + if proto_ver == 4: + bridge_protocol = "mqttv311" + proto_ver_connect = 128+4 + else: + bridge_protocol = "mqttv50" + proto_ver_connect = 5 + + (port1, port2) = mosq_test.get_port(2) + conf_file = os.path.basename(__file__).replace('.py', '.conf') + write_config(conf_file, port1, port2, bridge_protocol, outgoing_retain) + + rc = 1 + keepalive = 60 + client_id = socket.gethostname()+".bridge_sample" + connect_packet = mosq_test.gen_connect(client_id, keepalive=keepalive, clean_session=False, proto_ver=proto_ver_connect) + connack_packet = mosq_test.gen_connack(rc=0, proto_ver=proto_ver) + + mid = 1 + if proto_ver == 5: + opts = mqtt5_opts.MQTT_SUB_OPT_NO_LOCAL | mqtt5_opts.MQTT_SUB_OPT_RETAIN_AS_PUBLISHED + else: + opts = 0 + + subscribe_packet = mosq_test.gen_subscribe(mid, "bridge/#", 1 | opts, proto_ver=proto_ver) + suback_packet = mosq_test.gen_suback(mid, 1, proto_ver=proto_ver) + + if outgoing_retain == "true": + publish_packet = mosq_test.gen_publish("bridge/retain/test", qos=0, retain=True, payload="message", proto_ver=proto_ver) + else: + publish_packet = mosq_test.gen_publish("bridge/retain/test", qos=0, retain=False, payload="message", proto_ver=proto_ver) + + + helper_connect_packet = mosq_test.gen_connect("helper", keepalive=keepalive, clean_session=True, proto_ver=proto_ver) + helper_connack_packet = mosq_test.gen_connack(rc=0, proto_ver=proto_ver) + helper_publish_packet = mosq_test.gen_publish("bridge/retain/test", qos=0, retain=True, payload="message", proto_ver=proto_ver) + + + ssock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + ssock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + ssock.settimeout(40) + ssock.bind(('', port1)) + ssock.listen(5) + + try: + broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port2, use_conf=True) + + (bridge, address) = ssock.accept() + bridge.settimeout(20) + + if mosq_test.expect_packet(bridge, "connect", connect_packet): + bridge.send(connack_packet) + + if mosq_test.expect_packet(bridge, "subscribe", subscribe_packet): + bridge.send(suback_packet) + + # Broker is now connected to us on port1. + # Connect our client to the broker on port2 and send a publish + # message, which we will then receive by way of the bridge + helper = mosq_test.do_client_connect(helper_connect_packet, helper_connack_packet, port=port2) + helper.send(helper_publish_packet) + helper.close() + + if mosq_test.expect_packet(bridge, "publish", publish_packet): + rc = 0 + + bridge.close() + finally: + os.remove(conf_file) + try: + bridge.close() + except NameError: + pass + + broker.terminate() + broker.wait() + (stdo, stde) = broker.communicate() + ssock.close() + if rc: + print(stde.decode('utf-8')) + exit(rc) + +do_test(proto_ver=4, outgoing_retain="true") +do_test(proto_ver=4, outgoing_retain="false") +#do_test(proto_ver=5) + +exit(0) diff --git a/test/broker/Makefile b/test/broker/Makefile index ac025e19..7e4881d0 100644 --- a/test/broker/Makefile +++ b/test/broker/Makefile @@ -126,6 +126,7 @@ endif ./06-bridge-fail-persist-resend-qos1.py ./06-bridge-fail-persist-resend-qos2.py ./06-bridge-no-local.py + ./06-bridge-outgoing-retain.py ./06-bridge-per-listener-settings.py ./06-bridge-reconnect-local-out.py diff --git a/test/broker/test.py b/test/broker/test.py index de03fbe5..a86d0a38 100755 --- a/test/broker/test.py +++ b/test/broker/test.py @@ -101,6 +101,7 @@ tests = [ (2, './06-bridge-fail-persist-resend-qos1.py'), (2, './06-bridge-fail-persist-resend-qos2.py'), (1, './06-bridge-no-local.py'), + (2, './06-bridge-outgoing-retain.py'), (3, './06-bridge-per-listener-settings.py'), (2, './06-bridge-reconnect-local-out.py'),