mosquitto/test/broker/09-pwfile-parse-invalid.py

168 lines
6.6 KiB
Python
Raw Normal View History

2019-03-28 21:32:12 +00:00
#!/usr/bin/env python3
# Test for CVE-2018-xxxxx.
2019-03-03 22:04:22 +00:00
from mosq_test_helper import *
import signal
def write_config(filename, port, per_listener):
with open(filename, 'w') as f:
f.write("per_listener_settings %s\n" % (per_listener))
f.write("port %d\n" % (port))
f.write("password_file %s\n" % (filename.replace('.conf', '.pwfile')))
f.write("allow_anonymous false")
def write_pwfile(filename, bad_line1, bad_line2):
with open(filename, 'w') as f:
if bad_line1 is not None:
f.write('%s\n' % (bad_line1))
# Username test, password test
f.write('test:$6$njERlZMi/7DzNB9E$iiavfuXvUm8iyDZArTy7smTxh07GXXOrOsqxfW6gkOYVXHGk+W+i/8d3xDxrMwEPygEBhoA8A/gjQC0N2M4Lkw==\n')
# Username empty, password 0 length
f.write('empty:$6$o+53eGXtmlfHeYrg$FY7X9DNQ4uU1j0NiPmGOOSU05ZSzhqNmNhXIof/0nLpVb1zDhcRHdaC72E3YryH7dtTiG/r6jH6C8J+30cZBgA==\n')
if bad_line2 is not None:
f.write('%s\n' % (bad_line2))
def do_test(port, connack_rc, username, password):
rc = 1
keepalive = 60
connect_packet = mosq_test.gen_connect("username-password-check", keepalive=keepalive, username=username, password=password)
connack_packet = mosq_test.gen_connack(rc=connack_rc)
try:
sock = mosq_test.do_client_connect(connect_packet, connack_packet, port=port)
rc = 0
sock.close()
2020-08-12 10:06:09 +00:00
except mosq_test.TestError:
pass
finally:
if rc:
raise AssertionError
def username_password_tests(port):
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
try:
do_test(port, connack_rc=0, username='test', password='test')
do_test(port, connack_rc=5, username='test', password='bad')
do_test(port, connack_rc=5, username='test', password='')
do_test(port, connack_rc=5, username='test', password=None)
do_test(port, connack_rc=5, username='empty', password='test')
do_test(port, connack_rc=5, username='empty', password='bad')
2019-02-12 17:05:42 +00:00
do_test(port, connack_rc=5, username='empty', password='')
do_test(port, connack_rc=5, username='empty', password=None)
do_test(port, connack_rc=5, username='bad', password='test')
do_test(port, connack_rc=5, username='bad', password='bad')
do_test(port, connack_rc=5, username='bad', password='')
do_test(port, connack_rc=5, username='bad', password=None)
do_test(port, connack_rc=5, username='', password='test')
do_test(port, connack_rc=5, username='', password='bad')
do_test(port, connack_rc=5, username='', password='')
do_test(port, connack_rc=5, username='', password=None)
do_test(port, connack_rc=5, username=None, password='test')
do_test(port, connack_rc=5, username=None, password='bad')
do_test(port, connack_rc=5, username=None, password='')
do_test(port, connack_rc=5, username=None, password=None)
except ValueError:
pass
finally:
broker.terminate()
broker.wait()
def all_tests(port):
# Valid file, single user
write_pwfile(pw_file, bad_line1=None, bad_line2=None)
username_password_tests(port)
# Invalid file, first line blank
write_pwfile(pw_file, bad_line1='', bad_line2=None)
username_password_tests(port)
# Invalid file, last line blank
write_pwfile(pw_file, bad_line1=None, bad_line2='')
username_password_tests(port)
# Invalid file, first and last line blank
write_pwfile(pw_file, bad_line1='', bad_line2='')
username_password_tests(port)
# Invalid file, first line 'comment'
write_pwfile(pw_file, bad_line1='#comment', bad_line2=None)
username_password_tests(port)
# Invalid file, last line 'comment'
write_pwfile(pw_file, bad_line1=None, bad_line2='#comment')
username_password_tests(port)
# Invalid file, first and last line 'comment'
write_pwfile(pw_file, bad_line1='#comment', bad_line2='#comment')
username_password_tests(port)
# Invalid file, first line blank and last line 'comment'
write_pwfile(pw_file, bad_line1='', bad_line2='#comment')
username_password_tests(port)
# Invalid file, first line incomplete
write_pwfile(pw_file, bad_line1='bad:', bad_line2=None)
username_password_tests(port)
# Invalid file, first line incomplete, but with "password"
write_pwfile(pw_file, bad_line1='bad:bad', bad_line2=None)
username_password_tests(port)
# Invalid file, first line incomplete, partial password hash
write_pwfile(pw_file, bad_line1='bad:$', bad_line2=None)
username_password_tests(port)
# Invalid file, first line incomplete, partial password hash
write_pwfile(pw_file, bad_line1='bad:$6', bad_line2=None)
username_password_tests(port)
# Invalid file, first line incomplete, partial password hash
write_pwfile(pw_file, bad_line1='bad:$6$', bad_line2=None)
username_password_tests(port)
# Valid file, first line incomplete, has valid salt but no password hash
write_pwfile(pw_file, bad_line1='bad:$6$njERlZMi/7DzNB9E', bad_line2=None)
username_password_tests(port)
# Valid file, first line incomplete, has valid salt but no password hash
write_pwfile(pw_file, bad_line1='bad:$6$njERlZMi/7DzNB9E$', bad_line2=None)
username_password_tests(port)
# Valid file, first line has invalid hash designator
write_pwfile(pw_file, bad_line1='bad:$5$njERlZMi/7DzNB9E$iiavfuXvUm8iyDZArTy7smTxh07GXXOrOsqxfW6gkOYVXHGk+W+i/8d3xDxrMwEPygEBhoA8A/gjQC0N2M4Lkw==', bad_line2=None)
username_password_tests(port)
# Invalid file, missing username but valid password hash
write_pwfile(pw_file, bad_line1=':$6$njERlZMi/7DzNB9E$iiavfuXvUm8iyDZArTy7smTxh07GXXOrOsqxfW6gkOYVXHGk+W+i/8d3xDxrMwEPygEBhoA8A/gjQC0N2M4Lkw==', bad_line2=None)
username_password_tests(port)
# Valid file, valid username but password salt not base64
write_pwfile(pw_file, bad_line1='bad:$6$njER{ZMi/7DzNB9E$iiavfuXvUm8iyDZArTy7smTxh07GXXOrOsqxfW6gkOYVXHGk+W+i/8d3xDxrMwEPygEBhoA8A/gjQC0N2M4Lkw==', bad_line2=None)
username_password_tests(port)
# Valid file, valid username but password hash not base64
write_pwfile(pw_file, bad_line1='bad:$6$njERlZMi/7DzNB9E$iiavfuXv{}8iyDZArTy7smTxh07GXXOrOsqxfW6gkOYVXHGk+W+i/8d3xDxrMwEPygEBhoA8A/gjQC0N2M4Lkw==', bad_line2=None)
username_password_tests(port)
port = mosq_test.get_port()
conf_file = os.path.basename(__file__).replace('.py', '.conf')
pw_file = os.path.basename(__file__).replace('.py', '.pwfile')
try:
write_config(conf_file, port, "false")
all_tests(port)
write_config(conf_file, port, "true")
all_tests(port)
finally:
os.remove(conf_file)
os.remove(pw_file)
sys.exit(0)