mirror of
https://github.com/trezor/trezor-firmware.git
synced 2024-12-19 12:58:13 +00:00
core: use utils.BufferReader instead of apps.common.BytearrayReader
This commit is contained in:
parent
1ff4a0d239
commit
5e7fd3aea6
@ -2,7 +2,7 @@ from trezor import utils, wire
|
|||||||
from trezor.crypto import bip32, hashlib, hmac
|
from trezor.crypto import bip32, hashlib, hmac
|
||||||
|
|
||||||
from apps.common import seed
|
from apps.common import seed
|
||||||
from apps.common.readers import BytearrayReader, read_bitcoin_varint
|
from apps.common.readers import read_bitcoin_varint
|
||||||
from apps.common.writers import (
|
from apps.common.writers import (
|
||||||
empty_bytearray,
|
empty_bytearray,
|
||||||
write_bitcoin_varint,
|
write_bitcoin_varint,
|
||||||
@ -71,7 +71,7 @@ def verify_nonownership(
|
|||||||
coin: CoinInfo,
|
coin: CoinInfo,
|
||||||
) -> bool:
|
) -> bool:
|
||||||
try:
|
try:
|
||||||
r = BytearrayReader(proof)
|
r = utils.BufferReader(proof)
|
||||||
if r.read(4) != _VERSION_MAGIC:
|
if r.read(4) != _VERSION_MAGIC:
|
||||||
raise wire.DataError("Unknown format of proof of ownership")
|
raise wire.DataError("Unknown format of proof of ownership")
|
||||||
|
|
||||||
@ -100,7 +100,7 @@ def verify_nonownership(
|
|||||||
# the digest and the value to use is not defined in BIP-322.
|
# the digest and the value to use is not defined in BIP-322.
|
||||||
verifier = SignatureVerifier(script_pubkey, script_sig, witness, coin)
|
verifier = SignatureVerifier(script_pubkey, script_sig, witness, coin)
|
||||||
verifier.verify(sighash.digest())
|
verifier.verify(sighash.digest())
|
||||||
except (ValueError, IndexError):
|
except (ValueError, EOFError):
|
||||||
raise wire.DataError("Invalid proof of ownership")
|
raise wire.DataError("Invalid proof of ownership")
|
||||||
|
|
||||||
return not_owned
|
return not_owned
|
||||||
|
@ -1,12 +1,14 @@
|
|||||||
from apps.common.readers import BytearrayReader, read_bitcoin_varint
|
from trezor.utils import BufferReader
|
||||||
|
|
||||||
|
from apps.common.readers import read_bitcoin_varint
|
||||||
|
|
||||||
|
|
||||||
def read_bytes_prefixed(r: BytearrayReader) -> bytes:
|
def read_bytes_prefixed(r: BufferReader) -> bytes:
|
||||||
n = read_bitcoin_varint(r)
|
n = read_bitcoin_varint(r)
|
||||||
return r.read(n)
|
return r.read(n)
|
||||||
|
|
||||||
|
|
||||||
def read_op_push(r: BytearrayReader) -> int:
|
def read_op_push(r: BufferReader) -> int:
|
||||||
prefix = r.get()
|
prefix = r.get()
|
||||||
if prefix < 0x4C:
|
if prefix < 0x4C:
|
||||||
n = prefix
|
n = prefix
|
||||||
|
@ -7,7 +7,7 @@ from trezor.messages.TxInputType import TxInputType
|
|||||||
|
|
||||||
from apps.common import address_type
|
from apps.common import address_type
|
||||||
from apps.common.coininfo import CoinInfo
|
from apps.common.coininfo import CoinInfo
|
||||||
from apps.common.readers import BytearrayReader, read_bitcoin_varint
|
from apps.common.readers import read_bitcoin_varint
|
||||||
from apps.common.writers import empty_bytearray, write_bitcoin_varint
|
from apps.common.writers import empty_bytearray, write_bitcoin_varint
|
||||||
|
|
||||||
from . import common
|
from . import common
|
||||||
@ -145,9 +145,9 @@ def input_script_p2pkh_or_p2sh(
|
|||||||
return w
|
return w
|
||||||
|
|
||||||
|
|
||||||
def parse_input_script_p2pkh(script_sig: bytes,) -> Tuple[bytes, bytes, int]:
|
def parse_input_script_p2pkh(script_sig: bytes) -> Tuple[bytes, bytes, int]:
|
||||||
try:
|
try:
|
||||||
r = BytearrayReader(script_sig)
|
r = utils.BufferReader(script_sig)
|
||||||
n = read_op_push(r)
|
n = read_op_push(r)
|
||||||
signature = r.read(n - 1)
|
signature = r.read(n - 1)
|
||||||
hash_type = r.get()
|
hash_type = r.get()
|
||||||
@ -156,7 +156,7 @@ def parse_input_script_p2pkh(script_sig: bytes,) -> Tuple[bytes, bytes, int]:
|
|||||||
pubkey = r.read()
|
pubkey = r.read()
|
||||||
if len(pubkey) != n:
|
if len(pubkey) != n:
|
||||||
raise ValueError
|
raise ValueError
|
||||||
except (ValueError, IndexError):
|
except (ValueError, EOFError):
|
||||||
wire.DataError("Invalid scriptSig.")
|
wire.DataError("Invalid scriptSig.")
|
||||||
|
|
||||||
return pubkey, signature, hash_type
|
return pubkey, signature, hash_type
|
||||||
@ -274,7 +274,7 @@ def witness_p2wpkh(signature: bytes, pubkey: bytes, hash_type: int) -> bytearray
|
|||||||
|
|
||||||
def parse_witness_p2wpkh(witness: bytes) -> Tuple[bytes, bytes, int]:
|
def parse_witness_p2wpkh(witness: bytes) -> Tuple[bytes, bytes, int]:
|
||||||
try:
|
try:
|
||||||
r = BytearrayReader(witness)
|
r = utils.BufferReader(witness)
|
||||||
|
|
||||||
if r.get() != 2:
|
if r.get() != 2:
|
||||||
# num of stack items, in P2WPKH it's always 2
|
# num of stack items, in P2WPKH it's always 2
|
||||||
@ -287,7 +287,7 @@ def parse_witness_p2wpkh(witness: bytes) -> Tuple[bytes, bytes, int]:
|
|||||||
pubkey = read_bytes_prefixed(r)
|
pubkey = read_bytes_prefixed(r)
|
||||||
if r.remaining_count():
|
if r.remaining_count():
|
||||||
raise ValueError
|
raise ValueError
|
||||||
except (ValueError, IndexError):
|
except (ValueError, EOFError):
|
||||||
raise wire.DataError("Invalid witness.")
|
raise wire.DataError("Invalid witness.")
|
||||||
|
|
||||||
return pubkey, signature, hash_type
|
return pubkey, signature, hash_type
|
||||||
@ -344,7 +344,7 @@ def witness_multisig(
|
|||||||
|
|
||||||
def parse_witness_multisig(witness: bytes) -> Tuple[bytes, List[Tuple[bytes, int]]]:
|
def parse_witness_multisig(witness: bytes) -> Tuple[bytes, List[Tuple[bytes, int]]]:
|
||||||
try:
|
try:
|
||||||
r = BytearrayReader(witness)
|
r = utils.BufferReader(witness)
|
||||||
|
|
||||||
# Get number of witness stack items.
|
# Get number of witness stack items.
|
||||||
item_count = read_bitcoin_varint(r)
|
item_count = read_bitcoin_varint(r)
|
||||||
@ -363,7 +363,7 @@ def parse_witness_multisig(witness: bytes) -> Tuple[bytes, List[Tuple[bytes, int
|
|||||||
script = read_bytes_prefixed(r)
|
script = read_bytes_prefixed(r)
|
||||||
if r.remaining_count():
|
if r.remaining_count():
|
||||||
raise ValueError
|
raise ValueError
|
||||||
except (ValueError, IndexError):
|
except (ValueError, EOFError):
|
||||||
raise wire.DataError("Invalid witness.")
|
raise wire.DataError("Invalid witness.")
|
||||||
|
|
||||||
return script, signatures
|
return script, signatures
|
||||||
@ -422,7 +422,7 @@ def parse_input_script_multisig(
|
|||||||
script_sig: bytes,
|
script_sig: bytes,
|
||||||
) -> Tuple[bytes, List[Tuple[bytes, int]]]:
|
) -> Tuple[bytes, List[Tuple[bytes, int]]]:
|
||||||
try:
|
try:
|
||||||
r = BytearrayReader(script_sig)
|
r = utils.BufferReader(script_sig)
|
||||||
|
|
||||||
# Skip over OP_FALSE, which is due to the old OP_CHECKMULTISIG bug.
|
# Skip over OP_FALSE, which is due to the old OP_CHECKMULTISIG bug.
|
||||||
if r.get() != 0:
|
if r.get() != 0:
|
||||||
@ -439,7 +439,7 @@ def parse_input_script_multisig(
|
|||||||
script = r.read()
|
script = r.read()
|
||||||
if len(script) != n:
|
if len(script) != n:
|
||||||
raise ValueError
|
raise ValueError
|
||||||
except (ValueError, IndexError):
|
except (ValueError, EOFError):
|
||||||
raise wire.DataError("Invalid scriptSig.")
|
raise wire.DataError("Invalid scriptSig.")
|
||||||
|
|
||||||
return script, signatures
|
return script, signatures
|
||||||
@ -472,7 +472,7 @@ def output_script_multisig_length(pubkeys: List[bytes], m: int) -> int:
|
|||||||
|
|
||||||
def parse_output_script_multisig(script: bytes) -> Tuple[List[bytes], int]:
|
def parse_output_script_multisig(script: bytes) -> Tuple[List[bytes], int]:
|
||||||
try:
|
try:
|
||||||
r = BytearrayReader(script)
|
r = utils.BufferReader(script)
|
||||||
|
|
||||||
threshold = r.get() - 0x50
|
threshold = r.get() - 0x50
|
||||||
pubkey_count = script[-2] - 0x50
|
pubkey_count = script[-2] - 0x50
|
||||||
@ -497,7 +497,7 @@ def parse_output_script_multisig(script: bytes) -> Tuple[List[bytes], int]:
|
|||||||
if r.remaining_count():
|
if r.remaining_count():
|
||||||
raise ValueError
|
raise ValueError
|
||||||
|
|
||||||
except (ValueError, IndexError):
|
except (ValueError, IndexError, EOFError):
|
||||||
raise wire.DataError("Invalid multisig script")
|
raise wire.DataError("Invalid multisig script")
|
||||||
|
|
||||||
return public_keys, threshold
|
return public_keys, threshold
|
||||||
@ -548,7 +548,7 @@ def write_bip322_signature_proof(
|
|||||||
w.extend(witness)
|
w.extend(witness)
|
||||||
|
|
||||||
|
|
||||||
def read_bip322_signature_proof(r: BytearrayReader) -> Tuple[bytes, bytes]:
|
def read_bip322_signature_proof(r: utils.BufferReader) -> Tuple[bytes, bytes]:
|
||||||
script_sig = read_bytes_prefixed(r)
|
script_sig = read_bytes_prefixed(r)
|
||||||
witness = r.read()
|
witness = r.read()
|
||||||
return script_sig, witness
|
return script_sig, witness
|
||||||
|
@ -1,36 +1,7 @@
|
|||||||
if False:
|
from trezor.utils import BufferReader
|
||||||
from typing import Optional
|
|
||||||
|
|
||||||
|
|
||||||
class BytearrayReader:
|
def read_bitcoin_varint(r: BufferReader) -> int:
|
||||||
def __init__(self, data: bytes):
|
|
||||||
self.data = data
|
|
||||||
self.offset = 0
|
|
||||||
|
|
||||||
def get(self) -> int:
|
|
||||||
ret = self.data[self.offset]
|
|
||||||
self.offset += 1
|
|
||||||
return ret
|
|
||||||
|
|
||||||
def peek(self) -> int:
|
|
||||||
return self.data[self.offset]
|
|
||||||
|
|
||||||
def read(self, i: Optional[int] = None) -> bytes:
|
|
||||||
if i is None:
|
|
||||||
ret = self.data[self.offset :]
|
|
||||||
self.offset = len(self.data)
|
|
||||||
elif 0 <= i <= len(self.data) - self.offset:
|
|
||||||
ret = self.data[self.offset : self.offset + i]
|
|
||||||
self.offset += i
|
|
||||||
else:
|
|
||||||
raise IndexError
|
|
||||||
return ret
|
|
||||||
|
|
||||||
def remaining_count(self) -> int:
|
|
||||||
return len(self.data) - self.offset
|
|
||||||
|
|
||||||
|
|
||||||
def read_bitcoin_varint(r: BytearrayReader) -> int:
|
|
||||||
prefix = r.get()
|
prefix = r.get()
|
||||||
if prefix < 253:
|
if prefix < 253:
|
||||||
n = prefix
|
n = prefix
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
from apps.common.readers import BytearrayReader
|
from trezor.utils import BufferReader
|
||||||
|
|
||||||
if False:
|
if False:
|
||||||
from typing import List
|
from typing import List
|
||||||
@ -15,7 +15,7 @@ def encode_length(l: int) -> bytes:
|
|||||||
raise ValueError
|
raise ValueError
|
||||||
|
|
||||||
|
|
||||||
def decode_length(r: BytearrayReader) -> int:
|
def decode_length(r: BufferReader) -> int:
|
||||||
init = r.get()
|
init = r.get()
|
||||||
if init < 0x80:
|
if init < 0x80:
|
||||||
# short form encodes length in initial octet
|
# short form encodes length in initial octet
|
||||||
@ -45,7 +45,7 @@ def encode_int(i: bytes) -> bytes:
|
|||||||
return b"\x02" + encode_length(len(i)) + i
|
return b"\x02" + encode_length(len(i)) + i
|
||||||
|
|
||||||
|
|
||||||
def decode_int(r: BytearrayReader) -> bytes:
|
def decode_int(r: BufferReader) -> bytes:
|
||||||
if r.get() != 0x02:
|
if r.get() != 0x02:
|
||||||
raise ValueError
|
raise ValueError
|
||||||
|
|
||||||
@ -76,7 +76,7 @@ def encode_seq(seq: tuple) -> bytes:
|
|||||||
|
|
||||||
|
|
||||||
def decode_seq(data: bytes) -> List[bytes]:
|
def decode_seq(data: bytes) -> List[bytes]:
|
||||||
r = BytearrayReader(data)
|
r = BufferReader(data)
|
||||||
|
|
||||||
if r.get() != 0x30:
|
if r.get() != 0x30:
|
||||||
raise ValueError
|
raise ValueError
|
||||||
|
@ -208,10 +208,14 @@ class BufferReader:
|
|||||||
|
|
||||||
def peek(self) -> int:
|
def peek(self) -> int:
|
||||||
"""Peek the ordinal value of the next byte to be read."""
|
"""Peek the ordinal value of the next byte to be read."""
|
||||||
|
if self.offset >= len(self.buffer):
|
||||||
|
raise EOFError
|
||||||
return self.buffer[self.offset]
|
return self.buffer[self.offset]
|
||||||
|
|
||||||
def get(self) -> int:
|
def get(self) -> int:
|
||||||
"""Read exactly one byte and return its ordinal value."""
|
"""Read exactly one byte and return its ordinal value."""
|
||||||
|
if self.offset >= len(self.buffer):
|
||||||
|
raise EOFError
|
||||||
byte = self.buffer[self.offset]
|
byte = self.buffer[self.offset]
|
||||||
self.offset += 1
|
self.offset += 1
|
||||||
return byte
|
return byte
|
||||||
|
Loading…
Reference in New Issue
Block a user