Fix $SYS messages being expired after 60 seconds

and hence unchanged values disappearing. Thanks to Wim Nelis and
Christoph Krey.
This commit is contained in:
Roger A. Light 2022-08-22 15:57:14 +01:00
parent ef44b22cef
commit 86fffa34a9
2 changed files with 31 additions and 27 deletions

View File

@ -1,3 +1,7 @@
Broker:
- Fix $SYS messages being expired after 60 seconds and hence unchanged values
disappearing.
2.0.15 - 2022-08-16 2.0.15 - 2022-08-16
=================== ===================

View File

@ -76,31 +76,31 @@ static void sys_tree__update_clients(char *buf)
if(client_count != count_total){ if(client_count != count_total){
client_count = count_total; client_count = count_total;
len = (uint32_t)snprintf(buf, BUFLEN, "%d", client_count); len = (uint32_t)snprintf(buf, BUFLEN, "%d", client_count);
db__messages_easy_queue(NULL, "$SYS/broker/clients/total", SYS_TREE_QOS, len, buf, 1, 60, NULL); db__messages_easy_queue(NULL, "$SYS/broker/clients/total", SYS_TREE_QOS, len, buf, 1, 0, NULL);
if(client_count > client_max){ if(client_count > client_max){
client_max = client_count; client_max = client_count;
len = (uint32_t)snprintf(buf, BUFLEN, "%d", client_max); len = (uint32_t)snprintf(buf, BUFLEN, "%d", client_max);
db__messages_easy_queue(NULL, "$SYS/broker/clients/maximum", SYS_TREE_QOS, len, buf, 1, 60, NULL); db__messages_easy_queue(NULL, "$SYS/broker/clients/maximum", SYS_TREE_QOS, len, buf, 1, 0, NULL);
} }
} }
if(disconnected_count != count_total-count_by_sock){ if(disconnected_count != count_total-count_by_sock){
disconnected_count = count_total-count_by_sock; disconnected_count = count_total-count_by_sock;
len = (uint32_t)snprintf(buf, BUFLEN, "%d", disconnected_count); len = (uint32_t)snprintf(buf, BUFLEN, "%d", disconnected_count);
db__messages_easy_queue(NULL, "$SYS/broker/clients/inactive", SYS_TREE_QOS, len, buf, 1, 60, NULL); db__messages_easy_queue(NULL, "$SYS/broker/clients/inactive", SYS_TREE_QOS, len, buf, 1, 0, NULL);
db__messages_easy_queue(NULL, "$SYS/broker/clients/disconnected", SYS_TREE_QOS, len, buf, 1, 60, NULL); db__messages_easy_queue(NULL, "$SYS/broker/clients/disconnected", SYS_TREE_QOS, len, buf, 1, 0, NULL);
} }
if(connected_count != count_by_sock){ if(connected_count != count_by_sock){
connected_count = count_by_sock; connected_count = count_by_sock;
len = (uint32_t)snprintf(buf, BUFLEN, "%d", connected_count); len = (uint32_t)snprintf(buf, BUFLEN, "%d", connected_count);
db__messages_easy_queue(NULL, "$SYS/broker/clients/active", SYS_TREE_QOS, len, buf, 1, 60, NULL); db__messages_easy_queue(NULL, "$SYS/broker/clients/active", SYS_TREE_QOS, len, buf, 1, 0, NULL);
db__messages_easy_queue(NULL, "$SYS/broker/clients/connected", SYS_TREE_QOS, len, buf, 1, 60, NULL); db__messages_easy_queue(NULL, "$SYS/broker/clients/connected", SYS_TREE_QOS, len, buf, 1, 0, NULL);
} }
if(g_clients_expired != clients_expired){ if(g_clients_expired != clients_expired){
clients_expired = g_clients_expired; clients_expired = g_clients_expired;
len = (uint32_t)snprintf(buf, BUFLEN, "%d", clients_expired); len = (uint32_t)snprintf(buf, BUFLEN, "%d", clients_expired);
db__messages_easy_queue(NULL, "$SYS/broker/clients/expired", SYS_TREE_QOS, len, buf, 1, 60, NULL); db__messages_easy_queue(NULL, "$SYS/broker/clients/expired", SYS_TREE_QOS, len, buf, 1, 0, NULL);
} }
} }
@ -116,13 +116,13 @@ static void sys_tree__update_memory(char *buf)
if(current_heap != value_ul){ if(current_heap != value_ul){
current_heap = value_ul; current_heap = value_ul;
len = (uint32_t)snprintf(buf, BUFLEN, "%lu", current_heap); len = (uint32_t)snprintf(buf, BUFLEN, "%lu", current_heap);
db__messages_easy_queue(NULL, "$SYS/broker/heap/current", SYS_TREE_QOS, len, buf, 1, 60, NULL); db__messages_easy_queue(NULL, "$SYS/broker/heap/current", SYS_TREE_QOS, len, buf, 1, 0, NULL);
} }
value_ul =mosquitto__max_memory_used(); value_ul =mosquitto__max_memory_used();
if(max_heap != value_ul){ if(max_heap != value_ul){
max_heap = value_ul; max_heap = value_ul;
len = (uint32_t)snprintf(buf, BUFLEN, "%lu", max_heap); len = (uint32_t)snprintf(buf, BUFLEN, "%lu", max_heap);
db__messages_easy_queue(NULL, "$SYS/broker/heap/maximum", SYS_TREE_QOS, len, buf, 1, 60, NULL); db__messages_easy_queue(NULL, "$SYS/broker/heap/maximum", SYS_TREE_QOS, len, buf, 1, 0, NULL);
} }
} }
#endif #endif
@ -135,12 +135,12 @@ static void calc_load(char *buf, const char *topic, bool initial, double exponen
if (initial) { if (initial) {
new_value = *current; new_value = *current;
len = (uint32_t)snprintf(buf, BUFLEN, "%.2f", new_value); len = (uint32_t)snprintf(buf, BUFLEN, "%.2f", new_value);
db__messages_easy_queue(NULL, topic, SYS_TREE_QOS, len, buf, 1, 60, NULL); db__messages_easy_queue(NULL, topic, SYS_TREE_QOS, len, buf, 1, 0, NULL);
} else { } else {
new_value = interval + exponent*((*current) - interval); new_value = interval + exponent*((*current) - interval);
if(fabs(new_value - (*current)) >= 0.01){ if(fabs(new_value - (*current)) >= 0.01){
len = (uint32_t)snprintf(buf, BUFLEN, "%.2f", new_value); len = (uint32_t)snprintf(buf, BUFLEN, "%.2f", new_value);
db__messages_easy_queue(NULL, topic, SYS_TREE_QOS, len, buf, 1, 60, NULL); db__messages_easy_queue(NULL, topic, SYS_TREE_QOS, len, buf, 1, 0, NULL);
} }
} }
(*current) = new_value; (*current) = new_value;
@ -218,7 +218,7 @@ void sys_tree__update(int interval, time_t start_time)
if(interval && db.now_s - interval > last_update){ if(interval && db.now_s - interval > last_update){
uptime = db.now_s - start_time; uptime = db.now_s - start_time;
len = (uint32_t)snprintf(buf, BUFLEN, "%d seconds", (int)uptime); len = (uint32_t)snprintf(buf, BUFLEN, "%d seconds", (int)uptime);
db__messages_easy_queue(NULL, "$SYS/broker/uptime", SYS_TREE_QOS, len, buf, 1, 60, NULL); db__messages_easy_queue(NULL, "$SYS/broker/uptime", SYS_TREE_QOS, len, buf, 1, 0, NULL);
sys_tree__update_clients(buf); sys_tree__update_clients(buf);
initial_publish = false; initial_publish = false;
@ -287,32 +287,32 @@ void sys_tree__update(int interval, time_t start_time)
if(db.msg_store_count != msg_store_count){ if(db.msg_store_count != msg_store_count){
msg_store_count = db.msg_store_count; msg_store_count = db.msg_store_count;
len = (uint32_t)snprintf(buf, BUFLEN, "%d", msg_store_count); len = (uint32_t)snprintf(buf, BUFLEN, "%d", msg_store_count);
db__messages_easy_queue(NULL, "$SYS/broker/messages/stored", SYS_TREE_QOS, len, buf, 1, 60, NULL); db__messages_easy_queue(NULL, "$SYS/broker/messages/stored", SYS_TREE_QOS, len, buf, 1, 0, NULL);
db__messages_easy_queue(NULL, "$SYS/broker/store/messages/count", SYS_TREE_QOS, len, buf, 1, 60, NULL); db__messages_easy_queue(NULL, "$SYS/broker/store/messages/count", SYS_TREE_QOS, len, buf, 1, 0, NULL);
} }
if (db.msg_store_bytes != msg_store_bytes){ if (db.msg_store_bytes != msg_store_bytes){
msg_store_bytes = db.msg_store_bytes; msg_store_bytes = db.msg_store_bytes;
len = (uint32_t)snprintf(buf, BUFLEN, "%lu", msg_store_bytes); len = (uint32_t)snprintf(buf, BUFLEN, "%lu", msg_store_bytes);
db__messages_easy_queue(NULL, "$SYS/broker/store/messages/bytes", SYS_TREE_QOS, len, buf, 1, 60, NULL); db__messages_easy_queue(NULL, "$SYS/broker/store/messages/bytes", SYS_TREE_QOS, len, buf, 1, 0, NULL);
} }
if(db.subscription_count != subscription_count){ if(db.subscription_count != subscription_count){
subscription_count = db.subscription_count; subscription_count = db.subscription_count;
len = (uint32_t)snprintf(buf, BUFLEN, "%d", subscription_count); len = (uint32_t)snprintf(buf, BUFLEN, "%d", subscription_count);
db__messages_easy_queue(NULL, "$SYS/broker/subscriptions/count", SYS_TREE_QOS, len, buf, 1, 60, NULL); db__messages_easy_queue(NULL, "$SYS/broker/subscriptions/count", SYS_TREE_QOS, len, buf, 1, 0, NULL);
} }
if(db.shared_subscription_count != shared_subscription_count){ if(db.shared_subscription_count != shared_subscription_count){
shared_subscription_count = db.shared_subscription_count; shared_subscription_count = db.shared_subscription_count;
len = (uint32_t)snprintf(buf, BUFLEN, "%d", shared_subscription_count); len = (uint32_t)snprintf(buf, BUFLEN, "%d", shared_subscription_count);
db__messages_easy_queue(NULL, "$SYS/broker/shared_subscriptions/count", SYS_TREE_QOS, len, buf, 1, 60, NULL); db__messages_easy_queue(NULL, "$SYS/broker/shared_subscriptions/count", SYS_TREE_QOS, len, buf, 1, 0, NULL);
} }
if(db.retained_count != retained_count){ if(db.retained_count != retained_count){
retained_count = db.retained_count; retained_count = db.retained_count;
len = (uint32_t)snprintf(buf, BUFLEN, "%d", retained_count); len = (uint32_t)snprintf(buf, BUFLEN, "%d", retained_count);
db__messages_easy_queue(NULL, "$SYS/broker/retained messages/count", SYS_TREE_QOS, len, buf, 1, 60, NULL); db__messages_easy_queue(NULL, "$SYS/broker/retained messages/count", SYS_TREE_QOS, len, buf, 1, 0, NULL);
} }
#ifdef REAL_WITH_MEMORY_TRACKING #ifdef REAL_WITH_MEMORY_TRACKING
@ -322,55 +322,55 @@ void sys_tree__update(int interval, time_t start_time)
if(msgs_received != g_msgs_received){ if(msgs_received != g_msgs_received){
msgs_received = g_msgs_received; msgs_received = g_msgs_received;
len = (uint32_t)snprintf(buf, BUFLEN, "%lu", msgs_received); len = (uint32_t)snprintf(buf, BUFLEN, "%lu", msgs_received);
db__messages_easy_queue(NULL, "$SYS/broker/messages/received", SYS_TREE_QOS, len, buf, 1, 60, NULL); db__messages_easy_queue(NULL, "$SYS/broker/messages/received", SYS_TREE_QOS, len, buf, 1, 0, NULL);
} }
if(msgs_sent != g_msgs_sent){ if(msgs_sent != g_msgs_sent){
msgs_sent = g_msgs_sent; msgs_sent = g_msgs_sent;
len = (uint32_t)snprintf(buf, BUFLEN, "%lu", msgs_sent); len = (uint32_t)snprintf(buf, BUFLEN, "%lu", msgs_sent);
db__messages_easy_queue(NULL, "$SYS/broker/messages/sent", SYS_TREE_QOS, len, buf, 1, 60, NULL); db__messages_easy_queue(NULL, "$SYS/broker/messages/sent", SYS_TREE_QOS, len, buf, 1, 0, NULL);
} }
if(publish_dropped != g_msgs_dropped){ if(publish_dropped != g_msgs_dropped){
publish_dropped = g_msgs_dropped; publish_dropped = g_msgs_dropped;
len = (uint32_t)snprintf(buf, BUFLEN, "%lu", publish_dropped); len = (uint32_t)snprintf(buf, BUFLEN, "%lu", publish_dropped);
db__messages_easy_queue(NULL, "$SYS/broker/publish/messages/dropped", SYS_TREE_QOS, len, buf, 1, 60, NULL); db__messages_easy_queue(NULL, "$SYS/broker/publish/messages/dropped", SYS_TREE_QOS, len, buf, 1, 0, NULL);
} }
if(pub_msgs_received != g_pub_msgs_received){ if(pub_msgs_received != g_pub_msgs_received){
pub_msgs_received = g_pub_msgs_received; pub_msgs_received = g_pub_msgs_received;
len = (uint32_t)snprintf(buf, BUFLEN, "%lu", pub_msgs_received); len = (uint32_t)snprintf(buf, BUFLEN, "%lu", pub_msgs_received);
db__messages_easy_queue(NULL, "$SYS/broker/publish/messages/received", SYS_TREE_QOS, len, buf, 1, 60, NULL); db__messages_easy_queue(NULL, "$SYS/broker/publish/messages/received", SYS_TREE_QOS, len, buf, 1, 0, NULL);
} }
if(pub_msgs_sent != g_pub_msgs_sent){ if(pub_msgs_sent != g_pub_msgs_sent){
pub_msgs_sent = g_pub_msgs_sent; pub_msgs_sent = g_pub_msgs_sent;
len = (uint32_t)snprintf(buf, BUFLEN, "%lu", pub_msgs_sent); len = (uint32_t)snprintf(buf, BUFLEN, "%lu", pub_msgs_sent);
db__messages_easy_queue(NULL, "$SYS/broker/publish/messages/sent", SYS_TREE_QOS, len, buf, 1, 60, NULL); db__messages_easy_queue(NULL, "$SYS/broker/publish/messages/sent", SYS_TREE_QOS, len, buf, 1, 0, NULL);
} }
if(bytes_received != g_bytes_received){ if(bytes_received != g_bytes_received){
bytes_received = g_bytes_received; bytes_received = g_bytes_received;
len = (uint32_t)snprintf(buf, BUFLEN, "%llu", bytes_received); len = (uint32_t)snprintf(buf, BUFLEN, "%llu", bytes_received);
db__messages_easy_queue(NULL, "$SYS/broker/bytes/received", SYS_TREE_QOS, len, buf, 1, 60, NULL); db__messages_easy_queue(NULL, "$SYS/broker/bytes/received", SYS_TREE_QOS, len, buf, 1, 0, NULL);
} }
if(bytes_sent != g_bytes_sent){ if(bytes_sent != g_bytes_sent){
bytes_sent = g_bytes_sent; bytes_sent = g_bytes_sent;
len = (uint32_t)snprintf(buf, BUFLEN, "%llu", bytes_sent); len = (uint32_t)snprintf(buf, BUFLEN, "%llu", bytes_sent);
db__messages_easy_queue(NULL, "$SYS/broker/bytes/sent", SYS_TREE_QOS, len, buf, 1, 60, NULL); db__messages_easy_queue(NULL, "$SYS/broker/bytes/sent", SYS_TREE_QOS, len, buf, 1, 0, NULL);
} }
if(pub_bytes_received != g_pub_bytes_received){ if(pub_bytes_received != g_pub_bytes_received){
pub_bytes_received = g_pub_bytes_received; pub_bytes_received = g_pub_bytes_received;
len = (uint32_t)snprintf(buf, BUFLEN, "%llu", pub_bytes_received); len = (uint32_t)snprintf(buf, BUFLEN, "%llu", pub_bytes_received);
db__messages_easy_queue(NULL, "$SYS/broker/publish/bytes/received", SYS_TREE_QOS, len, buf, 1, 60, NULL); db__messages_easy_queue(NULL, "$SYS/broker/publish/bytes/received", SYS_TREE_QOS, len, buf, 1, 0, NULL);
} }
if(pub_bytes_sent != g_pub_bytes_sent){ if(pub_bytes_sent != g_pub_bytes_sent){
pub_bytes_sent = g_pub_bytes_sent; pub_bytes_sent = g_pub_bytes_sent;
len = (uint32_t)snprintf(buf, BUFLEN, "%llu", pub_bytes_sent); len = (uint32_t)snprintf(buf, BUFLEN, "%llu", pub_bytes_sent);
db__messages_easy_queue(NULL, "$SYS/broker/publish/bytes/sent", SYS_TREE_QOS, len, buf, 1, 60, NULL); db__messages_easy_queue(NULL, "$SYS/broker/publish/bytes/sent", SYS_TREE_QOS, len, buf, 1, 0, NULL);
} }
last_update = db.now_s; last_update = db.now_s;