1
0
mirror of https://github.com/trezor/trezor-firmware.git synced 2024-11-22 15:38:11 +00:00

tests: fix loop.select usage

This commit is contained in:
Jan Pochyla 2018-04-13 14:57:04 +02:00
parent e223173b4e
commit 3b67cb6bea
2 changed files with 14 additions and 14 deletions

View File

@ -6,7 +6,7 @@ from utest import *
from ubinascii import unhexlify from ubinascii import unhexlify
from trezor import io from trezor import io
from trezor.loop import select from trezor.loop import wait
from trezor.utils import chunks from trezor.utils import chunks
from trezor.wire import codec_v1 from trezor.wire import codec_v1
@ -38,7 +38,7 @@ def test_reader():
# open, expected one read # open, expected one read
first_report = report_header + message[:rep_len - len(report_header)] first_report = report_header + message[:rep_len - len(report_header)]
assert_async(reader.aopen(), [(None, select(io.POLL_READ | interface_num)), (first_report, StopIteration()), ]) assert_async(reader.aopen(), [(None, wait(io.POLL_READ | interface_num)), (first_report, StopIteration()), ])
assert_eq(reader.type, message_type) assert_eq(reader.type, message_type)
assert_eq(reader.size, message_len) assert_eq(reader.size, message_len)
@ -65,7 +65,7 @@ def test_reader():
next_report_header = bytearray(unhexlify('3f')) next_report_header = bytearray(unhexlify('3f'))
next_report = next_report_header + message[rep_len - len(report_header):][:rep_len - len(next_report_header)] next_report = next_report_header + message[rep_len - len(report_header):][:rep_len - len(next_report_header)]
onebyte_buffer = bytearray(1) onebyte_buffer = bytearray(1)
assert_async(reader.areadinto(onebyte_buffer), [(None, select(io.POLL_READ | interface_num)), (next_report, StopIteration()), ]) assert_async(reader.areadinto(onebyte_buffer), [(None, wait(io.POLL_READ | interface_num)), (next_report, StopIteration()), ])
assert_eq(onebyte_buffer, message[len(short_buffer):][len(aligned_buffer):][:len(onebyte_buffer)]) assert_eq(onebyte_buffer, message[len(short_buffer):][len(aligned_buffer):][:len(onebyte_buffer)])
assert_eq(reader.size, message_len - len(short_buffer) - len(aligned_buffer) - len(onebyte_buffer)) assert_eq(reader.size, message_len - len(short_buffer) - len(aligned_buffer) - len(onebyte_buffer))
@ -84,7 +84,7 @@ def test_reader():
expected_syscalls = [] expected_syscalls = []
for i, _ in enumerate(next_reports): for i, _ in enumerate(next_reports):
prev_report = next_reports[i - 1] if i > 0 else None prev_report = next_reports[i - 1] if i > 0 else None
expected_syscalls.append((prev_report, select(io.POLL_READ | interface_num))) expected_syscalls.append((prev_report, wait(io.POLL_READ | interface_num)))
expected_syscalls.append((next_reports[-1], StopIteration())) expected_syscalls.append((next_reports[-1], StopIteration()))
assert_async(reader.areadinto(long_buffer), expected_syscalls) assert_async(reader.areadinto(long_buffer), expected_syscalls)
assert_eq(long_buffer, message[-start_size:]) assert_eq(long_buffer, message[-start_size:])
@ -127,7 +127,7 @@ def test_writer():
# aligned write, expected one report # aligned write, expected one report
start_size = writer.size start_size = writer.size
aligned_payload = bytearray(range(rep_len - len(report_header) - len(short_payload))) aligned_payload = bytearray(range(rep_len - len(report_header) - len(short_payload)))
assert_async(writer.awrite(aligned_payload), [(None, select(io.POLL_WRITE | interface_num)), (None, StopIteration()), ]) assert_async(writer.awrite(aligned_payload), [(None, wait(io.POLL_WRITE | interface_num)), (None, StopIteration()), ])
assert_eq(interface.data, [report_header + assert_eq(interface.data, [report_header +
short_payload + short_payload +
aligned_payload + aligned_payload +
@ -153,7 +153,7 @@ def test_writer():
expected_reports[-1] += bytearray(bytes(1) * (rep_len - len(expected_reports[-1]))) expected_reports[-1] += bytearray(bytes(1) * (rep_len - len(expected_reports[-1])))
# test write # test write
expected_write_reports = expected_reports[:-1] expected_write_reports = expected_reports[:-1]
assert_async(writer.awrite(long_payload), len(expected_write_reports) * [(None, select(io.POLL_WRITE | interface_num))] + [(None, StopIteration())]) assert_async(writer.awrite(long_payload), len(expected_write_reports) * [(None, wait(io.POLL_WRITE | interface_num))] + [(None, StopIteration())])
assert_eq(interface.data, expected_write_reports) assert_eq(interface.data, expected_write_reports)
assert_eq(writer.size, start_size - len(long_payload)) assert_eq(writer.size, start_size - len(long_payload))
interface.data.clear() interface.data.clear()
@ -162,7 +162,7 @@ def test_writer():
assert_eq(interface.data, []) assert_eq(interface.data, [])
# test close # test close
expected_close_reports = expected_reports[-1:] expected_close_reports = expected_reports[-1:]
assert_async(writer.aclose(), len(expected_close_reports) * [(None, select(io.POLL_WRITE | interface_num))] + [(None, StopIteration())]) assert_async(writer.aclose(), len(expected_close_reports) * [(None, wait(io.POLL_WRITE | interface_num))] + [(None, StopIteration())])
assert_eq(interface.data, expected_close_reports) assert_eq(interface.data, expected_close_reports)
assert_eq(writer.size, 0) assert_eq(writer.size, 0)

View File

@ -6,7 +6,7 @@ from utest import *
from ustruct import pack from ustruct import pack
from trezor import io from trezor import io
from trezor.loop import select from trezor.loop import wait
from trezor.utils import chunks from trezor.utils import chunks
from trezor.wire import codec_v2 from trezor.wire import codec_v2
@ -41,7 +41,7 @@ def test_reader():
# open, expected one read # open, expected one read
first_report = report_header + message[:rep_len - len(report_header)] first_report = report_header + message[:rep_len - len(report_header)]
assert_async(reader.aopen(), [(None, select(io.POLL_READ | interface_num)), (first_report, StopIteration()), ]) assert_async(reader.aopen(), [(None, wait(io.POLL_READ | interface_num)), (first_report, StopIteration()), ])
assert_eq(reader.type, message_type) assert_eq(reader.type, message_type)
assert_eq(reader.size, message_len) assert_eq(reader.size, message_len)
@ -68,7 +68,7 @@ def test_reader():
next_report_header = bytearray(unhexlify('021234567800000000')) next_report_header = bytearray(unhexlify('021234567800000000'))
next_report = next_report_header + message[rep_len - len(report_header):][:rep_len - len(next_report_header)] next_report = next_report_header + message[rep_len - len(report_header):][:rep_len - len(next_report_header)]
onebyte_buffer = bytearray(1) onebyte_buffer = bytearray(1)
assert_async(reader.areadinto(onebyte_buffer), [(None, select(io.POLL_READ | interface_num)), (next_report, StopIteration()), ]) assert_async(reader.areadinto(onebyte_buffer), [(None, wait(io.POLL_READ | interface_num)), (next_report, StopIteration()), ])
assert_eq(onebyte_buffer, message[len(short_buffer):][len(aligned_buffer):][:len(onebyte_buffer)]) assert_eq(onebyte_buffer, message[len(short_buffer):][len(aligned_buffer):][:len(onebyte_buffer)])
assert_eq(reader.size, message_len - len(short_buffer) - len(aligned_buffer) - len(onebyte_buffer)) assert_eq(reader.size, message_len - len(short_buffer) - len(aligned_buffer) - len(onebyte_buffer))
@ -87,7 +87,7 @@ def test_reader():
expected_syscalls = [] expected_syscalls = []
for i, _ in enumerate(next_reports): for i, _ in enumerate(next_reports):
prev_report = next_reports[i - 1] if i > 0 else None prev_report = next_reports[i - 1] if i > 0 else None
expected_syscalls.append((prev_report, select(io.POLL_READ | interface_num))) expected_syscalls.append((prev_report, wait(io.POLL_READ | interface_num)))
expected_syscalls.append((next_reports[-1], StopIteration())) expected_syscalls.append((next_reports[-1], StopIteration()))
assert_async(reader.areadinto(long_buffer), expected_syscalls) assert_async(reader.areadinto(long_buffer), expected_syscalls)
assert_eq(long_buffer, message[-start_size:]) assert_eq(long_buffer, message[-start_size:])
@ -131,7 +131,7 @@ def test_writer():
# aligned write, expected one report # aligned write, expected one report
start_size = writer.size start_size = writer.size
aligned_payload = bytearray(range(rep_len - len(report_header) - len(short_payload))) aligned_payload = bytearray(range(rep_len - len(report_header) - len(short_payload)))
assert_async(writer.awrite(aligned_payload), [(None, select(io.POLL_WRITE | interface_num)), (None, StopIteration()), ]) assert_async(writer.awrite(aligned_payload), [(None, wait(io.POLL_WRITE | interface_num)), (None, StopIteration()), ])
assert_eq(interface.data, [report_header + assert_eq(interface.data, [report_header +
short_payload + short_payload +
aligned_payload + aligned_payload +
@ -159,7 +159,7 @@ def test_writer():
expected_reports[-1] += bytearray(bytes(1) * (rep_len - len(expected_reports[-1]))) expected_reports[-1] += bytearray(bytes(1) * (rep_len - len(expected_reports[-1])))
# test write # test write
expected_write_reports = expected_reports[:-1] expected_write_reports = expected_reports[:-1]
assert_async(writer.awrite(long_payload), len(expected_write_reports) * [(None, select(io.POLL_WRITE | interface_num))] + [(None, StopIteration())]) assert_async(writer.awrite(long_payload), len(expected_write_reports) * [(None, wait(io.POLL_WRITE | interface_num))] + [(None, StopIteration())])
assert_eq(interface.data, expected_write_reports) assert_eq(interface.data, expected_write_reports)
assert_eq(writer.size, start_size - len(long_payload)) assert_eq(writer.size, start_size - len(long_payload))
interface.data.clear() interface.data.clear()
@ -168,7 +168,7 @@ def test_writer():
assert_eq(interface.data, []) assert_eq(interface.data, [])
# test close # test close
expected_close_reports = expected_reports[-1:] expected_close_reports = expected_reports[-1:]
assert_async(writer.aclose(), len(expected_close_reports) * [(None, select(io.POLL_WRITE | interface_num))] + [(None, StopIteration())]) assert_async(writer.aclose(), len(expected_close_reports) * [(None, wait(io.POLL_WRITE | interface_num))] + [(None, StopIteration())])
assert_eq(interface.data, expected_close_reports) assert_eq(interface.data, expected_close_reports)
assert_eq(writer.size, 0) assert_eq(writer.size, 0)
interface.data.clear() interface.data.clear()