Merge branch 'fixes' into develop

This commit is contained in:
Roger A. Light 2018-02-13 14:16:47 +00:00
commit 81cb7ab547
25 changed files with 583 additions and 142 deletions

View File

@ -1,4 +1,4 @@
1.5 - 2017xxxx
1.5 - 2018xxxx
==============
Broker:
@ -45,10 +45,25 @@ Broker:
- Remove all build timestamp information including $SYS/broker/timestamp.
Close #651.
- Correctly handle incoming strings that contain a NULL byte. Closes #693.
- Use constant time memcmp for password comparisons.
- Fix incorrect PSK key being used if it had leading zeroes.
- Fix memory leak if a client provided a username/password for a listener with
use_identity_as_username configured.
- Fix use_identity_as_username not working on websockets clients.
- Don't crash if an auth plugin returns MOSQ_ERR_AUTH for a username check on
a websockets client. Closes #490.
- Fix 08-ssl-bridge.py test when using async dns lookups. Closes #507.
- Lines in the config file are no longer limited to 1024 characters long.
Closes #652.
- Fix $SYS counters of messages and bytes sent when message is sent over
a Websockets. Closes #250.
- Fix upgrade_outgoing_qos for retained message. Closes #534.
- Fix CONNACK message not being sent for unauthorised connect on websockets.
Closes #8.
Client library:
- Outgoing messages with QoS>1 are no longer retried after a timeout period.
Messages will be retried when a client reconnects.
Messages will be retried when a client reconnects.
- DNS-SRV support is now disabled by default.
- Add mosquitto_subscribe_simple() This is a helper function to make
retrieving messages from a broker very straightforward. Examples of its use
@ -66,8 +81,14 @@ Client library:
- Add mosquitto_pub_topic_check2(), mosquitto_sub_topic_check2(), and
mosquitto_topic_matches_sub2() which are identical to the similarly named
functions but also take length arguments.
- Fix incorrect PSK key being used if it had leading zeroes.
- Initialise "result" variable as soon as possible in
mosquitto_topic_matches_sub. Closes #654.
- No need to close socket again if setting non-blocking failed. Closes #649.
- Fix mosquitto_topic_matches_sub() not correctly matching foo/bar against
foo/+/#. Closes #670.
Client:
Clients:
- Add -F to mosquitto_sub to allow the user to choose the output format.
- Add -U to mosquitto_sub for unsubscribing from topics.
- Add -c (clean session) to mosquitto_pub.
@ -75,12 +96,14 @@ Client:
messages.
- Connections now default to using MQTT v3.1.1.
- Default to using port 8883 when using TLS.
- Correctly handle empty files with "mosquitto_pub -l". Closes #676.
Build:
- Add WITH_STRIP option (defaulting to "no") that when set to "yes" will strip
executables and shared libraries when installing.
- Add WITH_STATIC_LIBRARIES (defaulting to "no") that when set to "yes" will
build and install static versions of the client libraries.
- Don't run TLS-PSK tests if TLS-PSK disabled at compile time. Closes #636.
1.4.14 - 20170710
@ -621,7 +644,7 @@ Clients:
Broker:
- Don't always attempt to call read() for SSL clients, irrespective of whether
they were ready to read or not. Reduces syscalls significantly.
- Possible memory leak fixes.
- Possible memory leak fixes.
- Further fix for bug #1226040: multiple retained messages being delivered for
subscriptions ending in #.
- Fix bridge reconnections when using multiple bridge addresses.
@ -1087,7 +1110,7 @@ Client library:
- C++ lib_init(), lib_version() and lib_cleanup() are now in the mosqpp
namespace directly, not mosquittopp class members.
- The Python library is now written in pure Python and so no longer depends on
libmosquitto.
libmosquitto.
- The Python library includes SSL/TLS support.
- The Python library should now be compatible with Python 3.
@ -1223,7 +1246,7 @@ Other:
use a username/password.
- Add mosquitto_reconnect() to the client library.
- Add option for compiling with liberal protocol compliance support (enabled
by default).
by default).
- Fix problems with clients reconnecting and old messages remaining in the
message store.
- Display both ip and client id in the log message when a client connects.
@ -1309,7 +1332,7 @@ Other:
- Implement support for the password_file option and accompanying
authentication requirements in the broker.
- Implement topic Access Control Lists.
- Implement topic Access Control Lists.
- mosquitto_will_set() and mosquitto_publish() now return
MOSQ_ERR_PAYLOAD_SIZE if the payload is too large (>268,435,455 bytes).
- Bridge support can now be disabled at compile time.
@ -1384,7 +1407,7 @@ Other:
- Don't send client will if it is disconnected for exceeding its keepalive
timer.
- Fix client library unsubscribe function incorrectly sending a SUBSCRIBE
command when it should be UNSUBSCRIBE.
command when it should be UNSUBSCRIBE.
- Fix max_inflight_messages and max_queued_messages operation. These
parameters now apply only to QoS 1 and 2 messages and are used regardless of
the client connection state.
@ -1527,7 +1550,7 @@ Other:
Fixes bug #529990.
- Treat subscriptions with a trailing slash correctly. This should fix bugs
#530369 and #530099.
0.5.1 - 20100227
================
@ -1602,7 +1625,7 @@ Other:
- Set SO_REUSEADDR on the listening socket so restart is much quicker.
- Added support for tracking current heap memory usage - this is published on
the topic "$SYS/broker/heap/current size"
- Added code for logging to stderr, stdout, syslog and topics.
- Added code for logging to stderr, stdout, syslog and topics.
- Added logging to numerous places - still plenty of scope for more.
0.2 - 20091204

View File

