2014-05-07 22:27:00 +00:00
|
|
|
/*
|
|
|
|
Copyright (c) 2010-2014 Roger Light <roger@atchoo.org>
|
|
|
|
|
|
|
|
All rights reserved. This program and the accompanying materials
|
|
|
|
are made available under the terms of the Eclipse Public License v1.0
|
|
|
|
and Eclipse Distribution License v1.0 which accompany this distribution.
|
|
|
|
|
|
|
|
The Eclipse Public License is available at
|
|
|
|
http://www.eclipse.org/legal/epl-v10.html
|
|
|
|
and the Eclipse Distribution License is available at
|
|
|
|
http://www.eclipse.org/org/documents/edl-v10.php.
|
|
|
|
|
|
|
|
Contributors:
|
|
|
|
Roger Light - initial implementation and documentation.
|
|
|
|
*/
|
|
|
|
|
|
|
|
#include <assert.h>
|
|
|
|
#include <stdlib.h>
|
|
|
|
#include <string.h>
|
|
|
|
|
|
|
|
#include <mosquitto_internal.h>
|
|
|
|
#include <mosquitto.h>
|
|
|
|
#include <memory_mosq.h>
|
|
|
|
#include <messages_mosq.h>
|
|
|
|
#include <send_mosq.h>
|
|
|
|
#include <time_mosq.h>
|
|
|
|
|
|
|
|
void _mosquitto_message_cleanup(struct mosquitto_message_all **message)
|
|
|
|
{
|
|
|
|
struct mosquitto_message_all *msg;
|
|
|
|
|
|
|
|
if(!message || !*message) return;
|
|
|
|
|
|
|
|
msg = *message;
|
|
|
|
|
|
|
|
if(msg->msg.topic) _mosquitto_free(msg->msg.topic);
|
|
|
|
if(msg->msg.payload) _mosquitto_free(msg->msg.payload);
|
|
|
|
_mosquitto_free(msg);
|
|
|
|
}
|
|
|
|
|
|
|
|
void _mosquitto_message_cleanup_all(struct mosquitto *mosq)
|
|
|
|
{
|
|
|
|
struct mosquitto_message_all *tmp;
|
|
|
|
|
|
|
|
assert(mosq);
|
|
|
|
|
|
|
|
while(mosq->in_messages){
|
|
|
|
tmp = mosq->in_messages->next;
|
|
|
|
_mosquitto_message_cleanup(&mosq->in_messages);
|
|
|
|
mosq->in_messages = tmp;
|
|
|
|
}
|
|
|
|
while(mosq->out_messages){
|
|
|
|
tmp = mosq->out_messages->next;
|
|
|
|
_mosquitto_message_cleanup(&mosq->out_messages);
|
|
|
|
mosq->out_messages = tmp;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
int mosquitto_message_copy(struct mosquitto_message *dst, const struct mosquitto_message *src)
|
|
|
|
{
|
|
|
|
if(!dst || !src) return MOSQ_ERR_INVAL;
|
|
|
|
|
|
|
|
dst->mid = src->mid;
|
|
|
|
dst->topic = _mosquitto_strdup(src->topic);
|
|
|
|
if(!dst->topic) return MOSQ_ERR_NOMEM;
|
|
|
|
dst->qos = src->qos;
|
|
|
|
dst->retain = src->retain;
|
|
|
|
if(src->payloadlen){
|
|
|
|
dst->payload = _mosquitto_malloc(src->payloadlen);
|
|
|
|
if(!dst->payload){
|
|
|
|
_mosquitto_free(dst->topic);
|
|
|
|
return MOSQ_ERR_NOMEM;
|
|
|
|
}
|
|
|
|
memcpy(dst->payload, src->payload, src->payloadlen);
|
|
|
|
dst->payloadlen = src->payloadlen;
|
|
|
|
}else{
|
|
|
|
dst->payloadlen = 0;
|
|
|
|
dst->payload = NULL;
|
|
|
|
}
|
|
|
|
return MOSQ_ERR_SUCCESS;
|
|
|
|
}
|
|
|
|
|
|
|
|
int _mosquitto_message_delete(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir)
|
|
|
|
{
|
|
|
|
struct mosquitto_message_all *message;
|
|
|
|
int rc;
|
|
|
|
assert(mosq);
|
|
|
|
|
|
|
|
rc = _mosquitto_message_remove(mosq, mid, dir, &message);
|
|
|
|
if(rc == MOSQ_ERR_SUCCESS){
|
|
|
|
_mosquitto_message_cleanup(&message);
|
|
|
|
}
|
|
|
|
return rc;
|
|
|
|
}
|
|
|
|
|
|
|
|
void mosquitto_message_free(struct mosquitto_message **message)
|
|
|
|
{
|
|
|
|
struct mosquitto_message *msg;
|
|
|
|
|
|
|
|
if(!message || !*message) return;
|
|
|
|
|
|
|
|
msg = *message;
|
|
|
|
|
|
|
|
if(msg->topic) _mosquitto_free(msg->topic);
|
|
|
|
if(msg->payload) _mosquitto_free(msg->payload);
|
|
|
|
_mosquitto_free(msg);
|
|
|
|
}
|
|
|
|
|
|
|
|
void _mosquitto_message_queue(struct mosquitto *mosq, struct mosquitto_message_all *message, enum mosquitto_msg_direction dir)
|
|
|
|
{
|
|
|
|
/* mosq->*_message_mutex should be locked before entering this function */
|
|
|
|
assert(mosq);
|
|
|
|
assert(message);
|
|
|
|
|
|
|
|
if(dir == mosq_md_out){
|
|
|
|
mosq->out_queue_len++;
|
|
|
|
message->next = NULL;
|
|
|
|
if(mosq->out_messages_last){
|
|
|
|
mosq->out_messages_last->next = message;
|
|
|
|
}else{
|
|
|
|
mosq->out_messages = message;
|
|
|
|
}
|
|
|
|
mosq->out_messages_last = message;
|
|
|
|
if(message->msg.qos > 0 && (mosq->max_inflight_messages == 0 || mosq->inflight_messages < mosq->max_inflight_messages)){
|
|
|
|
mosq->inflight_messages++;
|
|
|
|
}
|
2015-03-08 22:06:20 +00:00
|
|
|
}else{
|
|
|
|
mosq->in_queue_len++;
|
2014-05-07 22:27:00 +00:00
|
|
|
message->next = NULL;
|
|
|
|
if(mosq->in_messages_last){
|
|
|
|
mosq->in_messages_last->next = message;
|
|
|
|
}else{
|
|
|
|
mosq->in_messages = message;
|
|
|
|
}
|
|
|
|
mosq->in_messages_last = message;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void _mosquitto_messages_reconnect_reset(struct mosquitto *mosq)
|
|
|
|
{
|
|
|
|
struct mosquitto_message_all *message;
|
|
|
|
struct mosquitto_message_all *prev = NULL;
|
|
|
|
assert(mosq);
|
|
|
|
|
|
|
|
pthread_mutex_lock(&mosq->in_message_mutex);
|
|
|
|
message = mosq->in_messages;
|
|
|
|
mosq->in_queue_len = 0;
|
|
|
|
while(message){
|
|
|
|
mosq->in_queue_len++;
|
|
|
|
message->timestamp = 0;
|
|
|
|
if(message->msg.qos != 2){
|
|
|
|
if(prev){
|
|
|
|
prev->next = message->next;
|
|
|
|
_mosquitto_message_cleanup(&message);
|
|
|
|
message = prev;
|
|
|
|
}else{
|
|
|
|
mosq->in_messages = message->next;
|
|
|
|
_mosquitto_message_cleanup(&message);
|
|
|
|
message = mosq->in_messages;
|
|
|
|
}
|
|
|
|
}else{
|
|
|
|
/* Message state can be preserved here because it should match
|
|
|
|
* whatever the client has got. */
|
|
|
|
}
|
|
|
|
prev = message;
|
|
|
|
message = message->next;
|
|
|
|
}
|
|
|
|
mosq->in_messages_last = prev;
|
|
|
|
pthread_mutex_unlock(&mosq->in_message_mutex);
|
|
|
|
|
|
|
|
|
|
|
|
pthread_mutex_lock(&mosq->out_message_mutex);
|
|
|
|
mosq->inflight_messages = 0;
|
|
|
|
message = mosq->out_messages;
|
|
|
|
mosq->out_queue_len = 0;
|
|
|
|
while(message){
|
|
|
|
mosq->out_queue_len++;
|
|
|
|
message->timestamp = 0;
|
|
|
|
|
|
|
|
if(message->msg.qos > 0){
|
|
|
|
mosq->inflight_messages++;
|
|
|
|
}
|
|
|
|
if(mosq->max_inflight_messages == 0 || mosq->inflight_messages < mosq->max_inflight_messages){
|
|
|
|
if(message->msg.qos == 1){
|
|
|
|
message->state = mosq_ms_wait_for_puback;
|
|
|
|
}else if(message->msg.qos == 2){
|
|
|
|
/* Should be able to preserve state. */
|
|
|
|
}
|
|
|
|
}else{
|
|
|
|
message->state = mosq_ms_invalid;
|
|
|
|
}
|
|
|
|
prev = message;
|
|
|
|
message = message->next;
|
|
|
|
}
|
|
|
|
mosq->out_messages_last = prev;
|
|
|
|
pthread_mutex_unlock(&mosq->out_message_mutex);
|
|
|
|
}
|
|
|
|
|
|
|
|
int _mosquitto_message_remove(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir, struct mosquitto_message_all **message)
|
|
|
|
{
|
|
|
|
struct mosquitto_message_all *cur, *prev = NULL;
|
|
|
|
bool found = false;
|
|
|
|
int rc;
|
|
|
|
assert(mosq);
|
|
|
|
assert(message);
|
|
|
|
|
|
|
|
if(dir == mosq_md_out){
|
|
|
|
pthread_mutex_lock(&mosq->out_message_mutex);
|
|
|
|
cur = mosq->out_messages;
|
|
|
|
while(cur){
|
|
|
|
if(cur->msg.mid == mid){
|
|
|
|
if(prev){
|
|
|
|
prev->next = cur->next;
|
|
|
|
}else{
|
|
|
|
mosq->out_messages = cur->next;
|
|
|
|
}
|
|
|
|
*message = cur;
|
|
|
|
mosq->out_queue_len--;
|
|
|
|
if(cur->next == NULL){
|
|
|
|
mosq->out_messages_last = prev;
|
|
|
|
}else if(!mosq->out_messages){
|
|
|
|
mosq->out_messages_last = NULL;
|
|
|
|
}
|
|
|
|
if(cur->msg.qos > 0){
|
|
|
|
mosq->inflight_messages--;
|
|
|
|
}
|
|
|
|
found = true;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
prev = cur;
|
|
|
|
cur = cur->next;
|
|
|
|
}
|
|
|
|
|
|
|
|
if(found){
|
|
|
|
cur = mosq->out_messages;
|
|
|
|
while(cur){
|
|
|
|
if(mosq->max_inflight_messages == 0 || mosq->inflight_messages < mosq->max_inflight_messages){
|
|
|
|
if(cur->msg.qos > 0 && cur->state == mosq_ms_invalid){
|
|
|
|
mosq->inflight_messages++;
|
|
|
|
if(cur->msg.qos == 1){
|
|
|
|
cur->state = mosq_ms_wait_for_puback;
|
|
|
|
}else if(cur->msg.qos == 2){
|
|
|
|
cur->state = mosq_ms_wait_for_pubrec;
|
|
|
|
}
|
|
|
|
rc = _mosquitto_send_publish(mosq, cur->msg.mid, cur->msg.topic, cur->msg.payloadlen, cur->msg.payload, cur->msg.qos, cur->msg.retain, cur->dup);
|
|
|
|
if(rc){
|
|
|
|
pthread_mutex_unlock(&mosq->out_message_mutex);
|
|
|
|
return rc;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}else{
|
|
|
|
pthread_mutex_unlock(&mosq->out_message_mutex);
|
|
|
|
return MOSQ_ERR_SUCCESS;
|
|
|
|
}
|
|
|
|
cur = cur->next;
|
|
|
|
}
|
|
|
|
pthread_mutex_unlock(&mosq->out_message_mutex);
|
|
|
|
return MOSQ_ERR_SUCCESS;
|
|
|
|
}else{
|
|
|
|
pthread_mutex_unlock(&mosq->out_message_mutex);
|
|
|
|
return MOSQ_ERR_NOT_FOUND;
|
|
|
|
}
|
|
|
|
}else{
|
|
|
|
pthread_mutex_lock(&mosq->in_message_mutex);
|
|
|
|
cur = mosq->in_messages;
|
|
|
|
while(cur){
|
|
|
|
if(cur->msg.mid == mid){
|
|
|
|
if(prev){
|
|
|
|
prev->next = cur->next;
|
|
|
|
}else{
|
|
|
|
mosq->in_messages = cur->next;
|
|
|
|
}
|
|
|
|
*message = cur;
|
|
|
|
mosq->in_queue_len--;
|
|
|
|
if(cur->next == NULL){
|
|
|
|
mosq->in_messages_last = prev;
|
|
|
|
}else if(!mosq->in_messages){
|
|
|
|
mosq->in_messages_last = NULL;
|
|
|
|
}
|
|
|
|
found = true;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
prev = cur;
|
|
|
|
cur = cur->next;
|
|
|
|
}
|
|
|
|
|
|
|
|
pthread_mutex_unlock(&mosq->in_message_mutex);
|
|
|
|
if(found){
|
|
|
|
return MOSQ_ERR_SUCCESS;
|
|
|
|
}else{
|
|
|
|
return MOSQ_ERR_NOT_FOUND;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#ifdef WITH_THREADING
|
2014-07-20 20:17:29 +00:00
|
|
|
void _mosquitto_message_retry_check_actual(struct mosquitto *mosq, struct mosquitto_message_all *messages, pthread_mutex_t *mutex)
|
2014-05-07 22:27:00 +00:00
|
|
|
#else
|
|
|
|
void _mosquitto_message_retry_check_actual(struct mosquitto *mosq, struct mosquitto_message_all *messages)
|
|
|
|
#endif
|
|
|
|
{
|
|
|
|
time_t now = mosquitto_time();
|
|
|
|
assert(mosq);
|
|
|
|
|
|
|
|
#ifdef WITH_THREADING
|
2014-07-20 20:17:29 +00:00
|
|
|
pthread_mutex_lock(mutex);
|
2014-05-07 22:27:00 +00:00
|
|
|
#endif
|
|
|
|
|
|
|
|
while(messages){
|
|
|
|
if(messages->timestamp + mosq->message_retry < now){
|
|
|
|
switch(messages->state){
|
|
|
|
case mosq_ms_wait_for_puback:
|
|
|
|
case mosq_ms_wait_for_pubrec:
|
|
|
|
messages->timestamp = now;
|
|
|
|
messages->dup = true;
|
|
|
|
_mosquitto_send_publish(mosq, messages->msg.mid, messages->msg.topic, messages->msg.payloadlen, messages->msg.payload, messages->msg.qos, messages->msg.retain, messages->dup);
|
|
|
|
break;
|
|
|
|
case mosq_ms_wait_for_pubrel:
|
|
|
|
messages->timestamp = now;
|
|
|
|
messages->dup = true;
|
|
|
|
_mosquitto_send_pubrec(mosq, messages->msg.mid);
|
|
|
|
break;
|
|
|
|
case mosq_ms_wait_for_pubcomp:
|
|
|
|
messages->timestamp = now;
|
|
|
|
messages->dup = true;
|
2014-11-19 21:28:52 +00:00
|
|
|
_mosquitto_send_pubrel(mosq, messages->msg.mid);
|
2014-05-07 22:27:00 +00:00
|
|
|
break;
|
|
|
|
default:
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
messages = messages->next;
|
|
|
|
}
|
|
|
|
#ifdef WITH_THREADING
|
2014-07-20 20:17:29 +00:00
|
|
|
pthread_mutex_unlock(mutex);
|
2014-05-07 22:27:00 +00:00
|
|
|
#endif
|
|
|
|
}
|
|
|
|
|
|
|
|
void _mosquitto_message_retry_check(struct mosquitto *mosq)
|
|
|
|
{
|
|
|
|
#ifdef WITH_THREADING
|
2014-07-20 20:17:29 +00:00
|
|
|
_mosquitto_message_retry_check_actual(mosq, mosq->out_messages, &mosq->out_message_mutex);
|
|
|
|
_mosquitto_message_retry_check_actual(mosq, mosq->in_messages, &mosq->in_message_mutex);
|
2014-05-07 22:27:00 +00:00
|
|
|
#else
|
|
|
|
_mosquitto_message_retry_check_actual(mosq, mosq->out_messages);
|
|
|
|
_mosquitto_message_retry_check_actual(mosq, mosq->in_messages);
|
|
|
|
#endif
|
|
|
|
}
|
|
|
|
|
|
|
|
void mosquitto_message_retry_set(struct mosquitto *mosq, unsigned int message_retry)
|
|
|
|
{
|
|
|
|
assert(mosq);
|
|
|
|
if(mosq) mosq->message_retry = message_retry;
|
|
|
|
}
|
|
|
|
|
|
|
|
int _mosquitto_message_out_update(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_state state)
|
|
|
|
{
|
|
|
|
struct mosquitto_message_all *message;
|
|
|
|
assert(mosq);
|
|
|
|
|
|
|
|
pthread_mutex_lock(&mosq->out_message_mutex);
|
|
|
|
message = mosq->out_messages;
|
|
|
|
while(message){
|
|
|
|
if(message->msg.mid == mid){
|
|
|
|
message->state = state;
|
|
|
|
message->timestamp = mosquitto_time();
|
|
|
|
pthread_mutex_unlock(&mosq->out_message_mutex);
|
|
|
|
return MOSQ_ERR_SUCCESS;
|
|
|
|
}
|
|
|
|
message = message->next;
|
|
|
|
}
|
|
|
|
pthread_mutex_unlock(&mosq->out_message_mutex);
|
|
|
|
return MOSQ_ERR_NOT_FOUND;
|
|
|
|
}
|
|
|
|
|
|
|
|
int mosquitto_max_inflight_messages_set(struct mosquitto *mosq, unsigned int max_inflight_messages)
|
|
|
|
{
|
|
|
|
if(!mosq) return MOSQ_ERR_INVAL;
|
|
|
|
|
|
|
|
mosq->max_inflight_messages = max_inflight_messages;
|
|
|
|
|
|
|
|
return MOSQ_ERR_SUCCESS;
|
|
|
|
}
|
|
|
|
|