2016-03-06 22:30:17 +00:00
/*
2019-02-28 16:56:15 +00:00
Copyright ( c ) 2009 - 2019 Roger Light < roger @ atchoo . org >
2016-03-06 22:30:17 +00:00
All rights reserved . This program and the accompanying materials
are made available under the terms of the Eclipse Public License v1 .0
and Eclipse Distribution License v1 .0 which accompany this distribution .
The Eclipse Public License is available at
http : //www.eclipse.org/legal/epl-v10.html
and the Eclipse Distribution License is available at
http : //www.eclipse.org/org/documents/edl-v10.php.
Contributors :
Roger Light - initial implementation and documentation .
*/
2018-08-16 10:14:51 +00:00
# include "config.h"
2016-03-06 22:30:17 +00:00
# include <assert.h>
# include <stdio.h>
# include <string.h>
2016-07-08 09:10:04 +00:00
# include "mosquitto_broker_internal.h"
2019-01-18 21:30:34 +00:00
# include "alias_mosq.h"
2018-08-29 21:26:16 +00:00
# include "mqtt_protocol.h"
2016-03-06 22:30:17 +00:00
# include "memory_mosq.h"
# include "packet_mosq.h"
2018-10-02 15:27:40 +00:00
# include "property_mosq.h"
2016-03-06 22:30:17 +00:00
# include "read_handle.h"
# include "send_mosq.h"
# include "sys_tree.h"
# include "util_mosq.h"
int handle__publish ( struct mosquitto_db * db , struct mosquitto * context )
{
char * topic ;
mosquitto__payload_uhpa payload ;
uint32_t payloadlen ;
uint8_t dup , qos , retain ;
uint16_t mid = 0 ;
int rc = 0 ;
2019-03-04 07:36:35 +00:00
int rc2 ;
2016-03-06 22:30:17 +00:00
uint8_t header = context - > in_packet . command ;
int res = 0 ;
struct mosquitto_msg_store * stored = NULL ;
int len ;
2018-02-11 20:47:17 +00:00
int slen ;
2016-03-06 22:30:17 +00:00
char * topic_mount ;
2018-11-01 23:50:54 +00:00
mosquitto_property * properties = NULL ;
mosquitto_property * p , * p_prev ;
mosquitto_property * msg_properties = NULL , * msg_properties_last ;
2018-11-01 21:51:35 +00:00
uint32_t message_expiry_interval = 0 ;
2019-01-18 21:30:34 +00:00
uint16_t topic_alias = 0 ;
2019-03-03 22:00:30 +00:00
uint8_t reason_code = 0 ;
2018-10-04 16:18:57 +00:00
2016-03-06 22:30:17 +00:00
# ifdef WITH_BRIDGE
char * topic_temp ;
int i ;
struct mosquitto__bridge_topic * cur_topic ;
bool match ;
# endif
payload . ptr = NULL ;
dup = ( header & 0x08 ) > > 3 ;
qos = ( header & 0x06 ) > > 1 ;
if ( qos = = 3 ) {
log__printf ( NULL , MOSQ_LOG_INFO ,
" Invalid QoS in PUBLISH from %s, disconnecting. " , context - > id ) ;
2019-01-09 17:56:01 +00:00
return 1 ;
}
if ( qos > context - > maximum_qos ) {
log__printf ( NULL , MOSQ_LOG_INFO ,
" Too high QoS in PUBLISH from %s, disconnecting. " , context - > id ) ;
2016-03-06 22:30:17 +00:00
return 1 ;
}
retain = ( header & 0x01 ) ;
2018-11-22 18:15:02 +00:00
if ( retain & & db - > config - > retain_available = = false ) {
if ( context - > protocol = = mosq_p_mqtt5 ) {
send__disconnect ( context , MQTT_RC_RETAIN_NOT_SUPPORTED , NULL ) ;
}
return 1 ;
}
2018-02-11 20:47:17 +00:00
if ( packet__read_string ( & context - > in_packet , & topic , & slen ) ) return 1 ;
2019-01-18 21:30:34 +00:00
if ( ! slen & & context - > protocol ! = mosq_p_mqtt5 ) {
2016-03-06 22:30:17 +00:00
/* Invalid publish topic, disconnect client. */
mosquitto__free ( topic ) ;
2018-04-12 21:19:48 +00:00
return 1 ;
}
2016-03-06 22:30:17 +00:00
if ( qos > 0 ) {
if ( packet__read_uint16 ( & context - > in_packet , & mid ) ) {
mosquitto__free ( topic ) ;
return 1 ;
}
2018-10-25 11:12:57 +00:00
if ( mid = = 0 ) {
mosquitto__free ( topic ) ;
return MOSQ_ERR_PROTOCOL ;
}
2016-03-06 22:30:17 +00:00
}
2018-10-25 15:35:24 +00:00
/* Handle properties */
2018-10-02 15:27:40 +00:00
if ( context - > protocol = = mosq_p_mqtt5 ) {
2018-10-30 11:00:16 +00:00
rc = property__read_all ( CMD_PUBLISH , & context - > in_packet , & properties ) ;
2018-10-02 15:27:40 +00:00
if ( rc ) return rc ;
2018-10-25 15:35:24 +00:00
p = properties ;
p_prev = NULL ;
msg_properties = NULL ;
msg_properties_last = NULL ;
while ( p ) {
switch ( p - > identifier ) {
2018-12-06 09:29:25 +00:00
case MQTT_PROP_CONTENT_TYPE :
2018-10-30 11:00:16 +00:00
case MQTT_PROP_CORRELATION_DATA :
2018-12-06 09:29:25 +00:00
case MQTT_PROP_PAYLOAD_FORMAT_INDICATOR :
case MQTT_PROP_RESPONSE_TOPIC :
2018-10-30 11:00:16 +00:00
case MQTT_PROP_USER_PROPERTY :
2018-10-25 15:35:24 +00:00
if ( msg_properties ) {
msg_properties_last - > next = p ;
msg_properties_last = p ;
} else {
msg_properties = p ;
msg_properties_last = p ;
}
if ( p_prev ) {
p_prev - > next = p - > next ;
p = p_prev - > next ;
} else {
properties = p - > next ;
p = properties ;
}
2018-11-01 16:50:23 +00:00
msg_properties_last - > next = NULL ;
2018-10-25 15:35:24 +00:00
break ;
2018-10-30 11:00:16 +00:00
case MQTT_PROP_TOPIC_ALIAS :
2019-01-18 21:30:34 +00:00
topic_alias = p - > value . i16 ;
2018-11-01 16:50:23 +00:00
p_prev = p ;
2018-10-25 15:35:24 +00:00
p = p - > next ;
break ;
2018-10-30 11:00:16 +00:00
case MQTT_PROP_MESSAGE_EXPIRY_INTERVAL :
2018-11-01 21:51:35 +00:00
message_expiry_interval = p - > value . i32 ;
2018-11-01 16:50:23 +00:00
p_prev = p ;
2018-10-25 15:35:24 +00:00
p = p - > next ;
break ;
2018-10-30 11:00:16 +00:00
case MQTT_PROP_SUBSCRIPTION_IDENTIFIER :
2018-11-01 16:50:23 +00:00
p_prev = p ;
2018-10-25 15:35:24 +00:00
p = p - > next ;
break ;
default :
p = p - > next ;
break ;
}
}
2018-10-02 15:27:40 +00:00
}
2018-10-30 10:11:20 +00:00
mosquitto_property_free_all ( & properties ) ;
2018-10-02 15:27:40 +00:00
2019-01-18 21:30:34 +00:00
if ( topic & & topic_alias ) {
rc = alias__add ( context , topic , topic_alias ) ;
if ( rc ) return rc ;
} else if ( topic = = NULL & & topic_alias ) {
rc = alias__find ( context , & topic , topic_alias ) ;
2019-01-25 22:53:31 +00:00
if ( rc ) {
if ( context - > protocol = = mosq_p_mqtt5 ) {
send__disconnect ( context , MQTT_RC_TOPIC_ALIAS_INVALID , NULL ) ;
}
return rc ;
}
2019-01-18 21:30:34 +00:00
} else if ( topic = = NULL & & topic_alias = = 0 ) {
return MOSQ_ERR_PROTOCOL ;
}
2018-04-12 21:19:48 +00:00
if ( mosquitto_validate_utf8 ( topic , slen ) ! = MOSQ_ERR_SUCCESS ) {
2019-02-06 09:10:05 +00:00
log__printf ( NULL , MOSQ_LOG_INFO , " Client %s sent topic with invalid UTF-8, disconnecting. " , context - > id ) ;
2018-04-12 21:19:48 +00:00
mosquitto__free ( topic ) ;
return 1 ;
}
2019-01-18 21:30:34 +00:00
# ifdef WITH_BRIDGE
if ( context - > bridge & & context - > bridge - > topics & & context - > bridge - > topic_remapping ) {
for ( i = 0 ; i < context - > bridge - > topic_count ; i + + ) {
cur_topic = & context - > bridge - > topics [ i ] ;
if ( ( cur_topic - > direction = = bd_both | | cur_topic - > direction = = bd_in )
& & ( cur_topic - > remote_prefix | | cur_topic - > local_prefix ) ) {
/* Topic mapping required on this topic if the message matches */
rc = mosquitto_topic_matches_sub ( cur_topic - > remote_topic , topic , & match ) ;
if ( rc ) {
mosquitto__free ( topic ) ;
return rc ;
}
if ( match ) {
if ( cur_topic - > remote_prefix ) {
/* This prefix needs removing. */
if ( ! strncmp ( cur_topic - > remote_prefix , topic , strlen ( cur_topic - > remote_prefix ) ) ) {
topic_temp = mosquitto__strdup ( topic + strlen ( cur_topic - > remote_prefix ) ) ;
if ( ! topic_temp ) {
mosquitto__free ( topic ) ;
return MOSQ_ERR_NOMEM ;
}
mosquitto__free ( topic ) ;
topic = topic_temp ;
}
}
if ( cur_topic - > local_prefix ) {
/* This prefix needs adding. */
len = strlen ( topic ) + strlen ( cur_topic - > local_prefix ) + 1 ;
topic_temp = mosquitto__malloc ( len + 1 ) ;
if ( ! topic_temp ) {
mosquitto__free ( topic ) ;
return MOSQ_ERR_NOMEM ;
}
snprintf ( topic_temp , len , " %s%s " , cur_topic - > local_prefix , topic ) ;
topic_temp [ len ] = ' \0 ' ;
mosquitto__free ( topic ) ;
topic = topic_temp ;
}
break ;
}
}
}
}
# endif
if ( mosquitto_pub_topic_check ( topic ) ! = MOSQ_ERR_SUCCESS ) {
/* Invalid publish topic, just swallow it. */
mosquitto__free ( topic ) ;
return 1 ;
}
2016-03-06 22:30:17 +00:00
payloadlen = context - > in_packet . remaining_length - context - > in_packet . pos ;
G_PUB_BYTES_RECEIVED_INC ( payloadlen ) ;
if ( context - > listener & & context - > listener - > mount_point ) {
len = strlen ( context - > listener - > mount_point ) + strlen ( topic ) + 1 ;
topic_mount = mosquitto__malloc ( len + 1 ) ;
if ( ! topic_mount ) {
mosquitto__free ( topic ) ;
2018-11-01 21:51:35 +00:00
mosquitto_property_free_all ( & msg_properties ) ;
2016-03-06 22:30:17 +00:00
return MOSQ_ERR_NOMEM ;
}
snprintf ( topic_mount , len , " %s%s " , context - > listener - > mount_point , topic ) ;
topic_mount [ len ] = ' \0 ' ;
mosquitto__free ( topic ) ;
topic = topic_mount ;
}
if ( payloadlen ) {
if ( db - > config - > message_size_limit & & payloadlen > db - > config - > message_size_limit ) {
log__printf ( NULL , MOSQ_LOG_DEBUG , " Dropped too large PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes)) " , context - > id , dup , qos , retain , mid , topic , ( long ) payloadlen ) ;
2019-03-03 22:00:30 +00:00
reason_code = MQTT_RC_IMPLEMENTATION_SPECIFIC ;
2016-03-06 22:30:17 +00:00
goto process_bad_message ;
}
2019-02-26 12:39:33 +00:00
if ( UHPA_ALLOC ( payload , payloadlen ) = = 0 ) {
2016-03-06 22:30:17 +00:00
mosquitto__free ( topic ) ;
2018-11-01 21:51:35 +00:00
mosquitto_property_free_all ( & msg_properties ) ;
2016-03-06 22:30:17 +00:00
return MOSQ_ERR_NOMEM ;
}
2019-02-26 12:39:33 +00:00
2016-03-06 22:30:17 +00:00
if ( packet__read_bytes ( & context - > in_packet , UHPA_ACCESS ( payload , payloadlen ) , payloadlen ) ) {
mosquitto__free ( topic ) ;
UHPA_FREE ( payload , payloadlen ) ;
2018-11-01 21:51:35 +00:00
mosquitto_property_free_all ( & msg_properties ) ;
2016-03-06 22:30:17 +00:00
return 1 ;
}
}
/* Check for topic access */
2018-05-22 14:51:26 +00:00
rc = mosquitto_acl_check ( db , context , topic , payloadlen , UHPA_ACCESS ( payload , payloadlen ) , qos , retain , MOSQ_ACL_WRITE ) ;
2016-03-06 22:30:17 +00:00
if ( rc = = MOSQ_ERR_ACL_DENIED ) {
log__printf ( NULL , MOSQ_LOG_DEBUG , " Denied PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes)) " , context - > id , dup , qos , retain , mid , topic , ( long ) payloadlen ) ;
2019-03-03 22:00:30 +00:00
reason_code = MQTT_RC_NOT_AUTHORIZED ;
2016-03-06 22:30:17 +00:00
goto process_bad_message ;
} else if ( rc ! = MOSQ_ERR_SUCCESS ) {
mosquitto__free ( topic ) ;
UHPA_FREE ( payload , payloadlen ) ;
2018-11-01 21:51:35 +00:00
mosquitto_property_free_all ( & msg_properties ) ;
2016-03-06 22:30:17 +00:00
return rc ;
}
log__printf ( NULL , MOSQ_LOG_DEBUG , " Received PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes)) " , context - > id , dup , qos , retain , mid , topic , ( long ) payloadlen ) ;
if ( qos > 0 ) {
db__message_store_find ( context , mid , & stored ) ;
}
if ( ! stored ) {
dup = 0 ;
2019-02-12 17:05:42 +00:00
if ( db__message_store ( db , context , mid , topic , qos , payloadlen , & payload , retain , & stored , message_expiry_interval , msg_properties , 0 ) ) {
2018-11-01 21:51:35 +00:00
mosquitto_property_free_all ( & msg_properties ) ;
2016-03-06 22:30:17 +00:00
return 1 ;
}
2018-10-25 15:35:24 +00:00
msg_properties = NULL ; /* Now belongs to db__message_store() */
2016-03-06 22:30:17 +00:00
} else {
mosquitto__free ( topic ) ;
topic = stored - > topic ;
dup = 1 ;
2018-11-01 21:51:35 +00:00
mosquitto_property_free_all ( & msg_properties ) ;
2019-02-25 22:28:51 +00:00
UHPA_FREE ( payload , payloadlen ) ;
2016-03-06 22:30:17 +00:00
}
switch ( qos ) {
case 0 :
2019-03-04 07:36:35 +00:00
rc2 = sub__messages_queue ( db , context - > id , topic , qos , retain , & stored ) ;
if ( rc2 > 0 ) rc = 1 ;
2016-03-06 22:30:17 +00:00
break ;
case 1 :
2019-03-04 07:36:35 +00:00
rc2 = sub__messages_queue ( db , context - > id , topic , qos , retain , & stored ) ;
if ( rc2 = = MOSQ_ERR_SUCCESS | | context - > protocol ! = mosq_p_mqtt5 ) {
if ( send__puback ( context , mid , 0 ) ) rc = 1 ;
} else if ( rc2 = = MOSQ_ERR_NO_SUBSCRIBERS ) {
if ( send__puback ( context , mid , MQTT_RC_NO_MATCHING_SUBSCRIBERS ) ) rc = 1 ;
} else {
rc = rc2 ;
}
2016-03-06 22:30:17 +00:00
break ;
case 2 :
if ( ! dup ) {
2018-12-20 15:32:43 +00:00
res = db__message_insert ( db , context , mid , mosq_md_in , qos , retain , stored , NULL ) ;
2016-03-06 22:30:17 +00:00
} else {
res = 0 ;
}
/* db__message_insert() returns 2 to indicate dropped message
* due to queue . This isn ' t an error so don ' t disconnect them . */
if ( ! res ) {
2019-03-03 22:00:30 +00:00
if ( send__pubrec ( context , mid , 0 ) ) rc = 1 ;
2016-03-06 22:30:17 +00:00
} else if ( res = = 1 ) {
rc = 1 ;
}
break ;
}
return rc ;
process_bad_message :
mosquitto__free ( topic ) ;
UHPA_FREE ( payload , payloadlen ) ;
switch ( qos ) {
case 0 :
return MOSQ_ERR_SUCCESS ;
case 1 :
2019-03-03 22:00:30 +00:00
return send__puback ( context , mid , reason_code ) ;
2016-03-06 22:30:17 +00:00
case 2 :
2019-03-03 22:00:30 +00:00
if ( context - > protocol = = mosq_p_mqtt5 ) {
return send__pubrec ( context , mid , reason_code ) ;
2016-03-06 22:30:17 +00:00
} else {
2019-03-03 22:00:30 +00:00
db__message_store_find ( context , mid , & stored ) ;
if ( ! stored ) {
if ( db__message_store ( db , context , mid , NULL , qos , 0 , NULL , false , & stored , 0 , NULL , 0 ) ) {
return 1 ;
}
res = db__message_insert ( db , context , mid , mosq_md_in , qos , false , stored , NULL ) ;
} else {
res = 0 ;
}
if ( ! res ) {
res = send__pubrec ( context , mid , 0 ) ;
}
2016-03-06 22:30:17 +00:00
}
return res ;
}
return 1 ;
}