Merge branch 'fixes'

This commit is contained in:
Roger A. Light 2019-09-25 12:17:33 +01:00
commit afb6a13674
53 changed files with 437 additions and 201 deletions

View File

@ -11,7 +11,7 @@ project(mosquitto)
cmake_minimum_required(VERSION 2.8)
# Only for version 3 and up. cmake_policy(SET CMP0042 NEW)
set (VERSION 1.6.6)
set (VERSION 1.6.7)
add_definitions (-DCMAKE -DVERSION=\"${VERSION}\")

View File

@ -1,3 +1,23 @@
1.6.7 - 20190925
================
Broker:
- Add workaround for working with libwebsockets 3.2.0.
- Fix potential crash when reloading config. Closes #1424, #1425.
Client library:
- Don't use `/` in autogenerated client ids, to avoid confusing with topics.
- Fix `mosquitto_max_inflight_messages_set()` and `mosquitto_int_option(...,
MOSQ_OPT_*_MAX, ...)` behaviour. Closes #1417.
- Fix regression on use of `mosquitto_connect_async()` not working.
Closes #1415 and #1422.
Clients:
- mosquitto_sub: Fix `-E` incorrectly not working unless `-d` was also
specified. Closes #1418.
- Updated documentation around automatic client ids.
1.6.6 - 20190917
================

View File

@ -145,11 +145,13 @@ void my_subscribe_callback(struct mosquitto *mosq, void *obj, int mid, int qos_c
UNUSED(obj);
if(!cfg.quiet) printf("Subscribed (mid: %d): %d", mid, granted_qos[0]);
for(i=1; i<qos_count; i++){
if(!cfg.quiet) printf(", %d", granted_qos[i]);
if(cfg.debug){
if(!cfg.quiet) printf("Subscribed (mid: %d): %d", mid, granted_qos[0]);
for(i=1; i<qos_count; i++){
if(!cfg.quiet) printf(", %d", granted_qos[i]);
}
if(!cfg.quiet) printf("\n");
}
if(!cfg.quiet) printf("\n");
if(cfg.exit_after_sub){
mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props);
@ -322,8 +324,8 @@ int main(int argc, char *argv[])
}
if(cfg.debug){
mosquitto_log_callback_set(mosq, my_log_callback);
mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
}
mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
mosquitto_connect_v5_callback_set(mosq, my_connect_callback);
mosquitto_message_v5_callback_set(mosq, my_message_callback);

View File

@ -104,7 +104,7 @@ WITH_COVERAGE:=no
# Also bump lib/mosquitto.h, CMakeLists.txt,
# installer/mosquitto.nsi, installer/mosquitto64.nsi
VERSION=1.6.6
VERSION=1.6.7
# Client library SO version. Bump if incompatible API/ABI changes are made.
SOVERSION=1

View File

@ -9,7 +9,7 @@
!define env_hklm 'HKLM "SYSTEM\CurrentControlSet\Control\Session Manager\Environment"'
Name "Eclipse Mosquitto"
!define VERSION 1.6.6
!define VERSION 1.6.7
OutFile "mosquitto-${VERSION}-install-windows-x86.exe"
InstallDir "$PROGRAMFILES\mosquitto"

View File

@ -9,7 +9,7 @@
!define env_hklm 'HKLM "SYSTEM\CurrentControlSet\Control\Session Manager\Environment"'
Name "Eclipse Mosquitto"
!define VERSION 1.6.6
!define VERSION 1.6.7
OutFile "mosquitto-${VERSION}-install-windows-x64.exe"
!include "x64.nsh"

View File

@ -53,7 +53,7 @@ static int mosquitto__connect_init(struct mosquitto *mosq, const char *host, int
mosq->id[1] = 'o';
mosq->id[2] = 's';
mosq->id[3] = 'q';
mosq->id[4] = '/';
mosq->id[4] = '-';
rc = util__random_bytes(&mosq->id[5], 18);
if(rc) return rc;
@ -119,9 +119,7 @@ int mosquitto_connect_bind_v5(struct mosquitto *mosq, const char *host, int port
rc = mosquitto__connect_init(mosq, host, port, keepalive, bind_address);
if(rc) return rc;
pthread_mutex_lock(&mosq->state_mutex);
mosq->state = mosq_cs_new;
pthread_mutex_unlock(&mosq->state_mutex);
mosquitto__set_state(mosq, mosq_cs_new);
return mosquitto__reconnect(mosq, true, properties);
}
@ -138,10 +136,6 @@ int mosquitto_connect_bind_async(struct mosquitto *mosq, const char *host, int p
int rc = mosquitto__connect_init(mosq, host, port, keepalive, bind_address);
if(rc) return rc;
pthread_mutex_lock(&mosq->state_mutex);
mosq->state = mosq_cs_connect_async;
pthread_mutex_unlock(&mosq->state_mutex);
return mosquitto__reconnect(mosq, false, NULL);
}
@ -163,6 +157,7 @@ static int mosquitto__reconnect(struct mosquitto *mosq, bool blocking, const mos
const mosquitto_property *outgoing_properties = NULL;
mosquitto_property local_property;
int rc;
if(!mosq) return MOSQ_ERR_INVAL;
if(!mosq->host || mosq->port <= 0) return MOSQ_ERR_INVAL;
if(mosq->protocol != mosq_p_mqtt5 && properties) return MOSQ_ERR_NOT_SUPPORTED;
@ -180,17 +175,6 @@ static int mosquitto__reconnect(struct mosquitto *mosq, bool blocking, const mos
if(rc) return rc;
}
pthread_mutex_lock(&mosq->state_mutex);
#ifdef WITH_SOCKS
if(mosq->socks5_host){
mosq->state = mosq_cs_socks5_new;
}else
#endif
{
mosq->state = mosq_cs_new;
}
pthread_mutex_unlock(&mosq->state_mutex);
pthread_mutex_lock(&mosq->msgtime_mutex);
mosq->last_msg_in = mosquitto_time();
mosq->next_msg_out = mosq->last_msg_in + mosq->keepalive;
@ -217,19 +201,23 @@ static int mosquitto__reconnect(struct mosquitto *mosq, bool blocking, const mos
rc = net__socket_connect(mosq, mosq->host, mosq->port, mosq->bind_address, blocking);
}
if(rc>0){
mosquitto__set_state(mosq, mosq_cs_connect_pending);
return rc;
}
#ifdef WITH_SOCKS
if(mosq->socks5_host){
mosquitto__set_state(mosq, mosq_cs_socks5_new);
return socks5__send(mosq);
}else
#endif
{
mosquitto__set_state(mosq, mosq_cs_connected);
rc = send__connect(mosq, mosq->keepalive, mosq->clean_start, outgoing_properties);
if(rc){
packet__cleanup_all(mosq);
net__socket_close(mosq);
mosquitto__set_state(mosq, mosq_cs_new);
}
return rc;
}
@ -262,21 +250,18 @@ int mosquitto_disconnect_v5(struct mosquitto *mosq, int reason_code, const mosqu
if(rc) return rc;
}
pthread_mutex_lock(&mosq->state_mutex);
mosq->state = mosq_cs_disconnecting;
pthread_mutex_unlock(&mosq->state_mutex);
if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
return send__disconnect(mosq, reason_code, outgoing_properties);
mosquitto__set_state(mosq, mosq_cs_disconnected);
if(mosq->sock == INVALID_SOCKET){
return MOSQ_ERR_NO_CONN;
}else{
return send__disconnect(mosq, reason_code, outgoing_properties);
}
}
void do_client_disconnect(struct mosquitto *mosq, int reason_code, const mosquitto_property *properties)
{
pthread_mutex_lock(&mosq->state_mutex);
mosq->state = mosq_cs_disconnecting;
pthread_mutex_unlock(&mosq->state_mutex);
mosquitto__set_state(mosq, mosq_cs_disconnected);
net__socket_close(mosq);
/* Free data and reset values */

View File

@ -111,7 +111,7 @@ int handle__connack(struct mosquitto *mosq)
case 0:
pthread_mutex_lock(&mosq->state_mutex);
if(mosq->state != mosq_cs_disconnecting){
mosq->state = mosq_cs_connected;
mosq->state = mosq_cs_active;
}
pthread_mutex_unlock(&mosq->state_mutex);
message__retry_check(mosq);

View File

@ -37,9 +37,12 @@ Contributors:
int handle__pingreq(struct mosquitto *mosq)
{
int state;
assert(mosq);
if(mosq->state != mosq_cs_connected){
state = mosquitto__get_state(mosq);
if(state != mosq_cs_active){
return MOSQ_ERR_PROTOCOL;
}
@ -57,11 +60,8 @@ int handle__pingresp(struct mosquitto *mosq)
assert(mosq);
pthread_mutex_lock(&mosq->state_mutex);
state = mosq->state;
pthread_mutex_unlock(&mosq->state_mutex);
if(state != mosq_cs_connected){
state = mosquitto__get_state(mosq);
if(state != mosq_cs_active){
return MOSQ_ERR_PROTOCOL;
}

View File

@ -51,11 +51,8 @@ int handle__pubackcomp(struct mosquitto *mosq, const char *type)
assert(mosq);
pthread_mutex_lock(&mosq->state_mutex);
state = mosq->state;
pthread_mutex_unlock(&mosq->state_mutex);
if(state != mosq_cs_connected){
state = mosquitto__get_state(mosq);
if(state != mosq_cs_active){
return MOSQ_ERR_PROTOCOL;
}

View File

@ -44,11 +44,8 @@ int handle__publish(struct mosquitto *mosq)
assert(mosq);
pthread_mutex_lock(&mosq->state_mutex);
state = mosq->state;
pthread_mutex_unlock(&mosq->state_mutex);
if(state != mosq_cs_connected){
state = mosquitto__get_state(mosq);
if(state != mosq_cs_active){
return MOSQ_ERR_PROTOCOL;
}

View File

@ -41,10 +41,12 @@ int handle__pubrec(struct mosquitto_db *db, struct mosquitto *mosq)
uint16_t mid;
int rc;
mosquitto_property *properties = NULL;
int state;
assert(mosq);
if(mosq->state != mosq_cs_connected){
state = mosquitto__get_state(mosq);
if(state != mosq_cs_active){
return MOSQ_ERR_PROTOCOL;
}

View File

@ -45,10 +45,12 @@ int handle__pubrel(struct mosquitto_db *db, struct mosquitto *mosq)
#endif
int rc;
mosquitto_property *properties = NULL;
int state;
assert(mosq);
if(mosq->state != mosq_cs_connected){
state = mosquitto__get_state(mosq);
if(state != mosq_cs_active){
return MOSQ_ERR_PROTOCOL;
}

View File

@ -29,6 +29,7 @@ Contributors:
#include "mqtt_protocol.h"
#include "packet_mosq.h"
#include "property_mosq.h"
#include "util_mosq.h"
int handle__suback(struct mosquitto *mosq)
@ -44,11 +45,8 @@ int handle__suback(struct mosquitto *mosq)
assert(mosq);
pthread_mutex_lock(&mosq->state_mutex);
state = mosq->state;
pthread_mutex_unlock(&mosq->state_mutex);
if(state != mosq_cs_connected){
state = mosquitto__get_state(mosq);
if(state != mosq_cs_active){
return MOSQ_ERR_PROTOCOL;
}

View File

@ -42,10 +42,12 @@ int handle__unsuback(struct mosquitto *mosq)
uint16_t mid;
int rc;
mosquitto_property *properties = NULL;
int state;
assert(mosq);
if(mosq->state != mosq_cs_connected){
state = mosquitto__get_state(mosq);
if(state != mosq_cs_active){
return MOSQ_ERR_PROTOCOL;
}

View File

@ -47,6 +47,9 @@ int mosquitto_loop(struct mosquitto *mosq, int timeout, int max_packets)
char pairbuf;
int maxfd = 0;
time_t now;
#ifdef WITH_SRV
int state;
#endif
if(!mosq || max_packets < 1) return MOSQ_ERR_INVAL;
#ifndef WIN32
@ -83,17 +86,15 @@ int mosquitto_loop(struct mosquitto *mosq, int timeout, int max_packets)
}else{
#ifdef WITH_SRV
if(mosq->achan){
pthread_mutex_lock(&mosq->state_mutex);
if(mosq->state == mosq_cs_connect_srv){
state = mosquitto__get_state(mosq);
if(state == mosq_cs_connect_srv){
rc = ares_fds(mosq->achan, &readfds, &writefds);
if(rc > maxfd){
maxfd = rc;
}
}else{
pthread_mutex_unlock(&mosq->state_mutex);
return MOSQ_ERR_NO_CONN;
}
pthread_mutex_unlock(&mosq->state_mutex);
}
#else
return MOSQ_ERR_NO_CONN;
@ -198,15 +199,12 @@ int mosquitto_loop_forever(struct mosquitto *mosq, int timeout, int max_packets)
#ifndef WIN32
struct timespec req, rem;
#endif
int state;
if(!mosq) return MOSQ_ERR_INVAL;
mosq->reconnects = 0;
if(mosq->state == mosq_cs_connect_async){
mosquitto_reconnect(mosq);
}
while(run){
do{
rc = mosquitto_loop(mosq, timeout, max_packets);
@ -234,13 +232,10 @@ int mosquitto_loop_forever(struct mosquitto *mosq, int timeout, int max_packets)
}
do{
rc = MOSQ_ERR_SUCCESS;
pthread_mutex_lock(&mosq->state_mutex);
if(mosq->state == mosq_cs_disconnecting){
state = mosquitto__get_state(mosq);
if(state == mosq_cs_disconnecting || state == mosq_cs_disconnected){
run = 0;
pthread_mutex_unlock(&mosq->state_mutex);
}else{
pthread_mutex_unlock(&mosq->state_mutex);
if(mosq->reconnect_delay_max > mosq->reconnect_delay){
if(mosq->reconnect_exponential_backoff){
reconnect_delay = mosq->reconnect_delay*(mosq->reconnects+1)*(mosq->reconnects+1);
@ -267,12 +262,10 @@ int mosquitto_loop_forever(struct mosquitto *mosq, int timeout, int max_packets)
}
#endif
pthread_mutex_lock(&mosq->state_mutex);
if(mosq->state == mosq_cs_disconnecting){
state = mosquitto__get_state(mosq);
if(state == mosq_cs_disconnecting || state == mosq_cs_disconnected){
run = 0;
pthread_mutex_unlock(&mosq->state_mutex);
}else{
pthread_mutex_unlock(&mosq->state_mutex);
rc = mosquitto_reconnect(mosq);
}
}
@ -293,13 +286,14 @@ int mosquitto_loop_misc(struct mosquitto *mosq)
static int mosquitto__loop_rc_handle(struct mosquitto *mosq, int rc)
{
int state;
if(rc){
net__socket_close(mosq);
pthread_mutex_lock(&mosq->state_mutex);
if(mosq->state == mosq_cs_disconnecting){
state = mosquitto__get_state(mosq);
if(state == mosq_cs_disconnecting || state == mosq_cs_disconnected){
rc = MOSQ_ERR_SUCCESS;
}
pthread_mutex_unlock(&mosq->state_mutex);
pthread_mutex_lock(&mosq->callback_mutex);
if(mosq->on_disconnect){
mosq->in_callback = true;

View File

@ -340,10 +340,6 @@ int message__out_update(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg
int mosquitto_max_inflight_messages_set(struct mosquitto *mosq, unsigned int max_inflight_messages)
{
if(!mosq) return MOSQ_ERR_INVAL;
mosq->send_maximum = max_inflight_messages;
return MOSQ_ERR_SUCCESS;
return mosquitto_int_option(mosq, MOSQ_OPT_SEND_MAXIMUM, max_inflight_messages);
}

View File

@ -48,7 +48,7 @@ extern "C" {
#define LIBMOSQUITTO_MAJOR 1
#define LIBMOSQUITTO_MINOR 6
#define LIBMOSQUITTO_REVISION 6
#define LIBMOSQUITTO_REVISION 7
/* LIBMOSQUITTO_VERSION_NUMBER looks like 1002001 for e.g. version 1.2.1. */
#define LIBMOSQUITTO_VERSION_NUMBER (LIBMOSQUITTO_MAJOR*1000000+LIBMOSQUITTO_MINOR*1000+LIBMOSQUITTO_REVISION)

View File

@ -94,7 +94,7 @@ enum mosquitto_client_state {
mosq_cs_new = 0,
mosq_cs_connected = 1,
mosq_cs_disconnecting = 2,
mosq_cs_connect_async = 3,
mosq_cs_active = 3,
mosq_cs_connect_pending = 4,
mosq_cs_connect_srv = 5,
mosq_cs_disconnect_ws = 6,
@ -107,7 +107,6 @@ enum mosquitto_client_state {
mosq_cs_socks5_userpass_reply = 13,
mosq_cs_socks5_send_userpass = 14,
mosq_cs_expiring = 15,
mosq_cs_connecting = 16,
mosq_cs_duplicate = 17, /* client that has been taken over by another with the same id */
mosq_cs_disconnect_with_will = 18,
mosq_cs_disused = 19, /* client that has been added to the disused list to be freed */
@ -335,8 +334,6 @@ struct mosquitto {
# ifdef WITH_SRV
ares_channel achan;
# endif
uint16_t send_maximum;
uint16_t receive_maximum;
#endif
uint8_t maximum_qos;

View File

@ -210,7 +210,7 @@ int net__socket_close(struct mosquitto *mosq)
if(mosq->wsi)
{
if(mosq->state != mosq_cs_disconnecting){
context__set_state(mosq, mosq_cs_disconnect_ws);
mosquitto__set_state(mosq, mosq_cs_disconnect_ws);
}
libwebsocket_callback_on_writable(mosq->ws_context, mosq->wsi);
}else

View File

@ -409,14 +409,22 @@ int mosquitto_int_option(struct mosquitto *mosq, enum mosq_opt_t option, int val
if(value < 0 || value > 65535){
return MOSQ_ERR_INVAL;
}
mosq->receive_maximum = value;
if(value == 0){
mosq->msgs_in.inflight_maximum = 65535;
}else{
mosq->msgs_in.inflight_maximum = value;
}
break;
case MOSQ_OPT_SEND_MAXIMUM:
if(value < 0 || value > 65535){
return MOSQ_ERR_INVAL;
}
mosq->send_maximum = value;
if(value == 0){
mosq->msgs_out.inflight_maximum = 65535;
}else{
mosq->msgs_out.inflight_maximum = value;
}
break;
case MOSQ_OPT_SSL_CTX_WITH_DEFAULTS:

View File

@ -34,6 +34,7 @@ Contributors:
#include "net_mosq.h"
#include "packet_mosq.h"
#include "read_handle.h"
#include "util_mosq.h"
#ifdef WITH_BROKER
# include "sys_tree.h"
# include "send_mosq.h"
@ -218,10 +219,7 @@ int packet__write(struct mosquitto *mosq)
}
pthread_mutex_unlock(&mosq->out_packet_mutex);
pthread_mutex_lock(&mosq->state_mutex);
state = mosq->state;
pthread_mutex_unlock(&mosq->state_mutex);
state = mosquitto__get_state(mosq);
#if defined(WITH_TLS) && !defined(WITH_BROKER)
if((state == mosq_cs_connect_pending) || mosq->want_connect){
#else
@ -329,10 +327,8 @@ int packet__read(struct mosquitto *mosq)
if(mosq->sock == INVALID_SOCKET){
return MOSQ_ERR_NO_CONN;
}
pthread_mutex_lock(&mosq->state_mutex);
state = mosq->state;
pthread_mutex_unlock(&mosq->state_mutex);
state = mosquitto__get_state(mosq);
if(state == mosq_cs_connect_pending){
return MOSQ_ERR_SUCCESS;
}
@ -358,7 +354,7 @@ int packet__read(struct mosquitto *mosq)
#ifdef WITH_BROKER
G_BYTES_RECEIVED_INC(1);
/* Clients must send CONNECT as their first command. */
if(!(mosq->bridge) && mosq->state == mosq_cs_new && (byte&0xF0) != CMD_CONNECT){
if(!(mosq->bridge) && mosq->state == mosq_cs_connected && (byte&0xF0) != CMD_CONNECT){
return MOSQ_ERR_PROTOCOL;
}
#endif

View File

@ -37,6 +37,7 @@ Contributors:
#include "net_mosq.h"
#include "packet_mosq.h"
#include "send_mosq.h"
#include "util_mosq.h"
#define SOCKS_AUTH_NONE 0x00
#define SOCKS_AUTH_GSS 0x01
@ -112,8 +113,11 @@ int socks5__send(struct mosquitto *mosq)
struct in6_addr addr_ipv6;
int ipv4_pton_result;
int ipv6_pton_result;
int state;
if(mosq->state == mosq_cs_socks5_new){
state = mosquitto__get_state(mosq);
if(state == mosq_cs_socks5_new){
packet = mosquitto__calloc(1, sizeof(struct mosquitto__packet));
if(!packet) return MOSQ_ERR_NOMEM;
@ -134,9 +138,7 @@ int socks5__send(struct mosquitto *mosq)
packet->payload[2] = SOCKS_AUTH_NONE;
}
pthread_mutex_lock(&mosq->state_mutex);
mosq->state = mosq_cs_socks5_start;
pthread_mutex_unlock(&mosq->state_mutex);
mosquitto__set_state(mosq, mosq_cs_socks5_start);
mosq->in_packet.pos = 0;
mosq->in_packet.packet_length = 2;
@ -149,7 +151,7 @@ int socks5__send(struct mosquitto *mosq)
}
return packet__queue(mosq, packet);
}else if(mosq->state == mosq_cs_socks5_auth_ok){
}else if(state == mosq_cs_socks5_auth_ok){
packet = mosquitto__calloc(1, sizeof(struct mosquitto__packet));
if(!packet) return MOSQ_ERR_NOMEM;
@ -201,9 +203,7 @@ int socks5__send(struct mosquitto *mosq)
packet->payload[1] = 0x01;
packet->payload[2] = 0x00;
pthread_mutex_lock(&mosq->state_mutex);
mosq->state = mosq_cs_socks5_request;
pthread_mutex_unlock(&mosq->state_mutex);
mosquitto__set_state(mosq, mosq_cs_socks5_request);
mosq->in_packet.pos = 0;
mosq->in_packet.packet_length = 5;
@ -216,7 +216,7 @@ int socks5__send(struct mosquitto *mosq)
}
return packet__queue(mosq, packet);
}else if(mosq->state == mosq_cs_socks5_send_userpass){
}else if(state == mosq_cs_socks5_send_userpass){
packet = mosquitto__calloc(1, sizeof(struct mosquitto__packet));
if(!packet) return MOSQ_ERR_NOMEM;
@ -232,9 +232,7 @@ int socks5__send(struct mosquitto *mosq)
packet->payload[2+ulen] = plen;
memcpy(&(packet->payload[3+ulen]), mosq->socks5_password, plen);
pthread_mutex_lock(&mosq->state_mutex);
mosq->state = mosq_cs_socks5_userpass_reply;
pthread_mutex_unlock(&mosq->state_mutex);
mosquitto__set_state(mosq, mosq_cs_socks5_userpass_reply);
mosq->in_packet.pos = 0;
mosq->in_packet.packet_length = 2;
@ -256,8 +254,10 @@ int socks5__read(struct mosquitto *mosq)
ssize_t len;
uint8_t *payload;
uint8_t i;
int state;
if(mosq->state == mosq_cs_socks5_start){
state = mosquitto__get_state(mosq);
if(state == mosq_cs_socks5_start){
while(mosq->in_packet.to_process > 0){
len = net__read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process);
if(len > 0){
@ -289,17 +289,17 @@ int socks5__read(struct mosquitto *mosq)
switch(mosq->in_packet.payload[1]){
case SOCKS_AUTH_NONE:
packet__cleanup(&mosq->in_packet);
mosq->state = mosq_cs_socks5_auth_ok;
mosquitto__set_state(mosq, mosq_cs_socks5_auth_ok);
return socks5__send(mosq);
case SOCKS_AUTH_USERPASS:
packet__cleanup(&mosq->in_packet);
mosq->state = mosq_cs_socks5_send_userpass;
mosquitto__set_state(mosq, mosq_cs_socks5_send_userpass);
return socks5__send(mosq);
default:
packet__cleanup(&mosq->in_packet);
return MOSQ_ERR_AUTH;
}
}else if(mosq->state == mosq_cs_socks5_userpass_reply){
}else if(state == mosq_cs_socks5_userpass_reply){
while(mosq->in_packet.to_process > 0){
len = net__read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process);
if(len > 0){
@ -330,7 +330,7 @@ int socks5__read(struct mosquitto *mosq)
}
if(mosq->in_packet.payload[1] == 0){
packet__cleanup(&mosq->in_packet);
mosq->state = mosq_cs_socks5_auth_ok;
mosquitto__set_state(mosq, mosq_cs_socks5_auth_ok);
return socks5__send(mosq);
}else{
i = mosq->in_packet.payload[1];
@ -355,7 +355,7 @@ int socks5__read(struct mosquitto *mosq)
}
return MOSQ_ERR_PROXY;
}
}else if(mosq->state == mosq_cs_socks5_request){
}else if(state == mosq_cs_socks5_request){
while(mosq->in_packet.to_process > 0){
len = net__read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process);
if(len > 0){
@ -423,7 +423,7 @@ int socks5__read(struct mosquitto *mosq)
if(mosq->in_packet.payload[1] == 0){
/* Auth passed */
packet__cleanup(&mosq->in_packet);
mosq->state = mosq_cs_new;
mosquitto__set_state(mosq, mosq_cs_new);
if(mosq->socks5_host){
int rc = net__socket_connect_step3(mosq, mosq->host);
if(rc) return rc;
@ -432,7 +432,7 @@ int socks5__read(struct mosquitto *mosq)
}else{
i = mosq->in_packet.payload[1];
packet__cleanup(&mosq->in_packet);
mosq->state = mosq_cs_socks5_new;
mosquitto__set_state(mosq, mosq_cs_socks5_new);
switch(i){
case SOCKS_REPLY_CONNECTION_NOT_ALLOWED:
return MOSQ_ERR_AUTH;

View File

@ -28,6 +28,7 @@ Contributors:
#include "memory_mosq.h"
#include "mosquitto_internal.h"
#include "mosquitto.h"
#include "util_mosq.h"
#ifdef WITH_SRV
static void srv_callback(void *arg, int status, int timeouts, unsigned char *abuf, int alen)
@ -91,9 +92,7 @@ int mosquitto_connect_srv(struct mosquitto *mosq, const char *host, int keepaliv
mosquitto__free(h);
}
pthread_mutex_lock(&mosq->state_mutex);
mosq->state = mosq_cs_connect_srv;
pthread_mutex_unlock(&mosq->state_mutex);
mosquitto__set_state(mosq, mosq_cs_connect_srv);
mosq->keepalive = keepalive;

View File

@ -22,6 +22,7 @@ Contributors:
#include "mosquitto_internal.h"
#include "net_mosq.h"
#include "util_mosq.h"
void *mosquitto__thread_main(void *obj);
@ -89,9 +90,7 @@ void *mosquitto__thread_main(void *obj)
if(!mosq) return NULL;
do{
pthread_mutex_lock(&mosq->state_mutex);
state = mosq->state;
pthread_mutex_unlock(&mosq->state_mutex);
state = mosquitto__get_state(mosq);
if(state == mosq_cs_new){
#ifdef WIN32
Sleep(10);
@ -103,10 +102,6 @@ void *mosquitto__thread_main(void *obj)
}
}while(1);
if(state == mosq_cs_connect_async){
mosquitto_reconnect(mosq);
}
if(!mosq->keepalive){
/* Sleep for a day if keepalive disabled. */
mosquitto_loop_forever(mosq, 1000*86400, 1);

View File

@ -89,11 +89,8 @@ int mosquitto__check_keepalive(struct mosquitto *mosq)
if(mosq->keepalive && mosq->sock != INVALID_SOCKET &&
(now >= next_msg_out || now - last_msg_in >= mosq->keepalive)){
pthread_mutex_lock(&mosq->state_mutex);
state = mosq->state;
pthread_mutex_unlock(&mosq->state_mutex);
if(state == mosq_cs_connected && mosq->ping_t == 0){
state = mosquitto__get_state(mosq);
if(state == mosq_cs_active && mosq->ping_t == 0){
send__pingreq(mosq);
/* Reset last msg times to give the server time to send a pingresp */
pthread_mutex_lock(&mosq->msgtime_mutex);
@ -105,13 +102,12 @@ int mosquitto__check_keepalive(struct mosquitto *mosq)
net__socket_close(db, mosq);
#else
net__socket_close(mosq);
pthread_mutex_lock(&mosq->state_mutex);
if(mosq->state == mosq_cs_disconnecting){
state = mosquitto__get_state(mosq);
if(state == mosq_cs_disconnecting){
rc = MOSQ_ERR_SUCCESS;
}else{
rc = MOSQ_ERR_KEEPALIVE;
}
pthread_mutex_unlock(&mosq->state_mutex);
pthread_mutex_lock(&mosq->callback_mutex);
if(mosq->on_disconnect){
mosq->in_callback = true;
@ -357,3 +353,30 @@ int util__random_bytes(void *bytes, int count)
#endif
return rc;
}
int mosquitto__set_state(struct mosquitto *mosq, enum mosquitto_client_state state)
{
pthread_mutex_lock(&mosq->state_mutex);
#ifdef WITH_BROKER
if(mosq->state != mosq_cs_disused)
#endif
{
mosq->state = state;
}
pthread_mutex_unlock(&mosq->state_mutex);
return MOSQ_ERR_SUCCESS;
}
enum mosquitto_client_state mosquitto__get_state(struct mosquitto *mosq)
{
enum mosquitto_client_state state;
pthread_mutex_lock(&mosq->state_mutex);
state = mosq->state;
pthread_mutex_unlock(&mosq->state_mutex);
return state;
}

View File

@ -33,6 +33,9 @@ int mosquitto__check_keepalive(struct mosquitto *mosq);
uint16_t mosquitto__mid_generate(struct mosquitto *mosq);
FILE *mosquitto__fopen(const char *path, const char *mode, bool restrict_read);
int mosquitto__set_state(struct mosquitto *mosq, enum mosquitto_client_state state);
enum mosquitto_client_state mosquitto__get_state(struct mosquitto *mosq);
#ifdef WITH_TLS
int mosquitto__hex2bin_sha1(const char *hex, unsigned char **bin);
int mosquitto__hex2bin(const char *hex, unsigned char *bin, int bin_max_len);

View File

@ -254,9 +254,15 @@
<term><option>-i</option></term>
<term><option>--id</option></term>
<listitem>
<para>The id to use for this client. If not given, defaults
to mosquitto_pub_ appended with the process id of the
client. Cannot be used at the same time as the
<para>The id to use for this client. If not given, a client id will
be generated depending on the MQTT version being used. For v3.1.1/v3.1,
the client generates a client id in the format
<option>mosq-XXXXXXXXXXXXXXXXXX</option>, where the
<option>X</option> are replaced with random alphanumeric
characters. For v5.0, the client sends a zero length client id, and the
server will generate a client id for the client.</para>
<para>This option cannot be used at the same time as the
<option>--id-prefix</option> argument.</para>
</listitem>
</varlistentry>

View File

@ -272,9 +272,15 @@
<term><option>-i</option></term>
<term><option>--id</option></term>
<listitem>
<para>The id to use for this client. If not given, defaults
to mosquitto_rr_ appended with the process id of the
client. Cannot be used at the same time as the
<para>The id to use for this client. If not given, a client id will
be generated depending on the MQTT version being used. For v3.1.1/v3.1,
the client generates a client id in the format
<option>mosq-XXXXXXXXXXXXXXXXXX</option>, where the
<option>X</option> are replaced with random alphanumeric
characters. For v5.0, the client sends a zero length client id, and the
server will generate a client id for the client.</para>
<para>This option cannot be used at the same time as the
<option>--id-prefix</option> argument.</para>
</listitem>
</varlistentry>

View File

@ -295,9 +295,15 @@
<term><option>-i</option></term>
<term><option>--id</option></term>
<listitem>
<para>The id to use for this client. If not given, defaults
to mosquitto_sub_ appended with the process id of the
client. Cannot be used at the same time as the
<para>The id to use for this client. If not given, a client id will
be generated depending on the MQTT version being used. For v3.1.1/v3.1,
the client generates a client id in the format
<option>mosq-XXXXXXXXXXXXXXXXXX</option>, where the
<option>X</option> are replaced with random alphanumeric
characters. For v5.0, the client sends a zero length client id, and the
server will generate a client id for the client.</para>
<para>This option cannot be used at the same time as the
<option>--id-prefix</option> argument.</para>
</listitem>
</varlistentry>

View File

@ -2,7 +2,7 @@
MAJOR=1
MINOR=6
REVISION=6
REVISION=7
sed -i "s/^VERSION=.*/VERSION=${MAJOR}.${MINOR}.${REVISION}/" config.mk

View File

@ -1,5 +1,5 @@
name: mosquitto
version: 1.6.6
version: 1.6.7
summary: Eclipse Mosquitto MQTT broker
description: This is a message broker that supports version 3.1 and 3.1.1 of the MQTT
protocol.

View File

@ -127,7 +127,7 @@ int bridge__connect_step1(struct mosquitto_db *db, struct mosquitto *context)
if(!context || !context->bridge) return MOSQ_ERR_INVAL;
context__set_state(context, mosq_cs_new);
mosquitto__set_state(context, mosq_cs_new);
context->sock = INVALID_SOCKET;
context->last_msg_in = mosquitto_time();
context->next_msg_out = mosquitto_time() + context->bridge->keepalive;
@ -247,7 +247,7 @@ int bridge__connect_step2(struct mosquitto_db *db, struct mosquitto *context)
HASH_ADD(hh_sock, db->contexts_by_sock, sock, sizeof(context->sock), context);
if(rc == MOSQ_ERR_CONN_PENDING){
context__set_state(context, mosq_cs_connect_pending);
mosquitto__set_state(context, mosq_cs_connect_pending);
}
return rc;
}
@ -306,7 +306,7 @@ int bridge__connect(struct mosquitto_db *db, struct mosquitto *context)
if(!context || !context->bridge) return MOSQ_ERR_INVAL;
context__set_state(context, mosq_cs_new);
mosquitto__set_state(context, mosq_cs_new);
context->sock = INVALID_SOCKET;
context->last_msg_in = mosquitto_time();
context->next_msg_out = mosquitto_time() + context->bridge->keepalive;
@ -400,7 +400,7 @@ int bridge__connect(struct mosquitto_db *db, struct mosquitto *context)
return rc;
}else if(rc == MOSQ_ERR_CONN_PENDING){
context__set_state(context, mosq_cs_connect_pending);
mosquitto__set_state(context, mosq_cs_connect_pending);
}
HASH_ADD(hh_sock, db->contexts_by_sock, sock, sizeof(context->sock), context);

View File

@ -25,6 +25,7 @@ Contributors:
#include "packet_mosq.h"
#include "property_mosq.h"
#include "time_mosq.h"
#include "util_mosq.h"
#include "will_mosq.h"
#include "uthash.h"
@ -38,7 +39,7 @@ struct mosquitto *context__init(struct mosquitto_db *db, mosq_sock_t sock)
if(!context) return NULL;
context->pollfd_index = -1;
context__set_state(context, mosq_cs_new);
mosquitto__set_state(context, mosq_cs_new);
context->sock = sock;
context->last_msg_in = mosquitto_time();
context->next_msg_out = mosquitto_time() + 60;
@ -243,14 +244,14 @@ void context__disconnect(struct mosquitto_db *db, struct mosquitto *context)
}else{
session_expiry__add(db, context);
}
context__set_state(context, mosq_cs_disconnected);
mosquitto__set_state(context, mosq_cs_disconnected);
}
void context__add_to_disused(struct mosquitto_db *db, struct mosquitto *context)
{
if(context->state == mosq_cs_disused) return;
context__set_state(context, mosq_cs_disused);
mosquitto__set_state(context, mosq_cs_disused);
if(context->id){
context__remove_from_by_id(db, context);
@ -308,10 +309,3 @@ void context__remove_from_by_id(struct mosquitto_db *db, struct mosquitto *conte
}
}
void context__set_state(struct mosquitto *context, enum mosquitto_client_state state)
{
if(context->state != mosq_cs_disused){
context->state = state;
}
}

View File

@ -964,11 +964,11 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
uint32_t expiry_interval;
if(!context || context->sock == INVALID_SOCKET
|| (context->state == mosq_cs_connected && !context->id)){
|| (context->state == mosq_cs_active && !context->id)){
return MOSQ_ERR_INVAL;
}
if(context->state != mosq_cs_connected){
if(context->state != mosq_cs_active){
return MOSQ_ERR_SUCCESS;
}

View File

@ -25,6 +25,7 @@ Contributors:
#include "packet_mosq.h"
#include "property_mosq.h"
#include "send_mosq.h"
#include "util_mosq.h"
#include "will_mosq.h"
@ -54,7 +55,7 @@ int handle__auth(struct mosquitto_db *db, struct mosquitto *context)
return MOSQ_ERR_PROTOCOL;
}
if((reason_code == MQTT_RC_REAUTHENTICATE && context->state != mosq_cs_connected)
if((reason_code == MQTT_RC_REAUTHENTICATE && context->state != mosq_cs_active)
|| (reason_code == MQTT_RC_CONTINUE_AUTHENTICATION
&& context->state != mosq_cs_authenticating && context->state != mosq_cs_reauthenticating)){
@ -94,11 +95,11 @@ int handle__auth(struct mosquitto_db *db, struct mosquitto *context)
if(reason_code == MQTT_RC_REAUTHENTICATE){
/* This is a re-authentication attempt */
context__set_state(context, mosq_cs_reauthenticating);
mosquitto__set_state(context, mosq_cs_reauthenticating);
rc = mosquitto_security_auth_start(db, context, true, auth_data, auth_data_len, &auth_data_out, &auth_data_out_len);
}else{
if(context->state != mosq_cs_reauthenticating){
context__set_state(context, mosq_cs_authenticating);
mosquitto__set_state(context, mosq_cs_authenticating);
}
rc = mosquitto_security_auth_continue(db, context, auth_data, auth_data_len, &auth_data_out, &auth_data_out_len);
}
@ -107,7 +108,7 @@ int handle__auth(struct mosquitto_db *db, struct mosquitto *context)
if(context->state == mosq_cs_authenticating){
return connect__on_authorised(db, context, auth_data_out, auth_data_out_len);
}else{
context__set_state(context, mosq_cs_connected);
mosquitto__set_state(context, mosq_cs_active);
rc = send__auth(db, context, MQTT_RC_SUCCESS, auth_data_out, auth_data_out_len);
free(auth_data_out);
return rc;

View File

@ -108,7 +108,7 @@ int handle__connack(struct mosquitto_db *db, struct mosquitto *context)
}
}
}
context__set_state(context, mosq_cs_connected);
mosquitto__set_state(context, mosq_cs_active);
return MOSQ_ERR_SUCCESS;
case CONNACK_REFUSED_PROTOCOL_VERSION:
if(context->bridge){

View File

@ -166,7 +166,7 @@ int connect__on_authorised(struct mosquitto_db *db, struct mosquitto *context, v
found_context->clean_start = true;
found_context->session_expiry_interval = 0;
context__set_state(found_context, mosq_cs_duplicate);
mosquitto__set_state(found_context, mosq_cs_duplicate);
do_disconnect(db, found_context, MOSQ_ERR_SUCCESS);
}
@ -266,7 +266,7 @@ int connect__on_authorised(struct mosquitto_db *db, struct mosquitto *context, v
}
free(auth_data_out);
context__set_state(context, mosq_cs_connected);
mosquitto__set_state(context, mosq_cs_active);
rc = send__connack(db, context, connect_ack, CONNACK_ACCEPTED, connack_props);
mosquitto_property_free_all(&connack_props);
return rc;
@ -835,7 +835,7 @@ int handle__connect(struct mosquitto_db *db, struct mosquitto *context)
if(rc == MOSQ_ERR_SUCCESS){
return connect__on_authorised(db, context, auth_data_out, auth_data_out_len);
}else if(rc == MOSQ_ERR_AUTH_CONTINUE){
context__set_state(context, mosq_cs_authenticating);
mosquitto__set_state(context, mosq_cs_authenticating);
rc = send__auth(db, context, MQTT_RC_CONTINUE_AUTHENTICATION, auth_data_out, auth_data_out_len);
free(auth_data_out);
return rc;

View File

@ -21,6 +21,7 @@ Contributors:
#include "packet_mosq.h"
#include "property_mosq.h"
#include "send_mosq.h"
#include "util_mosq.h"
#include "will_mosq.h"
@ -65,10 +66,10 @@ int handle__disconnect(struct mosquitto_db *db, struct mosquitto *context)
}
}
if(reason_code == MQTT_RC_DISCONNECT_WITH_WILL_MSG){
context__set_state(context, mosq_cs_disconnect_with_will);
mosquitto__set_state(context, mosq_cs_disconnect_with_will);
}else{
will__clear(context);
context__set_state(context, mosq_cs_disconnecting);
mosquitto__set_state(context, mosq_cs_disconnecting);
}
do_disconnect(db, context, MOSQ_ERR_SUCCESS);
return MOSQ_ERR_SUCCESS;

View File

@ -61,7 +61,7 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
bool match;
#endif
if(context->state != mosq_cs_connected){
if(context->state != mosq_cs_active){
return MOSQ_ERR_PROTOCOL;
}

View File

@ -46,7 +46,7 @@ int handle__subscribe(struct mosquitto_db *db, struct mosquitto *context)
if(!context) return MOSQ_ERR_INVAL;
if(context->state != mosq_cs_connected){
if(context->state != mosq_cs_active){
return MOSQ_ERR_PROTOCOL;
}

View File

@ -39,7 +39,7 @@ int handle__unsubscribe(struct mosquitto_db *db, struct mosquitto *context)
if(!context) return MOSQ_ERR_INVAL;
if(context->state != mosq_cs_connected){
if(context->state != mosq_cs_active){
return MOSQ_ERR_PROTOCOL;
}
log__printf(NULL, MOSQ_LOG_DEBUG, "Received UNSUBSCRIBE from %s", context->id);

View File

@ -101,6 +101,14 @@ static void temp__expire_websockets_clients(struct mosquitto_db *db)
}
#endif
#if defined(WITH_WEBSOCKETS) && LWS_LIBRARY_VERSION_NUMBER == 3002000
void lws__sul_callback(struct lws_sorted_usec_list *l)
{
}
static struct lws_sorted_usec_list sul;
#endif
int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int listensock_count)
{
#ifdef WITH_SYS_TREE
@ -133,6 +141,11 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
time_t expiration_check_time = 0;
char *id;
#if defined(WITH_WEBSOCKETS) && LWS_LIBRARY_VERSION_NUMBER == 3002000
memset(&sul, 0, sizeof(struct lws_sorted_usec_list));
#endif
#ifndef WIN32
sigemptyset(&sigblock);
sigaddset(&sigblock, SIGINT);
@ -476,7 +489,7 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
log__printf(NULL, MOSQ_LOG_NOTICE, "Expiring persistent client %s due to timeout.", id);
G_CLIENTS_EXPIRED_INC();
context->session_expiry_interval = 0;
context__set_state(context, mosq_cs_expiring);
mosquitto__set_state(context, mosq_cs_expiring);
do_disconnect(db, context, MOSQ_ERR_SUCCESS);
}
}
@ -603,6 +616,9 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
if(db->config->listeners[i].ws_context){
#if LWS_LIBRARY_VERSION_NUMBER > 3002000
libwebsocket_service(db->config->listeners[i].ws_context, -1);
#elif LWS_LIBRARY_VERSION_NUMBER == 3002000
lws_sul_schedule(db->config->listeners[i].ws_context, 0, &sul, lws__sul_callback, 10);
libwebsocket_service(db->config->listeners[i].ws_context, 0);
#else
libwebsocket_service(db->config->listeners[i].ws_context, 0);
#endif
@ -644,7 +660,7 @@ void do_disconnect(struct mosquitto_db *db, struct mosquitto *context, int reaso
}
if(context->state != mosq_cs_disconnecting && context->state != mosq_cs_disconnect_with_will){
context__set_state(context, mosq_cs_disconnect_ws);
mosquitto__set_state(context, mosq_cs_disconnect_ws);
}
if(context->wsi){
libwebsocket_callback_on_writable(context->ws_context, context->wsi);
@ -784,7 +800,7 @@ static void loop_handle_reads_writes(struct mosquitto_db *db, struct pollfd *pol
len = sizeof(int);
if(!getsockopt(context->sock, SOL_SOCKET, SO_ERROR, (char *)&err, &len)){
if(err == 0){
context__set_state(context, mosq_cs_new);
mosquitto__set_state(context, mosq_cs_new);
#if defined(WITH_ADNS) && defined(WITH_BRIDGE)
if(context->bridge){
bridge__connect_step3(db, context);

View File

@ -37,9 +37,6 @@ Contributors:
# define libwebsocket_protocols lws_protocols
# define libwebsocket_callback_reasons lws_callback_reasons
# define libwebsocket lws
# if LWS_LIBRARY_VERSION_NUMBER == 3002000
# error "libwebsockets 3.2.0 is not compatible with Mosquitto. <3.1.0, or >=3.2.1 will work fine"
# endif
# else
# define lws_pollfd pollfd
# define lws_service_fd(A, B) libwebsocket_service_fd((A), (B))
@ -656,7 +653,6 @@ void context__add_to_disused(struct mosquitto_db *db, struct mosquitto *context)
void context__free_disused(struct mosquitto_db *db);
void context__send_will(struct mosquitto_db *db, struct mosquitto *context);
void context__remove_from_by_id(struct mosquitto_db *db, struct mosquitto *context);
void context__set_state(struct mosquitto *context, enum mosquitto_client_state state);
int connect__on_authorised(struct mosquitto_db *db, struct mosquitto *context, void *auth_data_out, uint16_t auth_data_out_len);

View File

@ -948,7 +948,7 @@ static void security__disconnect_auth(struct mosquitto_db *db, struct mosquitto
if(context->protocol == mosq_p_mqtt5){
send__disconnect(context, MQTT_RC_ADMINISTRATIVE_ACTION, NULL);
}
context__set_state(context, mosq_cs_disconnecting);
mosquitto__set_state(context, mosq_cs_disconnecting);
do_disconnect(db, context, MOSQ_ERR_AUTH);
}
#endif
@ -1005,20 +1005,20 @@ int mosquitto_security_apply_default(struct mosquitto_db *db)
}
if(!allow_anonymous && !context->username){
context__set_state(context, mosq_cs_disconnecting);
mosquitto__set_state(context, mosq_cs_disconnecting);
do_disconnect(db, context, MOSQ_ERR_AUTH);
continue;
}
/* Check for connected clients that are no longer authorised */
#ifdef WITH_TLS
if(context->listener->ssl_ctx && (context->listener->use_identity_as_username || context->listener->use_subject_as_username)){
if(context->listener && context->listener->ssl_ctx && (context->listener->use_identity_as_username || context->listener->use_subject_as_username)){
/* Client must have either a valid certificate, or valid PSK used as a username. */
if(!context->ssl){
if(context->protocol == mosq_p_mqtt5){
send__disconnect(context, MQTT_RC_ADMINISTRATIVE_ACTION, NULL);
}
context__set_state(context, mosq_cs_disconnecting);
mosquitto__set_state(context, mosq_cs_disconnecting);
do_disconnect(db, context, MOSQ_ERR_AUTH);
continue;
}
@ -1118,7 +1118,7 @@ int mosquitto_security_apply_default(struct mosquitto_db *db)
{
/* Username/password check only if the identity/subject check not used */
if(mosquitto_unpwd_check(db, context, context->username, context->password) != MOSQ_ERR_SUCCESS){
context__set_state(context, mosq_cs_disconnecting);
mosquitto__set_state(context, mosq_cs_disconnecting);
do_disconnect(db, context, MOSQ_ERR_AUTH);
continue;
}
@ -1130,8 +1130,8 @@ int mosquitto_security_apply_default(struct mosquitto_db *db)
if(context->listener){
security_opts = &context->listener->security_options;
}else{
if(context->state != mosq_cs_connected){
context__set_state(context, mosq_cs_disconnecting);
if(context->state != mosq_cs_active){
mosquitto__set_state(context, mosq_cs_disconnecting);
do_disconnect(db, context, MOSQ_ERR_AUTH);
continue;
}

View File

@ -38,6 +38,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "memory_mosq.h"
#include "packet_mosq.h"
#include "sys_tree.h"
#include "util_mosq.h"
#include <stdlib.h>
#include <errno.h>
@ -424,7 +425,7 @@ static int callback_mqtt(struct libwebsocket_context *context,
if(rc && (mosq->out_packet || mosq->current_out_packet)) {
if(mosq->state != mosq_cs_disconnecting){
context__set_state(mosq, mosq_cs_disconnect_ws);
mosquitto__set_state(mosq, mosq_cs_disconnect_ws);
}
libwebsocket_callback_on_writable(mosq->ws_context, mosq->wsi);
} else if (rc) {

View File

@ -32,6 +32,8 @@ c : test-compile
./01-will-unpwd-set.py $@/01-will-unpwd-set.test
./02-subscribe-qos0.py $@/02-subscribe-qos0.test
./02-subscribe-qos1.py $@/02-subscribe-qos1.test
./02-subscribe-qos1.py $@/02-subscribe-qos1-async1.test
./02-subscribe-qos1.py $@/02-subscribe-qos1-async2.test
./02-subscribe-qos2.py $@/02-subscribe-qos2.test
./02-unsubscribe-multiple-v5.py $@/02-unsubscribe-multiple-v5.test
./02-unsubscribe-v5.py $@/02-unsubscribe-v5.test

View File

@ -0,0 +1,78 @@
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <mosquitto.h>
/* mosquitto_connect_async() test, with mosquitto_loop_start() called before mosquitto_connect_async(). */
static int run = -1;
static bool should_run = true;
void on_connect(struct mosquitto *mosq, void *obj, int rc)
{
if(rc){
exit(1);
}else{
mosquitto_subscribe(mosq, NULL, "qos1/test", 1);
}
}
void on_disconnect(struct mosquitto *mosq, void *obj, int rc)
{
run = rc;
}
void on_subscribe(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos)
{
//mosquitto_disconnect(mosq);
should_run = false;
}
int main(int argc, char *argv[])
{
int rc;
struct mosquitto *mosq;
int port = atoi(argv[1]);
mosquitto_lib_init();
mosq = mosquitto_new("subscribe-qos1-test", true, NULL);
mosquitto_connect_callback_set(mosq, on_connect);
mosquitto_disconnect_callback_set(mosq, on_disconnect);
mosquitto_subscribe_callback_set(mosq, on_subscribe);
printf("ok, about to call connect_async\n");
// this only works if loop_start is first. with loop_start second,
// it fails on both 1.6.4 _and_ 1.6.5
// in this order, 1.6.4 works and 1.6.5 fails.
rc = mosquitto_loop_start(mosq);
printf("loop_start returned rc: %d\n", rc);
if (rc) {
printf("which is: %s\n", mosquitto_strerror(rc));
}
// not sure which rc you want to be returned....
rc = mosquitto_connect_async(mosq, "localhost", port, 60);
printf("connect async returned rc: %d\n", rc);
if (rc) {
printf("which is: %s\n", mosquitto_strerror(rc));
}
printf("ok, so we can start just waiting now, loop_start will run in it's thread\n");
/* 10 millis to be system polite */
//struct timespec tv = { 0, 10e6 };
struct timespec tv = { 1, 0 };
while(should_run){
nanosleep(&tv, NULL);
printf("...waiting...\n");
}
printf("Already exited should_run....\n");
mosquitto_disconnect(mosq);
mosquitto_loop_stop(mosq, false);
mosquitto_lib_cleanup();
return run;
}

View File

@ -0,0 +1,74 @@
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <mosquitto.h>
/* mosquitto_connect_async() test, with mosquitto_loop_start() called after mosquitto_connect_async(). */
static int run = -1;
static bool should_run = true;
void on_connect(struct mosquitto *mosq, void *obj, int rc)
{
if(rc){
exit(1);
}else{
mosquitto_subscribe(mosq, NULL, "qos1/test", 1);
}
}
void on_disconnect(struct mosquitto *mosq, void *obj, int rc)
{
run = rc;
}
void on_subscribe(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos)
{
//mosquitto_disconnect(mosq);
should_run = false;
}
int main(int argc, char *argv[])
{
int rc;
struct mosquitto *mosq;
int port = atoi(argv[1]);
mosquitto_lib_init();
mosq = mosquitto_new("subscribe-qos1-test", true, NULL);
mosquitto_connect_callback_set(mosq, on_connect);
mosquitto_disconnect_callback_set(mosq, on_disconnect);
mosquitto_subscribe_callback_set(mosq, on_subscribe);
printf("ok, about to call connect_async\n");
rc = mosquitto_connect_async(mosq, "localhost", port, 60);
printf("connect async returned rc: %d\n", rc);
if (rc) {
printf("which is: %s\n", mosquitto_strerror(rc));
}
rc = mosquitto_loop_start(mosq);
printf("loop_start returned rc: %d\n", rc);
if (rc) {
printf("which is: %s\n", mosquitto_strerror(rc));
}
printf("ok, so we can start just waiting now, loop_start will run in it's thread\n");
/* 10 millis to be system polite */
//struct timespec tv = { 0, 10e6 };
struct timespec tv = { 1, 0 };
while(should_run){
nanosleep(&tv, NULL);
printf("...waiting...\n");
}
printf("Already exited should_run....\n");
mosquitto_disconnect(mosq);
mosquitto_loop_stop(mosq, false);
mosquitto_lib_cleanup();
return run;
}

View File

@ -13,6 +13,8 @@ SRC = \
01-server-keepalive-pingreq.c \
02-subscribe-qos0.c \
02-subscribe-qos1.c \
02-subscribe-qos1-async1.c \
02-subscribe-qos1-async2.c \
02-subscribe-qos2.c \
02-unsubscribe.c \
02-unsubscribe-v5.c \

View File

@ -14,6 +14,8 @@ tests = [
(1, ['./02-subscribe-qos0.py', 'c/02-subscribe-qos0.test']),
(1, ['./02-subscribe-qos1.py', 'c/02-subscribe-qos1.test']),
(1, ['./02-subscribe-qos1.py', 'c/02-subscribe-qos1-async1.test']),
(1, ['./02-subscribe-qos1.py', 'c/02-subscribe-qos1-async2.test']),
(1, ['./02-subscribe-qos2.py', 'c/02-subscribe-qos2.test']),
(1, ['./02-unsubscribe-multiple-v5.py', 'c/02-unsubscribe-multiple-v5.test']),
(1, ['./02-unsubscribe-v5.py', 'c/02-unsubscribe-v5.test']),

View File

@ -1,7 +1,7 @@
<!--
.. title: Download
.. slug: download
.. date: 2019-09-17 16:12:00 UTC+1
.. date: 2019-09-25 11:28:00 UTC+1
.. tags: tag
.. category: category
.. link: link
@ -11,7 +11,7 @@
# Source
* [mosquitto-1.6.6.tar.gz](https://mosquitto.org/files/source/mosquitto-1.6.6.tar.gz) (319kB) ([GPG signature](https://mosquitto.org/files/source/mosquitto-1.6.6.tar.gz.asc))
* [mosquitto-1.6.7.tar.gz](https://mosquitto.org/files/source/mosquitto-1.6.7.tar.gz) (319kB) ([GPG signature](https://mosquitto.org/files/source/mosquitto-1.6.7.tar.gz.asc))
* [Git source code repository](https://github.com/eclipse/mosquitto) (github.com)
Older downloads are available at [https://mosquitto.org/files/](../files/)
@ -24,8 +24,8 @@ distributions.
## Windows
* [mosquitto-1.6.6-install-windows-x64.exe](https://mosquitto.org/files/binary/win64/mosquitto-1.6.6-install-windows-x64.exe) (~1.4 MB) (64-bit build, Windows Vista and up, built with Visual Studio Community 2017)
* [mosquitto-1.6.6-install-windows-x32.exe](https://mosquitto.org/files/binary/win32/mosquitto-1.6.6-install-windows-x86.exe) (~1.4 MB) (32-bit build, Windows Vista and up, built with Visual Studio Community 2017)
* [mosquitto-1.6.7-install-windows-x64.exe](https://mosquitto.org/files/binary/win64/mosquitto-1.6.7-install-windows-x64.exe) (~1.4 MB) (64-bit build, Windows Vista and up, built with Visual Studio Community 2017)
* [mosquitto-1.6.7-install-windows-x32.exe](https://mosquitto.org/files/binary/win32/mosquitto-1.6.7-install-windows-x86.exe) (~1.4 MB) (32-bit build, Windows Vista and up, built with Visual Studio Community 2017)
See also readme-windows.txt after installing.

View File

@ -0,0 +1,35 @@
<!--
.. title: Version 1.6.7 released.
.. slug: version-1-6-7-released
.. date: 2019-09-25 11:27:19 UTC+01:00
.. tags: Releases
.. category:
.. link:
.. description:
.. type: text
-->
Mosquitto 1.6.7 has been released, this is a bugfix release.
# Broker
- Add workaround for working with libwebsockets 3.2.0.
- Fix potential crash when reloading config. Closes [#1424], [#1425].
# Client library
- Don't use `/` in autogenerated client ids, to avoid confusing with topics.
- Fix `mosquitto_max_inflight_messages_set()` and `mosquitto_int_option(...,
MOSQ_OPT_*_MAX, ...)` behaviour. Closes [#1417].
- Fix regression on use of `mosquitto_connect_async()` not working.
Closes [#1415] and [#1422].
# Clients
- mosquitto_sub: Fix `-E` incorrectly not working unless `-d` was also
specified. [Closes #1418].
- Updated documentation around automatic client ids.
[#1415]: https://github.com/eclipse/mosquitto/issues/1415
[#1417]: https://github.com/eclipse/mosquitto/issues/1417
[#1418]: https://github.com/eclipse/mosquitto/issues/1418
[#1422]: https://github.com/eclipse/mosquitto/issues/1422
[#1424]: https://github.com/eclipse/mosquitto/issues/1424
[#1425]: https://github.com/eclipse/mosquitto/issues/1425