Outgoing messages with QoS>0 are no longer retried after a timeout.

This change in behaviour can be justified by considering when the
timeout may have occurred.

* If a connection is unreliable and has dropped, but without one end
  noticing, the messages will be retried on reconnection. Sending
  additional PUBLISH or PUBREL would not have changed anything.

* If a client is overloaded/unable to respond/has a slow connection then
  sending additional PUBLISH or PUBREL would not help the client catch
  up. Once the backlog has cleared the client will respond. If it is not
  able to catch up, sending additional duplicates would not help either.
This commit is contained in:
Roger A. Light 2015-05-24 11:52:38 +01:00
parent 4195fde70b
commit cdbe62c2bb
32 changed files with 49 additions and 797 deletions

View File

@ -3,6 +3,20 @@
Broker: Broker:
- Reduce calls to malloc through the use of UHPA. - Reduce calls to malloc through the use of UHPA.
- Outgoing messages with QoS>1 are no longer retried after a timeout period.
Messages will be retried when a client reconnects. This change in behaviour
can be justified by considering when the timeout may have occurred.
* If a connection is unreliable and has dropped, but without one end
noticing, the messages will be retried on reconnection. Sending
additional PUBLISH or PUBREL would not have changed anything.
* If a client is overloaded/unable to respond/has a slow connection then
sending additional PUBLISH or PUBREL would not help the client catch
up. Once the backlog has cleared the client will respond. If it is not
able to catch up, sending additional duplicates would not help either.
Client library:
- Outgoing messages with QoS>1 are no longer retried after a timeout period.
Messages will be retried when a client reconnects.
Client: Client:
- Add -x to mosquitto_sub for printing the payload in hexadecimal format. - Add -x to mosquitto_sub for printing the payload in hexadecimal format.

View File

