2014-05-07 22:27:00 +00:00
|
|
|
/*
|
|
|
|
Copyright (c) 2009-2014 Roger Light <roger@atchoo.org>
|
|
|
|
|
|
|
|
All rights reserved. This program and the accompanying materials
|
|
|
|
are made available under the terms of the Eclipse Public License v1.0
|
|
|
|
and Eclipse Distribution License v1.0 which accompany this distribution.
|
|
|
|
|
|
|
|
The Eclipse Public License is available at
|
|
|
|
http://www.eclipse.org/legal/epl-v10.html
|
|
|
|
and the Eclipse Distribution License is available at
|
|
|
|
http://www.eclipse.org/org/documents/edl-v10.php.
|
|
|
|
|
|
|
|
Contributors:
|
|
|
|
Roger Light - initial implementation and documentation.
|
|
|
|
*/
|
|
|
|
|
|
|
|
#include <assert.h>
|
|
|
|
#include <stdio.h>
|
|
|
|
|
|
|
|
#include <config.h>
|
|
|
|
|
|
|
|
#include <mosquitto_broker.h>
|
|
|
|
#include <memory_mosq.h>
|
|
|
|
#include <send_mosq.h>
|
|
|
|
#include <time_mosq.h>
|
|
|
|
|
|
|
|
static int max_inflight = 20;
|
|
|
|
static int max_queued = 100;
|
|
|
|
#ifdef WITH_SYS_TREE
|
|
|
|
extern unsigned long g_msgs_dropped;
|
|
|
|
#endif
|
|
|
|
|
|
|
|
int mqtt3_db_open(struct mqtt3_config *config, struct mosquitto_db *db)
|
|
|
|
{
|
|
|
|
int rc = 0;
|
|
|
|
struct _mosquitto_subhier *child;
|
|
|
|
|
|
|
|
if(!config || !db) return MOSQ_ERR_INVAL;
|
|
|
|
|
|
|
|
db->last_db_id = 0;
|
|
|
|
|
2014-06-23 16:57:35 +00:00
|
|
|
db->contexts_by_id = NULL;
|
|
|
|
db->contexts_by_sock = NULL;
|
|
|
|
db->contexts_for_free = NULL;
|
|
|
|
db->contexts_bridge = NULL;
|
|
|
|
|
2014-05-07 22:27:00 +00:00
|
|
|
// Initialize the hashtable
|
|
|
|
db->clientid_index_hash = NULL;
|
|
|
|
|
|
|
|
db->subs.next = NULL;
|
|
|
|
db->subs.subs = NULL;
|
|
|
|
db->subs.topic = "";
|
|
|
|
|
|
|
|
child = _mosquitto_malloc(sizeof(struct _mosquitto_subhier));
|
|
|
|
if(!child){
|
|
|
|
_mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
|
|
|
return MOSQ_ERR_NOMEM;
|
|
|
|
}
|
|
|
|
child->next = NULL;
|
|
|
|
child->topic = _mosquitto_strdup("");
|
|
|
|
if(!child->topic){
|
|
|
|
_mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
|
|
|
return MOSQ_ERR_NOMEM;
|
|
|
|
}
|
|
|
|
child->subs = NULL;
|
|
|
|
child->children = NULL;
|
|
|
|
child->retained = NULL;
|
|
|
|
db->subs.children = child;
|
|
|
|
|
|
|
|
child = _mosquitto_malloc(sizeof(struct _mosquitto_subhier));
|
|
|
|
if(!child){
|
|
|
|
_mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
|
|
|
return MOSQ_ERR_NOMEM;
|
|
|
|
}
|
|
|
|
child->next = NULL;
|
|
|
|
child->topic = _mosquitto_strdup("$SYS");
|
|
|
|
if(!child->topic){
|
|
|
|
_mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
|
|
|
return MOSQ_ERR_NOMEM;
|
|
|
|
}
|
|
|
|
child->subs = NULL;
|
|
|
|
child->children = NULL;
|
|
|
|
child->retained = NULL;
|
|
|
|
db->subs.children->next = child;
|
|
|
|
|
|
|
|
db->unpwd = NULL;
|
|
|
|
|
|
|
|
#ifdef WITH_PERSISTENCE
|
|
|
|
if(config->persistence && config->persistence_filepath){
|
|
|
|
if(mqtt3_db_restore(db)) return 1;
|
|
|
|
}
|
|
|
|
#endif
|
|
|
|
|
|
|
|
return rc;
|
|
|
|
}
|
|
|
|
|
|
|
|
static void subhier_clean(struct _mosquitto_subhier *subhier)
|
|
|
|
{
|
|
|
|
struct _mosquitto_subhier *next;
|
|
|
|
struct _mosquitto_subleaf *leaf, *nextleaf;
|
|
|
|
|
|
|
|
while(subhier){
|
|
|
|
next = subhier->next;
|
|
|
|
leaf = subhier->subs;
|
|
|
|
while(leaf){
|
|
|
|
nextleaf = leaf->next;
|
|
|
|
_mosquitto_free(leaf);
|
|
|
|
leaf = nextleaf;
|
|
|
|
}
|
|
|
|
if(subhier->retained){
|
|
|
|
subhier->retained->ref_count--;
|
|
|
|
}
|
|
|
|
subhier_clean(subhier->children);
|
|
|
|
if(subhier->topic) _mosquitto_free(subhier->topic);
|
|
|
|
|
|
|
|
_mosquitto_free(subhier);
|
|
|
|
subhier = next;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
int mqtt3_db_close(struct mosquitto_db *db)
|
|
|
|
{
|
|
|
|
subhier_clean(db->subs.children);
|
|
|
|
mqtt3_db_store_clean(db);
|
|
|
|
|
|
|
|
return MOSQ_ERR_SUCCESS;
|
|
|
|
}
|
|
|
|
|
|
|
|
static void _message_remove(struct mosquitto *context, struct mosquitto_client_msg **msg, struct mosquitto_client_msg *last)
|
|
|
|
{
|
|
|
|
if(!context || !msg || !(*msg)){
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* FIXME - it would be nice to be able to remove the stored message here if ref_count==0 */
|
|
|
|
(*msg)->store->ref_count--;
|
|
|
|
if(last){
|
|
|
|
last->next = (*msg)->next;
|
|
|
|
if(!last->next){
|
|
|
|
context->last_msg = last;
|
|
|
|
}
|
|
|
|
}else{
|
|
|
|
context->msgs = (*msg)->next;
|
|
|
|
if(!context->msgs){
|
|
|
|
context->last_msg = NULL;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
context->msg_count--;
|
|
|
|
if((*msg)->qos > 0){
|
|
|
|
context->msg_count12--;
|
|
|
|
}
|
|
|
|
_mosquitto_free(*msg);
|
|
|
|
if(last){
|
|
|
|
*msg = last->next;
|
|
|
|
}else{
|
|
|
|
*msg = context->msgs;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
int mqtt3_db_message_delete(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir)
|
|
|
|
{
|
|
|
|
struct mosquitto_client_msg *tail, *last = NULL;
|
|
|
|
int msg_index = 0;
|
|
|
|
bool deleted = false;
|
|
|
|
|
|
|
|
if(!context) return MOSQ_ERR_INVAL;
|
|
|
|
|
|
|
|
tail = context->msgs;
|
|
|
|
while(tail){
|
|
|
|
msg_index++;
|
|
|
|
if(tail->state == mosq_ms_queued && msg_index <= max_inflight){
|
|
|
|
tail->timestamp = mosquitto_time();
|
|
|
|
if(tail->direction == mosq_md_out){
|
|
|
|
switch(tail->qos){
|
|
|
|
case 0:
|
|
|
|
tail->state = mosq_ms_publish_qos0;
|
|
|
|
break;
|
|
|
|
case 1:
|
|
|
|
tail->state = mosq_ms_publish_qos1;
|
|
|
|
break;
|
|
|
|
case 2:
|
|
|
|
tail->state = mosq_ms_publish_qos2;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}else{
|
|
|
|
if(tail->qos == 2){
|
|
|
|
tail->state = mosq_ms_wait_for_pubrel;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if(tail->mid == mid && tail->direction == dir){
|
|
|
|
msg_index--;
|
|
|
|
_message_remove(context, &tail, last);
|
|
|
|
deleted = true;
|
|
|
|
}else{
|
|
|
|
last = tail;
|
|
|
|
tail = tail->next;
|
|
|
|
}
|
|
|
|
if(msg_index > max_inflight && deleted){
|
|
|
|
return MOSQ_ERR_SUCCESS;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return MOSQ_ERR_SUCCESS;
|
|
|
|
}
|
|
|
|
|
|
|
|
int mqtt3_db_message_insert(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, int qos, bool retain, struct mosquitto_msg_store *stored)
|
|
|
|
{
|
|
|
|
struct mosquitto_client_msg *msg;
|
|
|
|
enum mosquitto_msg_state state = mosq_ms_invalid;
|
|
|
|
int rc = 0;
|
|
|
|
int i;
|
|
|
|
char **dest_ids;
|
|
|
|
|
|
|
|
assert(stored);
|
|
|
|
if(!context) return MOSQ_ERR_INVAL;
|
2014-10-16 23:08:10 +00:00
|
|
|
if(!context->id) return MOSQ_ERR_SUCCESS; /* Protect against unlikely "client is disconnected but not entirely freed" scenario */
|
2014-05-07 22:27:00 +00:00
|
|
|
|
|
|
|
/* Check whether we've already sent this message to this client
|
|
|
|
* for outgoing messages only.
|
|
|
|
* If retain==true then this is a stale retained message and so should be
|
|
|
|
* sent regardless. FIXME - this does mean retained messages will received
|
|
|
|
* multiple times for overlapping subscriptions, although this is only the
|
|
|
|
* case for SUBSCRIPTION with multiple subs in so is a minor concern.
|
|
|
|
*/
|
|
|
|
if(db->config->allow_duplicate_messages == false
|
|
|
|
&& dir == mosq_md_out && retain == false && stored->dest_ids){
|
|
|
|
|
|
|
|
for(i=0; i<stored->dest_id_count; i++){
|
|
|
|
if(!strcmp(stored->dest_ids[i], context->id)){
|
|
|
|
/* We have already sent this message to this client. */
|
|
|
|
return MOSQ_ERR_SUCCESS;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if(context->sock == INVALID_SOCKET){
|
|
|
|
/* Client is not connected only queue messages with QoS>0. */
|
|
|
|
if(qos == 0 && !db->config->queue_qos0_messages){
|
|
|
|
if(!context->bridge){
|
|
|
|
return 2;
|
|
|
|
}else{
|
|
|
|
if(context->bridge->start_type != bst_lazy){
|
|
|
|
return 2;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if(context->sock != INVALID_SOCKET){
|
|
|
|
if(qos == 0 || max_inflight == 0 || context->msg_count12 < max_inflight){
|
|
|
|
if(dir == mosq_md_out){
|
|
|
|
switch(qos){
|
|
|
|
case 0:
|
|
|
|
state = mosq_ms_publish_qos0;
|
|
|
|
break;
|
|
|
|
case 1:
|
|
|
|
state = mosq_ms_publish_qos1;
|
|
|
|
break;
|
|
|
|
case 2:
|
|
|
|
state = mosq_ms_publish_qos2;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}else{
|
|
|
|
if(qos == 2){
|
|
|
|
state = mosq_ms_wait_for_pubrel;
|
|
|
|
}else{
|
|
|
|
return 1;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}else if(max_queued == 0 || context->msg_count12-max_inflight < max_queued){
|
|
|
|
state = mosq_ms_queued;
|
|
|
|
rc = 2;
|
|
|
|
}else{
|
|
|
|
/* Dropping message due to full queue. */
|
|
|
|
if(context->is_dropping == false){
|
|
|
|
context->is_dropping = true;
|
|
|
|
_mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE,
|
|
|
|
"Outgoing messages are being dropped for client %s.",
|
|
|
|
context->id);
|
|
|
|
}
|
|
|
|
#ifdef WITH_SYS_TREE
|
|
|
|
g_msgs_dropped++;
|
|
|
|
#endif
|
|
|
|
return 2;
|
|
|
|
}
|
|
|
|
}else{
|
|
|
|
if(max_queued > 0 && context->msg_count12 >= max_queued){
|
|
|
|
#ifdef WITH_SYS_TREE
|
|
|
|
g_msgs_dropped++;
|
|
|
|
#endif
|
|
|
|
if(context->is_dropping == false){
|
|
|
|
context->is_dropping = true;
|
|
|
|
_mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE,
|
|
|
|
"Outgoing messages are being dropped for client %s.",
|
|
|
|
context->id);
|
|
|
|
}
|
|
|
|
return 2;
|
|
|
|
}else{
|
|
|
|
state = mosq_ms_queued;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
assert(state != mosq_ms_invalid);
|
|
|
|
|
|
|
|
#ifdef WITH_PERSISTENCE
|
|
|
|
if(state == mosq_ms_queued){
|
|
|
|
db->persistence_changes++;
|
|
|
|
}
|
|
|
|
#endif
|
|
|
|
|
|
|
|
msg = _mosquitto_malloc(sizeof(struct mosquitto_client_msg));
|
|
|
|
if(!msg) return MOSQ_ERR_NOMEM;
|
|
|
|
msg->next = NULL;
|
|
|
|
msg->store = stored;
|
|
|
|
msg->store->ref_count++;
|
|
|
|
msg->mid = mid;
|
|
|
|
msg->timestamp = mosquitto_time();
|
|
|
|
msg->direction = dir;
|
|
|
|
msg->state = state;
|
|
|
|
msg->dup = false;
|
|
|
|
msg->qos = qos;
|
|
|
|
msg->retain = retain;
|
|
|
|
if(context->last_msg){
|
|
|
|
context->last_msg->next = msg;
|
|
|
|
context->last_msg = msg;
|
|
|
|
}else{
|
|
|
|
context->msgs = msg;
|
|
|
|
context->last_msg = msg;
|
|
|
|
}
|
|
|
|
context->msg_count++;
|
|
|
|
if(qos > 0){
|
|
|
|
context->msg_count12++;
|
|
|
|
}
|
|
|
|
|
|
|
|
if(db->config->allow_duplicate_messages == false && dir == mosq_md_out && retain == false){
|
|
|
|
/* Record which client ids this message has been sent to so we can avoid duplicates.
|
|
|
|
* Outgoing messages only.
|
|
|
|
* If retain==true then this is a stale retained message and so should be
|
|
|
|
* sent regardless. FIXME - this does mean retained messages will received
|
|
|
|
* multiple times for overlapping subscriptions, although this is only the
|
|
|
|
* case for SUBSCRIPTION with multiple subs in so is a minor concern.
|
|
|
|
*/
|
|
|
|
dest_ids = _mosquitto_realloc(stored->dest_ids, sizeof(char *)*(stored->dest_id_count+1));
|
|
|
|
if(dest_ids){
|
|
|
|
stored->dest_ids = dest_ids;
|
|
|
|
stored->dest_id_count++;
|
|
|
|
stored->dest_ids[stored->dest_id_count-1] = _mosquitto_strdup(context->id);
|
|
|
|
if(!stored->dest_ids[stored->dest_id_count-1]){
|
|
|
|
return MOSQ_ERR_NOMEM;
|
|
|
|
}
|
|
|
|
}else{
|
|
|
|
return MOSQ_ERR_NOMEM;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
#ifdef WITH_BRIDGE
|
|
|
|
if(context->bridge && context->bridge->start_type == bst_lazy
|
|
|
|
&& context->sock == INVALID_SOCKET
|
|
|
|
&& context->msg_count >= context->bridge->threshold){
|
|
|
|
|
|
|
|
context->bridge->lazy_reconnect = true;
|
|
|
|
}
|
|
|
|
#endif
|
|
|
|
|
2014-05-06 09:47:00 +00:00
|
|
|
#ifdef WITH_WEBSOCKETS
|
|
|
|
if(context->wsi){
|
|
|
|
return mqtt3_db_message_write(context);
|
|
|
|
}else{
|
|
|
|
return rc;
|
|
|
|
}
|
|
|
|
#else
|
2014-05-07 22:27:00 +00:00
|
|
|
return rc;
|
2014-05-06 09:47:00 +00:00
|
|
|
#endif
|
2014-05-07 22:27:00 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
int mqtt3_db_message_update(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, enum mosquitto_msg_state state)
|
|
|
|
{
|
|
|
|
struct mosquitto_client_msg *tail;
|
|
|
|
|
|
|
|
tail = context->msgs;
|
|
|
|
while(tail){
|
|
|
|
if(tail->mid == mid && tail->direction == dir){
|
|
|
|
tail->state = state;
|
|
|
|
tail->timestamp = mosquitto_time();
|
|
|
|
return MOSQ_ERR_SUCCESS;
|
|
|
|
}
|
|
|
|
tail = tail->next;
|
|
|
|
}
|
|
|
|
return 1;
|
|
|
|
}
|
|
|
|
|
|
|
|
int mqtt3_db_messages_delete(struct mosquitto *context)
|
|
|
|
{
|
|
|
|
struct mosquitto_client_msg *tail, *next;
|
|
|
|
|
|
|
|
if(!context) return MOSQ_ERR_INVAL;
|
|
|
|
|
|
|
|
tail = context->msgs;
|
|
|
|
while(tail){
|
|
|
|
/* FIXME - it would be nice to be able to remove the stored message here if rec_count==0 */
|
|
|
|
tail->store->ref_count--;
|
|
|
|
next = tail->next;
|
|
|
|
_mosquitto_free(tail);
|
|
|
|
tail = next;
|
|
|
|
}
|
|
|
|
context->msgs = NULL;
|
|
|
|
context->last_msg = NULL;
|
|
|
|
context->msg_count = 0;
|
|
|
|
context->msg_count12 = 0;
|
|
|
|
|
|
|
|
return MOSQ_ERR_SUCCESS;
|
|
|
|
}
|
|
|
|
|
|
|
|
int mqtt3_db_messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain)
|
|
|
|
{
|
|
|
|
struct mosquitto_msg_store *stored;
|
|
|
|
char *source_id;
|
|
|
|
|
|
|
|
assert(db);
|
|
|
|
|
|
|
|
if(!topic) return MOSQ_ERR_INVAL;
|
|
|
|
|
|
|
|
if(context){
|
|
|
|
source_id = context->id;
|
|
|
|
}else{
|
|
|
|
source_id = "";
|
|
|
|
}
|
|
|
|
if(mqtt3_db_message_store(db, source_id, 0, topic, qos, payloadlen, payload, retain, &stored, 0)) return 1;
|
|
|
|
|
|
|
|
return mqtt3_db_messages_queue(db, source_id, topic, qos, retain, stored);
|
|
|
|
}
|
|
|
|
|
|
|
|
int mqtt3_db_message_store(struct mosquitto_db *db, const char *source, uint16_t source_mid, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain, struct mosquitto_msg_store **stored, dbid_t store_id)
|
|
|
|
{
|
|
|
|
struct mosquitto_msg_store *temp;
|
|
|
|
|
|
|
|
assert(db);
|
|
|
|
assert(stored);
|
|
|
|
|
|
|
|
temp = _mosquitto_malloc(sizeof(struct mosquitto_msg_store));
|
|
|
|
if(!temp) return MOSQ_ERR_NOMEM;
|
|
|
|
|
|
|
|
temp->next = db->msg_store;
|
|
|
|
temp->ref_count = 0;
|
|
|
|
if(source){
|
|
|
|
temp->source_id = _mosquitto_strdup(source);
|
|
|
|
}else{
|
|
|
|
temp->source_id = _mosquitto_strdup("");
|
|
|
|
}
|
|
|
|
if(!temp->source_id){
|
|
|
|
_mosquitto_free(temp);
|
|
|
|
_mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
|
|
|
return MOSQ_ERR_NOMEM;
|
|
|
|
}
|
|
|
|
temp->source_mid = source_mid;
|
|
|
|
temp->msg.mid = 0;
|
|
|
|
temp->msg.qos = qos;
|
|
|
|
temp->msg.retain = retain;
|
|
|
|
if(topic){
|
|
|
|
temp->msg.topic = _mosquitto_strdup(topic);
|
|
|
|
if(!temp->msg.topic){
|
|
|
|
_mosquitto_free(temp->source_id);
|
|
|
|
_mosquitto_free(temp);
|
|
|
|
_mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
|
|
|
return MOSQ_ERR_NOMEM;
|
|
|
|
}
|
|
|
|
}else{
|
|
|
|
temp->msg.topic = NULL;
|
|
|
|
}
|
|
|
|
temp->msg.payloadlen = payloadlen;
|
|
|
|
if(payloadlen){
|
|
|
|
temp->msg.payload = _mosquitto_malloc(sizeof(char)*payloadlen);
|
|
|
|
if(!temp->msg.payload){
|
|
|
|
if(temp->source_id) _mosquitto_free(temp->source_id);
|
|
|
|
if(temp->msg.topic) _mosquitto_free(temp->msg.topic);
|
|
|
|
if(temp->msg.payload) _mosquitto_free(temp->msg.payload);
|
|
|
|
_mosquitto_free(temp);
|
|
|
|
return MOSQ_ERR_NOMEM;
|
|
|
|
}
|
|
|
|
memcpy(temp->msg.payload, payload, sizeof(char)*payloadlen);
|
|
|
|
}else{
|
|
|
|
temp->msg.payload = NULL;
|
|
|
|
}
|
|
|
|
|
|
|
|
if(!temp->source_id || (payloadlen && !temp->msg.payload)){
|
|
|
|
if(temp->source_id) _mosquitto_free(temp->source_id);
|
|
|
|
if(temp->msg.topic) _mosquitto_free(temp->msg.topic);
|
|
|
|
if(temp->msg.payload) _mosquitto_free(temp->msg.payload);
|
|
|
|
_mosquitto_free(temp);
|
|
|
|
return 1;
|
|
|
|
}
|
|
|
|
temp->dest_ids = NULL;
|
|
|
|
temp->dest_id_count = 0;
|
|
|
|
db->msg_store_count++;
|
|
|
|
db->msg_store = temp;
|
|
|
|
(*stored) = temp;
|
|
|
|
|
|
|
|
if(!store_id){
|
|
|
|
temp->db_id = ++db->last_db_id;
|
|
|
|
}else{
|
|
|
|
temp->db_id = store_id;
|
|
|
|
}
|
|
|
|
|
|
|
|
return MOSQ_ERR_SUCCESS;
|
|
|
|
}
|
|
|
|
|
|
|
|
int mqtt3_db_message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_msg_store **stored)
|
|
|
|
{
|
|
|
|
struct mosquitto_client_msg *tail;
|
|
|
|
|
|
|
|
if(!context) return MOSQ_ERR_INVAL;
|
|
|
|
|
|
|
|
*stored = NULL;
|
|
|
|
tail = context->msgs;
|
|
|
|
while(tail){
|
|
|
|
if(tail->store->source_mid == mid && tail->direction == mosq_md_in){
|
|
|
|
*stored = tail->store;
|
|
|
|
return MOSQ_ERR_SUCCESS;
|
|
|
|
}
|
|
|
|
tail = tail->next;
|
|
|
|
}
|
|
|
|
|
|
|
|
return 1;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* Called on reconnect to set outgoing messages to a sensible state and force a
|
|
|
|
* retry, and to set incoming messages to expect an appropriate retry. */
|
|
|
|
int mqtt3_db_message_reconnect_reset(struct mosquitto *context)
|
|
|
|
{
|
|
|
|
struct mosquitto_client_msg *msg;
|
|
|
|
struct mosquitto_client_msg *prev = NULL;
|
|
|
|
int count;
|
|
|
|
|
|
|
|
msg = context->msgs;
|
|
|
|
context->msg_count = 0;
|
|
|
|
context->msg_count12 = 0;
|
|
|
|
while(msg){
|
|
|
|
context->last_msg = msg;
|
|
|
|
|
|
|
|
context->msg_count++;
|
|
|
|
if(msg->qos > 0){
|
|
|
|
context->msg_count12++;
|
|
|
|
}
|
|
|
|
|
|
|
|
if(msg->direction == mosq_md_out){
|
|
|
|
if(msg->state != mosq_ms_queued){
|
|
|
|
switch(msg->qos){
|
|
|
|
case 0:
|
|
|
|
msg->state = mosq_ms_publish_qos0;
|
|
|
|
break;
|
|
|
|
case 1:
|
|
|
|
msg->state = mosq_ms_publish_qos1;
|
|
|
|
break;
|
|
|
|
case 2:
|
|
|
|
if(msg->state == mosq_ms_wait_for_pubcomp){
|
|
|
|
msg->state = mosq_ms_resend_pubrel;
|
|
|
|
}else{
|
|
|
|
msg->state = mosq_ms_publish_qos2;
|
|
|
|
}
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}else{
|
|
|
|
if(msg->qos != 2){
|
|
|
|
/* Anything <QoS 2 can be completely retried by the client at
|
|
|
|
* no harm. */
|
|
|
|
_message_remove(context, &msg, prev);
|
|
|
|
}else{
|
|
|
|
/* Message state can be preserved here because it should match
|
|
|
|
* whatever the client has got. */
|
|
|
|
}
|
|
|
|
}
|
|
|
|
prev = msg;
|
|
|
|
if(msg) msg = msg->next;
|
|
|
|
}
|
|
|
|
/* Messages received when the client was disconnected are put
|
|
|
|
* in the mosq_ms_queued state. If we don't change them to the
|
|
|
|
* appropriate "publish" state, then the queued messages won't
|
|
|
|
* get sent until the client next receives a message - and they
|
|
|
|
* will be sent out of order.
|
|
|
|
*/
|
|
|
|
if(context->msgs){
|
|
|
|
count = 0;
|
|
|
|
msg = context->msgs;
|
|
|
|
while(msg && (max_inflight == 0 || count < max_inflight)){
|
|
|
|
if(msg->state == mosq_ms_queued){
|
|
|
|
switch(msg->qos){
|
|
|
|
case 0:
|
|
|
|
msg->state = mosq_ms_publish_qos0;
|
|
|
|
break;
|
|
|
|
case 1:
|
|
|
|
msg->state = mosq_ms_publish_qos1;
|
|
|
|
break;
|
|
|
|
case 2:
|
|
|
|
msg->state = mosq_ms_publish_qos2;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
msg = msg->next;
|
|
|
|
count++;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return MOSQ_ERR_SUCCESS;
|
|
|
|
}
|
|
|
|
|
|
|
|
int mqtt3_db_message_timeout_check(struct mosquitto_db *db, unsigned int timeout)
|
|
|
|
{
|
|
|
|
time_t threshold;
|
2014-05-29 14:22:48 +00:00
|
|
|
enum mosquitto_msg_state new_state;
|
2014-06-23 16:57:35 +00:00
|
|
|
struct mosquitto *context, *ctxt_tmp;
|
2014-05-07 22:27:00 +00:00
|
|
|
struct mosquitto_client_msg *msg;
|
|
|
|
|
|
|
|
threshold = mosquitto_time() - timeout;
|
|
|
|
|
2014-06-23 16:57:35 +00:00
|
|
|
HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){
|
2014-05-07 22:27:00 +00:00
|
|
|
msg = context->msgs;
|
|
|
|
while(msg){
|
2014-05-29 14:22:48 +00:00
|
|
|
new_state = mosq_ms_invalid;
|
2014-05-07 22:27:00 +00:00
|
|
|
if(msg->timestamp < threshold && msg->state != mosq_ms_queued){
|
|
|
|
switch(msg->state){
|
|
|
|
case mosq_ms_wait_for_puback:
|
|
|
|
new_state = mosq_ms_publish_qos1;
|
|
|
|
break;
|
|
|
|
case mosq_ms_wait_for_pubrec:
|
|
|
|
new_state = mosq_ms_publish_qos2;
|
|
|
|
break;
|
|
|
|
case mosq_ms_wait_for_pubrel:
|
|
|
|
new_state = mosq_ms_send_pubrec;
|
|
|
|
break;
|
|
|
|
case mosq_ms_wait_for_pubcomp:
|
|
|
|
new_state = mosq_ms_resend_pubrel;
|
|
|
|
break;
|
|
|
|
default:
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
if(new_state != mosq_ms_invalid){
|
|
|
|
msg->timestamp = mosquitto_time();
|
|
|
|
msg->state = new_state;
|
|
|
|
msg->dup = true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
msg = msg->next;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return MOSQ_ERR_SUCCESS;
|
|
|
|
}
|
|
|
|
|
|
|
|
int mqtt3_db_message_release(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir)
|
|
|
|
{
|
|
|
|
struct mosquitto_client_msg *tail, *last = NULL;
|
|
|
|
int qos;
|
|
|
|
int retain;
|
|
|
|
char *topic;
|
|
|
|
char *source_id;
|
|
|
|
int msg_index = 0;
|
|
|
|
bool deleted = false;
|
|
|
|
|
|
|
|
if(!context) return MOSQ_ERR_INVAL;
|
|
|
|
|
|
|
|
tail = context->msgs;
|
|
|
|
while(tail){
|
|
|
|
msg_index++;
|
|
|
|
if(tail->state == mosq_ms_queued && msg_index <= max_inflight){
|
|
|
|
tail->timestamp = mosquitto_time();
|
|
|
|
if(tail->direction == mosq_md_out){
|
|
|
|
switch(tail->qos){
|
|
|
|
case 0:
|
|
|
|
tail->state = mosq_ms_publish_qos0;
|
|
|
|
break;
|
|
|
|
case 1:
|
|
|
|
tail->state = mosq_ms_publish_qos1;
|
|
|
|
break;
|
|
|
|
case 2:
|
|
|
|
tail->state = mosq_ms_publish_qos2;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}else{
|
|
|
|
if(tail->qos == 2){
|
|
|
|
_mosquitto_send_pubrec(context, tail->mid);
|
|
|
|
tail->state = mosq_ms_wait_for_pubrel;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if(tail->mid == mid && tail->direction == dir){
|
|
|
|
qos = tail->store->msg.qos;
|
|
|
|
topic = tail->store->msg.topic;
|
|
|
|
retain = tail->retain;
|
|
|
|
source_id = tail->store->source_id;
|
|
|
|
|
|
|
|
/* topic==NULL should be a QoS 2 message that was
|
|
|
|
* denied/dropped and is being processed so the client doesn't
|
|
|
|
* keep resending it. That means we don't send it to other
|
|
|
|
* clients. */
|
|
|
|
if(!topic || !mqtt3_db_messages_queue(db, source_id, topic, qos, retain, tail->store)){
|
|
|
|
_message_remove(context, &tail, last);
|
|
|
|
deleted = true;
|
|
|
|
}else{
|
|
|
|
return 1;
|
|
|
|
}
|
|
|
|
}else{
|
|
|
|
last = tail;
|
|
|
|
tail = tail->next;
|
|
|
|
}
|
|
|
|
if(msg_index > max_inflight && deleted){
|
|
|
|
return MOSQ_ERR_SUCCESS;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if(deleted){
|
|
|
|
return MOSQ_ERR_SUCCESS;
|
|
|
|
}else{
|
|
|
|
return 1;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
int mqtt3_db_message_write(struct mosquitto *context)
|
|
|
|
{
|
|
|
|
int rc;
|
|
|
|
struct mosquitto_client_msg *tail, *last = NULL;
|
|
|
|
uint16_t mid;
|
|
|
|
int retries;
|
|
|
|
int retain;
|
|
|
|
const char *topic;
|
|
|
|
int qos;
|
|
|
|
uint32_t payloadlen;
|
|
|
|
const void *payload;
|
|
|
|
int msg_count = 0;
|
|
|
|
|
2014-06-29 22:16:10 +00:00
|
|
|
if(!context || context->sock == INVALID_SOCKET
|
2014-05-06 09:47:00 +00:00
|
|
|
|| (context->state == mosq_cs_connected && !context->id)){
|
2014-05-07 22:27:00 +00:00
|
|
|
return MOSQ_ERR_INVAL;
|
|
|
|
}
|
|
|
|
|
2014-10-24 20:28:24 +00:00
|
|
|
if(context->state != mosq_cs_connected){
|
|
|
|
return MOSQ_ERR_SUCCESS;
|
|
|
|
}
|
|
|
|
|
2014-05-07 22:27:00 +00:00
|
|
|
tail = context->msgs;
|
|
|
|
while(tail){
|
|
|
|
if(tail->direction == mosq_md_in){
|
|
|
|
msg_count++;
|
|
|
|
}
|
|
|
|
if(tail->state != mosq_ms_queued){
|
|
|
|
mid = tail->mid;
|
|
|
|
retries = tail->dup;
|
|
|
|
retain = tail->retain;
|
|
|
|
topic = tail->store->msg.topic;
|
|
|
|
qos = tail->qos;
|
|
|
|
payloadlen = tail->store->msg.payloadlen;
|
|
|
|
payload = tail->store->msg.payload;
|
|
|
|
|
|
|
|
switch(tail->state){
|
|
|
|
case mosq_ms_publish_qos0:
|
|
|
|
rc = _mosquitto_send_publish(context, mid, topic, payloadlen, payload, qos, retain, retries);
|
|
|
|
if(!rc){
|
|
|
|
_message_remove(context, &tail, last);
|
|
|
|
}else{
|
|
|
|
return rc;
|
|
|
|
}
|
|
|
|
break;
|
|
|
|
|
|
|
|
case mosq_ms_publish_qos1:
|
|
|
|
rc = _mosquitto_send_publish(context, mid, topic, payloadlen, payload, qos, retain, retries);
|
|
|
|
if(!rc){
|
|
|
|
tail->timestamp = mosquitto_time();
|
|
|
|
tail->dup = 1; /* Any retry attempts are a duplicate. */
|
|
|
|
tail->state = mosq_ms_wait_for_puback;
|
|
|
|
}else{
|
|
|
|
return rc;
|
|
|
|
}
|
|
|
|
last = tail;
|
|
|
|
tail = tail->next;
|
|
|
|
break;
|
|
|
|
|
|
|
|
case mosq_ms_publish_qos2:
|
|
|
|
rc = _mosquitto_send_publish(context, mid, topic, payloadlen, payload, qos, retain, retries);
|
|
|
|
if(!rc){
|
|
|
|
tail->timestamp = mosquitto_time();
|
|
|
|
tail->dup = 1; /* Any retry attempts are a duplicate. */
|
|
|
|
tail->state = mosq_ms_wait_for_pubrec;
|
|
|
|
}else{
|
|
|
|
return rc;
|
|
|
|
}
|
|
|
|
last = tail;
|
|
|
|
tail = tail->next;
|
|
|
|
break;
|
|
|
|
|
|
|
|
case mosq_ms_send_pubrec:
|
|
|
|
rc = _mosquitto_send_pubrec(context, mid);
|
|
|
|
if(!rc){
|
|
|
|
tail->state = mosq_ms_wait_for_pubrel;
|
|
|
|
}else{
|
|
|
|
return rc;
|
|
|
|
}
|
|
|
|
last = tail;
|
|
|
|
tail = tail->next;
|
|
|
|
break;
|
|
|
|
|
|
|
|
case mosq_ms_resend_pubrel:
|
|
|
|
rc = _mosquitto_send_pubrel(context, mid, true);
|
|
|
|
if(!rc){
|
|
|
|
tail->state = mosq_ms_wait_for_pubcomp;
|
|
|
|
}else{
|
|
|
|
return rc;
|
|
|
|
}
|
|
|
|
last = tail;
|
|
|
|
tail = tail->next;
|
|
|
|
break;
|
|
|
|
|
|
|
|
case mosq_ms_resend_pubcomp:
|
|
|
|
rc = _mosquitto_send_pubcomp(context, mid);
|
|
|
|
if(!rc){
|
|
|
|
tail->state = mosq_ms_wait_for_pubrel;
|
|
|
|
}else{
|
|
|
|
return rc;
|
|
|
|
}
|
|
|
|
last = tail;
|
|
|
|
tail = tail->next;
|
|
|
|
break;
|
|
|
|
|
|
|
|
default:
|
|
|
|
last = tail;
|
|
|
|
tail = tail->next;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}else{
|
|
|
|
/* state == mosq_ms_queued */
|
|
|
|
if(tail->direction == mosq_md_in && (max_inflight == 0 || msg_count < max_inflight)){
|
|
|
|
if(tail->qos == 2){
|
|
|
|
tail->state = mosq_ms_send_pubrec;
|
|
|
|
}
|
|
|
|
}else{
|
|
|
|
last = tail;
|
|
|
|
tail = tail->next;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return MOSQ_ERR_SUCCESS;
|
|
|
|
}
|
|
|
|
|
|
|
|
void mqtt3_db_store_clean(struct mosquitto_db *db)
|
|
|
|
{
|
|
|
|
/* FIXME - this may not be necessary if checks are made when messages are removed. */
|
|
|
|
struct mosquitto_msg_store *tail, *last = NULL;
|
|
|
|
int i;
|
|
|
|
assert(db);
|
|
|
|
|
|
|
|
tail = db->msg_store;
|
|
|
|
while(tail){
|
|
|
|
if(tail->ref_count == 0){
|
|
|
|
if(tail->source_id) _mosquitto_free(tail->source_id);
|
|
|
|
if(tail->dest_ids){
|
|
|
|
for(i=0; i<tail->dest_id_count; i++){
|
|
|
|
if(tail->dest_ids[i]) _mosquitto_free(tail->dest_ids[i]);
|
|
|
|
}
|
|
|
|
_mosquitto_free(tail->dest_ids);
|
|
|
|
}
|
|
|
|
if(tail->msg.topic) _mosquitto_free(tail->msg.topic);
|
|
|
|
if(tail->msg.payload) _mosquitto_free(tail->msg.payload);
|
|
|
|
if(last){
|
|
|
|
last->next = tail->next;
|
|
|
|
_mosquitto_free(tail);
|
|
|
|
tail = last->next;
|
|
|
|
}else{
|
|
|
|
db->msg_store = tail->next;
|
|
|
|
_mosquitto_free(tail);
|
|
|
|
tail = db->msg_store;
|
|
|
|
}
|
|
|
|
db->msg_store_count--;
|
|
|
|
}else{
|
|
|
|
last = tail;
|
|
|
|
tail = tail->next;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void mqtt3_db_limits_set(int inflight, int queued)
|
|
|
|
{
|
|
|
|
max_inflight = inflight;
|
|
|
|
max_queued = queued;
|
|
|
|
}
|
|
|
|
|
|
|
|
void mqtt3_db_vacuum(void)
|
|
|
|
{
|
|
|
|
/* FIXME - reimplement? */
|
|
|
|
}
|
|
|
|
|