mosquitto/test/broker/msg_sequence_test.py

224 lines
7.3 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# Test whether a valid CONNECT results in the correct CONNACK packet.
from mosq_test_helper import *
import importlib
from os import walk
import socket
import json
from collections import deque
import mosq_test
send = 1
recv = 2
disconnected_check = 3
connected_check = 4
publish = 5
class SingleMsg(object):
__slots__ = 'action', 'message', 'comment'
def __init__(self, action, message, comment=''):
self.action = action
self.message = message
self.comment = comment
class MsgSequence(object):
__slots__ = 'name', 'msgs', 'expect_disconnect'
def __init__(self, name, default_connect=True, proto_ver=4, expect_disconnect=True):
self.name = name
self.msgs = deque()
self.expect_disconnect = expect_disconnect
if default_connect:
self.add_default_connect(proto_ver=proto_ver)
def add_default_connect(self, proto_ver):
self.add_send(mosq_test.gen_connect(self.name, keepalive=60, proto_ver=proto_ver))
self.add_recv(mosq_test.gen_connack(rc=0, proto_ver=proto_ver), "default connack")
def add_send(self, message):
self._add(send, message)
def add_recv(self, message, comment):
self._add(recv, message, comment)
def add_publish(self, message, comment):
self._add(publish, message, comment)
def add_connected_check(self):
self._add(connected_check, b"")
def add_disconnected_check(self):
self._add(disconnected_check, b"")
def _add(self, action, message, comment=""):
msg = SingleMsg(action, message, comment)
self.msgs.append(msg)
def _connected_check(self, sock):
try:
mosq_test.do_ping(sock)
except mosq_test.TestError:
raise ValueError("connection failed")
def _send_message(self, sock, msg):
sock.send(msg.message)
def _publish_message(self, msg):
sock = mosq_test.client_connect_only(hostname="localhost", port=1888, timeout=2)
sock.send(mosq_test.gen_connect("helper", keepalive=60))
mosq_test.expect_packet(sock, "connack", mosq_test.gen_connack(rc=0))
m = msg.message
if m['qos'] == 0:
sock.send(mosq_test.gen_publish(topic=m['topic'], payload=m['payload']))
elif m['qos'] == 1:
sock.send(mosq_test.gen_publish(mid=1, qos=1, topic=m['topic'], payload=m['payload']))
mosq_test.expect_packet(sock, "helper puback", mosq_test.gen_puback(mid=1))
elif m['qos'] == 2:
sock.send(mosq_test.gen_publish(mid=1, qos=2, topic=m['topic'], payload=m['payload']))
mosq_test.expect_packet(sock, "helper pubrec", mosq_test.gen_pubrec(mid=1))
sock.send(mosq_test.gen_pubrel(mid=1))
mosq_test.expect_packet(sock, "helper pubcomp", mosq_test.gen_pubcomp(mid=1))
sock.close()
def _recv_message(self, sock, msg):
data = sock.recv(len(msg.message))
if data != msg.message:
raise ValueError("Receive message %s | %s | %s" % (msg.comment, data, msg.message))
def _disconnected_check(self, sock):
try:
data = sock.recv(1)
if len(data) == 1 and self.expect_disconnect:
raise ValueError("Still connected")
except ConnectionResetError:
if self.expect_disconnect:
pass
else:
raise
def _process_message(self, sock, msg):
if msg.action == send:
self._send_message(sock, msg)
elif msg.action == recv:
self._recv_message(sock, msg)
elif msg.action == publish:
self._publish_message(msg)
elif msg.action == disconnected_check:
self._disconnected_check(sock)
elif msg.action == connected_check:
self._connected_check(sock)
def process_next(self, sock):
msg = self.msgs.popleft()
self._process_message(sock, msg)
def process_all(self, sock):
while len(self.msgs):
self.process_next(sock)
if self.expect_disconnect:
self._disconnected_check(sock)
else:
self._connected_check(sock)
def do_test(hostname, port):
rc = 0
sequences = []
for (_, _, filenames) in walk("data"):
sequences.extend(filenames)
break
total = 0
succeeded = 0
test = None
for seq in sorted(sequences):
if seq[-5:] != ".json":
continue
with open("data/"+seq, "r") as f:
test_file = json.load(f)
for g in test_file:
group_name = g["group"]
try:
disabled = g["disable"]
if disabled:
continue
except KeyError:
pass
tests = g["tests"]
for t in tests:
tname = group_name + " " + t["name"]
try:
proto_ver = t["ver"]
except KeyError:
proto_ver = 4
try:
connect = t["connect"]
except KeyError:
connect = True
try:
expect_disconnect = t["expect_disconnect"]
except KeyError:
expect_disconnect = True
this_test = MsgSequence(tname,
proto_ver=proto_ver,
expect_disconnect=expect_disconnect,
default_connect=connect)
for m in t["msgs"]:
try:
c = m["comment"]
except KeyError:
c = ""
if m["type"] == "send":
this_test.add_send(bytes.fromhex(m["payload"].replace(" ", "")))
elif m["type"] == "recv":
this_test.add_recv(bytes.fromhex(m["payload"].replace(" ", "")), c)
elif m["type"] == "publish":
this_test.add_publish(m, c)
total += 1
try:
sock = mosq_test.client_connect_only(hostname=hostname, port=port, timeout=2)
this_test.process_all(sock)
print("\033[32m" + tname + "\033[0m")
succeeded += 1
except ValueError as e:
print("\033[31m" + tname + " failed: " + str(e) + "\033[0m")
rc = 1
except ConnectionResetError as e:
print("\033[31m" + tname + " failed: " + str(e) + "\033[0m")
rc = 1
except socket.timeout as e:
print("\033[31m" + tname + " failed: " + str(e) + "\033[0m")
rc = 1
except mosq_test.TestError as e:
print("\033[31m" + tname + " failed: " + str(e) + "\033[0m")
rc = 1
print("%d tests total\n%d tests succeeded" % (total, succeeded))
return rc
hostname = "localhost"
port = mosq_test.get_port()
broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port, nolog=True)
rc = 0
try:
rc = do_test(hostname=hostname, port=port)
finally:
broker.terminate()
broker.wait()
(stdo, stde) = broker.communicate()
if rc:
#print(stde.decode('utf-8'))
exit(rc)