@ -182,8 +182,13 @@ void message__reconnect_reset(struct mosquitto *mosq)
} }
if(mosq->max_inflight_messages == 0 || mosq->inflight_messages < mosq->max_inflight_messages){ if(mosq->max_inflight_messages == 0 || mosq->inflight_messages < mosq->max_inflight_messages){
if(message->msg.qos == 1){ if(message->msg.qos == 1){
message->state = mosq_ms_wait_for_puback; message->state = mosq_ms_publish_qos1;
}else if(message->msg.qos == 2){ }else if(message->msg.qos == 2){
if(message->state == mosq_ms_wait_for_pubrec){
message->state = mosq_ms_publish_qos2;
}else if(message->state == mosq_ms_wait_for_pubcomp){
message->state = mosq_ms_resend_pubrel;
}
/* Should be able to preserve state. */ /* Should be able to preserve state. */
} }
}else{ }else{
@ -307,10 +312,9 @@ void message__retry_check_actual(struct mosquitto *mosq, struct mosquitto_messag
#endif #endif
while(messages){ while(messages){
if(messages->timestamp + mosq->message_retry < now){
switch(messages->state){ switch(messages->state){
case mosq_ms_wait_for_puback: case mosq_ms_publish_qos1:
case mosq_ms_wait_for_pubrec: case mosq_ms_publish_qos2:
messages->timestamp = now; messages->timestamp = now;
messages->dup = true; messages->dup = true;
send__publish(mosq, messages->msg.mid, messages->msg.topic, messages->msg.payloadlen, messages->msg.payload, messages->msg.qos, messages->msg.retain, messages->dup); send__publish(mosq, messages->msg.mid, messages->msg.topic, messages->msg.payloadlen, messages->msg.payload, messages->msg.qos, messages->msg.retain, messages->dup);
@ -320,6 +324,7 @@ void message__retry_check_actual(struct mosquitto *mosq, struct mosquitto_messag
messages->dup = true; messages->dup = true;
send__pubrec(mosq, messages->msg.mid); send__pubrec(mosq, messages->msg.mid);
break; break;
case mosq_ms_resend_pubrel:
case mosq_ms_wait_for_pubcomp: case mosq_ms_wait_for_pubcomp:
messages->timestamp = now; messages->timestamp = now;
messages->dup = true; messages->dup = true;
@ -328,7 +333,6 @@ void message__retry_check_actual(struct mosquitto *mosq, struct mosquitto_messag
default: default:
break; break;
} }
}
messages = messages->next; messages = messages->next;
} }
#ifdef WITH_THREADING #ifdef WITH_THREADING
@ -340,17 +344,13 @@ void message__retry_check(struct mosquitto *mosq)
{ {
#ifdef WITH_THREADING #ifdef WITH_THREADING
message__retry_check_actual(mosq, mosq->out_messages, &mosq->out_message_mutex); message__retry_check_actual(mosq, mosq->out_messages, &mosq->out_message_mutex);
message__retry_check_actual(mosq, mosq->in_messages, &mosq->in_message_mutex);
#else #else
message__retry_check_actual(mosq, mosq->out_messages); message__retry_check_actual(mosq, mosq->out_messages);
message__retry_check_actual(mosq, mosq->in_messages);
#endif #endif
} }
void mosquitto_message_retry_set(struct mosquitto *mosq, unsigned int message_retry) void mosquitto_message_retry_set(struct mosquitto *mosq, unsigned int message_retry)
{ {
assert(mosq);
if(mosq) mosq->message_retry = message_retry;
} }
int message__out_update(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_state state) int message__out_update(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_state state)

View File

@ -145,8 +145,6 @@ int mosquitto_reinitialise(struct mosquitto *mosq, const char *id, bool clean_se
mosq->sockpairR = INVALID_SOCKET; mosq->sockpairR = INVALID_SOCKET;
mosq->sockpairW = INVALID_SOCKET; mosq->sockpairW = INVALID_SOCKET;
mosq->keepalive = 60; mosq->keepalive = 60;
mosq->message_retry = 20;
mosq->last_retry_check = 0;
mosq->clean_session = clean_session; mosq->clean_session = clean_session;
if(id){ if(id){
if(strlen(id) == 0){ if(strlen(id) == 0){
@ -1060,10 +1058,6 @@ int mosquitto_loop_misc(struct mosquitto *mosq)
mosquitto__check_keepalive(mosq); mosquitto__check_keepalive(mosq);
now = mosquitto_time(); now = mosquitto_time();
if(mosq->last_retry_check+1 < now){
message__retry_check(mosq);
mosq->last_retry_check = now;
}
if(mosq->ping_t && now - mosq->ping_t >= mosq->keepalive){ if(mosq->ping_t && now - mosq->ping_t >= mosq->keepalive){
/* mosq->ping_t != 0 means we are waiting for a pingresp. /* mosq->ping_t != 0 means we are waiting for a pingresp.
* This hasn't happened in the keepalive time so we should disconnect. * This hasn't happened in the keepalive time so we should disconnect.

View File

@ -1297,13 +1297,7 @@ libmosq_EXPORT int mosquitto_max_inflight_messages_set(struct mosquitto *mosq, u
/* /*
* Function: mosquitto_message_retry_set * Function: mosquitto_message_retry_set
* *
* Set the number of seconds to wait before retrying messages. This applies to * This function now has no effect.
* publish messages with QoS>0. May be called at any time.
*
* Parameters:
* mosq - a valid mosquitto instance.
* message_retry - the number of seconds to wait for a response before
* retrying. Defaults to 20.
*/ */
libmosq_EXPORT void mosquitto_message_retry_set(struct mosquitto *mosq, unsigned int message_retry); libmosq_EXPORT void mosquitto_message_retry_set(struct mosquitto *mosq, unsigned int message_retry);

View File

@ -219,8 +219,6 @@ struct mosquitto {
# endif # endif
void *userdata; void *userdata;
bool in_callback; bool in_callback;
unsigned int message_retry;
time_t last_retry_check;
struct mosquitto_message_all *in_messages; struct mosquitto_message_all *in_messages;
struct mosquitto_message_all *in_messages_last; struct mosquitto_message_all *in_messages_last;
struct mosquitto_message_all *out_messages; struct mosquitto_message_all *out_messages;

View File

@ -19,6 +19,7 @@ Contributors:
#include "mosquitto.h" #include "mosquitto.h"
#include "logging_mosq.h" #include "logging_mosq.h"
#include "memory_mosq.h" #include "memory_mosq.h"
#include "messages_mosq.h"
#include "net_mosq.h" #include "net_mosq.h"
#include "packet_mosq.h" #include "packet_mosq.h"
#include "read_handle.h" #include "read_handle.h"
@ -47,6 +48,7 @@ int handle__connack(struct mosquitto *mosq)
if(mosq->state != mosq_cs_disconnecting){ if(mosq->state != mosq_cs_disconnecting){
mosq->state = mosq_cs_connected; mosq->state = mosq_cs_connected;
} }
message__retry_check(mosq);
return MOSQ_ERR_SUCCESS; return MOSQ_ERR_SUCCESS;
case 1: case 1:
case 2: case 2:

View File

@ -279,11 +279,6 @@
<paramdef>unsigned int <parameter>max_inflight_messages</parameter></paramdef> <paramdef>unsigned int <parameter>max_inflight_messages</parameter></paramdef>
</funcprototype></funcsynopsis> </funcprototype></funcsynopsis>
<funcsynopsis><funcprototype><funcdef>int <function>mosquitto_message_retry_set</function></funcdef>
<paramdef>struct mosquitto *<parameter>mosq</parameter></paramdef>
<paramdef>unsigned int <parameter>message_retry</parameter></paramdef>
</funcprototype></funcsynopsis>
<funcsynopsis><funcprototype><funcdef>int <function>mosquitto_reconnect_delay_set</function></funcdef> <funcsynopsis><funcprototype><funcdef>int <function>mosquitto_reconnect_delay_set</function></funcdef>
<paramdef>struct mosquitto *<parameter>mosq</parameter></paramdef> <paramdef>struct mosquitto *<parameter>mosq</parameter></paramdef>
<paramdef>unsigned int <parameter>reconnect_delay</parameter></paramdef> <paramdef>unsigned int <parameter>reconnect_delay</parameter></paramdef>

View File

@ -525,16 +525,6 @@
<para>Reloaded on reload signal.</para> <para>Reloaded on reload signal.</para>
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry>
<term><option>retry_interval</option> <replaceable>seconds</replaceable></term>
<listitem>
<para>The integer number of seconds after a QoS=1 or QoS=2
message has been sent that mosquitto will wait before
retrying when no response is received. If unset,
defaults to 20 seconds.</para>
<para>Reloaded on reload signal.</para>
</listitem>
</varlistentry>
<varlistentry> <varlistentry>
<term><option>store_clean_interval</option> <replaceable>seconds</replaceable></term> <term><option>store_clean_interval</option> <replaceable>seconds</replaceable></term>
<listitem> <listitem>

View File

@ -11,10 +11,6 @@
# General configuration # General configuration
# ================================================================= # =================================================================
# Time in seconds to wait before resending an outgoing QoS=1 or
# QoS=2 message.
#retry_interval 20
# Time in seconds between updates of the $SYS tree. # Time in seconds between updates of the $SYS tree.
# Set to 0 to disable the publishing of the $SYS tree. # Set to 0 to disable the publishing of the $SYS tree.
#sys_interval 10 #sys_interval 10

View File

@ -154,7 +154,6 @@ static void config__init_reload(struct mosquitto__config *config)
if(config->psk_file) mosquitto__free(config->psk_file); if(config->psk_file) mosquitto__free(config->psk_file);
config->psk_file = NULL; config->psk_file = NULL;
config->queue_qos0_messages = false; config->queue_qos0_messages = false;
config->retry_interval = 20;
config->sys_interval = 10; config->sys_interval = 10;
config->upgrade_outgoing_qos = false; config->upgrade_outgoing_qos = false;
if(config->auth_options){ if(config->auth_options){
@ -1613,11 +1612,7 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, const
log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Bridge support not available."); log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Bridge support not available.");
#endif #endif
}else if(!strcmp(token, "retry_interval")){ }else if(!strcmp(token, "retry_interval")){
if(conf__parse_int(&token, "retry_interval", &config->retry_interval, saveptr)) return MOSQ_ERR_INVAL; log__printf(NULL, MOSQ_LOG_WARNING, "Warning: The retry_interval option is no longer available.");
if(config->retry_interval < 1 || config->retry_interval > 3600){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid retry_interval value (%d).", config->retry_interval);
return MOSQ_ERR_INVAL;
}
}else if(!strcmp(token, "round_robin")){ }else if(!strcmp(token, "round_robin")){
#ifdef WITH_BRIDGE #ifdef WITH_BRIDGE
if(reload) continue; // FIXME if(reload) continue; // FIXME

View File

@ -675,48 +675,6 @@ int db__message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *conte
return MOSQ_ERR_SUCCESS; return MOSQ_ERR_SUCCESS;
} }
int db__message_timeout_check(struct mosquitto_db *db, unsigned int timeout)
{
time_t threshold;
enum mosquitto_msg_state new_state;
struct mosquitto *context, *ctxt_tmp;
struct mosquitto_client_msg *msg;
threshold = mosquitto_time() - timeout;
HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){
msg = context->msgs;
while(msg){
new_state = mosq_ms_invalid;
if(msg->timestamp < threshold && msg->state != mosq_ms_queued){
switch(msg->state){
case mosq_ms_wait_for_puback:
new_state = mosq_ms_publish_qos1;
break;
case mosq_ms_wait_for_pubrec:
new_state = mosq_ms_publish_qos2;
break;
case mosq_ms_wait_for_pubrel:
new_state = mosq_ms_send_pubrec;
break;
case mosq_ms_wait_for_pubcomp:
new_state = mosq_ms_resend_pubrel;
break;
default:
break;
}
if(new_state != mosq_ms_invalid){
msg->timestamp = mosquitto_time();
msg->state = new_state;
msg->dup = true;
}
}
msg = msg->next;
}
}
return MOSQ_ERR_SUCCESS;
}
int db__message_release(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir) int db__message_release(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir)
{ {

View File

@ -304,8 +304,6 @@ int mosquitto_main_loop(struct mosquitto_db *db, int *listensock, int listensock
expiration_check_time = time(NULL) + 3600; expiration_check_time = time(NULL) + 3600;
} }
db__message_timeout_check(db, db->config->retry_interval);
#ifndef WIN32 #ifndef WIN32
sigprocmask(SIG_SETMASK, &sigblock, &origsig); sigprocmask(SIG_SETMASK, &sigblock, &origsig);
fdcount = poll(pollfds, pollfd_index, 100); fdcount = poll(pollfds, pollfd_index, 100);

View File

@ -183,7 +183,6 @@ struct mosquitto__config {
char *pid_file; char *pid_file;
char *psk_file; char *psk_file;
bool queue_qos0_messages; bool queue_qos0_messages;
int retry_interval;
int sys_interval; int sys_interval;
bool upgrade_outgoing_qos; bool upgrade_outgoing_qos;
char *user; char *user;
@ -477,8 +476,6 @@ void db__msg_store_add(struct mosquitto_db *db, struct mosquitto_msg_store *stor
void db__msg_store_remove(struct mosquitto_db *db, struct mosquitto_msg_store *store); void db__msg_store_remove(struct mosquitto_db *db, struct mosquitto_msg_store *store);
void db__msg_store_deref(struct mosquitto_db *db, struct mosquitto_msg_store **store); void db__msg_store_deref(struct mosquitto_db *db, struct mosquitto_msg_store **store);
void db__msg_store_clean(struct mosquitto_db *db); void db__msg_store_clean(struct mosquitto_db *db);
/* Check all messages waiting on a client reply and resend if timeout has been exceeded. */
int db__message_timeout_check(struct mosquitto_db *db, unsigned int timeout);
int db__message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *context); int db__message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *context);
void db__vacuum(void); void db__vacuum(void);
void sys__update(struct mosquitto_db *db, int interval, time_t start_time); void sys__update(struct mosquitto_db *db, int interval, time_t start_time);

View File

@ -1,35 +0,0 @@
#!/usr/bin/env python
# Test whether a PUBLISH to a topic with QoS 2 results in the correct packet flow.
import subprocess
import socket
import time
import inspect, os, sys
# From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder
cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"..")))
if cmd_subfolder not in sys.path:
sys.path.insert(0, cmd_subfolder)
import mosq_test
rc = 1
keepalive = 60
connect_packet = mosq_test.gen_connect("test-helper", keepalive=keepalive)
connack_packet = mosq_test.gen_connack(rc=0)
mid = 128
publish_packet = mosq_test.gen_publish("qos1/timeout/test", qos=1, mid=mid, payload="timeout-message")
puback_packet = mosq_test.gen_puback(mid)
sock = mosq_test.do_client_connect(connect_packet, connack_packet, connack_error="helper connack")
sock.send(publish_packet)
if mosq_test.expect_packet(sock, "helper puback", puback_packet):
rc = 0
sock.close()
exit(rc)

View File

@ -1,3 +0,0 @@
retry_interval 10
port 1888
log_type debug

View File

@ -1,60 +0,0 @@
#!/usr/bin/env python
# Test whether a SUBSCRIBE to a topic with QoS 2 results in the correct SUBACK packet.
import subprocess
import socket
import time
from os import environ
import inspect, os, sys
# From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder
cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"..")))
if cmd_subfolder not in sys.path:
sys.path.insert(0, cmd_subfolder)
import mosq_test
rc = 1
mid = 3265
keepalive = 60
connect_packet = mosq_test.gen_connect("pub-qos1-timeout-test", keepalive=keepalive)
connack_packet = mosq_test.gen_connack(rc=0)
subscribe_packet = mosq_test.gen_subscribe(mid, "qos1/timeout/test", 1)
suback_packet = mosq_test.gen_suback(mid, 1)
mid = 1
publish_packet = mosq_test.gen_publish("qos1/timeout/test", qos=1, mid=mid, payload="timeout-message")
publish_dup_packet = mosq_test.gen_publish("qos1/timeout/test", qos=1, mid=mid, payload="timeout-message", dup=True)
puback_packet = mosq_test.gen_puback(mid)
broker = mosq_test.start_broker(filename=os.path.basename(__file__))
try:
sock = mosq_test.do_client_connect(connect_packet, connack_packet)
sock.send(subscribe_packet)
if mosq_test.expect_packet(sock, "suback", suback_packet):
pub = subprocess.Popen(['./03-publish-b2c-timeout-qos1-helper.py'])
pub.wait()
# Should have now received a publish command
if mosq_test.expect_packet(sock, "publish", publish_packet):
# Wait for longer than 5 seconds to get republish with dup set
# This is covered by the 8 second timeout
if mosq_test.expect_packet(sock, "dup publish", publish_dup_packet):
sock.send(puback_packet)
rc = 0
sock.close()
finally:
broker.terminate()
broker.wait()
if rc:
(stdo, stde) = broker.communicate()
print(stde)
exit(rc)

View File

@ -1,40 +0,0 @@
#!/usr/bin/env python
# Test whether a PUBLISH to a topic with QoS 2 results in the correct packet flow.
import subprocess
import socket
import time
import inspect, os, sys
# From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder
cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"..")))
if cmd_subfolder not in sys.path:
sys.path.insert(0, cmd_subfolder)
import mosq_test
rc = 1
keepalive = 60
connect_packet = mosq_test.gen_connect("test-helper", keepalive=keepalive)
connack_packet = mosq_test.gen_connack(rc=0)
mid = 312
publish_packet = mosq_test.gen_publish("qos2/timeout/test", qos=2, mid=mid, payload="timeout-message")
pubrec_packet = mosq_test.gen_pubrec(mid)
pubrel_packet = mosq_test.gen_pubrel(mid)
pubcomp_packet = mosq_test.gen_pubcomp(mid)
sock = mosq_test.do_client_connect(connect_packet, connack_packet, connack_error="helper connack")
sock.send(publish_packet)
if mosq_test.expect_packet(sock, "helper pubrec", pubrec_packet):
sock.send(pubrel_packet)
if mosq_test.expect_packet(sock, "helper pubcomp", pubcomp_packet):
rc = 0
sock.close()
exit(rc)

View File

@ -1,3 +0,0 @@
retry_interval 10
port 1888
log_type debug

View File

@ -1,69 +0,0 @@
#!/usr/bin/env python
# Test whether a SUBSCRIBE to a topic with QoS 2 results in the correct SUBACK packet.
import subprocess
import socket
import time
from os import environ
import inspect, os, sys
# From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder
cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"..")))
if cmd_subfolder not in sys.path:
sys.path.insert(0, cmd_subfolder)
import mosq_test
rc = 1
mid = 3265
keepalive = 60
connect_packet = mosq_test.gen_connect("pub-qo2-timeout-test", keepalive=keepalive)
connack_packet = mosq_test.gen_connack(rc=0)
subscribe_packet = mosq_test.gen_subscribe(mid, "qos2/timeout/test", 2)
suback_packet = mosq_test.gen_suback(mid, 2)
mid = 1
publish_packet = mosq_test.gen_publish("qos2/timeout/test", qos=2, mid=mid, payload="timeout-message")
publish_dup_packet = mosq_test.gen_publish("qos2/timeout/test", qos=2, mid=mid, payload="timeout-message", dup=True)
pubrec_packet = mosq_test.gen_pubrec(mid)
pubrel_packet = mosq_test.gen_pubrel(mid)
pubcomp_packet = mosq_test.gen_pubcomp(mid)
broker = mosq_test.start_broker(filename=os.path.basename(__file__))
try:
sock = mosq_test.do_client_connect(connect_packet, connack_packet)
sock.send(subscribe_packet)
if mosq_test.expect_packet(sock, "suback", suback_packet):
pub = subprocess.Popen(['./03-publish-b2c-timeout-qos2-helper.py'])
pub.wait()
# Should have now received a publish command
if mosq_test.expect_packet(sock, "publish", publish_packet):
# Wait for longer than 5 seconds to get republish with dup set
# This is covered by the 8 second timeout
if mosq_test.expect_packet(sock, "dup publish", publish_dup_packet):
sock.send(pubrec_packet)
if mosq_test.expect_packet(sock, "pubrel", pubrel_packet):
# Wait for longer than 5 seconds to get republish with dup set
# This is covered by the 8 second timeout
if mosq_test.expect_packet(sock, "dup pubrel", pubrel_packet):
sock.send(pubcomp_packet)
rc = 0
sock.close()
finally:
broker.terminate()
broker.wait()
if rc:
(stdo, stde) = broker.communicate()
print(stde)
exit(rc)

