From 6113eac95a9df634fbc858be542c4a0456bfe7b9 Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Tue, 13 Jun 2023 11:22:03 +0100 Subject: [PATCH] Fix for CVE-2023-28366 --- ChangeLog.txt | 3 ++ lib/packet_mosq.c | 15 ++++++++ src/context.c | 41 ++++++++++++--------- src/database.c | 25 +++++++------ src/handle_publish.c | 35 ++++++++++++------ src/mosquitto_broker_internal.h | 4 +-- test/broker/03-publish-qos2-dup.py | 58 ++++++++++++++++++++++++++++++ test/broker/Makefile | 1 + test/broker/test.py | 1 + www/pages/security.md | 3 ++ 10 files changed, 147 insertions(+), 39 deletions(-) create mode 100755 test/broker/03-publish-qos2-dup.py diff --git a/ChangeLog.txt b/ChangeLog.txt index 231e4c3a..412d3b4d 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -1,4 +1,7 @@ Security: +- CVE-2023-28366: Fix memory leak in broker when clients send multiple QoS 2 + messages with the same message ID, but then never respond to the PUBREC + commands. - Broker will now reject Will messages that attempt to publish to $CONTROL/. - Broker now validates usernames provided in a TLS certificate or TLS-PSK identity are valid UTF-8. diff --git a/lib/packet_mosq.c b/lib/packet_mosq.c index d737c185..8c06b942 100644 --- a/lib/packet_mosq.c +++ b/lib/packet_mosq.c @@ -152,6 +152,21 @@ int packet__queue(struct mosquitto *mosq, struct mosquitto__packet *packet) packet->next = NULL; pthread_mutex_lock(&mosq->out_packet_mutex); + +#ifdef WITH_BROKER + if(mosq->out_packet_count >= db.config->max_queued_messages){ + mosquitto__free(packet); + if(mosq->is_dropping == false){ + mosq->is_dropping = true; + log__printf(NULL, MOSQ_LOG_NOTICE, + "Outgoing messages are being dropped for client %s.", + mosq->id); + } + G_MSGS_DROPPED_INC(); + return MOSQ_ERR_SUCCESS; + } +#endif + if(mosq->out_packet){ mosq->out_packet_last->next = packet; }else{ diff --git a/src/context.c b/src/context.c index 923d06f8..3ed49765 100644 --- a/src/context.c +++ b/src/context.c @@ -83,9 +83,9 @@ struct mosquitto *context__init(mosq_sock_t sock) } } context->bridge = NULL; - context->msgs_in.inflight_maximum = db.config->max_inflight_messages; + context->msgs_in.inflight_maximum = 1; context->msgs_out.inflight_maximum = db.config->max_inflight_messages; - context->msgs_in.inflight_quota = db.config->max_inflight_messages; + context->msgs_in.inflight_quota = 1; context->msgs_out.inflight_quota = db.config->max_inflight_messages; context->max_qos = 2; #ifdef WITH_TLS @@ -98,6 +98,27 @@ struct mosquitto *context__init(mosq_sock_t sock) return context; } +static void context__cleanup_out_packets(struct mosquitto *context) +{ + struct mosquitto__packet *packet; + + if(!context) return; + + if(context->current_out_packet){ + packet__cleanup(context->current_out_packet); + mosquitto__free(context->current_out_packet); + context->current_out_packet = NULL; + } + while(context->out_packet){ + packet__cleanup(context->out_packet); + packet = context->out_packet; + context->out_packet = context->out_packet->next; + mosquitto__free(packet); + } + context->out_packet_count = 0; +} + + /* * This will result in any outgoing packets going unsent. If we're disconnected * forcefully then it is usually an error condition and shouldn't be a problem, @@ -106,8 +127,6 @@ struct mosquitto *context__init(mosq_sock_t sock) */ void context__cleanup(struct mosquitto *context, bool force_free) { - struct mosquitto__packet *packet; - if(!context) return; if(force_free){ @@ -121,6 +140,7 @@ void context__cleanup(struct mosquitto *context, bool force_free) #endif alias__free_all(context); + context__cleanup_out_packets(context); mosquitto__free(context->auth_method); context->auth_method = NULL; @@ -148,18 +168,7 @@ void context__cleanup(struct mosquitto *context, bool force_free) context->id = NULL; } packet__cleanup(&(context->in_packet)); - if(context->current_out_packet){ - packet__cleanup(context->current_out_packet); - mosquitto__free(context->current_out_packet); - context->current_out_packet = NULL; - } - while(context->out_packet){ - packet__cleanup(context->out_packet); - packet = context->out_packet; - context->out_packet = context->out_packet->next; - mosquitto__free(packet); - } - context->out_packet_count = 0; + context__cleanup_out_packets(context); #if defined(WITH_BROKER) && defined(__GLIBC__) && defined(WITH_ADNS) if(context->adns){ gai_cancel(context->adns); diff --git a/src/database.c b/src/database.c index 5e77a281..1a926f50 100644 --- a/src/database.c +++ b/src/database.c @@ -555,7 +555,7 @@ int db__message_insert(struct mosquitto *context, uint16_t mid, enum mosquitto_m } #endif - msg = mosquitto__malloc(sizeof(struct mosquitto_client_msg)); + msg = mosquitto__calloc(1, sizeof(struct mosquitto_client_msg)); if(!msg) return MOSQ_ERR_NOMEM; msg->prev = NULL; msg->next = NULL; @@ -613,6 +613,8 @@ int db__message_insert(struct mosquitto *context, uint16_t mid, enum mosquitto_m if(dir == mosq_md_out && msg->qos > 0 && state != mosq_ms_queued){ util__decrement_send_quota(context); + }else if(dir == mosq_md_in && msg->qos > 0 && state != mosq_ms_queued){ + util__decrement_receive_quota(context); } if(dir == mosq_md_out && update){ @@ -796,23 +798,24 @@ int db__message_store(const struct mosquitto *source, struct mosquitto_msg_store return MOSQ_ERR_SUCCESS; } -int db__message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_msg_store **stored) +int db__message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_client_msg **client_msg) { - struct mosquitto_client_msg *tail; + struct mosquitto_client_msg *cmsg; + + *client_msg = NULL; if(!context) return MOSQ_ERR_INVAL; - *stored = NULL; - DL_FOREACH(context->msgs_in.inflight, tail){ - if(tail->store->source_mid == mid){ - *stored = tail->store; + DL_FOREACH(context->msgs_in.inflight, cmsg){ + if(cmsg->store->source_mid == mid){ + *client_msg = cmsg; return MOSQ_ERR_SUCCESS; } } - DL_FOREACH(context->msgs_in.queued, tail){ - if(tail->store->source_mid == mid){ - *stored = tail->store; + DL_FOREACH(context->msgs_in.queued, cmsg){ + if(cmsg->store->source_mid == mid){ + *client_msg = cmsg; return MOSQ_ERR_SUCCESS; } } @@ -914,6 +917,7 @@ static int db__message_reconnect_reset_incoming(struct mosquitto *context) }else{ /* Message state can be preserved here because it should match * whatever the client has got. */ + msg->dup = 0; } } @@ -924,6 +928,7 @@ static int db__message_reconnect_reset_incoming(struct mosquitto *context) * will be sent out of order. */ DL_FOREACH_SAFE(context->msgs_in.queued, msg, tmp){ + msg->dup = 0; db__msg_add_to_queued_stats(&context->msgs_in, msg); if(db__ready_for_flight(context, mosq_md_in, msg->qos)){ switch(msg->qos){ diff --git a/src/handle_publish.c b/src/handle_publish.c index 111f0316..d3aa5ce2 100644 --- a/src/handle_publish.c +++ b/src/handle_publish.c @@ -42,6 +42,7 @@ int handle__publish(struct mosquitto *context) uint8_t header = context->in_packet.command; int res = 0; struct mosquitto_msg_store *msg, *stored = NULL; + struct mosquitto_client_msg *cmsg_stored = NULL; size_t len; uint16_t slen; char *topic_mount; @@ -287,24 +288,24 @@ int handle__publish(struct mosquitto *context) } if(msg->qos > 0){ - db__message_store_find(context, msg->source_mid, &stored); + db__message_store_find(context, msg->source_mid, &cmsg_stored); } - if(stored && msg->source_mid != 0 && - (stored->qos != msg->qos - || stored->payloadlen != msg->payloadlen - || strcmp(stored->topic, msg->topic) - || memcmp(stored->payload, msg->payload, msg->payloadlen) )){ + if(cmsg_stored && cmsg_stored->store && msg->source_mid != 0 && + (cmsg_stored->store->qos != msg->qos + || cmsg_stored->store->payloadlen != msg->payloadlen + || strcmp(cmsg_stored->store->topic, msg->topic) + || memcmp(cmsg_stored->store->payload, msg->payload, msg->payloadlen) )){ log__printf(NULL, MOSQ_LOG_WARNING, "Reused message ID %u from %s detected. Clearing from storage.", msg->source_mid, context->id); db__message_remove_incoming(context, msg->source_mid); - stored = NULL; + cmsg_stored = NULL; } - if(!stored){ + if(!cmsg_stored){ if(msg->qos == 0 || db__ready_for_flight(context, mosq_md_in, msg->qos) - || db__ready_for_queue(context, msg->qos, &context->msgs_in)){ + ){ dup = 0; rc = db__message_store(context, msg, message_expiry_interval, 0, mosq_mo_client); @@ -316,10 +317,13 @@ int handle__publish(struct mosquitto *context) } stored = msg; msg = NULL; + dup = 0; }else{ db__msg_store_free(msg); msg = NULL; - dup = 1; + stored = cmsg_stored->store; + cmsg_stored->dup++; + dup = cmsg_stored->dup; } switch(stored->qos){ @@ -345,11 +349,17 @@ int handle__publish(struct mosquitto *context) }else{ res = 0; } + /* db__message_insert() returns 2 to indicate dropped message * due to queue. This isn't an error so don't disconnect them. */ /* FIXME - this is no longer necessary due to failing early above */ if(!res){ - if(send__pubrec(context, stored->source_mid, 0, NULL)) rc = 1; + if(dup == 0 || dup == 1){ + rc2 = send__pubrec(context, stored->source_mid, 0, NULL); + if(rc2) rc = rc2; + }else{ + return MOSQ_ERR_PROTOCOL; + } }else if(res == 1){ rc = 1; } @@ -374,6 +384,9 @@ process_bad_message: } db__msg_store_free(msg); } + if(context->out_packet_count >= db.config->max_queued_messages){ + rc = MQTT_RC_QUOTA_EXCEEDED; + } return rc; } diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index c28eaa2a..2cfa9d2b 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -394,7 +394,7 @@ struct mosquitto_client_msg{ bool retain; enum mosquitto_msg_direction direction; enum mosquitto_msg_state state; - bool dup; + uint8_t dup; }; @@ -651,7 +651,7 @@ void db__message_dequeue_first(struct mosquitto *context, struct mosquitto_msg_d int db__messages_delete(struct mosquitto *context, bool force_free); int db__messages_easy_queue(struct mosquitto *context, const char *topic, uint8_t qos, uint32_t payloadlen, const void *payload, int retain, uint32_t message_expiry_interval, mosquitto_property **properties); int db__message_store(const struct mosquitto *source, struct mosquitto_msg_store *stored, uint32_t message_expiry_interval, dbid_t store_id, enum mosquitto_msg_origin origin); -int db__message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_msg_store **stored); +int db__message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_client_msg **client_msg); void db__msg_store_add(struct mosquitto_msg_store *store); void db__msg_store_remove(struct mosquitto_msg_store *store); void db__msg_store_ref_inc(struct mosquitto_msg_store *store); diff --git a/test/broker/03-publish-qos2-dup.py b/test/broker/03-publish-qos2-dup.py new file mode 100755 index 00000000..70834fab --- /dev/null +++ b/test/broker/03-publish-qos2-dup.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python3 + +from mosq_test_helper import * + +def do_test(proto_ver): + rc = 1 + connect_packet = mosq_test.gen_connect("03-pub-qos2-dup-test", proto_ver=proto_ver) + connack_packet = mosq_test.gen_connack(rc=0, proto_ver=proto_ver) + + mid = 1 + publish_packet = mosq_test.gen_publish("topic", qos=2, mid=mid, payload="message", proto_ver=proto_ver, dup=1) + pubrec_packet = mosq_test.gen_pubrec(mid, proto_ver=proto_ver) + + disconnect_packet = mosq_test.gen_disconnect(reason_code=130, proto_ver=proto_ver) + + port = mosq_test.get_port() + broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port) + + try: + sock = mosq_test.do_client_connect(connect_packet, connack_packet, port=port) + mosq_test.do_send_receive(sock, publish_packet, pubrec_packet, "pubrec 1") + mosq_test.do_send_receive(sock, publish_packet, pubrec_packet, "pubrec 2") + if proto_ver == 5: + mosq_test.do_send_receive(sock, publish_packet, disconnect_packet, "disconnect") + rc = 0 + else: + try: + mosq_test.do_send_receive(sock, publish_packet, b"", "disconnect1") + rc = 0 + except BrokenPipeError: + rc = 0 + + sock.close() + except Exception as e: + print(e) + except mosq_test.TestError: + pass + finally: + broker.terminate() + broker.wait() + (stdo, stde) = broker.communicate() + if rc: + print(stde.decode('utf-8')) + print("proto_ver=%d" % (proto_ver)) + exit(rc) + + +def all_tests(): + rc = do_test(proto_ver=4) + if rc: + return rc; + rc = do_test(proto_ver=5) + if rc: + return rc; + return 0 + +if __name__ == '__main__': + all_tests() diff --git a/test/broker/Makefile b/test/broker/Makefile index 94cc8026..e66c7ffc 100644 --- a/test/broker/Makefile +++ b/test/broker/Makefile @@ -84,6 +84,7 @@ msg_sequence_test: ./03-publish-qos1-no-subscribers-v5.py ./03-publish-qos1-retain-disabled.py ./03-publish-qos1.py + ./03-publish-qos2-dup.py ./03-publish-qos2-max-inflight.py ./03-publish-qos2.py diff --git a/test/broker/test.py b/test/broker/test.py index 649746d3..e8956408 100755 --- a/test/broker/test.py +++ b/test/broker/test.py @@ -64,6 +64,7 @@ tests = [ (1, './03-publish-qos1-no-subscribers-v5.py'), (1, './03-publish-qos1-retain-disabled.py'), (1, './03-publish-qos1.py'), + (1, './03-publish-qos2-dup.py'), (1, './03-publish-qos2-max-inflight.py'), (1, './03-publish-qos2.py'), diff --git a/www/pages/security.md b/www/pages/security.md index 2f124cca..ddc4423f 100644 --- a/www/pages/security.md +++ b/www/pages/security.md @@ -19,6 +19,9 @@ follow the steps on [Eclipse Security] page to report it. Listed with most recent first. Further information on security related issues can be found in the [security category]. +* June 2023: [CVE-2023-28366]: Clients sending unacknowledged QoS 2 messages + with duplicate message ids cause a memory leak. Affecting versions **1.3.2** + to **2.0.15** inclusive, fixed in **2.0.16**. * August 2022: Deleting the anonymous group in the dynamic security plugin could lead to a crash. Affecting versions **2.0.0** to **2.0.14** inclusive, fixed in **2.0.15**.