2014-05-07 22:27:00 +00:00
/*
2015-04-19 21:10:59 +00:00
Copyright ( c ) 2009 - 2015 Roger Light < roger @ atchoo . org >
2014-05-07 22:27:00 +00:00
All rights reserved . This program and the accompanying materials
are made available under the terms of the Eclipse Public License v1 .0
and Eclipse Distribution License v1 .0 which accompany this distribution .
The Eclipse Public License is available at
http : //www.eclipse.org/legal/epl-v10.html
and the Eclipse Distribution License is available at
http : //www.eclipse.org/org/documents/edl-v10.php.
Contributors :
Roger Light - initial implementation and documentation .
*/
# include <assert.h>
# include <stdio.h>
# include <string.h>
2015-04-29 20:37:47 +00:00
# include "config.h"
2014-05-07 22:27:00 +00:00
2015-04-29 20:37:47 +00:00
# include "mosquitto_broker.h"
# include "mqtt3_protocol.h"
# include "memory_mosq.h"
# include "packet_mosq.h"
# include "read_handle.h"
# include "send_mosq.h"
2015-05-16 11:25:35 +00:00
# include "sys_tree.h"
2015-04-29 20:37:47 +00:00
# include "util_mosq.h"
2014-05-07 22:27:00 +00:00
int mqtt3_packet_handle ( struct mosquitto_db * db , struct mosquitto * context )
{
if ( ! context ) return MOSQ_ERR_INVAL ;
switch ( ( context - > in_packet . command ) & 0xF0 ) {
case PINGREQ :
2015-05-16 17:43:06 +00:00
return handle__pingreq ( context ) ;
2014-05-07 22:27:00 +00:00
case PINGRESP :
2015-05-16 17:43:06 +00:00
return handle__pingresp ( context ) ;
2014-05-07 22:27:00 +00:00
case PUBACK :
2015-05-16 17:43:06 +00:00
return handle__pubackcomp ( db , context , " PUBACK " ) ;
2014-05-07 22:27:00 +00:00
case PUBCOMP :
2015-05-16 17:43:06 +00:00
return handle__pubackcomp ( db , context , " PUBCOMP " ) ;
2014-05-07 22:27:00 +00:00
case PUBLISH :
2015-05-16 17:43:06 +00:00
return handle__publish ( db , context ) ;
2014-05-07 22:27:00 +00:00
case PUBREC :
2015-05-16 17:43:06 +00:00
return handle__pubrec ( context ) ;
2014-05-07 22:27:00 +00:00
case PUBREL :
2015-05-16 17:43:06 +00:00
return handle__pubrel ( db , context ) ;
2014-05-07 22:27:00 +00:00
case CONNECT :
2015-05-16 17:43:06 +00:00
return handle__connect ( db , context ) ;
2014-05-07 22:27:00 +00:00
case DISCONNECT :
2015-05-16 17:43:06 +00:00
return handle__disconnect ( db , context ) ;
2014-05-07 22:27:00 +00:00
case SUBSCRIBE :
2015-05-16 17:43:06 +00:00
return handle__subscribe ( db , context ) ;
2014-05-07 22:27:00 +00:00
case UNSUBSCRIBE :
2015-05-16 17:43:06 +00:00
return handle__unsubscribe ( db , context ) ;
2014-05-07 22:27:00 +00:00
# ifdef WITH_BRIDGE
case CONNACK :
2015-05-16 17:43:06 +00:00
return handle__connack ( db , context ) ;
2014-05-07 22:27:00 +00:00
case SUBACK :
2015-05-16 17:43:06 +00:00
return handle__suback ( context ) ;
2014-05-07 22:27:00 +00:00
case UNSUBACK :
2015-05-16 17:43:06 +00:00
return handle__unsuback ( context ) ;
2014-05-07 22:27:00 +00:00
# endif
default :
/* If we don't recognise the command, return an error straight away. */
return MOSQ_ERR_PROTOCOL ;
}
}
2015-05-16 17:43:06 +00:00
int handle__publish ( struct mosquitto_db * db , struct mosquitto * context )
2014-05-07 22:27:00 +00:00
{
char * topic ;
void * payload = NULL ;
uint32_t payloadlen ;
uint8_t dup , qos , retain ;
uint16_t mid = 0 ;
int rc = 0 ;
uint8_t header = context - > in_packet . command ;
int res = 0 ;
struct mosquitto_msg_store * stored = NULL ;
int len ;
char * topic_mount ;
# ifdef WITH_BRIDGE
char * topic_temp ;
int i ;
2015-05-16 18:03:12 +00:00
struct mosquitto__bridge_topic * cur_topic ;
2014-05-07 22:27:00 +00:00
bool match ;
# endif
dup = ( header & 0x08 ) > > 3 ;
qos = ( header & 0x06 ) > > 1 ;
if ( qos = = 3 ) {
2015-04-19 21:10:59 +00:00
mosquitto__log_printf ( NULL , MOSQ_LOG_INFO ,
2014-05-07 22:27:00 +00:00
" Invalid QoS in PUBLISH from %s, disconnecting. " , context - > id ) ;
return 1 ;
}
retain = ( header & 0x01 ) ;
2015-05-16 13:16:40 +00:00
if ( packet__read_string ( & context - > in_packet , & topic ) ) return 1 ;
2014-05-07 22:27:00 +00:00
if ( strlen ( topic ) = = 0 ) {
/* Invalid publish topic, disconnect client. */
2015-04-19 21:10:59 +00:00
mosquitto__free ( topic ) ;
2014-05-07 22:27:00 +00:00
return 1 ;
}
# ifdef WITH_BRIDGE
if ( context - > bridge & & context - > bridge - > topics & & context - > bridge - > topic_remapping ) {
for ( i = 0 ; i < context - > bridge - > topic_count ; i + + ) {
cur_topic = & context - > bridge - > topics [ i ] ;
if ( ( cur_topic - > direction = = bd_both | | cur_topic - > direction = = bd_in )
& & ( cur_topic - > remote_prefix | | cur_topic - > local_prefix ) ) {
/* Topic mapping required on this topic if the message matches */
rc = mosquitto_topic_matches_sub ( cur_topic - > remote_topic , topic , & match ) ;
if ( rc ) {
2015-04-19 21:10:59 +00:00
mosquitto__free ( topic ) ;
2014-05-07 22:27:00 +00:00
return rc ;
}
if ( match ) {
if ( cur_topic - > remote_prefix ) {
/* This prefix needs removing. */
if ( ! strncmp ( cur_topic - > remote_prefix , topic , strlen ( cur_topic - > remote_prefix ) ) ) {
2015-04-19 21:10:59 +00:00
topic_temp = mosquitto__strdup ( topic + strlen ( cur_topic - > remote_prefix ) ) ;
2014-05-07 22:27:00 +00:00
if ( ! topic_temp ) {
2015-04-19 21:10:59 +00:00
mosquitto__free ( topic ) ;
2014-05-07 22:27:00 +00:00
return MOSQ_ERR_NOMEM ;
}
2015-04-19 21:10:59 +00:00
mosquitto__free ( topic ) ;
2014-05-07 22:27:00 +00:00
topic = topic_temp ;
}
}
if ( cur_topic - > local_prefix ) {
/* This prefix needs adding. */
len = strlen ( topic ) + strlen ( cur_topic - > local_prefix ) + 1 ;
2015-04-19 21:10:59 +00:00
topic_temp = mosquitto__malloc ( len + 1 ) ;
2014-05-07 22:27:00 +00:00
if ( ! topic_temp ) {
2015-04-19 21:10:59 +00:00
mosquitto__free ( topic ) ;
2014-05-07 22:27:00 +00:00
return MOSQ_ERR_NOMEM ;
}
snprintf ( topic_temp , len , " %s%s " , cur_topic - > local_prefix , topic ) ;
2014-11-17 21:58:53 +00:00
topic_temp [ len ] = ' \0 ' ;
2015-04-19 21:10:59 +00:00
mosquitto__free ( topic ) ;
2014-05-07 22:27:00 +00:00
topic = topic_temp ;
}
break ;
}
}
}
}
# endif
2014-09-10 14:57:20 +00:00
if ( mosquitto_pub_topic_check ( topic ) ! = MOSQ_ERR_SUCCESS ) {
2014-05-07 22:27:00 +00:00
/* Invalid publish topic, just swallow it. */
2015-04-19 21:10:59 +00:00
mosquitto__free ( topic ) ;
2014-05-07 22:27:00 +00:00
return 1 ;
}
if ( qos > 0 ) {
2015-05-16 13:16:40 +00:00
if ( packet__read_uint16 ( & context - > in_packet , & mid ) ) {
2015-04-19 21:10:59 +00:00
mosquitto__free ( topic ) ;
2014-05-07 22:27:00 +00:00
return 1 ;
}
}
payloadlen = context - > in_packet . remaining_length - context - > in_packet . pos ;
2015-05-16 11:25:35 +00:00
G_PUB_BYTES_RECEIVED_INC ( payloadlen ) ;
2014-05-07 22:27:00 +00:00
if ( context - > listener & & context - > listener - > mount_point ) {
len = strlen ( context - > listener - > mount_point ) + strlen ( topic ) + 1 ;
2015-04-19 21:10:59 +00:00
topic_mount = mosquitto__malloc ( len + 1 ) ;
2014-05-07 22:27:00 +00:00
if ( ! topic_mount ) {
2015-04-19 21:10:59 +00:00
mosquitto__free ( topic ) ;
2014-05-07 22:27:00 +00:00
return MOSQ_ERR_NOMEM ;
}
snprintf ( topic_mount , len , " %s%s " , context - > listener - > mount_point , topic ) ;
2014-11-17 21:58:53 +00:00
topic_mount [ len ] = ' \0 ' ;
2015-04-19 21:10:59 +00:00
mosquitto__free ( topic ) ;
2014-05-07 22:27:00 +00:00
topic = topic_mount ;
}
if ( payloadlen ) {
if ( db - > config - > message_size_limit & & payloadlen > db - > config - > message_size_limit ) {
2015-04-19 21:10:59 +00:00
mosquitto__log_printf ( NULL , MOSQ_LOG_DEBUG , " Dropped too large PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes)) " , context - > id , dup , qos , retain , mid , topic , ( long ) payloadlen ) ;
2014-05-07 22:27:00 +00:00
goto process_bad_message ;
}
2015-04-19 21:10:59 +00:00
payload = mosquitto__calloc ( payloadlen + 1 , 1 ) ;
2014-05-07 22:27:00 +00:00
if ( ! payload ) {
2015-04-19 21:10:59 +00:00
mosquitto__free ( topic ) ;
2014-05-07 22:27:00 +00:00
return 1 ;
}
2015-05-16 13:16:40 +00:00
if ( packet__read_bytes ( & context - > in_packet , payload , payloadlen ) ) {
2015-04-19 21:10:59 +00:00
mosquitto__free ( topic ) ;
mosquitto__free ( payload ) ;
2014-05-07 22:27:00 +00:00
return 1 ;
}
}
/* Check for topic access */
rc = mosquitto_acl_check ( db , context , topic , MOSQ_ACL_WRITE ) ;
if ( rc = = MOSQ_ERR_ACL_DENIED ) {
2015-04-19 21:10:59 +00:00
mosquitto__log_printf ( NULL , MOSQ_LOG_DEBUG , " Denied PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes)) " , context - > id , dup , qos , retain , mid , topic , ( long ) payloadlen ) ;
2014-05-07 22:27:00 +00:00
goto process_bad_message ;
} else if ( rc ! = MOSQ_ERR_SUCCESS ) {
2015-04-19 21:10:59 +00:00
mosquitto__free ( topic ) ;
if ( payload ) mosquitto__free ( payload ) ;
2014-05-07 22:27:00 +00:00
return rc ;
}
2015-04-19 21:10:59 +00:00
mosquitto__log_printf ( NULL , MOSQ_LOG_DEBUG , " Received PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes)) " , context - > id , dup , qos , retain , mid , topic , ( long ) payloadlen ) ;
2014-05-07 22:27:00 +00:00
if ( qos > 0 ) {
2015-05-16 14:24:24 +00:00
db__message_store_find ( context , mid , & stored ) ;
2014-05-07 22:27:00 +00:00
}
if ( ! stored ) {
dup = 0 ;
2015-05-16 14:24:24 +00:00
if ( db__message_store ( db , context - > id , mid , topic , qos , payloadlen , payload , retain , & stored , 0 ) ) {
2015-04-19 21:10:59 +00:00
mosquitto__free ( topic ) ;
if ( payload ) mosquitto__free ( payload ) ;
2014-05-07 22:27:00 +00:00
return 1 ;
}
} else {
dup = 1 ;
}
switch ( qos ) {
case 0 :
2014-11-20 21:13:21 +00:00
if ( mqtt3_db_messages_queue ( db , context - > id , topic , qos , retain , & stored ) ) rc = 1 ;
2014-05-07 22:27:00 +00:00
break ;
case 1 :
2014-11-20 21:13:21 +00:00
if ( mqtt3_db_messages_queue ( db , context - > id , topic , qos , retain , & stored ) ) rc = 1 ;
2015-05-16 14:24:24 +00:00
if ( send__puback ( context , mid ) ) rc = 1 ;
2014-05-07 22:27:00 +00:00
break ;
case 2 :
if ( ! dup ) {
2015-05-16 14:24:24 +00:00
res = db__message_insert ( db , context , mid , mosq_md_in , qos , retain , stored ) ;
2014-05-07 22:27:00 +00:00
} else {
res = 0 ;
}
2015-05-16 14:24:24 +00:00
/* db__message_insert() returns 2 to indicate dropped message
2014-05-07 22:27:00 +00:00
* due to queue . This isn ' t an error so don ' t disconnect them . */
if ( ! res ) {
2015-05-16 14:24:24 +00:00
if ( send__pubrec ( context , mid ) ) rc = 1 ;
2014-05-07 22:27:00 +00:00
} else if ( res = = 1 ) {
rc = 1 ;
}
break ;
}
2015-04-19 21:10:59 +00:00
mosquitto__free ( topic ) ;
if ( payload ) mosquitto__free ( payload ) ;
2014-05-07 22:27:00 +00:00
return rc ;
process_bad_message :
2015-04-19 21:10:59 +00:00
mosquitto__free ( topic ) ;
if ( payload ) mosquitto__free ( payload ) ;
2014-05-07 22:27:00 +00:00
switch ( qos ) {
case 0 :
return MOSQ_ERR_SUCCESS ;
case 1 :
2015-05-16 14:24:24 +00:00
return send__puback ( context , mid ) ;
2014-05-07 22:27:00 +00:00
case 2 :
2015-05-16 14:24:24 +00:00
db__message_store_find ( context , mid , & stored ) ;
2014-05-07 22:27:00 +00:00
if ( ! stored ) {
2015-05-16 14:24:24 +00:00
if ( db__message_store ( db , context - > id , mid , NULL , qos , 0 , NULL , false , & stored , 0 ) ) {
2014-05-07 22:27:00 +00:00
return 1 ;
}
2015-05-16 14:24:24 +00:00
res = db__message_insert ( db , context , mid , mosq_md_in , qos , false , stored ) ;
2014-05-07 22:27:00 +00:00
} else {
res = 0 ;
}
if ( ! res ) {
2015-05-16 14:24:24 +00:00
res = send__pubrec ( context , mid ) ;
2014-05-07 22:27:00 +00:00
}
return res ;
}
return 1 ;
}