@ -34,6 +34,7 @@ Contributors:
#define STATUS_CONNECTING 0
#define STATUS_CONNACK_RECVD 1
#define STATUS_WAITING 2
#define STATUS_DISCONNECTING 3
/* Global variables for use in callbacks. See sub_client.c for an example of
* using a struct to hold variables for use in callbacks. */
@ -414,8 +415,15 @@ int main(int argc, char *argv[])
}
}
if(feof(stdin)){
last_mid = mid_sent;
status = STATUS_WAITING;
if(last_mid == -1){
/* Empty file */
mosquitto_disconnect(mosq);
disconnect_sent = true;
status = STATUS_DISCONNECTING;
}else{
last_mid = mid_sent;
status = STATUS_WAITING;
}
}
}else if(status == STATUS_WAITING){
if(last_mid_sent == last_mid && disconnect_sent == false){

View File

@ -43,8 +43,13 @@ void message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_
bind[0].buffer_type = MYSQL_TYPE_STRING;
bind[0].buffer = message->topic;
bind[0].buffer_length = strlen(message->topic);
// Note: payload is normally a binary blob and could contains
// NULL byte. This sample does not handle it and assume payload is a
// string.
bind[1].buffer_type = MYSQL_TYPE_STRING;
bind[1].buffer = message->payload;
bind[1].buffer_length = message->payloadlen;
mysql_stmt_bind_param(stmt, bind);
mysql_stmt_execute(stmt);

View File

@ -1177,15 +1177,20 @@ int mosquitto_loop_write(struct mosquitto *mosq, int max_packets)
bool mosquitto_want_write(struct mosquitto *mosq)
{
bool result = false;
if(mosq->out_packet || mosq->current_out_packet){
return true;
#ifdef WITH_TLS
}else if(mosq->ssl && mosq->want_write){
return true;
#endif
}else{
return false;
result = true;
}
#ifdef WITH_TLS
if(mosq->ssl){
if (mosq->want_write) {
result = true;
}else if(mosq->want_connect){
result = false;
}
}
#endif
return result;
}
int mosquitto_opts_set(struct mosquitto *mosq, enum mosq_opt_t option, void *value)

View File

@ -4,12 +4,12 @@ Copyright (c) 2009-2016 Roger Light <roger@atchoo.org>
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License v1.0
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
http://www.eclipse.org/legal/epl-v10.html
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
Contributors:
Roger Light - initial implementation and documentation.
*/
@ -143,33 +143,39 @@ int net__socket_close(struct mosquitto *mosq)
assert(mosq);
#ifdef WITH_TLS
if(mosq->ssl){
SSL_shutdown(mosq->ssl);
SSL_free(mosq->ssl);
mosq->ssl = NULL;
}
if(mosq->ssl_ctx){
SSL_CTX_free(mosq->ssl_ctx);
mosq->ssl_ctx = NULL;
#ifdef WITH_WEBSOCKETS
if(!mosq->wsi)
#endif
{
if(mosq->ssl){
SSL_shutdown(mosq->ssl);
SSL_free(mosq->ssl);
mosq->ssl = NULL;
}
if(mosq->ssl_ctx){
SSL_CTX_free(mosq->ssl_ctx);
mosq->ssl_ctx = NULL;
}
}
#endif
if((int)mosq->sock >= 0){
#ifdef WITH_BROKER
HASH_DELETE(hh_sock, db->contexts_by_sock, mosq);
#endif
rc = COMPAT_CLOSE(mosq->sock);
mosq->sock = INVALID_SOCKET;
#ifdef WITH_WEBSOCKETS
}else if(mosq->sock == WEBSOCKET_CLIENT){
if(mosq->wsi)
{
if(mosq->state != mosq_cs_disconnecting){
mosq->state = mosq_cs_disconnect_ws;
}
if(mosq->wsi){
libwebsocket_callback_on_writable(mosq->ws_context, mosq->wsi);
}
mosq->sock = INVALID_SOCKET;
libwebsocket_callback_on_writable(mosq->ws_context, mosq->wsi);
}else
#endif
{
if((int)mosq->sock >= 0){
#ifdef WITH_BROKER
HASH_DELETE(hh_sock, db->contexts_by_sock, mosq);
#endif
rc = COMPAT_CLOSE(mosq->sock);
mosq->sock = INVALID_SOCKET;
}
}
#ifdef WITH_BROKER
@ -253,7 +259,6 @@ int net__try_connect_step2(struct mosquitto *mosq, uint16_t port, mosq_sock_t *s
/* Set non-blocking */
if(net__socket_nonblock(*sock)){
COMPAT_CLOSE(*sock);
continue;
}
@ -268,7 +273,6 @@ int net__try_connect_step2(struct mosquitto *mosq, uint16_t port, mosq_sock_t *s
/* Set non-blocking */
if(net__socket_nonblock(*sock)){
COMPAT_CLOSE(*sock);
continue;
}
break;
@ -353,7 +357,6 @@ int net__try_connect(struct mosquitto *mosq, const char *host, uint16_t port, mo
if(!blocking){
/* Set non-blocking */
if(net__socket_nonblock(*sock)){
COMPAT_CLOSE(*sock);
continue;
}
}
@ -370,7 +373,6 @@ int net__try_connect(struct mosquitto *mosq, const char *host, uint16_t port, mo
if(blocking){
/* Set non-blocking */
if(net__socket_nonblock(*sock)){
COMPAT_CLOSE(*sock);
continue;
}
}
@ -801,7 +803,6 @@ int net__socketpair(mosq_sock_t *pairR, mosq_sock_t *pairW)
continue;
}
if(net__socket_nonblock(spR)){
COMPAT_CLOSE(spR);
COMPAT_CLOSE(listensock);
continue;
}
@ -829,7 +830,6 @@ int net__socketpair(mosq_sock_t *pairR, mosq_sock_t *pairW)
if(net__socket_nonblock(spW)){
COMPAT_CLOSE(spR);
COMPAT_CLOSE(spW);
COMPAT_CLOSE(listensock);
continue;
}
@ -847,13 +847,11 @@ int net__socketpair(mosq_sock_t *pairR, mosq_sock_t *pairW)
return MOSQ_ERR_ERRNO;
}
if(net__socket_nonblock(sv[0])){
COMPAT_CLOSE(sv[0]);
COMPAT_CLOSE(sv[1]);
return MOSQ_ERR_ERRNO;
}
if(net__socket_nonblock(sv[1])){
COMPAT_CLOSE(sv[0]);
COMPAT_CLOSE(sv[1]);
return MOSQ_ERR_ERRNO;
}
*pairR = sv[0];

View File

@ -4,12 +4,12 @@ Copyright (c) 2009-2016 Roger Light <roger@atchoo.org>
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License v1.0
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
http://www.eclipse.org/legal/epl-v10.html
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
Contributors:
Roger Light - initial implementation and documentation.
*/
@ -269,7 +269,11 @@ int packet__write(struct mosquitto *mosq)
}
pthread_mutex_unlock(&mosq->out_packet_mutex);
#if defined(WITH_TLS) && !defined(WITH_BROKER)
if((mosq->state == mosq_cs_connect_pending) || mosq->want_connect){
#else
if(mosq->state == mosq_cs_connect_pending){
#endif
pthread_mutex_unlock(&mosq->current_out_packet_mutex);
return MOSQ_ERR_SUCCESS;
}
@ -315,7 +319,7 @@ int packet__write(struct mosquitto *mosq)
}
pthread_mutex_unlock(&mosq->callback_mutex);
}else if(((packet->command)&0xF0) == DISCONNECT){
/* FIXME what cleanup needs doing here?
/* FIXME what cleanup needs doing here?
* incoming/outgoing messages? */
net__socket_close(mosq);
@ -533,4 +537,3 @@ int packet__read(struct mosquitto *mosq)
pthread_mutex_unlock(&mosq->msgtime_mutex);
return rc;
}

View File

@ -4,12 +4,12 @@ Copyright (c) 2009-2016 Roger Light <roger@atchoo.org>
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License v1.0
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
http://www.eclipse.org/legal/epl-v10.html
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
Contributors:
Roger Light - initial implementation and documentation.
*/
@ -128,7 +128,7 @@ uint16_t mosquitto__mid_generate(struct mosquitto *mosq)
if(mosq->last_mid == 0) mosq->last_mid++;
mid = mosq->last_mid;
pthread_mutex_unlock(&mosq->mid_mutex);
return mid;
}
@ -223,7 +223,13 @@ int mosquitto_sub_topic_check2(const char *str, size_t len)
int mosquitto_topic_matches_sub(const char *sub, const char *topic, bool *result)
{
int slen, tlen;
if(!sub || !topic || !result) return MOSQ_ERR_INVAL;
if(!result) return MOSQ_ERR_INVAL;
*result = false;
if(!sub || !topic){
return MOSQ_ERR_INVAL;
}
slen = strlen(sub);
tlen = strlen(topic);
@ -237,7 +243,12 @@ int mosquitto_topic_matches_sub2(const char *sub, size_t sublen, const char *top
int spos, tpos;
bool multilevel_wildcard = false;
if(!sub || !topic || !result) return MOSQ_ERR_INVAL;
if(!result) return MOSQ_ERR_INVAL;
*result = false;
if(!sub || !topic){
return MOSQ_ERR_INVAL;
}
if(!sublen || !topiclen){
*result = false;
@ -248,7 +259,6 @@ int mosquitto_topic_matches_sub2(const char *sub, size_t sublen, const char *top
if((sub[0] == '$' && topic[0] != '$')
|| (topic[0] == '$' && sub[0] != '$')){
*result = false;
return MOSQ_ERR_SUCCESS;
}
}
@ -260,7 +270,7 @@ int mosquitto_topic_matches_sub2(const char *sub, size_t sublen, const char *top
if(sub[spos] == topic[tpos]){
if(tpos == topiclen-1){
/* Check for e.g. foo matching foo/# */
if(spos == sublen-3
if(spos == sublen-3
&& sub[spos+1] == '/'
&& sub[spos+2] == '#'){
*result = true;
@ -275,7 +285,6 @@ int mosquitto_topic_matches_sub2(const char *sub, size_t sublen, const char *top
return MOSQ_ERR_SUCCESS;
}else if(tpos == topiclen && spos == sublen-1 && sub[spos] == '+'){
if(spos > 0 && sub[spos-1] != '/'){
*result = false;
return MOSQ_ERR_INVAL;
}
spos++;
@ -286,12 +295,10 @@ int mosquitto_topic_matches_sub2(const char *sub, size_t sublen, const char *top
if(sub[spos] == '+'){
/* Check for bad "+foo" or "a/+foo" subscription */
if(spos > 0 && sub[spos-1] != '/'){
*result = false;
return MOSQ_ERR_INVAL;
}
/* Check for bad "foo+" or "foo+/a" subscription */
if(spos < sublen-1 && sub[spos+1] != '/'){
*result = false;
return MOSQ_ERR_INVAL;
}
spos++;
@ -304,19 +311,28 @@ int mosquitto_topic_matches_sub2(const char *sub, size_t sublen, const char *top
}
}else if(sub[spos] == '#'){
if(spos > 0 && sub[spos-1] != '/'){
*result = false;
return MOSQ_ERR_INVAL;
}
multilevel_wildcard = true;
if(spos+1 != sublen){
*result = false;
return MOSQ_ERR_INVAL;
}else{
*result = true;
return MOSQ_ERR_SUCCESS;
}
}else{
*result = false;
/* Check for e.g. foo/bar matching foo/+/# */
if(spos > 0
&& spos+2 == sublen
&& tpos == topiclen
&& sub[spos-1] == '+'
&& sub[spos] == '/'
&& sub[spos+1] == '#')
{
*result = true;
multilevel_wildcard = true;
return MOSQ_ERR_SUCCESS;
}
return MOSQ_ERR_SUCCESS;
}
}
@ -437,4 +453,3 @@ FILE *mosquitto__fopen(const char *path, const char *mode, bool restrict_read)
}
#endif
}

View File

@ -50,6 +50,7 @@
<itemizedlist mark="circle">
<listitem><para>openssl req -out server.csr -key server.key -new</para></listitem>
</itemizedlist>
<note><para>When prompted for the CN (Common Name), please enter either your server (or broker) hostname or domain name.</para></note>
<para>Send the CSR to the CA, or sign it with your CA key:</para>
<itemizedlist mark="circle">

View File

@ -119,7 +119,7 @@
<para><code>user &lt;username&gt;</code></para>
<para>The username referred to here is the same as in
<option>password_fil</option>e. It is not the
<option>password_file</option>. It is not the
clientid.</para>
<para>It is also possible to define ACLs based on pattern
@ -1078,9 +1078,9 @@
<listitem>
<para>Set the clientid to use on the local broker. If not
defined, this defaults to
<option>local.&lt;clientid&gt;</option>. If you are
<option>local.&lt;remote_clientid&gt;</option>. If you are
bridging a broker to itself, it is important that
local_clientid and clientid do not match.</para>
local_clientid and remote_clientid do not match.</para>
</listitem>
</varlistentry>
<varlistentry>
@ -1106,7 +1106,7 @@
notification messages to the local and remote brokers
giving information about the state of the bridge
connection. Retained messages are published to the
topic $SYS/broker/connection/&lt;clientid&gt;/state
topic $SYS/broker/connection/&lt;remote_clientid&gt;/state
unless otherwise set with
<option>notification_topic</option>s. If the message
is 1 then the connection is active, or 0 if the
@ -1129,7 +1129,7 @@
<para>Choose the topic on which notifications will be
published for this bridge. If not set the messages will
be sent on the topic
$SYS/broker/connection/&lt;clientid&gt;/state.</para>
$SYS/broker/connection/&lt;remote_clientid&gt;/state.</para>
</listitem>
</varlistentry>
<varlistentry>
@ -1351,21 +1351,6 @@ topic clients/total in 0 test/mosquitto/org $SYS/broker/
<para>The following options are available for all bridges to
configure SSL/TLS support.</para>
<variablelist>
<varlistentry>
<term><option>bridge_attempt_unsubscribe</option> [ true | false ]</term>
<listitem>
<para>If a bridge has topics that have "out" direction,
the default behaviour is to send an unsubscribe
request to the remote broker on that topic. This
means that changing a topic direction from "in" to
"out" will not keep receiving incoming messages.
Sending these unsubscribe requests is not always
desirable, setting
<option>bridge_attempt_unsubscribe</option> to
<replaceable>false</replaceable> will disable
sending the unsubscribe request.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>bridge_cafile</option> <replaceable>file path</replaceable></term>
<listitem>

View File

@ -41,7 +41,7 @@
<refsect1>
<title>Description</title>
<para><command>mosquitto_passwd</command> is a tool for managing
password files the mosquitto MQTT broker.</para>
password files for the mosquitto MQTT broker.</para>
<para>Usernames must not contain ":". Passwords are stored in a similar
format to
<citerefentry><refentrytitle>crypt</refentrytitle><manvolnum>3</manvolnum></citerefentry>.</para>

View File

@ -237,7 +237,7 @@
#crlfile
# If you wish to control which encryption ciphers are used, use the ciphers
# option. The list of available ciphers can be optained using the "openssl
# option. The list of available ciphers can be obtained using the "openssl
# ciphers" command and should be provided in the same format as the output of
# that command.
# If unset defaults to DEFAULT:!aNULL:!eNULL:!LOW:!EXPORT:!SSLv2:@STRENGTH
@ -270,7 +270,7 @@
# When using PSK, the encryption ciphers used will be chosen from the list of
# available PSK ciphers. If you want to control which ciphers are available,
# use the "ciphers" option. The list of available ciphers can be optained
# use the "ciphers" option. The list of available ciphers can be obtained
# using the "openssl ciphers" command and should be provided in the same format
# as the output of that command.
#ciphers

View File

@ -69,6 +69,7 @@ already be built. Use `make binary` to skip building the man pages, or install
* libuuid (uuid-dev) - disable with `make WITH_UUID=no`
* libwebsockets (libwebsockets-dev) - enable with `make WITH_WEBSOCKETS=yes`
* openssl (libssl-dev on Debian based systems) - disable with `make WITH_TLS=no`
* xsltproc (xsltproc and docbook-xsl on Debian based systems) - only needed when building from git sources - disable with `make WITH_DOCS=no`
## Credits

View File

@ -4,12 +4,12 @@ Copyright (c) 2009-2016 Roger Light <roger@atchoo.org>
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License v1.0
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
http://www.eclipse.org/legal/epl-v10.html
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
Contributors:
Roger Light - initial implementation and documentation.
*/
@ -67,6 +67,35 @@ static int conf__parse_string(char **token, const char *name, char **value, char
static int config__read_file(struct mosquitto__config *config, bool reload, const char *file, struct config_recurse *config_tmp, int level, int *lineno);
static int config__check(struct mosquitto__config *config);
static char *fgets_extending(char **buf, int *buflen, FILE *stream)
{
char *rc;
char endchar;
int offset = 0;
char *newbuf;
do{
rc = fgets(&((*buf)[offset]), *buflen-offset, stream);
if(feof(stream)){
return rc;
}
endchar = (*buf)[strlen(*buf)-1];
if(endchar == '\n'){
return rc;
}
/* No EOL char found, so extend buffer */
offset = *buflen-1;
*buflen += 1000;
newbuf = realloc(*buf, *buflen);
if(!newbuf){
return NULL;
}
*buf = newbuf;
}while(1);
}
static int conf__attempt_resolve(const char *host, const char *text, int log, const char *msg)
{
struct addrinfo gai_hints;
@ -253,7 +282,12 @@ void config__cleanup(struct mosquitto__config *config)
mosquitto__free(config->listeners[i].psk_hint);
mosquitto__free(config->listeners[i].crlfile);
mosquitto__free(config->listeners[i].tls_version);
SSL_CTX_free(config->listeners[i].ssl_ctx);
#ifdef WITH_WEBSOCKETS
if(!config->listeners[i].ws_context) /* libwebsockets frees its own SSL_CTX */
#endif
{
SSL_CTX_free(config->listeners[i].ssl_ctx);
}
#endif
#ifdef WITH_WEBSOCKETS
mosquitto__free(config->listeners[i].http_dir);
@ -561,10 +595,9 @@ int config__read(struct mosquitto__config *config, bool reload)
return MOSQ_ERR_SUCCESS;
}
int config__read_file_core(struct mosquitto__config *config, bool reload, const char *file, struct config_recurse *cr, int level, int *lineno, FILE *fptr)
int config__read_file_core(struct mosquitto__config *config, bool reload, const char *file, struct config_recurse *cr, int level, int *lineno, FILE *fptr, char **buf, int *buflen)
{
int rc;
char buf[1024];
char *token;
int tmp_int;
char *saveptr = NULL;
@ -595,13 +628,13 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, const
*lineno = 0;
while(fgets(buf, 1024, fptr)){
while(fgets_extending(buf, buflen, fptr)){
(*lineno)++;
if(buf[0] != '#' && buf[0] != 10 && buf[0] != 13){
while(buf[strlen(buf)-1] == 10 || buf[strlen(buf)-1] == 13){
buf[strlen(buf)-1] = 0;
if((*buf)[0] != '#' && (*buf)[0] != 10 && (*buf)[0] != 13){
while((*buf)[strlen((*buf))-1] == 10 || (*buf)[strlen((*buf))-1] == 13){
(*buf)[strlen((*buf))-1] = 0;
}
token = strtok_r(buf, " ", &saveptr);
token = strtok_r((*buf), " ", &saveptr);
if(token){
if(!strcmp(token, "acl_file")){
if(reload){
@ -1055,8 +1088,8 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, const
}
snprintf(conf_file, len, "%s\\%s", token, find_data.cFileName);
conf_file[len] = '\0';
rc = config__read_file(config, reload, conf_file, cr, level+1, &lineno_ext);
rc = config__read_file(config, reload, conf_file, cr, level+1, &lineno_ext, buf, buflen);
if(rc){
FindClose(fh);
log__printf(NULL, MOSQ_LOG_ERR, "Error found at %s:%d.", conf_file, lineno_ext);
@ -1084,7 +1117,7 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, const
}
snprintf(conf_file, len, "%s/%s", token, de->d_name);
conf_file[len] = '\0';
rc = config__read_file(config, reload, conf_file, cr, level+1, &lineno_ext);
if(rc){
closedir(dh);
@ -1381,7 +1414,7 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, const
return MOSQ_ERR_INVAL;
}
if(conf__parse_bool(&token, "notifications_local_only", &cur_bridge->notifications_local_only, saveptr)) return MOSQ_ERR_INVAL;
#else
#else
log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Bridge support not available.");
#endif
}else if(!strcmp(token, "notification_topic")){
@ -1611,7 +1644,7 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, const
token = strtok_r(NULL, " ", &saveptr);
if(token){
cur_bridge->topic_count++;
cur_bridge->topics = mosquitto__realloc(cur_bridge->topics,
cur_bridge->topics = mosquitto__realloc(cur_bridge->topics,
sizeof(struct mosquitto__bridge_topic)*cur_bridge->topic_count);
if(!cur_bridge->topics){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
@ -1691,7 +1724,7 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, const
}
}
}
if(cur_topic->topic == NULL &&
if(cur_topic->topic == NULL &&
(cur_topic->local_prefix == NULL || cur_topic->remote_prefix == NULL)){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge remapping.");
@ -1832,6 +1865,8 @@ int config__read_file(struct mosquitto__config *config, bool reload, const char
{
int rc;
FILE *fptr = NULL;
char *buf;
int buflen;
fptr = mosquitto__fopen(file, "rt", false);
if(!fptr){
@ -1839,7 +1874,15 @@ int config__read_file(struct mosquitto__config *config, bool reload, const char
return 1;
}
rc = config__read_file_core(config, reload, file, cr, level, lineno, fptr);
buflen = 1000;
buf = mosquitto__malloc(buflen);
if(!buf){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
rc = config__read_file_core(config, reload, file, cr, level, lineno, fptr, &buf, &buflen);
mosquitto__free(buf);
fclose(fptr);
return rc;
@ -1918,7 +1961,7 @@ static int conf__parse_bool(char **token, const char *name, bool *value, char *s
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty %s value in configuration.", name);
return MOSQ_ERR_INVAL;
}
return MOSQ_ERR_SUCCESS;
}

View File

@ -371,6 +371,12 @@ int handle__connect(struct mosquitto_db *db, struct mosquitto *context)
#ifdef WITH_TLS
if(context->listener && context->listener->ssl_ctx && (context->listener->use_identity_as_username || context->listener->use_subject_as_username)){
/* Don't need the username or password if provided */
mosquitto__free(username);
username = NULL;
mosquitto__free(password);
password = NULL;
if(!context->ssl){
send__connack(context, 0, CONNACK_REFUSED_BAD_USERNAME_PASSWORD);
rc = 1;
@ -645,5 +651,3 @@ int handle__disconnect(struct mosquitto_db *db, struct mosquitto *context)
do_disconnect(db, context);
return MOSQ_ERR_SUCCESS;
}

View File

@ -658,9 +658,12 @@ static int retain__process(struct mosquitto_db *db, struct mosquitto_msg_store *
return rc;
}
qos = retained->qos;
if(qos > sub_qos) qos = sub_qos;
if (db->config->upgrade_outgoing_qos){
qos = sub_qos;
} else {
qos = retained->qos;
if(qos > sub_qos) qos = sub_qos;
}
if(qos > 0){
mid = mosquitto__mid_generate(context);
}else{

View File

@ -183,6 +183,12 @@ static int callback_mqtt(struct libwebsocket_context *context,
mosq->ws_context = context;
#endif
mosq->wsi = wsi;
if(in){
mosq->ssl = (SSL *)in;
if(!mosq->listener->ssl_ctx){
mosq->listener->ssl_ctx = SSL_get_SSL_CTX(mosq->ssl);
}
}
u->mosq = mosq;
}else{
return -1;
@ -216,6 +222,7 @@ static int callback_mqtt(struct libwebsocket_context *context,
mosq->pollfd_index = -1;
}
mosq->wsi = NULL;
mosq->ssl = NULL;
do_disconnect(db, mosq);
}
break;
@ -225,7 +232,7 @@ static int callback_mqtt(struct libwebsocket_context *context,
return -1;
}
mosq = u->mosq;
if(!mosq || mosq->state == mosq_cs_disconnect_ws || mosq->state == mosq_cs_disconnecting){
if(!mosq){
return -1;
}
@ -254,14 +261,30 @@ static int callback_mqtt(struct libwebsocket_context *context,
}
count = libwebsocket_write(wsi, &packet->payload[packet->pos], packet->to_process, LWS_WRITE_BINARY);
if(count < 0){
if (mosq->state == mosq_cs_disconnect_ws || mosq->state == mosq_cs_disconnecting){
return -1;
}
return 0;
}
#ifdef WITH_SYS_TREE
g_bytes_sent += count;
#endif
packet->to_process -= count;
packet->pos += count;
if(packet->to_process > 0){
if (mosq->state == mosq_cs_disconnect_ws || mosq->state == mosq_cs_disconnecting){
return -1;
}
break;
}
#ifdef WITH_SYS_TREE
g_msgs_sent++;
if(((packet->command)&0xF6) == PUBLISH){
g_pub_msgs_sent++;
}
#endif
/* Free data and reset values */
mosq->current_out_packet = mosq->out_packet;
if(mosq->out_packet){
@ -276,6 +299,9 @@ static int callback_mqtt(struct libwebsocket_context *context,
mosq->next_msg_out = mosquitto_time() + mosq->keepalive;
}
if (mosq->state == mosq_cs_disconnect_ws || mosq->state == mosq_cs_disconnecting){
return -1;
}
if(mosq->current_out_packet){
libwebsocket_callback_on_writable(mosq->ws_context, mosq->wsi);
}
@ -356,7 +382,12 @@ static int callback_mqtt(struct libwebsocket_context *context,
mosq->last_msg_in = mosquitto_time();
if(rc){
if(rc && (mosq->out_packet || mosq->current_out_packet)) {
if(mosq->state != mosq_cs_disconnecting){
mosq->state = mosq_cs_disconnect_ws;
}
libwebsocket_callback_on_writable(mosq->ws_context, mosq->wsi);
} else if (rc) {
do_disconnect(db, mosq);
return -1;
}

View File

@ -0,0 +1,2 @@
port 1888
upgrade_outgoing_qos true

View File

@ -0,0 +1,48 @@
#!/usr/bin/env python
# Test whether a retained PUBLISH to a topic with QoS 0 is sent with subscriber QoS
# when upgrade_outgoing_qos is true
import inspect, os, sys
# From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder
cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"..")))
if cmd_subfolder not in sys.path:
sys.path.insert(0, cmd_subfolder)
import mosq_test
rc = 1
keepalive = 60
mid = 16
connect_packet = mosq_test.gen_connect("retain-qos0-test", keepalive=keepalive)
connack_packet = mosq_test.gen_connack(rc=0)
publish_packet = mosq_test.gen_publish("retain/qos0/test", qos=0, payload="retained message", retain=True)
subscribe_packet = mosq_test.gen_subscribe(mid, "retain/qos0/test", 1)
suback_packet = mosq_test.gen_suback(mid, 1)
publish_packet2 = mosq_test.gen_publish("retain/qos0/test", mid=1, qos=1, payload="retained message", retain=True)
broker = mosq_test.start_broker(filename=os.path.basename(__file__))
try:
sock = mosq_test.do_client_connect(connect_packet, connack_packet)
sock.send(publish_packet)
#sock.close()
#sock = mosq_test.do_client_connect(connect_packet, connack_packet)
sock.send(subscribe_packet)
if mosq_test.expect_packet(sock, "suback", suback_packet):
if mosq_test.expect_packet(sock, "publish", publish_packet2):
rc = 0
sock.close()
finally:
broker.terminate()
broker.wait()
if rc:
(stdo, stde) = broker.communicate()
print(stde)
exit(rc)

View File

@ -0,0 +1,14 @@
port 1889
retry_interval 10
connection bridge_sample
address 127.0.0.1:1888
bridge_attempt_unsubscribe false
topic # in 0 local/topic/ remote/topic/
topic prefix/# in 0 local2/topic/ remote2/topic/
topic +/value in 0 local3/topic/ remote3/topic/
topic ic/+ in 0 local4/top remote4/tip
topic clients/total in 0 test/mosquitto/org $SYS/broker/
notifications false
restart_timeout 5

View File

@ -0,0 +1,117 @@
#!/usr/bin/env python
# Test remapping of topic name for incoming message
import socket
import inspect, os, sys
# From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder
cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"..")))
if cmd_subfolder not in sys.path:
sys.path.insert(0, cmd_subfolder)
import mosq_test
rc = 1
keepalive = 60
client_id = socket.gethostname()+".bridge_sample"
connect_packet = mosq_test.gen_connect(client_id, keepalive=keepalive, clean_session=False, proto_ver=128+4)
connack_packet = mosq_test.gen_connack(rc=0)
client_connect_packet = mosq_test.gen_connect("pub-test", keepalive=keepalive)
client_connack_packet = mosq_test.gen_connack(rc=0)
ssock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ssock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
ssock.settimeout(4)
ssock.bind(('', 1888))
ssock.listen(5)
broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=1889)
def test(bridge, sock):
if not mosq_test.expect_packet(bridge, "connect", connect_packet):
return 1
bridge.send(connack_packet)
mid = 0
patterns = [
"remote/topic/#",
"remote2/topic/prefix/#",
"remote3/topic/+/value",
"remote4/tipic/+",
"$SYS/broker/clients/total",
]
for pattern in ("remote/topic/#", "remote2/topic/prefix/#", "remote3/topic/+/value"):
mid += 1
subscribe_packet = mosq_test.gen_subscribe(mid, pattern, 0)
suback_packet = mosq_test.gen_suback(mid, 0)
if not mosq_test.expect_packet(bridge, "subscribe", subscribe_packet):
return 1
bridge.send(suback_packet)
mid += 1
subscribe_packet = mosq_test.gen_subscribe(mid, "#", 0)
suback_packet = mosq_test.gen_suback(mid, 0)
sock.send(subscribe_packet)
if not mosq_test.expect_packet(sock, "suback", suback_packet):
return 1
cases = [
('local/topic/something', 'remote/topic/something'),
('local/topic/some/t/h/i/n/g', 'remote/topic/some/t/h/i/n/g'),
('local/topic/value', 'remote/topic/value'),
# Don't work, #40 must be fixed before
# ('local/topic', 'remote/topic'),
('local2/topic/prefix/something', 'remote2/topic/prefix/something'),
('local3/topic/something/value', 'remote3/topic/something/value'),
('local4/topic/something', 'remote4/tipic/something'),
('test/mosquitto/orgclients/total', '$SYS/broker/clients/total'),
]
for (local_topic, remote_topic) in cases:
mid += 1
remote_publish_packet = mosq_test.gen_publish(
remote_topic, qos=0, mid=mid, payload=''
)
local_publish_packet = mosq_test.gen_publish(
local_topic, qos=0, mid=mid, payload=''
)
bridge.send(remote_publish_packet)
match = mosq_test.expect_packet(sock, "publish", local_publish_packet)
if not match:
print("Fail on cases local_topic=%r, remote_topic=%r" % (
local_topic, remote_topic,
))
return 1
return 0
try:
(bridge, address) = ssock.accept()
bridge.settimeout(2)
sock = mosq_test.do_client_connect(
client_connect_packet, client_connack_packet,
port=1889,
)
rc = test(bridge, sock)
sock.close()
bridge.close()
finally:
try:
bridge.close()
except NameError:
pass
broker.terminate()
broker.wait()
if rc:
(stdo, stde) = broker.communicate()
print(stde)
ssock.close()
exit(rc)

View File

@ -0,0 +1,15 @@
port 1889
retry_interval 10
connection bridge_sample
address 127.0.0.1:1888
bridge_attempt_unsubscribe false
topic # out 0 local/topic/ remote/topic/
topic prefix/# out 0 local2/topic/ remote2/topic/
topic +/value out 0 local3/topic/ remote3/topic/
topic ic/+ out 0 local4/top remote4/tip
# this one is invalid
topic +/value out 0 local5/top remote5/tip
notifications false
restart_timeout 5

View File

@ -0,0 +1,109 @@
#!/usr/bin/env python
# Test remapping of topic name for outgoing message
import socket
import inspect, os, sys
# From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder
cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"..")))
if cmd_subfolder not in sys.path:
sys.path.insert(0, cmd_subfolder)
import mosq_test
rc = 1
keepalive = 60
client_id = socket.gethostname()+".bridge_sample"
connect_packet = mosq_test.gen_connect(client_id, keepalive=keepalive, clean_session=False, proto_ver=128+4)
connack_packet = mosq_test.gen_connack(rc=0)
client_connect_packet = mosq_test.gen_connect("pub-test", keepalive=keepalive)
client_connack_packet = mosq_test.gen_connack(rc=0)
ssock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ssock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
ssock.settimeout(4)
ssock.bind(('', 1888))
ssock.listen(5)
broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=1889)
def test(bridge, sock):
if not mosq_test.expect_packet(bridge, "connect", connect_packet):
return 1
bridge.send(connack_packet)
cases = [
('local/topic/something', 'remote/topic/something'),
('local/topic/some/t/h/i/n/g', 'remote/topic/some/t/h/i/n/g'),
('local/topic/value', 'remote/topic/value'),
# Don't work, #40 must be fixed before
# ('local/topic', 'remote/topic'),
('local2/topic/something', None), # don't match topic pattern
('local2/topic/prefix/something', 'remote2/topic/prefix/something'),
('local3/topic/something/value', 'remote3/topic/something/value'),
('local4/topic/something', 'remote4/tipic/something'),
('local5/topic/something', None),
]
mid = 3
for (local_topic, remote_topic) in cases:
mid += 1
local_publish_packet = mosq_test.gen_publish(
local_topic, qos=0, mid=mid, payload=''
)
sock.send(local_publish_packet)
if remote_topic:
remote_publish_packet = mosq_test.gen_publish(
remote_topic, qos=0, mid=mid, payload=''
)
match = mosq_test.expect_packet(bridge, "publish", remote_publish_packet)
if not match:
print("Fail on cases local_topic=%r, remote_topic=%r" % (
local_topic, remote_topic,
))
return 1
else:
bridge.settimeout(3)
try:
bridge.recv(1)
print("FAIL: Received data when nothing is expected")
print("Fail on cases local_topic=%r, remote_topic=%r" % (
local_topic, remote_topic,
))
return 1
except socket.timeout:
pass
bridge.settimeout(20)
return 0
try:
(bridge, address) = ssock.accept()
bridge.settimeout(2)
sock = mosq_test.do_client_connect(
client_connect_packet, client_connack_packet,
port=1889,
)
rc = test(bridge, sock)
sock.close()
bridge.close()
finally:
try:
bridge.close()
except NameError:
pass
broker.terminate()
broker.wait()
if rc:
(stdo, stde) = broker.communicate()
print(stde)
ssock.close()
exit(rc)

View File

@ -1,9 +1,10 @@
port 1889
connection bridge_test
address localhost:1888
address 127.0.0.1:1888
topic bridge/# both 0
notifications false
restart_timeout 2
#bridge_cafile ../ssl/test-root-ca.crt
bridge_cafile ../ssl/all-ca.crt

View File

@ -58,6 +58,7 @@ endif
./04-retain-qos0-repeated.py
./04-retain-qos1-qos0.py
./04-retain-qos0-clear.py
./04-retain-upgrade-outgoing-qos.py
05 :
./05-clean-session-qos1.py
@ -70,6 +71,8 @@ endif
./06-bridge-b2br-disconnect-qos2.py
./06-bridge-fail-persist-resend-qos1.py
./06-bridge-fail-persist-resend-qos2.py
./06-bridge-b2br-remapping.py
./06-bridge-br2b-remapping.py
07 :
./07-will-qos0.py
@ -88,9 +91,11 @@ ifeq ($(WITH_TLS),yes)
./08-ssl-connect-identity.py
./08-ssl-connect-no-identity.py
./08-ssl-bridge.py
ifeq ($(WITH_TLS_PSK),yes)
./08-tls-psk-pub.py
./08-tls-psk-bridge.py
endif
endif
09 :
./09-plugin-auth-unpwd-success.py

View File

@ -2,6 +2,9 @@
#include <stdlib.h>
#include <mosquitto.h>
#define EXPECT_MATCH(A, B) do_check((A), (B), false)
#define EXPECT_NOMATCH(A, B) do_check((A), (B), true)
void do_check(const char *sub, const char *topic, bool bad_res)
{
bool match;
@ -16,42 +19,44 @@ void do_check(const char *sub, const char *topic, bool bad_res)
int main(int argc, char *argv[])
{
do_check("foo/#", "foo/", false);
do_check("foo#", "foo", true);
do_check("fo#o/", "foo", true);
do_check("foo#", "fooa", true);
do_check("foo+", "foo", true);
do_check("foo+", "fooa", true);
EXPECT_MATCH("foo/#", "foo/");
EXPECT_NOMATCH("foo#", "foo");
EXPECT_NOMATCH("fo#o/", "foo");
EXPECT_NOMATCH("foo#", "fooa");
EXPECT_NOMATCH("foo+", "foo");
EXPECT_NOMATCH("foo+", "fooa");
do_check("test/6/#", "test/3", true);
do_check("foo/bar", "foo/bar", false);
do_check("foo/+", "foo/bar", false);
do_check("foo/+/baz", "foo/bar/baz", false);
EXPECT_NOMATCH("test/6/#", "test/3");
EXPECT_MATCH("foo/bar", "foo/bar");
EXPECT_MATCH("foo/+", "foo/bar");
EXPECT_MATCH("foo/+/baz", "foo/bar/baz");
do_check("A/B/+/#", "A/B/B/C", false);
EXPECT_MATCH("A/B/+/#", "A/B/B/C");
do_check("foo/+/#", "foo/bar/baz", false);
do_check("#", "foo/bar/baz", false);
EXPECT_MATCH("foo/+/#", "foo/bar/baz");
EXPECT_MATCH("foo/+/#", "foo/bar");
EXPECT_MATCH("#", "foo/bar/baz");
EXPECT_MATCH("#", "foo/bar/baz");
do_check("foo/bar", "foo", true);
do_check("foo/+", "foo/bar/baz", true);
do_check("foo/+/baz", "foo/bar/bar", true);
EXPECT_NOMATCH("foo/bar", "foo");
EXPECT_NOMATCH("foo/+", "foo/bar/baz");
EXPECT_NOMATCH("foo/+/baz", "foo/bar/bar");
do_check("foo/+/#", "fo2/bar/baz", true);
EXPECT_NOMATCH("foo/+/#", "fo2/bar/baz");
do_check("#", "/foo/bar", false);
do_check("/#", "/foo/bar", false);
do_check("/#", "foo/bar", true);
EXPECT_MATCH("#", "/foo/bar");
EXPECT_MATCH("/#", "/foo/bar");
EXPECT_NOMATCH("/#", "foo/bar");
do_check("foo//bar", "foo//bar", false);
do_check("foo//+", "foo//bar", false);
do_check("foo/+/+/baz", "foo///baz", false);
do_check("foo/bar/+", "foo/bar/", false);
EXPECT_MATCH("foo//bar", "foo//bar");
EXPECT_MATCH("foo//+", "foo//bar");
EXPECT_MATCH("foo/+/+/baz", "foo///baz");
EXPECT_MATCH("foo/bar/+", "foo/bar/");
do_check("$SYS/bar", "$SYS/bar", false);
do_check("#", "$SYS/bar", true);
do_check("$BOB/bar", "$SYS/bar", true);
EXPECT_MATCH("$SYS/bar", "$SYS/bar");
EXPECT_NOMATCH("#", "$SYS/bar");
EXPECT_NOMATCH("$BOB/bar", "$SYS/bar");
return 0;
}