mosquitto/lib/messages_mosq.c

350 lines
9.0 KiB
C
Raw Normal View History

2014-05-07 22:27:00 +00:00
/*
Copyright (c) 2010-2020 Roger Light <roger@atchoo.org>
2014-05-07 22:27:00 +00:00
All rights reserved. This program and the accompanying materials
2020-11-25 17:34:21 +00:00
are made available under the terms of the Eclipse Public License 2.0
2014-05-07 22:27:00 +00:00
and Eclipse Distribution License v1.0 which accompany this distribution.
2021-10-05 14:20:37 +00:00
2014-05-07 22:27:00 +00:00
The Eclipse Public License is available at
2020-11-25 17:34:21 +00:00
https://www.eclipse.org/legal/epl-2.0/
2014-05-07 22:27:00 +00:00
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
2021-10-05 14:20:37 +00:00
SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
2020-12-01 18:21:59 +00:00
2014-05-07 22:27:00 +00:00
Contributors:
Roger Light - initial implementation and documentation.
*/
#include "config.h"
2014-05-07 22:27:00 +00:00
#include <assert.h>
#include <stdlib.h>
#include <string.h>
#include <utlist.h>
2014-05-07 22:27:00 +00:00
2015-04-29 20:37:47 +00:00
#include "mosquitto_internal.h"
#include "mosquitto.h"
#include "memory_mosq.h"
#include "messages_mosq.h"
#include "send_mosq.h"
#include "time_mosq.h"
#include "util_mosq.h"
2014-05-07 22:27:00 +00:00
2015-05-16 13:29:54 +00:00
void message__cleanup(struct mosquitto_message_all **message)
2014-05-07 22:27:00 +00:00
{
struct mosquitto_message_all *msg;
if(!message || !*message) return;
msg = *message;
mosquitto__free(msg->msg.topic);
mosquitto__free(msg->msg.payload);
mosquitto_property_free_all(&msg->properties);
mosquitto__free(msg);
2014-05-07 22:27:00 +00:00
}
2015-05-16 13:29:54 +00:00
void message__cleanup_all(struct mosquitto *mosq)
2014-05-07 22:27:00 +00:00
{
struct mosquitto_message_all *tail, *tmp;
2014-05-07 22:27:00 +00:00
assert(mosq);
DL_FOREACH_SAFE(mosq->msgs_in.inflight, tail, tmp){
DL_DELETE(mosq->msgs_in.inflight, tail);
message__cleanup(&tail);
2014-05-07 22:27:00 +00:00
}
DL_FOREACH_SAFE(mosq->msgs_out.inflight, tail, tmp){
DL_DELETE(mosq->msgs_out.inflight, tail);
message__cleanup(&tail);
2014-05-07 22:27:00 +00:00
}
}
int mosquitto_message_copy(struct mosquitto_message *dst, const struct mosquitto_message *src)
{
if(!dst || !src) return MOSQ_ERR_INVAL;
dst->mid = src->mid;
dst->topic = mosquitto__strdup(src->topic);
2014-05-07 22:27:00 +00:00
if(!dst->topic) return MOSQ_ERR_NOMEM;
dst->qos = src->qos;
dst->retain = src->retain;
if(src->payloadlen){
2020-10-17 00:23:08 +00:00
dst->payload = mosquitto__calloc((unsigned int)src->payloadlen+1, sizeof(uint8_t));
2014-05-07 22:27:00 +00:00
if(!dst->payload){
mosquitto__free(dst->topic);
2014-05-07 22:27:00 +00:00
return MOSQ_ERR_NOMEM;
}
2020-10-17 00:23:08 +00:00
memcpy(dst->payload, src->payload, (unsigned int)src->payloadlen);
2014-05-07 22:27:00 +00:00
dst->payloadlen = src->payloadlen;
}else{
dst->payloadlen = 0;
dst->payload = NULL;
}
return MOSQ_ERR_SUCCESS;
}
int message__delete(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir, int qos)
2014-05-07 22:27:00 +00:00
{
struct mosquitto_message_all *message;
int rc;
assert(mosq);
rc = message__remove(mosq, mid, dir, &message, qos);
2014-05-07 22:27:00 +00:00
if(rc == MOSQ_ERR_SUCCESS){
2015-05-16 13:29:54 +00:00
message__cleanup(&message);
2014-05-07 22:27:00 +00:00
}
return rc;
}
void mosquitto_message_free(struct mosquitto_message **message)
{
struct mosquitto_message *msg;
if(!message || !*message) return;
msg = *message;
mosquitto__free(msg->topic);
mosquitto__free(msg->payload);
mosquitto__free(msg);
2014-05-07 22:27:00 +00:00
}
void mosquitto_message_free_contents(struct mosquitto_message *message)
{
if(!message) return;
mosquitto__free(message->topic);
mosquitto__free(message->payload);
}
int message__queue(struct mosquitto *mosq, struct mosquitto_message_all *message, enum mosquitto_msg_direction dir)
2014-05-07 22:27:00 +00:00
{
/* mosq->*_message_mutex should be locked before entering this function */
assert(mosq);
assert(message);
assert(message->msg.qos != 0);
2014-05-07 22:27:00 +00:00
if(dir == mosq_md_out){
DL_APPEND(mosq->msgs_out.inflight, message);
mosq->msgs_out.queue_len++;
}else{
DL_APPEND(mosq->msgs_in.inflight, message);
mosq->msgs_in.queue_len++;
2014-05-07 22:27:00 +00:00
}
return message__release_to_inflight(mosq, dir);
2014-05-07 22:27:00 +00:00
}
void message__reconnect_reset(struct mosquitto *mosq, bool update_quota_only)
2014-05-07 22:27:00 +00:00
{
struct mosquitto_message_all *message, *tmp;
2014-05-07 22:27:00 +00:00
assert(mosq);
pthread_mutex_lock(&mosq->msgs_in.mutex);
mosq->msgs_in.inflight_quota = mosq->msgs_in.inflight_maximum;
mosq->msgs_in.queue_len = 0;
DL_FOREACH_SAFE(mosq->msgs_in.inflight, message, tmp){
mosq->msgs_in.queue_len++;
2014-05-07 22:27:00 +00:00
message->timestamp = 0;
if(message->msg.qos != 2){
DL_DELETE(mosq->msgs_in.inflight, message);
message__cleanup(&message);
2014-05-07 22:27:00 +00:00
}else{
/* Message state can be preserved here because it should match
* whatever the client has got. */
util__decrement_receive_quota(mosq);
2014-05-07 22:27:00 +00:00
}
}
pthread_mutex_unlock(&mosq->msgs_in.mutex);
2014-05-07 22:27:00 +00:00
pthread_mutex_lock(&mosq->msgs_out.mutex);
mosq->msgs_out.inflight_quota = mosq->msgs_out.inflight_maximum;
mosq->msgs_out.queue_len = 0;
DL_FOREACH_SAFE(mosq->msgs_out.inflight, message, tmp){
mosq->msgs_out.queue_len++;
2014-05-07 22:27:00 +00:00
message->timestamp = 0;
if(mosq->msgs_out.inflight_quota != 0){
util__decrement_send_quota(mosq);
if (update_quota_only == false){
if(message->msg.qos == 1){
message->state = mosq_ms_publish_qos1;
}else if(message->msg.qos == 2){
if(message->state == mosq_ms_wait_for_pubrec){
message->state = mosq_ms_publish_qos2;
}else if(message->state == mosq_ms_wait_for_pubcomp){
message->state = mosq_ms_resend_pubrel;
}
/* Should be able to preserve state. */
}
2014-05-07 22:27:00 +00:00
}
}else{
message->state = mosq_ms_invalid;
}
}
pthread_mutex_unlock(&mosq->msgs_out.mutex);
}
int message__release_to_inflight(struct mosquitto *mosq, enum mosquitto_msg_direction dir)
{
/* mosq->*_message_mutex should be locked before entering this function */
struct mosquitto_message_all *cur, *tmp;
int rc = MOSQ_ERR_SUCCESS;
if(dir == mosq_md_out){
DL_FOREACH_SAFE(mosq->msgs_out.inflight, cur, tmp){
if(mosq->msgs_out.inflight_quota > 0){
if(cur->msg.qos > 0 && cur->state == mosq_ms_invalid){
if(cur->msg.qos == 1){
cur->state = mosq_ms_wait_for_puback;
}else if(cur->msg.qos == 2){
cur->state = mosq_ms_wait_for_pubrec;
}
2020-10-17 00:23:08 +00:00
rc = send__publish(mosq, (uint16_t)cur->msg.mid, cur->msg.topic, (uint32_t)cur->msg.payloadlen, cur->msg.payload, (uint8_t)cur->msg.qos, cur->msg.retain, cur->dup, cur->properties, NULL, 0);
if(rc){
return rc;
}
util__decrement_send_quota(mosq);
}
}else{
return MOSQ_ERR_SUCCESS;
}
}
}
return rc;
2014-05-07 22:27:00 +00:00
}
int message__remove(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir, struct mosquitto_message_all **message, int qos)
2014-05-07 22:27:00 +00:00
{
struct mosquitto_message_all *cur, *tmp;
2014-05-07 22:27:00 +00:00
bool found = false;
assert(mosq);
assert(message);
if(dir == mosq_md_out){
pthread_mutex_lock(&mosq->msgs_out.mutex);
DL_FOREACH_SAFE(mosq->msgs_out.inflight, cur, tmp){
if(found == false && cur->msg.mid == mid){
if(cur->msg.qos != qos){
pthread_mutex_unlock(&mosq->msgs_out.mutex);
return MOSQ_ERR_PROTOCOL;
}
DL_DELETE(mosq->msgs_out.inflight, cur);
2014-05-07 22:27:00 +00:00
*message = cur;
mosq->msgs_out.queue_len--;
2014-05-07 22:27:00 +00:00
found = true;
break;
}
}
pthread_mutex_unlock(&mosq->msgs_out.mutex);
2014-05-07 22:27:00 +00:00
if(found){
return MOSQ_ERR_SUCCESS;
}else{
return MOSQ_ERR_NOT_FOUND;
}
}else{
pthread_mutex_lock(&mosq->msgs_in.mutex);
DL_FOREACH_SAFE(mosq->msgs_in.inflight, cur, tmp){
2014-05-07 22:27:00 +00:00
if(cur->msg.mid == mid){
if(cur->msg.qos != qos){
pthread_mutex_unlock(&mosq->msgs_in.mutex);
return MOSQ_ERR_PROTOCOL;
}
DL_DELETE(mosq->msgs_in.inflight, cur);
2014-05-07 22:27:00 +00:00
*message = cur;
mosq->msgs_in.queue_len--;
2014-05-07 22:27:00 +00:00
found = true;
break;
}
}
pthread_mutex_unlock(&mosq->msgs_in.mutex);
2014-05-07 22:27:00 +00:00
if(found){
return MOSQ_ERR_SUCCESS;
}else{
return MOSQ_ERR_NOT_FOUND;
}
}
}
void message__retry_check(struct mosquitto *mosq)
2014-05-07 22:27:00 +00:00
{
struct mosquitto_message_all *msg;
2014-05-07 22:27:00 +00:00
time_t now = mosquitto_time();
assert(mosq);
#ifdef WITH_THREADING
pthread_mutex_lock(&mosq->msgs_out.mutex);
2014-05-07 22:27:00 +00:00
#endif
DL_FOREACH(mosq->msgs_out.inflight, msg){
switch(msg->state){
case mosq_ms_publish_qos1:
case mosq_ms_publish_qos2:
msg->timestamp = now;
msg->dup = true;
2020-10-17 00:23:08 +00:00
send__publish(mosq, (uint16_t)msg->msg.mid, msg->msg.topic, (uint32_t)msg->msg.payloadlen, msg->msg.payload, (uint8_t)msg->msg.qos, msg->msg.retain, msg->dup, msg->properties, NULL, 0);
break;
case mosq_ms_wait_for_pubrel:
msg->timestamp = now;
msg->dup = true;
2020-10-17 00:23:08 +00:00
send__pubrec(mosq, (uint16_t)msg->msg.mid, 0, NULL);
break;
case mosq_ms_resend_pubrel:
case mosq_ms_wait_for_pubcomp:
msg->timestamp = now;
msg->dup = true;
2020-10-17 00:23:08 +00:00
send__pubrel(mosq, (uint16_t)msg->msg.mid, NULL);
break;
default:
break;
2014-05-07 22:27:00 +00:00
}
}
#ifdef WITH_THREADING
pthread_mutex_unlock(&mosq->msgs_out.mutex);
2014-05-07 22:27:00 +00:00
#endif
}
void mosquitto_message_retry_set(struct mosquitto *mosq, unsigned int message_retry)
{
2019-03-13 14:11:50 +00:00
UNUSED(mosq);
UNUSED(message_retry);
2014-05-07 22:27:00 +00:00
}
int message__out_update(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_state state, int qos)
2014-05-07 22:27:00 +00:00
{
struct mosquitto_message_all *message, *tmp;
2014-05-07 22:27:00 +00:00
assert(mosq);
pthread_mutex_lock(&mosq->msgs_out.mutex);
DL_FOREACH_SAFE(mosq->msgs_out.inflight, message, tmp){
2014-05-07 22:27:00 +00:00
if(message->msg.mid == mid){
if(message->msg.qos != qos){
pthread_mutex_unlock(&mosq->msgs_out.mutex);
return MOSQ_ERR_PROTOCOL;
}
2014-05-07 22:27:00 +00:00
message->state = state;
message->timestamp = mosquitto_time();
pthread_mutex_unlock(&mosq->msgs_out.mutex);
2014-05-07 22:27:00 +00:00
return MOSQ_ERR_SUCCESS;
}
}
pthread_mutex_unlock(&mosq->msgs_out.mutex);
2014-05-07 22:27:00 +00:00
return MOSQ_ERR_NOT_FOUND;
}
int mosquitto_max_inflight_messages_set(struct mosquitto *mosq, unsigned int max_inflight_messages)
{
2020-10-17 00:23:08 +00:00
return mosquitto_int_option(mosq, MOSQ_OPT_SEND_MAXIMUM, (int)max_inflight_messages);
2014-05-07 22:27:00 +00:00
}