chore(core): delete unused functions

pull/2633/head
grdddj 2 years ago committed by matejcik
parent 52558c7b96
commit 756a97a95d

@ -75,10 +75,6 @@ def format_coin_amount(amount: int, network_id: int) -> str:
return f"{format_amount(amount, 6)} {currency}"
def is_printable_ascii_bytestring(bytestr: bytes) -> bool:
return all((32 < b < 127) for b in bytestr)
async def show_native_script(
ctx: wire.Context,
script: messages.CardanoNativeScript,

@ -45,12 +45,6 @@ class PlutusSigner(Signer):
tx_hash,
)
def _should_show_tx_hash(self) -> bool:
# super() omitted intentionally
# Plutus txs tend to contain a lot of opaque data, some users might
# want to verify only the tx hash.
return True
async def _show_input(self, input: messages.CardanoTxInput) -> None:
# super() omitted intentionally
# The inputs are not interchangeable (because of datums), so we must show them.

@ -253,10 +253,6 @@ class Signer:
# Final signing confirmation is handled separately in each signing mode.
raise NotImplementedError
def _should_show_tx_hash(self) -> bool:
# By default we display tx hash only if showing details
return self.should_show_details
# inputs
async def _process_inputs(

@ -271,38 +271,6 @@ def encode_streamed(value: Value) -> Iterator[bytes]:
return _cbor_encode(value)
def encode_chunked(value: Value, max_chunk_size: int) -> Iterator[bytes]:
"""
Returns the encoded value as an iterable of chunks of a given size,
removing the need to reserve a continuous chunk of memory for the
full serialized representation of the value.
"""
if max_chunk_size <= 0:
raise ValueError
chunks = encode_streamed(value)
chunk_buffer = utils.empty_bytearray(max_chunk_size)
try:
current_chunk_view = utils.BufferReader(next(chunks))
while True:
num_bytes_to_write = min(
current_chunk_view.remaining_count(),
max_chunk_size - len(chunk_buffer),
)
chunk_buffer.extend(current_chunk_view.read(num_bytes_to_write))
if len(chunk_buffer) >= max_chunk_size:
yield chunk_buffer
chunk_buffer[:] = bytes()
if current_chunk_view.remaining_count() == 0:
current_chunk_view = utils.BufferReader(next(chunks))
except StopIteration:
if len(chunk_buffer) > 0:
yield chunk_buffer
def decode(cbor: bytes, offset: int = 0) -> Value:
r = utils.BufferReader(cbor)
r.seek(offset)

@ -326,12 +326,6 @@ class AlwaysMatchingSchema:
return True
class NeverMatchingSchema:
@staticmethod
def match(path: Bip32Path) -> bool:
return False
# BIP-44 for basic (legacy) Bitcoin accounts, and widely used for other currencies:
# https://github.com/bitcoin/bips/blob/master/bip-0044.mediawiki
PATTERN_BIP44 = "m/44'/coin_type'/account'/change/address_index"

@ -19,13 +19,6 @@ def write_uint16_le(w: Writer, n: int) -> int:
return 2
def write_uint16_be(w: Writer, n: int) -> int:
ensure(0 <= n <= 0xFFFF)
w.append((n >> 8) & 0xFF)
w.append(n & 0xFF)
return 2
def write_uint32_le(w: Writer, n: int) -> int:
ensure(0 <= n <= 0xFFFF_FFFF)
w.append(n & 0xFF)

@ -89,18 +89,6 @@ async def show_dry_run_result(
await show_warning(ctx, "warning_dry_recovery", text, button="Continue")
async def show_dry_run_different_type(ctx: wire.GenericContext) -> None:
await show_warning(
ctx,
"warning_dry_recovery",
header="Dry run failure",
content="Seed in the device was\ncreated using another\nbackup mechanism.",
icon=ui.ICON_CANCEL,
icon_color=ui.ORANGE_ICON,
br_code=ButtonRequestType.ProtectCall,
)
async def show_invalid_mnemonic(ctx: wire.GenericContext, word_count: int) -> None:
if backup_types.is_slip39_word_count(word_count):
await show_warning(

@ -94,13 +94,6 @@ def base58_encode_check(payload: bytes, prefix: str | None = None) -> str:
return base58.encode_check(result)
def base58_decode_check(enc: str, prefix: str | None = None) -> bytes:
decoded = base58.decode_check(enc)
if prefix is not None:
decoded = decoded[len(TEZOS_PREFIX_BYTES[prefix]) :]
return decoded
def write_bool(w: Writer, boolean: bool) -> None:
if boolean:
write_uint8(w, 255)

@ -51,23 +51,6 @@ class ZcashHasher:
write_uint32(tx_hash_person, tx.branch_id)
self.tx_hash_person = bytes(tx_hash_person)
# The `txid_digest` method is currently a dead code,
# but we keep it for future use cases.
def txid_digest(self) -> bytes:
"""
Returns the transaction identifier.
see: https://zips.z.cash/zip-0244#id4
"""
h = HashWriter(blake2b(outlen=32, personal=self.tx_hash_person))
write_hash(h, self.header.digest()) # T.1
write_hash(h, self.transparent.digest()) # T.2
write_hash(h, self.sapling.digest()) # T.3
write_hash(h, self.orchard.digest()) # T.4
return h.get_digest()
def signature_digest(
self, txi: TxInput | None, script_pubkey: bytes | None
) -> bytes:

@ -411,37 +411,6 @@ def _rs1024_verify_checksum(data: Indices) -> bool:
return _rs1024_polymod(tuple(_CUSTOMIZATION_STRING) + data) == 1
def _rs1024_error_index(data: Indices) -> int | None:
"""
Returns the index where an error possibly occurred.
Currently unused.
"""
GEN = (
0x91F_9F87,
0x122F_1F07,
0x244E_1E07,
0x81C_1C07,
0x1028_1C0E,
0x2040_1C1C,
0x10_3838,
0x20_7070,
0x40_E0E0,
0x81_C1C0,
)
chk = _rs1024_polymod(tuple(_CUSTOMIZATION_STRING) + data) ^ 1
if chk == 0:
return None
for i in reversed(range(len(data))):
b = chk & 0x3FF
chk >>= 10
if chk == 0:
return i
for j in range(10):
chk ^= GEN[j] if ((b >> j) & 1) else 0
return None
# === Internal functions ===

@ -51,10 +51,6 @@ def error(name: str, msg: str, *args: Any) -> None:
_log(name, _ERROR, msg, *args)
def critical(name: str, msg: str, *args: Any) -> None:
_log(name, _CRITICAL, msg, *args)
def exception(name: str, exc: BaseException) -> None:
# we are using `__class__.__name__` to avoid importing ui module
# we also need to instruct typechecker to ignore the missing argument

@ -21,10 +21,12 @@ def format_amount(amount: int, decimals: int) -> str:
return s
def format_ordinal(number: int) -> str:
return str(number) + {1: "st", 2: "nd", 3: "rd"}.get(
4 if 10 <= number % 100 < 20 else number % 10, "th"
)
if False:
def format_ordinal(number: int) -> str:
return str(number) + {1: "st", 2: "nd", 3: "rd"}.get(
4 if 10 <= number % 100 < 20 else number % 10, "th"
)
def format_plural(string: str, count: int, plural: str) -> str:

@ -105,19 +105,6 @@ def alert(count: int = 3) -> None:
loop.schedule(_alert(count))
async def click() -> Pos:
touch = loop.wait(io.TOUCH)
while True:
ev, *pos = await touch
if ev == io.TOUCH_START:
break
while True:
ev, *pos = await touch
if ev == io.TOUCH_END:
break
return pos # type: ignore [Expression of type "list[Unknown]" cannot be assigned to return type "Pos"]
def backlight_fade(val: int, delay: int = 14000, step: int = 15) -> None:
if __debug__:
if utils.DISABLE_ANIMATION:

@ -192,36 +192,6 @@ if TYPE_CHECKING:
BufferType = bytearray | memoryview
class BufferWriter:
"""Seekable and writeable view into a buffer."""
def __init__(self, buffer: BufferType) -> None:
self.buffer = buffer
self.offset = 0
def seek(self, offset: int) -> None:
"""Set current offset to `offset`.
If negative, set to zero. If longer than the buffer, set to end of buffer.
"""
offset = min(offset, len(self.buffer))
offset = max(offset, 0)
self.offset = offset
def write(self, src: bytes) -> int:
"""Write exactly `len(src)` bytes into buffer, or raise EOFError.
Returns number of bytes written.
"""
buffer = self.buffer
offset = self.offset
if len(src) > len(buffer) - offset:
raise EOFError
nwrite = memcpy(buffer, offset, src, 0)
self.offset += nwrite
return nwrite
class BufferReader:
"""Seekable and readable view into a buffer."""

@ -97,20 +97,22 @@ def set_default(constructor: Callable[[], loop.Task]) -> None:
default_constructor = constructor
def kill_default() -> None:
"""Forcefully shut down default task.
if False:
The purpose of the call is to prevent the default task from interfering with
a synchronous layout-less workflow (e.g., the progress bar in `mnemonic.get_seed`).
def kill_default() -> None:
"""Forcefully shut down default task.
This function should only be called from a workflow registered with `on_start`.
Otherwise the default will be restarted immediately.
"""
if default_task:
if __debug__:
log.debug(__name__, "close default")
# We let the `_finalize_default` reset the global.
default_task.close()
The purpose of the call is to prevent the default task from interfering with
a synchronous layout-less workflow (e.g., the progress bar in `mnemonic.get_seed`).
This function should only be called from a workflow registered with `on_start`.
Otherwise the default will be restarted immediately.
"""
if default_task:
if __debug__:
log.debug(__name__, "close default")
# We let the `_finalize_default` reset the global.
default_task.close()
def close_others() -> None:

@ -11,10 +11,44 @@ from apps.common.cbor import (
create_embedded_cbor_bytes_header,
decode,
encode,
encode_chunked,
encode_streamed,
utils
)
# NOTE: moved into tests not to occupy flash space
# in firmware binary, when it is not used in production
def encode_chunked(value: "Value", max_chunk_size: int) -> "Iterator[bytes]":
"""
Returns the encoded value as an iterable of chunks of a given size,
removing the need to reserve a continuous chunk of memory for the
full serialized representation of the value.
"""
if max_chunk_size <= 0:
raise ValueError
chunks = encode_streamed(value)
chunk_buffer = utils.empty_bytearray(max_chunk_size)
try:
current_chunk_view = utils.BufferReader(next(chunks))
while True:
num_bytes_to_write = min(
current_chunk_view.remaining_count(),
max_chunk_size - len(chunk_buffer),
)
chunk_buffer.extend(current_chunk_view.read(num_bytes_to_write))
if len(chunk_buffer) >= max_chunk_size:
yield chunk_buffer
chunk_buffer[:] = bytes()
if current_chunk_view.remaining_count() == 0:
current_chunk_view = utils.BufferReader(next(chunks))
except StopIteration:
if len(chunk_buffer) > 0:
yield chunk_buffer
class TestCardanoCbor(unittest.TestCase):
def test_create_array_header(self):

@ -2,6 +2,13 @@ from common import *
from trezor.utils import ensure
from apps.common.paths import *
# NOTE: moved into tests not to occupy flash space
# in firmware binary, when it is not used in production
class NeverMatchingSchema:
@staticmethod
def match(path: "Bip32Path") -> bool:
return False
class TestPaths(unittest.TestCase):
def test_is_hardened(self):

@ -3,7 +3,7 @@ from common import *
if not utils.BITCOIN_ONLY:
from trezor.enums import TezosContractType
from trezor.messages import TezosContractID
from apps.tezos.helpers import base58_decode_check, base58_encode_check, write_bool
from apps.tezos.helpers import base58_encode_check, write_bool
from apps.tezos.sign_tx import (
_encode_contract_id,
_encode_data_with_bool_prefix,
@ -80,21 +80,6 @@ class TestTezosEncoding(unittest.TestCase):
)
self.assertEqual(base58_encode_check(pkh), "2U14dJ6ED97bBHDZTQWA6umVL8SAVefXj")
def test_tezos_base58_decode_check(self):
pkh = unhexlify("101368afffeb1dc3c089facbbe23f5c30b787ce9")
address = "tz1M72kkAJrntPtayM4yU4CCwQPLSdpEgRrn"
self.assertEqual(base58_decode_check(address, prefix="tz1"), pkh)
address = "tz29nEixktH9p9XTFX7p8hATUyeLxXEz96KR"
self.assertEqual(base58_decode_check(address, prefix="tz2"), pkh)
address = "tz3Mo3gHekQhCmykfnC58ecqJLXrjMKzkF2Q"
self.assertEqual(base58_decode_check(address, prefix="tz3"), pkh)
address = "2U14dJ6ED97bBHDZTQWA6umVL8SAVefXj"
self.assertEqual(base58_decode_check(address), pkh)
def test_tezos_encode_natural(self):
inputs = [200000000000, 2000000, 159066, 200, 60000, 157000000, 0]
outputs = ["0080c0ee8ed20b", "008092f401", "009ab513", "008803", "00a0a907", "008085dd9501", "0000"]

@ -1,14 +1,31 @@
from common import *
from trezor.messages import TxInput, SignTx, PrevOutput
from trezor.enums import InputScriptType
from apps.zcash.hasher import ZcashHasher
from apps.bitcoin.common import SigHashType
from apps.zcash.hasher import ZcashHasher, HashWriter, blake2b, write_hash
from apps.common.coininfo import by_name
ZCASH_COININFO = by_name("Zcash")
# NOTE: moved into tests not to occupy flash space
# in firmware binary, when it is not used in production
def txid_digest(hasher: "ZcashHasher") -> bytes:
"""
Returns the transaction identifier.
see: https://zips.z.cash/zip-0244#id4
"""
h = HashWriter(blake2b(outlen=32, personal=hasher.tx_hash_person))
write_hash(h, hasher.header.digest()) # T.1
write_hash(h, hasher.transparent.digest()) # T.2
write_hash(h, hasher.sapling.digest()) # T.3
write_hash(h, hasher.orchard.digest()) # T.4
return h.get_digest()
@unittest.skipUnless(not utils.BITCOIN_ONLY, "altcoin")
class TestZcashSigHasher(unittest.TestCase):
def test_zcash_hasher(self):
@ -86,7 +103,7 @@ class TestZcashSigHasher(unittest.TestCase):
hasher.add_output(txo, txo.script_pubkey)
# test ZcashSigHasher.txid_digest
computed_txid = hasher.txid_digest()
computed_txid = txid_digest(hasher)
self.assertEqual(computed_txid, expected_txid)
# test ZcashSigHasher.signature_digest

@ -2,6 +2,39 @@ from common import *
from trezor.crypto import slip39, random
from slip39_vectors import vectors
# NOTE: moved into tests not to occupy flash space
# in firmware binary, when it is not used in production
def _rs1024_error_index(data: tuple[int, ...]) -> int | None:
"""
Returns the index where an error possibly occurred.
"""
GEN = (
0x91F_9F87,
0x122F_1F07,
0x244E_1E07,
0x81C_1C07,
0x1028_1C0E,
0x2040_1C1C,
0x10_3838,
0x20_7070,
0x40_E0E0,
0x81_C1C0,
)
chk = slip39._rs1024_polymod(tuple(slip39._CUSTOMIZATION_STRING) + data) ^ 1
if chk == 0:
return None
for i in reversed(range(len(data))):
b = chk & 0x3FF
chk >>= 10
if chk == 0:
return i
for j in range(10):
chk ^= GEN[j] if ((b >> j) & 1) else 0
return None
def combinations(iterable, r):
# Taken from https://docs.python.org/3.7/library/itertools.html#itertools.combinations
pool = tuple(iterable)
@ -160,11 +193,11 @@ class TestCryptoSlip39(unittest.TestCase):
]
for mnemonic in mnemonics:
data = tuple(slip39._mnemonic_to_indices(mnemonic))
self.assertEqual(slip39._rs1024_error_index(data), None)
self.assertEqual(_rs1024_error_index(data), None)
for i in range(len(data)):
for _ in range(50):
error_data = error_data = data[:i] + (data[i] ^ (random.uniform(1023) + 1), ) + data[i + 1:]
self.assertEqual(slip39._rs1024_error_index(error_data), i)
self.assertEqual(_rs1024_error_index(error_data), i)
if __name__ == '__main__':

Loading…
Cancel
Save