View File

@ -1,2 +0,0 @@
retry_interval 10
port 1888

View File

@ -1,54 +0,0 @@
#!/usr/bin/env python
# Test whether a PUBLISH to a topic with QoS 2 results in the correct packet
# flow. This test introduces delays into the flow in order to force the broker
# to send duplicate PUBREC and PUBCOMP messages.
import subprocess
import socket
import time
import inspect, os, sys
# From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder
cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"..")))
if cmd_subfolder not in sys.path:
sys.path.insert(0, cmd_subfolder)
import mosq_test
rc = 1
keepalive = 600
connect_packet = mosq_test.gen_connect("pub-qos2-timeout-test", keepalive=keepalive)
connack_packet = mosq_test.gen_connack(rc=0)
mid = 1926
publish_packet = mosq_test.gen_publish("pub/qos2/test", qos=2, mid=mid, payload="timeout-message")
pubrec_packet = mosq_test.gen_pubrec(mid)
pubrel_packet = mosq_test.gen_pubrel(mid)
pubcomp_packet = mosq_test.gen_pubcomp(mid)
broker = mosq_test.start_broker(filename=os.path.basename(__file__))
try:
sock = mosq_test.do_client_connect(connect_packet, connack_packet)
sock.send(publish_packet)
if mosq_test.expect_packet(sock, "pubrec", pubrec_packet):
# Timeout is 8 seconds which means the broker should repeat the PUBREC.
if mosq_test.expect_packet(sock, "pubrec", pubrec_packet):
sock.send(pubrel_packet)
if mosq_test.expect_packet(sock, "pubcomp", pubcomp_packet):
rc = 0
sock.close()
finally:
broker.terminate()
broker.wait()
if rc:
(stdo, stde) = broker.communicate()
print(stde)
exit(rc)

