Add subscription-identifier and options to persistence.

This commit is contained in:
Roger A. Light 2019-03-21 11:53:46 +00:00
parent 5691456ac7
commit 2f15a7be2b
14 changed files with 223 additions and 8 deletions

View File

@ -104,9 +104,11 @@ struct P_msg_store{
struct PF_sub{
uint32_t identifier;
uint16_t id_len;
uint16_t topic_len;
uint8_t qos;
uint8_t options;
};
struct P_sub{
struct PF_sub F;

View File

@ -39,7 +39,7 @@ static uint32_t db_version;
const unsigned char magic[15] = {0x00, 0xB5, 0x00, 'm','o','s','q','u','i','t','t','o',' ','d','b'};
static int persist__restore_sub(struct mosquitto_db *db, const char *client_id, const char *sub, int qos);
static int persist__restore_sub(struct mosquitto_db *db, const char *client_id, const char *sub, int qos, uint32_t identifier, int options);
static struct mosquitto *persist__find_or_add_context(struct mosquitto_db *db, const char *client_id, uint16_t last_mid)
{
@ -113,7 +113,7 @@ static int persist__client_msg_restore(struct mosquitto_db *db, const char *clie
struct mosquitto_msg_store_load *load;
struct mosquitto *context;
cmsg = mosquitto__malloc(sizeof(struct mosquitto_client_msg));
cmsg = mosquitto__calloc(1, sizeof(struct mosquitto_client_msg));
if(!cmsg){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
@ -249,7 +249,7 @@ static int persist__msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fp
}
}
}
load = mosquitto__malloc(sizeof(struct mosquitto_msg_store_load));
load = mosquitto__calloc(1, sizeof(struct mosquitto_msg_store_load));
if(!load){
fclose(db_fptr);
mosquitto__free(chunk.source.id);
@ -327,7 +327,7 @@ static int persist__sub_chunk_restore(struct mosquitto_db *db, FILE *db_fptr)
return rc;
}
rc = persist__restore_sub(db, chunk.client_id, chunk.topic, chunk.F.qos);
rc = persist__restore_sub(db, chunk.client_id, chunk.topic, chunk.F.qos, chunk.F.identifier, chunk.F.options);
mosquitto__free(chunk.client_id);
mosquitto__free(chunk.topic);
@ -466,7 +466,7 @@ error:
return 1;
}
static int persist__restore_sub(struct mosquitto_db *db, const char *client_id, const char *sub, int qos)
static int persist__restore_sub(struct mosquitto_db *db, const char *client_id, const char *sub, int qos, uint32_t identifier, int options)
{
struct mosquitto *context;
@ -476,8 +476,7 @@ static int persist__restore_sub(struct mosquitto_db *db, const char *client_id,
context = persist__find_or_add_context(db, client_id, 0);
if(!context) return 1;
/* FIXME - identifer, options need saving */
return sub__add(db, context, sub, qos, 0, 0, &db->subs);
return sub__add(db, context, sub, qos, identifier, options, &db->subs);
}
#endif

View File

@ -170,6 +170,7 @@ int persist__chunk_sub_read_v5(FILE *db_fptr, struct P_sub *chunk)
int rc;
read_e(db_fptr, &chunk->F, sizeof(struct PF_sub));
chunk->F.identifier = ntohl(chunk->F.identifier);
chunk->F.id_len = ntohs(chunk->F.id_len);
chunk->F.topic_len = ntohs(chunk->F.topic_len);

View File

@ -213,9 +213,11 @@ static int persist__subs_retain_save(struct mosquitto_db *db, FILE *db_fptr, str
sub = node->subs;
while(sub){
if(sub->context->clean_start == false && sub->context->id){
sub_chunk.F.identifier = sub->identifier;
sub_chunk.F.id_len = strlen(sub->context->id);
sub_chunk.F.topic_len = strlen(thistopic);
sub_chunk.F.qos = (uint8_t)sub->qos;
sub_chunk.F.options = sub->no_local<<2 | sub->retain_as_published<<3;
sub_chunk.client_id = sub->context->id;
sub_chunk.topic = thistopic;

View File

@ -160,6 +160,7 @@ int persist__chunk_sub_write_v5(FILE *db_fptr, struct P_sub *chunk)
int id_len = chunk->F.id_len;
int topic_len = chunk->F.topic_len;
chunk->F.identifier = htonl(chunk->F.identifier);
chunk->F.id_len = htons(chunk->F.id_len);
chunk->F.topic_len = htons(chunk->F.topic_len);

View File

@ -0,0 +1,96 @@
#!/usr/bin/env python
# Test whether a client subscribed to a topic receives its own message sent to that topic.
# And whether the no-local option is persisted.
from mosq_test_helper import *
def write_config(filename, port):
with open(filename, 'w') as f:
f.write("port %d\n" % (port))
f.write("persistence true\n")
f.write("persistence_file mosquitto-%d.db\n" % (port))
port = mosq_test.get_port()
conf_file = os.path.basename(__file__).replace('.py', '.conf')
write_config(conf_file, port)
rc = 1
keepalive = 60
props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_SESSION_EXPIRY_INTERVAL, 100)
connect_packet = mosq_test.gen_connect(
"persistent-subscription-test", keepalive=keepalive, clean_session=False, proto_ver=5, properties=props
)
connack_packet = mosq_test.gen_connack(rc=0, proto_ver=5)
connack_packet2 = mosq_test.gen_connack(rc=0, flags=1, proto_ver=5) # session present
mid = 1
subscribe1_packet = mosq_test.gen_subscribe(mid, "subpub/nolocal", 5, proto_ver=5)
suback1_packet = mosq_test.gen_suback(mid, 1, proto_ver=5)
mid = 2
subscribe2_packet = mosq_test.gen_subscribe(mid, "subpub/local", 1, proto_ver=5)
suback2_packet = mosq_test.gen_suback(mid, 1, proto_ver=5)
mid = 1
publish1_packet = mosq_test.gen_publish("subpub/nolocal", qos=1, mid=mid, payload="message", proto_ver=5)
puback1_packet = mosq_test.gen_puback(mid, proto_ver=5)
mid = 2
publish2s_packet = mosq_test.gen_publish("subpub/local", qos=1, mid=mid, payload="message", proto_ver=5)
puback2s_packet = mosq_test.gen_puback(mid, proto_ver=5)
mid = 1
publish2a_packet = mosq_test.gen_publish("subpub/local", qos=1, mid=mid, payload="message", proto_ver=5)
puback2a_packet = mosq_test.gen_puback(mid, proto_ver=5)
mid = 2
publish2b_packet = mosq_test.gen_publish("subpub/local", qos=1, mid=mid, payload="message", proto_ver=5)
puback2b_packet = mosq_test.gen_puback(mid, proto_ver=5)
if os.path.exists('mosquitto-%d.db' % (port)):
os.unlink('mosquitto-%d.db' % (port))
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
(stdo1, stde1) = ("", "")
try:
sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=20, port=port)
mosq_test.do_send_receive(sock, subscribe1_packet, suback1_packet, "suback1")
mosq_test.do_send_receive(sock, subscribe2_packet, suback2_packet, "suback2")
mosq_test.do_send_receive(sock, publish1_packet, puback1_packet, "puback1a")
mosq_test.do_send_receive(sock, publish2s_packet, puback2s_packet, "puback2a")
if mosq_test.expect_packet(sock, "publish2a", publish2a_packet):
sock.send(puback2a_packet)
broker.terminate()
broker.wait()
(stdo1, stde1) = broker.communicate()
sock.close()
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
sock = mosq_test.do_client_connect(connect_packet, connack_packet2, timeout=20, port=port)
mosq_test.do_send_receive(sock, publish1_packet, puback1_packet, "puback1b")
mosq_test.do_send_receive(sock, publish2s_packet, puback2s_packet, "puback2b")
if mosq_test.expect_packet(sock, "publish2b", publish2b_packet):
rc = 0
sock.close()
finally:
os.remove(conf_file)
broker.terminate()
broker.wait()
(stdo, stde) = broker.communicate()
if rc:
print(stde1 + stde)
if os.path.exists('mosquitto-%d.db' % (port)):
os.unlink('mosquitto-%d.db' % (port))
exit(rc)

View File

@ -0,0 +1,74 @@
#!/usr/bin/env python
# Test whether a client subscribed to a topic receives its own message sent to that topic.
from mosq_test_helper import *
def write_config(filename, port):
with open(filename, 'w') as f:
f.write("port %d\n" % (port))
f.write("persistence true\n")
f.write("persistence_file mosquitto-%d.db\n" % (port))
port = mosq_test.get_port()
conf_file = os.path.basename(__file__).replace('.py', '.conf')
write_config(conf_file, port)
rc = 1
mid = 530
keepalive = 60
props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_SESSION_EXPIRY_INTERVAL, 100)
connect_packet = mosq_test.gen_connect(
"persistent-subscription-test", keepalive=keepalive, clean_session=False, proto_ver=5, properties=props
)
connack_packet = mosq_test.gen_connack(rc=0, proto_ver=5)
connack_packet2 = mosq_test.gen_connack(rc=0, flags=1, proto_ver=5) # session present
subscribe_packet = mosq_test.gen_subscribe(mid, "subpub/qos1", 1, proto_ver=5)
suback_packet = mosq_test.gen_suback(mid, 1, proto_ver=5)
mid = 300
publish_packet = mosq_test.gen_publish("subpub/qos1", qos=1, mid=mid, payload="message", proto_ver=5)
puback_packet = mosq_test.gen_puback(mid, proto_ver=5)
mid = 1
publish_packet2 = mosq_test.gen_publish("subpub/qos1", qos=1, mid=mid, payload="message", proto_ver=5)
if os.path.exists('mosquitto-%d.db' % (port)):
os.unlink('mosquitto-%d.db' % (port))
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
(stdo1, stde1) = ("", "")
try:
sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=20, port=port)
mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback")
broker.terminate()
broker.wait()
(stdo1, stde1) = broker.communicate()
sock.close()
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
sock = mosq_test.do_client_connect(connect_packet, connack_packet2, timeout=20, port=port)
mosq_test.do_send_receive(sock, publish_packet, puback_packet, "puback")
if mosq_test.expect_packet(sock, "publish2", publish_packet2):
rc = 0
sock.close()
finally:
os.remove(conf_file)
broker.terminate()
broker.wait()
(stdo, stde) = broker.communicate()
if rc:
print(stde1 + stde)
if os.path.exists('mosquitto-%d.db' % (port)):
os.unlink('mosquitto-%d.db' % (port))
exit(rc)

View File

@ -18,7 +18,7 @@ rc = 1
mid = 530
keepalive = 60
connect_packet = mosq_test.gen_connect(
"persitent-subscription-test", keepalive=keepalive, clean_session=False,
"persistent-subscription-test", keepalive=keepalive, clean_session=False,
)
connack_packet = mosq_test.gen_connack(rc=0)
connack_packet2 = mosq_test.gen_connack(rc=0, flags=1) # session present

View File

@ -179,6 +179,8 @@ endif
11 :
./11-persistent-subscription.py
./11-persistent-subscription-v5.py
./11-persistent-subscription-no-local.py
12 :
./12-prop-assigned-client-identifier.py

View File

@ -147,6 +147,8 @@ tests = [
(2, './10-listener-mount-point.py'),
(1, './11-persistent-subscription.py'),
(1, './11-persistent-subscription-v5.py'),
(1, './11-persistent-subscription-no-local.py'),
(1, './12-prop-assigned-client-identifier.py'),
(1, './12-prop-maximum-packet-size-broker.py'),

View File

@ -12,6 +12,7 @@
extern uint64_t last_retained;
extern char *last_sub;
extern int last_qos;
extern uint32_t last_identifier;
struct mosquitto *context__init(struct mosquitto_db *db, mosq_sock_t sock)
{
@ -119,6 +120,7 @@ int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub
{
last_sub = strdup(sub);
last_qos = qos;
last_identifier = identifier;
return MOSQ_ERR_SUCCESS;
}

View File

@ -15,6 +15,7 @@
uint64_t last_retained;
char *last_sub = NULL;
int last_qos;
uint32_t last_identifier;
static void TEST_persistence_disabled(void)
{
@ -622,6 +623,7 @@ static void TEST_v5_sub(void)
free(last_sub);
}
CU_ASSERT_EQUAL(last_qos, 1);
CU_ASSERT_EQUAL(last_identifier, 0x7623);
}
}

View File

@ -211,6 +211,37 @@ static void TEST_v5_client_message(void)
}
static void TEST_v5_sub(void)
{
struct mosquitto_db db;
struct mosquitto__config config;
struct mosquitto__listener listener;
int rc;
memset(&db, 0, sizeof(struct mosquitto_db));
memset(&config, 0, sizeof(struct mosquitto__config));
memset(&listener, 0, sizeof(struct mosquitto__listener));
db.config = &config;
listener.port = 1883;
config.listeners = &listener;
config.listener_count = 1;
db__open(&config, &db);
config.persistence = true;
config.persistence_filepath = "files/persist_read/v5-sub.test-db";
rc = persist__restore(&db);
CU_ASSERT_EQUAL(rc, MOSQ_ERR_SUCCESS);
config.persistence_filepath = "v5-sub.db";
rc = persist__backup(&db, true);
CU_ASSERT_EQUAL(rc, MOSQ_ERR_SUCCESS);
CU_ASSERT_EQUAL(0, file_diff("files/persist_read/v5-sub.test-db", "v5-sub.db"));
//unlink("v5-sub.db");
}
#if 0
NOT WORKING
static void TEST_v5_full(void)
@ -269,6 +300,7 @@ int main(int argc, char *argv[])
|| !CU_add_test(test_suite, "v5 message store (message has no refs)", TEST_v5_message_store_no_ref)
|| !CU_add_test(test_suite, "v5 client", TEST_v5_client)
|| !CU_add_test(test_suite, "v5 client message", TEST_v5_client_message)
|| !CU_add_test(test_suite, "v5 sub", TEST_v5_sub)
//|| !CU_add_test(test_suite, "v5 full", TEST_v5_full)
){