Remove unused messages from store immediately.

This removes the need for *store_clean*.
This commit is contained in:
Roger A. Light 2014-11-17 23:46:02 +00:00
parent 4374170c40
commit ea8537c048
13 changed files with 50 additions and 74 deletions

View File

@ -23,7 +23,11 @@ int _mosquitto_packet_handle(struct mosquitto *mosq);
int _mosquitto_handle_connack(struct mosquitto *mosq);
int _mosquitto_handle_pingreq(struct mosquitto *mosq);
int _mosquitto_handle_pingresp(struct mosquitto *mosq);
#ifdef WITH_BROKER
int _mosquitto_handle_pubackcomp(struct mosquitto_db *db, struct mosquitto *mosq, const char *type);
#else
int _mosquitto_handle_pubackcomp(struct mosquitto *mosq, const char *type);
#endif
int _mosquitto_handle_publish(struct mosquitto *mosq);
int _mosquitto_handle_pubrec(struct mosquitto *mosq);
int _mosquitto_handle_pubrel(struct mosquitto_db *db, struct mosquitto *mosq);

View File

@ -54,7 +54,11 @@ int _mosquitto_handle_pingresp(struct mosquitto *mosq)
return MOSQ_ERR_SUCCESS;
}
#ifdef WITH_BROKER
int _mosquitto_handle_pubackcomp(struct mosquitto_db *db, struct mosquitto *mosq, const char *type)
#else
int _mosquitto_handle_pubackcomp(struct mosquitto *mosq, const char *type)
#endif
{
uint16_t mid;
int rc;
@ -66,7 +70,7 @@ int _mosquitto_handle_pubackcomp(struct mosquitto *mosq, const char *type)
_mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Received %s from %s (Mid: %d)", type, mosq->id, mid);
if(mid){
rc = mqtt3_db_message_delete(mosq, mid, mosq_md_out);
rc = mqtt3_db_message_delete(db, mosq, mid, mosq_md_out);
if(rc) return rc;
}
#else

View File

@ -146,10 +146,10 @@ int mqtt3_bridge_connect(struct mosquitto_db *db, struct mosquitto *context)
context->ping_t = 0;
context->bridge->lazy_reconnect = false;
mqtt3_bridge_packet_cleanup(context);
mqtt3_db_message_reconnect_reset(context);
mqtt3_db_message_reconnect_reset(db, context);
if(context->clean_session){
mqtt3_db_messages_delete(context);
mqtt3_db_messages_delete(db, context);
}
/* Delete all local subscriptions even for clean_session==false. We don't

View File

@ -155,7 +155,6 @@ static void _config_init_reload(struct mqtt3_config *config)
config->psk_file = NULL;
config->queue_qos0_messages = false;
config->retry_interval = 20;
config->store_clean_interval = 10;
config->sys_interval = 10;
config->upgrade_outgoing_qos = false;
if(config->auth_options){
@ -1618,11 +1617,7 @@ int _config_read_file_core(struct mqtt3_config *config, bool reload, const char
_mosquitto_log_printf(NULL, MOSQ_LOG_WARNING, "Warning: Bridge support not available.");
#endif
}else if(!strcmp(token, "store_clean_interval")){
if(_conf_parse_int(&token, "store_clean_interval", &config->store_clean_interval, saveptr)) return MOSQ_ERR_INVAL;
if(config->store_clean_interval < 0 || config->store_clean_interval > 65535){
_mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Invalid store_clean_interval value (%d).", config->store_clean_interval);
return MOSQ_ERR_INVAL;
}
_mosquitto_log_printf(NULL, MOSQ_LOG_WARNING, "Warning: store_clean_interval is no longer needed.");
}else if(!strcmp(token, "sys_interval")){
if(_conf_parse_int(&token, "sys_interval", &config->sys_interval, saveptr)) return MOSQ_ERR_INVAL;
if(config->sys_interval < 0 || config->sys_interval > 65535){

View File

@ -135,7 +135,7 @@ void mqtt3_context_cleanup(struct mosquitto_db *db, struct mosquitto *context, b
_mosquitto_socket_close(db, context);
if((do_free || context->clean_session) && db){
mqtt3_subs_clean_session(db, context);
mqtt3_db_messages_delete(context);
mqtt3_db_messages_delete(db, context);
}
if(context->address){
_mosquitto_free(context->address);

View File

@ -121,19 +121,21 @@ static void subhier_clean(struct _mosquitto_subhier *subhier)
int mqtt3_db_close(struct mosquitto_db *db)
{
subhier_clean(db->subs.children);
mqtt3_db_store_clean(db);
return MOSQ_ERR_SUCCESS;
}
static void _message_remove(struct mosquitto *context, struct mosquitto_client_msg **msg, struct mosquitto_client_msg *last)
static void _message_remove(struct mosquitto_db *db, struct mosquitto *context, struct mosquitto_client_msg **msg, struct mosquitto_client_msg *last)
{
if(!context || !msg || !(*msg)){
return;
}
/* FIXME - it would be nice to be able to remove the stored message here if ref_count==0 */
(*msg)->store->ref_count--;
if((*msg)->store->ref_count == 0){
HASH_DELETE(hh, db->msg_store, (*msg)->store);
db->msg_store_count--;
}
if(last){
last->next = (*msg)->next;
if(!last->next){
@ -157,7 +159,7 @@ static void _message_remove(struct mosquitto *context, struct mosquitto_client_m
}
}
int mqtt3_db_message_delete(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir)
int mqtt3_db_message_delete(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir)
{
struct mosquitto_client_msg *tail, *last = NULL;
int msg_index = 0;
@ -190,7 +192,7 @@ int mqtt3_db_message_delete(struct mosquitto *context, uint16_t mid, enum mosqui
}
if(tail->mid == mid && tail->direction == dir){
msg_index--;
_message_remove(context, &tail, last);
_message_remove(db, context, &tail, last);
deleted = true;
}else{
last = tail;
@ -362,7 +364,7 @@ int mqtt3_db_message_insert(struct mosquitto_db *db, struct mosquitto *context,
#ifdef WITH_WEBSOCKETS
if(context->wsi){
return mqtt3_db_message_write(context);
return mqtt3_db_message_write(db, context);
}else{
return rc;
}
@ -387,7 +389,7 @@ int mqtt3_db_message_update(struct mosquitto *context, uint16_t mid, enum mosqui
return 1;
}
int mqtt3_db_messages_delete(struct mosquitto *context)
int mqtt3_db_messages_delete(struct mosquitto_db *db, struct mosquitto *context)
{
struct mosquitto_client_msg *tail, *next;
@ -395,8 +397,11 @@ int mqtt3_db_messages_delete(struct mosquitto *context)
tail = context->msgs;
while(tail){
/* FIXME - it would be nice to be able to remove the stored message here if rec_count==0 */
tail->store->ref_count--;
if(tail->store->ref_count == 0){
HASH_DELETE(hh, db->msg_store, tail->store);
db->msg_store_count--;
}
next = tail->next;
_mosquitto_free(tail);
tail = next;
@ -523,7 +528,7 @@ int mqtt3_db_message_store_find(struct mosquitto *context, uint16_t mid, struct
/* Called on reconnect to set outgoing messages to a sensible state and force a
* retry, and to set incoming messages to expect an appropriate retry. */
int mqtt3_db_message_reconnect_reset(struct mosquitto *context)
int mqtt3_db_message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *context)
{
struct mosquitto_client_msg *msg;
struct mosquitto_client_msg *prev = NULL;
@ -562,7 +567,7 @@ int mqtt3_db_message_reconnect_reset(struct mosquitto *context)
if(msg->qos != 2){
/* Anything <QoS 2 can be completely retried by the client at
* no harm. */
_message_remove(context, &msg, prev);
_message_remove(db, context, &msg, prev);
}else{
/* Message state can be preserved here because it should match
* whatever the client has got. */
@ -692,7 +697,7 @@ int mqtt3_db_message_release(struct mosquitto_db *db, struct mosquitto *context,
* keep resending it. That means we don't send it to other
* clients. */
if(!topic || !mqtt3_db_messages_queue(db, source_id, topic, qos, retain, tail->store)){
_message_remove(context, &tail, last);
_message_remove(db, context, &tail, last);
deleted = true;
}else{
return 1;
@ -712,7 +717,7 @@ int mqtt3_db_message_release(struct mosquitto_db *db, struct mosquitto *context,
}
}
int mqtt3_db_message_write(struct mosquitto *context)
int mqtt3_db_message_write(struct mosquitto_db *db, struct mosquitto *context)
{
int rc;
struct mosquitto_client_msg *tail, *last = NULL;
@ -752,7 +757,7 @@ int mqtt3_db_message_write(struct mosquitto *context)
case mosq_ms_publish_qos0:
rc = _mosquitto_send_publish(context, mid, topic, payloadlen, payload, qos, retain, retries);
if(!rc){
_message_remove(context, &tail, last);
_message_remove(db, context, &tail, last);
}else{
return rc;
}
@ -838,32 +843,6 @@ int mqtt3_db_message_write(struct mosquitto *context)
return MOSQ_ERR_SUCCESS;
}
void mqtt3_db_store_clean(struct mosquitto_db *db)
{
/* FIXME - this may not be necessary if checks are made when messages are removed. */
struct mosquitto_msg_store *msg_store, *msg_tmp;
int i;
assert(db);
HASH_ITER(hh, db->msg_store, msg_store, msg_tmp){
if(msg_store->ref_count == 0){
HASH_DELETE(hh, db->msg_store, msg_store);
if(msg_store->source_id) _mosquitto_free(msg_store->source_id);
if(msg_store->dest_ids){
for(i=0; i<msg_store->dest_id_count; i++){
if(msg_store->dest_ids[i]) _mosquitto_free(msg_store->dest_ids[i]);
}
_mosquitto_free(msg_store->dest_ids);
}
if(msg_store->msg.topic) _mosquitto_free(msg_store->msg.topic);
if(msg_store->msg.payload) _mosquitto_free(msg_store->msg.payload);
_mosquitto_free(msg_store);
db->msg_store_count--;
}
}
}
void mqtt3_db_limits_set(int inflight, int queued)
{
max_inflight = inflight;

View File

@ -67,7 +67,6 @@ int mosquitto_main_loop(struct mosquitto_db *db, int *listensock, int listensock
#ifdef WITH_PERSISTENCE
time_t last_backup = mosquitto_time();
#endif
time_t last_store_clean = mosquitto_time();
time_t now = 0;
time_t now_time;
int time_count;
@ -162,7 +161,7 @@ int mosquitto_main_loop(struct mosquitto_db *db, int *listensock, int listensock
|| context->bridge
|| now - context->last_msg_in < (time_t)(context->keepalive)*3/2){
if(mqtt3_db_message_write(context) == MOSQ_ERR_SUCCESS){
if(mqtt3_db_message_write(db, context) == MOSQ_ERR_SUCCESS){
pollfds[pollfd_index].fd = context->sock;
pollfds[pollfd_index].events = POLLIN;
pollfds[pollfd_index].revents = 0;
@ -287,24 +286,21 @@ int mosquitto_main_loop(struct mosquitto_db *db, int *listensock, int listensock
if(db->config->persistence && db->config->autosave_interval){
if(db->config->autosave_on_changes){
if(db->persistence_changes > db->config->autosave_interval){
mqtt3_db_backup(db, false, false);
mqtt3_db_backup(db, false);
db->persistence_changes = 0;
}
}else{
if(last_backup + db->config->autosave_interval < mosquitto_time()){
mqtt3_db_backup(db, false, false);
mqtt3_db_backup(db, false);
last_backup = mosquitto_time();
}
}
}
#endif
if(!db->config->store_clean_interval || last_store_clean + db->config->store_clean_interval < mosquitto_time()){
mqtt3_db_store_clean(db);
last_store_clean = mosquitto_time();
}
#ifdef WITH_PERSISTENCE
if(flag_db_backup){
mqtt3_db_backup(db, false, false);
mqtt3_db_backup(db, false);
flag_db_backup = false;
}
#endif

View File

@ -335,7 +335,7 @@ int main(int argc, char *argv[])
#ifdef WITH_PERSISTENCE
if(config.persistence){
mqtt3_db_backup(&int_db, true, true);
mqtt3_db_backup(&int_db, true);
}
#endif

View File

@ -113,7 +113,6 @@ struct mqtt3_config {
char *psk_file;
bool queue_qos0_messages;
int retry_interval;
int store_clean_interval;
int sys_interval;
bool upgrade_outgoing_qos;
char *user;
@ -360,27 +359,26 @@ int mqtt3_handle_unsubscribe(struct mosquitto_db *db, struct mosquitto *context)
int mqtt3_db_open(struct mqtt3_config *config, struct mosquitto_db *db);
int mqtt3_db_close(struct mosquitto_db *db);
#ifdef WITH_PERSISTENCE
int mqtt3_db_backup(struct mosquitto_db *db, bool cleanup, bool shutdown);
int mqtt3_db_backup(struct mosquitto_db *db, bool shutdown);
int mqtt3_db_restore(struct mosquitto_db *db);
#endif
void mqtt3_db_limits_set(int inflight, int queued);
/* Return the number of in-flight messages in count. */
int mqtt3_db_message_count(int *count);
int mqtt3_db_message_delete(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir);
int mqtt3_db_message_delete(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir);
int mqtt3_db_message_insert(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, int qos, bool retain, struct mosquitto_msg_store *stored);
int mqtt3_db_message_release(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir);
int mqtt3_db_message_update(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, enum mosquitto_msg_state state);
int mqtt3_db_message_write(struct mosquitto *context);
int mqtt3_db_messages_delete(struct mosquitto *context);
int mqtt3_db_message_write(struct mosquitto_db *db, struct mosquitto *context);
int mqtt3_db_messages_delete(struct mosquitto_db *db, struct mosquitto *context);
int mqtt3_db_messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain);
int mqtt3_db_messages_queue(struct mosquitto_db *db, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored);
int mqtt3_db_message_store(struct mosquitto_db *db, const char *source, uint16_t source_mid, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain, struct mosquitto_msg_store **stored, dbid_t store_id);
int mqtt3_db_message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_msg_store **stored);
/* Check all messages waiting on a client reply and resend if timeout has been exceeded. */
int mqtt3_db_message_timeout_check(struct mosquitto_db *db, unsigned int timeout);
int mqtt3_db_message_reconnect_reset(struct mosquitto *context);
int mqtt3_db_message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *context);
int mqtt3_retain_queue(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int sub_qos);
void mqtt3_db_store_clean(struct mosquitto_db *db);
void mqtt3_db_sys_update(struct mosquitto_db *db, int interval, time_t start_time);
void mqtt3_db_vacuum(void);

View File

@ -322,7 +322,7 @@ static int mqtt3_db_subs_retain_write(struct mosquitto_db *db, FILE *db_fptr)
return MOSQ_ERR_SUCCESS;
}
int mqtt3_db_backup(struct mosquitto_db *db, bool cleanup, bool shutdown)
int mqtt3_db_backup(struct mosquitto_db *db, bool shutdown)
{
int rc = 0;
FILE *db_fptr = NULL;
@ -338,9 +338,6 @@ int mqtt3_db_backup(struct mosquitto_db *db, bool cleanup, bool shutdown)
if(!db || !db->config || !db->config->persistence_filepath) return MOSQ_ERR_INVAL;
_mosquitto_log_printf(NULL, MOSQ_LOG_INFO, "Saving in-memory database to %s.", db->config->persistence_filepath);
if(cleanup){
mqtt3_db_store_clean(db);
}
len = strlen(db->config->persistence_filepath)+5;
outfile = _mosquitto_calloc(len+1, 1);

View File

@ -41,9 +41,9 @@ int mqtt3_packet_handle(struct mosquitto_db *db, struct mosquitto *context)
case PINGRESP:
return _mosquitto_handle_pingresp(context);
case PUBACK:
return _mosquitto_handle_pubackcomp(context, "PUBACK");
return _mosquitto_handle_pubackcomp(db, context, "PUBACK");
case PUBCOMP:
return _mosquitto_handle_pubackcomp(context, "PUBCOMP");
return _mosquitto_handle_pubackcomp(db, context, "PUBCOMP");
case PUBLISH:
return mqtt3_handle_publish(db, context);
case PUBREC:

View File

@ -434,7 +434,7 @@ int mqtt3_handle_connect(struct mosquitto_db *db, struct mosquitto *context)
if(found_context->msgs){
context->msgs = found_context->msgs;
found_context->msgs = NULL;
mqtt3_db_message_reconnect_reset(context);
mqtt3_db_message_reconnect_reset(db, context);
}
context->subs = found_context->subs;
found_context->subs = NULL;

View File

@ -81,7 +81,10 @@ static int _subs_process(struct mosquitto_db *db, struct _mosquitto_subhier *hie
#endif
if(hier->retained){
hier->retained->ref_count--;
/* FIXME - it would be nice to be able to remove the message from the store at this point if ref_count == 0 */
if(hier->retained->ref_count == 0){
HASH_DELETE(hh, db->msg_store, hier->retained);
db->msg_store_count--;
}
#ifdef WITH_SYS_TREE
db->retained_count--;
#endif