View File

@ -44,11 +44,8 @@ endif
03 : 03 :
./03-publish-qos1.py ./03-publish-qos1.py
./03-publish-qos2.py ./03-publish-qos2.py
./03-publish-b2c-timeout-qos1.py
./03-publish-b2c-disconnect-qos1.py ./03-publish-b2c-disconnect-qos1.py
./03-publish-c2b-timeout-qos2.py
./03-publish-c2b-disconnect-qos2.py ./03-publish-c2b-disconnect-qos2.py
./03-publish-b2c-timeout-qos2.py
./03-publish-b2c-disconnect-qos2.py ./03-publish-b2c-disconnect-qos2.py
./03-pattern-matching.py ./03-pattern-matching.py

View File

@ -67,8 +67,6 @@ try:
conn.send(connack_packet) conn.send(connack_packet)
conn.send(publish_packet) conn.send(publish_packet)
if mosq_test.expect_packet(conn, "pubrec", pubrec_packet):
# Should be repeated due to timeout
if mosq_test.expect_packet(conn, "pubrec", pubrec_packet): if mosq_test.expect_packet(conn, "pubrec", pubrec_packet):
conn.send(pubrel_packet) conn.send(pubrel_packet)

View File

@ -1,83 +0,0 @@
#!/usr/bin/env python
# Test whether a client sends a correct PUBLISH to a topic with QoS 1 and responds to a delay.
# The client should connect to port 1888 with keepalive=60, clean session set,
# and client id publish-qos1-test
# The test will send a CONNACK message to the client with rc=0. Upon receiving
# the CONNACK the client should verify that rc==0. If not, it should exit with
# return code=1.
# On a successful CONNACK, the client should send a PUBLISH message with topic
# "pub/qos1/test", payload "message" and QoS=1.
# The test will not respond to the first PUBLISH message, so the client must
# resend the PUBLISH message with dup=1. Note that to keep test durations low, a
# message retry timeout of less than 10 seconds is required for this test.
# On receiving the second PUBLISH message, the test will send the correct
# PUBACK response. On receiving the correct PUBACK response, the client should
# send a DISCONNECT message.
import inspect
import os
import subprocess
import socket
import sys
import time
# From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder
cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"..")))
if cmd_subfolder not in sys.path:
sys.path.insert(0, cmd_subfolder)
import mosq_test
rc = 1
keepalive = 60
connect_packet = mosq_test.gen_connect("publish-qos1-test", keepalive=keepalive)
connack_packet = mosq_test.gen_connack(rc=0)
disconnect_packet = mosq_test.gen_disconnect()
mid = 1
publish_packet = mosq_test.gen_publish("pub/qos1/test", qos=1, mid=mid, payload="message")
publish_packet_dup = mosq_test.gen_publish("pub/qos1/test", qos=1, mid=mid, payload="message", dup=True)
puback_packet = mosq_test.gen_puback(mid)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.settimeout(10)
sock.bind(('', 1888))
sock.listen(5)
client_args = sys.argv[1:]
env = dict(os.environ)
env['LD_LIBRARY_PATH'] = '../../lib:../../lib/cpp'
try:
pp = env['PYTHONPATH']
except KeyError:
pp = ''
env['PYTHONPATH'] = '../../lib/python:'+pp
client = mosq_test.start_client(filename=sys.argv[1].replace('/', '-'), cmd=client_args, env=env)
try:
(conn, address) = sock.accept()
conn.settimeout(10)
if mosq_test.expect_packet(conn, "connect", connect_packet):
conn.send(connack_packet)
if mosq_test.expect_packet(conn, "publish", publish_packet):
# Delay for > 3 seconds (message retry time)
if mosq_test.expect_packet(conn, "dup publish", publish_packet_dup):
conn.send(puback_packet)
if mosq_test.expect_packet(conn, "disconnect", disconnect_packet):
rc = 0
conn.close()
finally:
client.terminate()
client.wait()
sock.close()
exit(rc)

View File

@ -1,93 +0,0 @@
#!/usr/bin/env python
# Test whether a client sends a correct PUBLISH to a topic with QoS 1 and responds to a delay.
# The client should connect to port 1888 with keepalive=60, clean session set,
# and client id publish-qos2-test
# The test will send a CONNACK message to the client with rc=0. Upon receiving
# the CONNACK the client should verify that rc==0. If not, it should exit with
# return code=1.
# On a successful CONNACK, the client should send a PUBLISH message with topic
# "pub/qos2/test", payload "message" and QoS=2.
# The test will not respond to the first PUBLISH message, so the client must
# resend the PUBLISH message with dup=1. Note that to keep test durations low, a
# message retry timeout of less than 10 seconds is required for this test.
# On receiving the second PUBLISH message, the test will send the correct
# PUBREC response. On receiving the correct PUBREC response, the client should
# send a PUBREL message.
# The test will not respond to the first PUBREL message, so the client must
# resend the PUBREL message with dup=1. On receiving the second PUBREL message,
# the test will send the correct PUBCOMP response. On receiving the correct
# PUBCOMP response, the client should send a DISCONNECT message.
import inspect
import os
import subprocess
import socket
import sys
import time
# From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder
cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"..")))
if cmd_subfolder not in sys.path:
sys.path.insert(0, cmd_subfolder)
import mosq_test
rc = 1
keepalive = 60
connect_packet = mosq_test.gen_connect("publish-qos2-test", keepalive=keepalive)
connack_packet = mosq_test.gen_connack(rc=0)
disconnect_packet = mosq_test.gen_disconnect()
mid = 1
publish_packet = mosq_test.gen_publish("pub/qos2/test", qos=2, mid=mid, payload="message")
publish_dup_packet = mosq_test.gen_publish("pub/qos2/test", qos=2, mid=mid, payload="message", dup=True)
pubrec_packet = mosq_test.gen_pubrec(mid)
pubrel_packet = mosq_test.gen_pubrel(mid)
pubcomp_packet = mosq_test.gen_pubcomp(mid)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.settimeout(10)
sock.bind(('', 1888))
sock.listen(5)
client_args = sys.argv[1:]
env = dict(os.environ)
env['LD_LIBRARY_PATH'] = '../../lib:../../lib/cpp'
try:
pp = env['PYTHONPATH']
except KeyError:
pp = ''
env['PYTHONPATH'] = '../../lib/python:'+pp
client = mosq_test.start_client(filename=sys.argv[1].replace('/', '-'), cmd=client_args, env=env)
try:
(conn, address) = sock.accept()
conn.settimeout(10)
if mosq_test.expect_packet(conn, "connect", connect_packet):
conn.send(connack_packet)
if mosq_test.expect_packet(conn, "publish", publish_packet):
# Delay for > 3 seconds (message retry time)
if mosq_test.expect_packet(conn, "dup publish", publish_dup_packet):
conn.send(pubrec_packet)
if mosq_test.expect_packet(conn, "pubrel", pubrel_packet):
if mosq_test.expect_packet(conn, "dup pubrel", pubrel_packet):
conn.send(pubcomp_packet)
if mosq_test.expect_packet(conn, "disconnect", disconnect_packet):
rc = 0
conn.close()
finally:
client.terminate()
client.wait()
sock.close()
exit(rc)

View File

@ -30,10 +30,8 @@ c cpp : test-compile
./02-unsubscribe.py $@/02-unsubscribe.test ./02-unsubscribe.py $@/02-unsubscribe.test
./03-publish-qos0.py $@/03-publish-qos0.test ./03-publish-qos0.py $@/03-publish-qos0.test
./03-publish-qos0-no-payload.py $@/03-publish-qos0-no-payload.test ./03-publish-qos0-no-payload.py $@/03-publish-qos0-no-payload.test
./03-publish-c2b-qos1-timeout.py $@/03-publish-c2b-qos1-timeout.test
./03-publish-c2b-qos1-disconnect.py $@/03-publish-c2b-qos1-disconnect.test ./03-publish-c2b-qos1-disconnect.py $@/03-publish-c2b-qos1-disconnect.test
./03-publish-c2b-qos2.py $@/03-publish-c2b-qos2.test ./03-publish-c2b-qos2.py $@/03-publish-c2b-qos2.test
./03-publish-c2b-qos2-timeout.py $@/03-publish-c2b-qos2-timeout.test
./03-publish-c2b-qos2-disconnect.py $@/03-publish-c2b-qos2-disconnect.test ./03-publish-c2b-qos2-disconnect.py $@/03-publish-c2b-qos2-disconnect.test
./03-publish-b2c-qos1.py $@/03-publish-b2c-qos1.test ./03-publish-b2c-qos1.py $@/03-publish-b2c-qos1.test
./03-publish-b2c-qos2.py $@/03-publish-b2c-qos2.test ./03-publish-b2c-qos2.py $@/03-publish-b2c-qos2.test

View File

@ -1,49 +0,0 @@
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mosquitto.h>
static int run = -1;
void on_connect(struct mosquitto *mosq, void *obj, int rc)
{
if(rc){
exit(1);
}else{
mosquitto_publish(mosq, NULL, "pub/qos1/test", strlen("message"), "message", 1, false);
}
}
void on_publish(struct mosquitto *mosq, void *obj, int mid)
{
mosquitto_disconnect(mosq);
}
void on_disconnect(struct mosquitto *mosq, void *obj, int rc)
{
run = 0;
}
int main(int argc, char *argv[])
{
int rc;
struct mosquitto *mosq;
mosquitto_lib_init();
mosq = mosquitto_new("publish-qos1-test", true, NULL);
mosquitto_connect_callback_set(mosq, on_connect);
mosquitto_disconnect_callback_set(mosq, on_disconnect);
mosquitto_publish_callback_set(mosq, on_publish);
mosquitto_message_retry_set(mosq, 3);
rc = mosquitto_connect(mosq, "localhost", 1888, 60);
while(run == -1){
mosquitto_loop(mosq, 300, 1);
}
mosquitto_lib_cleanup();
return run;
}

View File

@ -1,49 +0,0 @@
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mosquitto.h>
static int run = -1;
void on_connect(struct mosquitto *mosq, void *obj, int rc)
{
if(rc){
exit(1);
}else{
mosquitto_publish(mosq, NULL, "pub/qos2/test", strlen("message"), "message", 2, false);
}
}
void on_publish(struct mosquitto *mosq, void *obj, int mid)
{
mosquitto_disconnect(mosq);
}
void on_disconnect(struct mosquitto *mosq, void *obj, int rc)
{
run = 0;
}
int main(int argc, char *argv[])
{
int rc;
struct mosquitto *mosq;
mosquitto_lib_init();
mosq = mosquitto_new("publish-qos2-test", true, NULL);
mosquitto_connect_callback_set(mosq, on_connect);
mosquitto_disconnect_callback_set(mosq, on_disconnect);
mosquitto_publish_callback_set(mosq, on_publish);
mosquitto_message_retry_set(mosq, 3);
rc = mosquitto_connect(mosq, "localhost", 1888, 60);
while(run == -1){
mosquitto_loop(mosq, 300, 1);
}
mosquitto_lib_cleanup();
return run;
}

View File

@ -41,18 +41,12 @@ all : 01 02 03 04 08 09
03-publish-qos0-no-payload.test : 03-publish-qos0-no-payload.c 03-publish-qos0-no-payload.test : 03-publish-qos0-no-payload.c
$(CC) $< -o $@ $(CFLAGS) $(LIBS) $(CC) $< -o $@ $(CFLAGS) $(LIBS)
03-publish-c2b-qos1-timeout.test : 03-publish-c2b-qos1-timeout.c
$(CC) $< -o $@ $(CFLAGS) $(LIBS)
03-publish-c2b-qos1-disconnect.test : 03-publish-c2b-qos1-disconnect.c 03-publish-c2b-qos1-disconnect.test : 03-publish-c2b-qos1-disconnect.c
$(CC) $< -o $@ $(CFLAGS) $(LIBS) $(CC) $< -o $@ $(CFLAGS) $(LIBS)
03-publish-c2b-qos2.test : 03-publish-c2b-qos2.c 03-publish-c2b-qos2.test : 03-publish-c2b-qos2.c
$(CC) $< -o $@ $(CFLAGS) $(LIBS) $(CC) $< -o $@ $(CFLAGS) $(LIBS)
03-publish-c2b-qos2-timeout.test : 03-publish-c2b-qos2-timeout.c
$(CC) $< -o $@ $(CFLAGS) $(LIBS)
03-publish-c2b-qos2-disconnect.test : 03-publish-c2b-qos2-disconnect.c 03-publish-c2b-qos2-disconnect.test : 03-publish-c2b-qos2-disconnect.c
$(CC) $< -o $@ $(CFLAGS) $(LIBS) $(CC) $< -o $@ $(CFLAGS) $(LIBS)
@ -90,7 +84,7 @@ all : 01 02 03 04 08 09
02 : 02-subscribe-qos0.test 02-subscribe-qos1.test 02-subscribe-qos2.test 02-unsubscribe.test 02 : 02-subscribe-qos0.test 02-subscribe-qos1.test 02-subscribe-qos2.test 02-unsubscribe.test
03 : 03-publish-qos0.test 03-publish-qos0-no-payload.test 03-publish-c2b-qos1-timeout.test 03-publish-c2b-qos1-disconnect.test 03-publish-c2b-qos2.test 03-publish-c2b-qos2-timeout.test 03-publish-c2b-qos2-disconnect.test 03-publish-b2c-qos1.test 03-publish-b2c-qos2.test 03 : 03-publish-qos0.test 03-publish-qos0-no-payload.test 03-publish-c2b-qos1-disconnect.test 03-publish-c2b-qos2.test 03-publish-c2b-qos2-disconnect.test 03-publish-b2c-qos1.test 03-publish-b2c-qos2.test
04 : 04-retain-qos0.test 04 : 04-retain-qos0.test

View File

@ -1,60 +0,0 @@
#include <cstdlib>
#include <cstring>
#include <mosquittopp.h>
static int run = -1;
class mosquittopp_test : public mosqpp::mosquittopp
{
public:
mosquittopp_test(const char *id);
void on_connect(int rc);
void on_disconnect(int rc);
void on_publish(int mid);
};
mosquittopp_test::mosquittopp_test(const char *id) : mosqpp::mosquittopp(id)
{
}
void mosquittopp_test::on_connect(int rc)
{
if(rc){
exit(1);
}else{
publish(NULL, "pub/qos1/test", strlen("message"), "message", 1, false);
}
}
void mosquittopp_test::on_disconnect(int rc)
{
run = 0;
}
void mosquittopp_test::on_publish(int mid)
{
disconnect();
}
int main(int argc, char *argv[])
{
struct mosquittopp_test *mosq;
mosqpp::lib_init();
mosq = new mosquittopp_test("publish-qos1-test");
mosq->message_retry_set(3);
mosq->connect("localhost", 1888, 60);
while(run == -1){
mosq->loop();
}
mosqpp::lib_cleanup();
return run;
}

View File

@ -1,60 +0,0 @@
#include <cstdlib>
#include <cstring>
#include <mosquittopp.h>
static int run = -1;
class mosquittopp_test : public mosqpp::mosquittopp
{
public:
mosquittopp_test(const char *id);
void on_connect(int rc);
void on_disconnect(int rc);
void on_publish(int mid);
};
mosquittopp_test::mosquittopp_test(const char *id) : mosqpp::mosquittopp(id)
{
}
void mosquittopp_test::on_connect(int rc)
{
if(rc){
exit(1);
}else{
publish(NULL, "pub/qos2/test", strlen("message"), "message", 2, false);
}
}
void mosquittopp_test::on_disconnect(int rc)
{
run = 0;
}
void mosquittopp_test::on_publish(int mid)
{
disconnect();
}
int main(int argc, char *argv[])
{
struct mosquittopp_test *mosq;
mosqpp::lib_init();
mosq = new mosquittopp_test("publish-qos2-test");
mosq->message_retry_set(3);
mosq->connect("localhost", 1888, 60);
while(run == -1){
mosq->loop();
}
mosqpp::lib_cleanup();
return run;
}

View File

@ -41,9 +41,6 @@ all : 01 02 03 04 08 09
03-publish-qos0-no-payload.test : 03-publish-qos0-no-payload.cpp 03-publish-qos0-no-payload.test : 03-publish-qos0-no-payload.cpp
$(CXX) $< -o $@ $(CFLAGS) $(LIBS) $(CXX) $< -o $@ $(CFLAGS) $(LIBS)
03-publish-c2b-qos1-timeout.test : 03-publish-c2b-qos1-timeout.cpp
$(CXX) $< -o $@ $(CFLAGS) $(LIBS)
03-publish-c2b-qos1-disconnect.test : 03-publish-c2b-qos1-disconnect.cpp 03-publish-c2b-qos1-disconnect.test : 03-publish-c2b-qos1-disconnect.cpp
$(CXX) $< -o $@ $(CFLAGS) $(LIBS) $(CXX) $< -o $@ $(CFLAGS) $(LIBS)
@ -53,9 +50,6 @@ all : 01 02 03 04 08 09
03-publish-c2b-qos2-disconnect.test : 03-publish-c2b-qos2-disconnect.cpp 03-publish-c2b-qos2-disconnect.test : 03-publish-c2b-qos2-disconnect.cpp
$(CXX) $< -o $@ $(CFLAGS) $(LIBS) $(CXX) $< -o $@ $(CFLAGS) $(LIBS)
03-publish-c2b-qos2-timeout.test : 03-publish-c2b-qos2-timeout.cpp
$(CXX) $< -o $@ $(CFLAGS) $(LIBS)
03-publish-b2c-qos1.test : 03-publish-b2c-qos1.cpp 03-publish-b2c-qos1.test : 03-publish-b2c-qos1.cpp
$(CXX) $< -o $@ $(CFLAGS) $(LIBS) $(CXX) $< -o $@ $(CFLAGS) $(LIBS)
@ -90,7 +84,7 @@ all : 01 02 03 04 08 09
02 : 02-subscribe-qos0.test 02-subscribe-qos1.test 02-subscribe-qos2.test 02-unsubscribe.test 02 : 02-subscribe-qos0.test 02-subscribe-qos1.test 02-subscribe-qos2.test 02-unsubscribe.test
03 : 03-publish-qos0.test 03-publish-qos0-no-payload.test 03-publish-c2b-qos1-timeout.test 03-publish-c2b-qos1-disconnect.test 03-publish-c2b-qos2.test 03-publish-c2b-qos2-timeout.test 03-publish-c2b-qos2-disconnect.test 03-publish-b2c-qos1.test 03-publish-b2c-qos2.test 03 : 03-publish-qos0.test 03-publish-qos0-no-payload.test 03-publish-c2b-qos1-disconnect.test 03-publish-c2b-qos2.test 03-publish-c2b-qos2-disconnect.test 03-publish-b2c-qos1.test 03-publish-b2c-qos2.test
04 : 04-retain-qos0.test 04 : 04-retain-qos0.test