refactor(cardano): shorten identifiers in addresses, certs, aux data

pull/2374/head
David Misiak 2 years ago committed by matejcik
parent 50ad00a3c0
commit 8a229edcb3

@ -33,7 +33,7 @@ MIN_ADDRESS_BYTES_LENGTH = 29
MAX_ADDRESS_BYTES_LENGTH = 65 MAX_ADDRESS_BYTES_LENGTH = 65
def assert_address_params_cond(condition: bool) -> None: def assert_params_cond(condition: bool) -> None:
if not condition: if not condition:
raise wire.ProcessError("Invalid address parameters") raise wire.ProcessError("Invalid address parameters")
@ -44,10 +44,10 @@ def validate_address_parameters(
_validate_address_parameters_structure(parameters) _validate_address_parameters_structure(parameters)
if parameters.address_type == CardanoAddressType.BYRON: if parameters.address_type == CardanoAddressType.BYRON:
assert_address_params_cond(seed.is_byron_path(parameters.address_n)) assert_params_cond(seed.is_byron_path(parameters.address_n))
elif parameters.address_type == CardanoAddressType.BASE: elif parameters.address_type == CardanoAddressType.BASE:
assert_address_params_cond(seed.is_shelley_path(parameters.address_n)) assert_params_cond(seed.is_shelley_path(parameters.address_n))
_validate_base_address_staking_info( _validate_base_address_staking_info(
parameters.address_n_staking, parameters.staking_key_hash parameters.address_n_staking, parameters.staking_key_hash
) )
@ -59,7 +59,7 @@ def validate_address_parameters(
) )
elif parameters.address_type == CardanoAddressType.BASE_KEY_SCRIPT: elif parameters.address_type == CardanoAddressType.BASE_KEY_SCRIPT:
assert_address_params_cond(seed.is_shelley_path(parameters.address_n)) assert_params_cond(seed.is_shelley_path(parameters.address_n))
_validate_script_hash(parameters.script_staking_hash) _validate_script_hash(parameters.script_staking_hash)
elif parameters.address_type == CardanoAddressType.BASE_SCRIPT_SCRIPT: elif parameters.address_type == CardanoAddressType.BASE_SCRIPT_SCRIPT:
@ -67,22 +67,22 @@ def validate_address_parameters(
_validate_script_hash(parameters.script_staking_hash) _validate_script_hash(parameters.script_staking_hash)
elif parameters.address_type == CardanoAddressType.POINTER: elif parameters.address_type == CardanoAddressType.POINTER:
assert_address_params_cond(seed.is_shelley_path(parameters.address_n)) assert_params_cond(seed.is_shelley_path(parameters.address_n))
assert_address_params_cond(parameters.certificate_pointer is not None) assert_params_cond(parameters.certificate_pointer is not None)
elif parameters.address_type == CardanoAddressType.POINTER_SCRIPT: elif parameters.address_type == CardanoAddressType.POINTER_SCRIPT:
_validate_script_hash(parameters.script_payment_hash) _validate_script_hash(parameters.script_payment_hash)
assert_address_params_cond(parameters.certificate_pointer is not None) assert_params_cond(parameters.certificate_pointer is not None)
elif parameters.address_type == CardanoAddressType.ENTERPRISE: elif parameters.address_type == CardanoAddressType.ENTERPRISE:
assert_address_params_cond(seed.is_shelley_path(parameters.address_n)) assert_params_cond(seed.is_shelley_path(parameters.address_n))
elif parameters.address_type == CardanoAddressType.ENTERPRISE_SCRIPT: elif parameters.address_type == CardanoAddressType.ENTERPRISE_SCRIPT:
_validate_script_hash(parameters.script_payment_hash) _validate_script_hash(parameters.script_payment_hash)
elif parameters.address_type == CardanoAddressType.REWARD: elif parameters.address_type == CardanoAddressType.REWARD:
assert_address_params_cond(seed.is_shelley_path(parameters.address_n_staking)) assert_params_cond(seed.is_shelley_path(parameters.address_n_staking))
assert_address_params_cond( assert_params_cond(
SCHEMA_STAKING_ANY_ACCOUNT.match(parameters.address_n_staking) SCHEMA_STAKING_ANY_ACCOUNT.match(parameters.address_n_staking)
) )
@ -173,28 +173,26 @@ def _validate_address_parameters_structure(
), ),
} }
assert_address_params_cond(parameters.address_type in fields_to_be_empty) assert_params_cond(parameters.address_type in fields_to_be_empty)
assert_address_params_cond(not any(fields_to_be_empty[parameters.address_type])) assert_params_cond(not any(fields_to_be_empty[parameters.address_type]))
def _validate_base_address_staking_info( def _validate_base_address_staking_info(
staking_path: list[int], staking_path: list[int],
staking_key_hash: bytes | None, staking_key_hash: bytes | None,
) -> None: ) -> None:
assert_address_params_cond(not (staking_key_hash and staking_path)) assert_params_cond(not (staking_key_hash and staking_path))
if staking_key_hash: if staking_key_hash:
assert_address_params_cond(len(staking_key_hash) == ADDRESS_KEY_HASH_SIZE) assert_params_cond(len(staking_key_hash) == ADDRESS_KEY_HASH_SIZE)
elif staking_path: elif staking_path:
assert_address_params_cond(SCHEMA_STAKING_ANY_ACCOUNT.match(staking_path)) assert_params_cond(SCHEMA_STAKING_ANY_ACCOUNT.match(staking_path))
else: else:
raise wire.ProcessError("Invalid address parameters") raise wire.ProcessError("Invalid address parameters")
def _validate_script_hash(script_hash: bytes | None) -> None: def _validate_script_hash(script_hash: bytes | None) -> None:
assert_address_params_cond( assert_params_cond(script_hash is not None and len(script_hash) == SCRIPT_HASH_SIZE)
script_hash is not None and len(script_hash) == SCRIPT_HASH_SIZE
)
def validate_output_address_parameters( def validate_output_address_parameters(
@ -204,7 +202,7 @@ def validate_output_address_parameters(
# Change outputs with script payment part are forbidden. # Change outputs with script payment part are forbidden.
# Reward addresses are forbidden as outputs in general, see also validate_output_address # Reward addresses are forbidden as outputs in general, see also validate_output_address
assert_address_params_cond( assert_params_cond(
parameters.address_type parameters.address_type
in ( in (
CardanoAddressType.BASE, CardanoAddressType.BASE,
@ -216,26 +214,24 @@ def validate_output_address_parameters(
) )
def assert_address_cond(condition: bool) -> None: def assert_cond(condition: bool) -> None:
if not condition: if not condition:
raise wire.ProcessError("Invalid address") raise wire.ProcessError("Invalid address")
def _validate_address_and_get_type( def _validate_and_get_type(address: str, protocol_magic: int, network_id: int) -> int:
address: str, protocol_magic: int, network_id: int
) -> int:
""" """
Validates Cardano address and returns its type Validates Cardano address and returns its type
for the convenience of outward-facing functions. for the convenience of outward-facing functions.
""" """
assert_address_cond(address is not None) assert_cond(address is not None)
assert_address_cond(len(address) > 0) assert_cond(len(address) > 0)
address_bytes = get_address_bytes_unsafe(address) address_bytes = get_bytes_unsafe(address)
address_type = get_address_type(address_bytes) address_type = get_type(address_bytes)
if address_type == CardanoAddressType.BYRON: if address_type == CardanoAddressType.BYRON:
byron_addresses.validate_byron_address(address_bytes, protocol_magic) byron_addresses.validate(address_bytes, protocol_magic)
elif address_type in ADDRESS_TYPES_SHELLEY: elif address_type in ADDRESS_TYPES_SHELLEY:
_validate_shelley_address(address, address_bytes, network_id) _validate_shelley_address(address, address_bytes, network_id)
else: else:
@ -245,21 +241,21 @@ def _validate_address_and_get_type(
def validate_output_address(address: str, protocol_magic: int, network_id: int) -> None: def validate_output_address(address: str, protocol_magic: int, network_id: int) -> None:
address_type = _validate_address_and_get_type(address, protocol_magic, network_id) address_type = _validate_and_get_type(address, protocol_magic, network_id)
assert_address_cond( assert_cond(
address_type address_type
not in (CardanoAddressType.REWARD, CardanoAddressType.REWARD_SCRIPT) not in (CardanoAddressType.REWARD, CardanoAddressType.REWARD_SCRIPT)
) )
def validate_reward_address(address: str, protocol_magic: int, network_id: int) -> None: def validate_reward_address(address: str, protocol_magic: int, network_id: int) -> None:
address_type = _validate_address_and_get_type(address, protocol_magic, network_id) address_type = _validate_and_get_type(address, protocol_magic, network_id)
assert_address_cond( assert_cond(
address_type in (CardanoAddressType.REWARD, CardanoAddressType.REWARD_SCRIPT) address_type in (CardanoAddressType.REWARD, CardanoAddressType.REWARD_SCRIPT)
) )
def get_address_bytes_unsafe(address: str) -> bytes: def get_bytes_unsafe(address: str) -> bytes:
try: try:
address_bytes = bech32.decode_unsafe(address) address_bytes = bech32.decode_unsafe(address)
except ValueError: except ValueError:
@ -271,38 +267,36 @@ def get_address_bytes_unsafe(address: str) -> bytes:
return address_bytes return address_bytes
def get_address_type(address: bytes) -> CardanoAddressType: def get_type(address: bytes) -> CardanoAddressType:
return address[0] >> 4 # type: ignore [int-into-enum] return address[0] >> 4 # type: ignore [int-into-enum]
def _validate_shelley_address( def _validate_shelley_address(
address_str: str, address_bytes: bytes, network_id: int address_str: str, address_bytes: bytes, network_id: int
) -> None: ) -> None:
address_type = get_address_type(address_bytes) address_type = get_type(address_bytes)
_validate_address_size(address_bytes) _validate_size(address_bytes)
_validate_address_bech32_hrp(address_str, address_type, network_id) _validate_bech32_hrp(address_str, address_type, network_id)
_validate_address_network_id(address_bytes, network_id) _validate_network_id(address_bytes, network_id)
def _validate_address_size(address_bytes: bytes) -> None: def _validate_size(address_bytes: bytes) -> None:
assert_address_cond( assert_cond(
MIN_ADDRESS_BYTES_LENGTH <= len(address_bytes) <= MAX_ADDRESS_BYTES_LENGTH MIN_ADDRESS_BYTES_LENGTH <= len(address_bytes) <= MAX_ADDRESS_BYTES_LENGTH
) )
def _validate_address_bech32_hrp( def _validate_bech32_hrp(
address_str: str, address_type: CardanoAddressType, network_id: int address_str: str, address_type: CardanoAddressType, network_id: int
) -> None: ) -> None:
valid_hrp = _get_bech32_hrp_for_address(address_type, network_id) valid_hrp = _get_bech32_hrp(address_type, network_id)
bech32_hrp = bech32.get_hrp(address_str) bech32_hrp = bech32.get_hrp(address_str)
assert_address_cond(valid_hrp == bech32_hrp) assert_cond(valid_hrp == bech32_hrp)
def _get_bech32_hrp_for_address( def _get_bech32_hrp(address_type: CardanoAddressType, network_id: int) -> str:
address_type: CardanoAddressType, network_id: int
) -> str:
if address_type == CardanoAddressType.BYRON: if address_type == CardanoAddressType.BYRON:
# Byron address uses base58 encoding # Byron address uses base58 encoding
raise ValueError raise ValueError
@ -319,42 +313,37 @@ def _get_bech32_hrp_for_address(
return bech32.HRP_TESTNET_ADDRESS return bech32.HRP_TESTNET_ADDRESS
def _validate_address_network_id(address: bytes, network_id: int) -> None: def _validate_network_id(address: bytes, network_id: int) -> None:
if _get_address_network_id(address) != network_id: if _get_network_id(address) != network_id:
raise wire.ProcessError("Output address network mismatch") raise wire.ProcessError("Output address network mismatch")
def _get_address_network_id(address: bytes) -> int: def _get_network_id(address: bytes) -> int:
return address[0] & 0x0F return address[0] & 0x0F
def derive_human_readable_address( def derive_human_readable(
keychain: seed.Keychain, keychain: seed.Keychain,
parameters: messages.CardanoAddressParametersType, parameters: messages.CardanoAddressParametersType,
protocol_magic: int, protocol_magic: int,
network_id: int, network_id: int,
) -> str: ) -> str:
address_bytes = derive_address_bytes( address_bytes = derive_bytes(keychain, parameters, protocol_magic, network_id)
keychain, parameters, protocol_magic, network_id return encode_human_readable(address_bytes)
)
return encode_human_readable_address(address_bytes)
def encode_human_readable(address_bytes: bytes) -> str:
def encode_human_readable_address(address_bytes: bytes) -> str: address_type = get_type(address_bytes)
address_type = get_address_type(address_bytes)
if address_type == CardanoAddressType.BYRON: if address_type == CardanoAddressType.BYRON:
return base58.encode(address_bytes) return base58.encode(address_bytes)
elif address_type in ADDRESS_TYPES_SHELLEY: elif address_type in ADDRESS_TYPES_SHELLEY:
hrp = _get_bech32_hrp_for_address( hrp = _get_bech32_hrp(address_type, _get_network_id(address_bytes))
address_type, _get_address_network_id(address_bytes)
)
return bech32.encode(hrp, address_bytes) return bech32.encode(hrp, address_bytes)
else: else:
raise ValueError raise ValueError
def derive_address_bytes( def derive_bytes(
keychain: seed.Keychain, keychain: seed.Keychain,
parameters: messages.CardanoAddressParametersType, parameters: messages.CardanoAddressParametersType,
protocol_magic: int, protocol_magic: int,
@ -363,9 +352,7 @@ def derive_address_bytes(
is_byron_address = parameters.address_type == CardanoAddressType.BYRON is_byron_address = parameters.address_type == CardanoAddressType.BYRON
if is_byron_address: if is_byron_address:
address = byron_addresses.derive_byron_address( address = byron_addresses.derive(keychain, parameters.address_n, protocol_magic)
keychain, parameters.address_n, protocol_magic
)
else: else:
address = _derive_shelley_address(keychain, parameters, network_id) address = _derive_shelley_address(keychain, parameters, network_id)
@ -377,15 +364,15 @@ def _derive_shelley_address(
parameters: messages.CardanoAddressParametersType, parameters: messages.CardanoAddressParametersType,
network_id: int, network_id: int,
) -> bytes: ) -> bytes:
header = _create_address_header(parameters.address_type, network_id) header = _create_header(parameters.address_type, network_id)
payment_part = _get_address_payment_part(keychain, parameters) payment_part = _get_payment_part(keychain, parameters)
staking_part = _get_address_staking_part(keychain, parameters) staking_part = _get_staking_part(keychain, parameters)
return header + payment_part + staking_part return header + payment_part + staking_part
def _create_address_header(address_type: CardanoAddressType, network_id: int) -> bytes: def _create_header(address_type: CardanoAddressType, network_id: int) -> bytes:
header: int = address_type << 4 | network_id header: int = address_type << 4 | network_id
return header.to_bytes(1, "little") return header.to_bytes(1, "little")

@ -35,7 +35,7 @@ def validate(auxiliary_data: messages.CardanoTxAuxiliaryData) -> None:
fields_provided = 0 fields_provided = 0
if auxiliary_data.hash: if auxiliary_data.hash:
fields_provided += 1 fields_provided += 1
_validate_auxiliary_data_hash(auxiliary_data.hash) _validate_hash(auxiliary_data.hash)
if auxiliary_data.catalyst_registration_parameters: if auxiliary_data.catalyst_registration_parameters:
fields_provided += 1 fields_provided += 1
_validate_catalyst_registration_parameters( _validate_catalyst_registration_parameters(
@ -46,7 +46,7 @@ def validate(auxiliary_data: messages.CardanoTxAuxiliaryData) -> None:
raise wire.ProcessError("Invalid auxiliary data") raise wire.ProcessError("Invalid auxiliary data")
def _validate_auxiliary_data_hash(auxiliary_data_hash: bytes) -> None: def _validate_hash(auxiliary_data_hash: bytes) -> None:
if len(auxiliary_data_hash) != AUXILIARY_DATA_HASH_SIZE: if len(auxiliary_data_hash) != AUXILIARY_DATA_HASH_SIZE:
raise wire.ProcessError("Invalid auxiliary data") raise wire.ProcessError("Invalid auxiliary data")
@ -72,7 +72,7 @@ def _validate_catalyst_registration_parameters(
addresses.validate_address_parameters(address_parameters) addresses.validate_address_parameters(address_parameters)
async def show_auxiliary_data( async def show(
ctx: wire.Context, ctx: wire.Context,
keychain: seed.Keychain, keychain: seed.Keychain,
auxiliary_data_hash: bytes, auxiliary_data_hash: bytes,
@ -103,7 +103,7 @@ async def _show_catalyst_registration(
public_key = catalyst_registration_parameters.voting_public_key public_key = catalyst_registration_parameters.voting_public_key
encoded_public_key = bech32.encode(bech32.HRP_JORMUN_PUBLIC_KEY, public_key) encoded_public_key = bech32.encode(bech32.HRP_JORMUN_PUBLIC_KEY, public_key)
staking_path = catalyst_registration_parameters.staking_path staking_path = catalyst_registration_parameters.staking_path
reward_address = addresses.derive_human_readable_address( reward_address = addresses.derive_human_readable(
keychain, keychain,
catalyst_registration_parameters.reward_address_parameters, catalyst_registration_parameters.reward_address_parameters,
protocol_magic, protocol_magic,
@ -116,7 +116,7 @@ async def _show_catalyst_registration(
) )
def get_auxiliary_data_hash_and_supplement( def get_hash_and_supplement(
keychain: seed.Keychain, keychain: seed.Keychain,
auxiliary_data: messages.CardanoTxAuxiliaryData, auxiliary_data: messages.CardanoTxAuxiliaryData,
protocol_magic: int, protocol_magic: int,
@ -129,7 +129,7 @@ def get_auxiliary_data_hash_and_supplement(
) = _get_signed_catalyst_registration_payload( ) = _get_signed_catalyst_registration_payload(
keychain, parameters, protocol_magic, network_id keychain, parameters, protocol_magic, network_id
) )
auxiliary_data_hash = _get_catalyst_registration_auxiliary_data_hash( auxiliary_data_hash = _get_catalyst_registration_hash(
catalyst_registration_payload, catalyst_signature catalyst_registration_payload, catalyst_signature
) )
auxiliary_data_supplement = messages.CardanoTxAuxiliaryDataSupplement( auxiliary_data_supplement = messages.CardanoTxAuxiliaryDataSupplement(
@ -145,7 +145,7 @@ def get_auxiliary_data_hash_and_supplement(
) )
def _get_catalyst_registration_auxiliary_data_hash( def _get_catalyst_registration_hash(
catalyst_registration_payload: CatalystRegistrationPayload, catalyst_registration_payload: CatalystRegistrationPayload,
catalyst_registration_payload_signature: bytes, catalyst_registration_payload_signature: bytes,
) -> bytes: ) -> bytes:
@ -153,9 +153,7 @@ def _get_catalyst_registration_auxiliary_data_hash(
catalyst_registration_payload, catalyst_registration_payload,
catalyst_registration_payload_signature, catalyst_registration_payload_signature,
) )
return _hash_auxiliary_data( return _get_hash(cbor.encode(_wrap_metadata(cborized_catalyst_registration)))
cbor.encode(_wrap_metadata(cborized_catalyst_registration))
)
def _cborize_catalyst_registration( def _cborize_catalyst_registration(
@ -183,7 +181,7 @@ def _get_signed_catalyst_registration_payload(
payload: CatalystRegistrationPayload = { payload: CatalystRegistrationPayload = {
1: catalyst_registration_parameters.voting_public_key, 1: catalyst_registration_parameters.voting_public_key,
2: staking_key, 2: staking_key,
3: addresses.derive_address_bytes( 3: addresses.derive_bytes(
keychain, keychain,
catalyst_registration_parameters.reward_address_parameters, catalyst_registration_parameters.reward_address_parameters,
protocol_magic, protocol_magic,
@ -234,7 +232,7 @@ def _wrap_metadata(metadata: dict) -> tuple[dict, tuple]:
return metadata, () return metadata, ()
def _hash_auxiliary_data(auxiliary_data: bytes) -> bytes: def _get_hash(auxiliary_data: bytes) -> bytes:
return hashlib.blake2b( return hashlib.blake2b(
data=auxiliary_data, outlen=AUXILIARY_DATA_HASH_SIZE data=auxiliary_data, outlen=AUXILIARY_DATA_HASH_SIZE
).digest() ).digest()

@ -22,15 +22,13 @@ with base58 encoding and all the nuances of Byron addresses.
""" """
def _encode_address_raw(address_data_encoded: bytes) -> bytes: def _encode_raw(address_data_encoded: bytes) -> bytes:
return cbor.encode( return cbor.encode(
[cbor.Tagged(24, address_data_encoded), crc.crc32(address_data_encoded)] [cbor.Tagged(24, address_data_encoded), crc.crc32(address_data_encoded)]
) )
def derive_byron_address( def derive(keychain: seed.Keychain, path: list, protocol_magic: int) -> bytes:
keychain: seed.Keychain, path: list, protocol_magic: int
) -> bytes:
address_attributes = get_address_attributes(protocol_magic) address_attributes = get_address_attributes(protocol_magic)
address_root = _get_address_root(keychain, path, address_attributes) address_root = _get_address_root(keychain, path, address_attributes)
@ -38,7 +36,7 @@ def derive_byron_address(
address_data = [address_root, address_attributes, address_type] address_data = [address_root, address_attributes, address_type]
address_data_encoded = cbor.encode(address_data) address_data_encoded = cbor.encode(address_data)
return _encode_address_raw(address_data_encoded) return _encode_raw(address_data_encoded)
def get_address_attributes(protocol_magic: int) -> dict: def get_address_attributes(protocol_magic: int) -> dict:
@ -51,12 +49,12 @@ def get_address_attributes(protocol_magic: int) -> dict:
return address_attributes return address_attributes
def validate_byron_address(address: bytes, protocol_magic: int) -> None: def validate(address: bytes, protocol_magic: int) -> None:
address_data_encoded = _decode_address_raw(address) address_data_encoded = _decode_raw(address)
_validate_address_data_protocol_magic(address_data_encoded, protocol_magic) _validate_protocol_magic(address_data_encoded, protocol_magic)
def _decode_address_raw(address: bytes) -> bytes: def _decode_raw(address: bytes) -> bytes:
try: try:
address_unpacked = cbor.decode(address) address_unpacked = cbor.decode(address)
except ValueError as e: except ValueError as e:
@ -81,9 +79,7 @@ def _decode_address_raw(address: bytes) -> bytes:
return address_data_encoded return address_data_encoded
def _validate_address_data_protocol_magic( def _validate_protocol_magic(address_data_encoded: bytes, protocol_magic: int) -> None:
address_data_encoded: bytes, protocol_magic: int
) -> None:
""" """
Determines whether the correct protocol magic (or none) Determines whether the correct protocol magic (or none)
is included in the address. Addresses on mainnet don't is included in the address. Addresses on mainnet don't

@ -29,13 +29,13 @@ MAX_URL_LENGTH = 64
MAX_PORT_NUMBER = 65535 MAX_PORT_NUMBER = 65535
def validate_certificate( def validate(
certificate: messages.CardanoTxCertificate, certificate: messages.CardanoTxCertificate,
protocol_magic: int, protocol_magic: int,
network_id: int, network_id: int,
account_path_checker: AccountPathChecker, account_path_checker: AccountPathChecker,
) -> None: ) -> None:
_validate_certificate_structure(certificate) _validate_structure(certificate)
if certificate.type in ( if certificate.type in (
CardanoCertificateType.STAKE_DELEGATION, CardanoCertificateType.STAKE_DELEGATION,
@ -63,7 +63,7 @@ def validate_certificate(
account_path_checker.add_certificate(certificate) account_path_checker.add_certificate(certificate)
def _validate_certificate_structure(certificate: messages.CardanoTxCertificate) -> None: def _validate_structure(certificate: messages.CardanoTxCertificate) -> None:
pool = certificate.pool pool = certificate.pool
pool_parameters = certificate.pool_parameters pool_parameters = certificate.pool_parameters
@ -85,7 +85,7 @@ def _validate_certificate_structure(certificate: messages.CardanoTxCertificate)
raise wire.ProcessError("Invalid certificate") raise wire.ProcessError("Invalid certificate")
def cborize_certificate( def cborize(
keychain: seed.Keychain, certificate: messages.CardanoTxCertificate keychain: seed.Keychain, certificate: messages.CardanoTxCertificate
) -> CborSequence: ) -> CborSequence:
if certificate.type in ( if certificate.type in (
@ -94,7 +94,7 @@ def cborize_certificate(
): ):
return ( return (
certificate.type, certificate.type,
cborize_certificate_stake_credential( cborize_stake_credential(
keychain, keychain,
certificate.path, certificate.path,
certificate.script_hash, certificate.script_hash,
@ -104,7 +104,7 @@ def cborize_certificate(
elif certificate.type == CardanoCertificateType.STAKE_DELEGATION: elif certificate.type == CardanoCertificateType.STAKE_DELEGATION:
return ( return (
certificate.type, certificate.type,
cborize_certificate_stake_credential( cborize_stake_credential(
keychain, keychain,
certificate.path, certificate.path,
certificate.script_hash, certificate.script_hash,
@ -116,7 +116,7 @@ def cborize_certificate(
raise RuntimeError # should be unreachable raise RuntimeError # should be unreachable
def cborize_certificate_stake_credential( def cborize_stake_credential(
keychain: seed.Keychain, keychain: seed.Keychain,
path: list[int], path: list[int],
script_hash: bytes | None, script_hash: bytes | None,
@ -132,7 +132,7 @@ def cborize_certificate_stake_credential(
raise RuntimeError raise RuntimeError
def cborize_pool_registration_certificate_init( def cborize_pool_registration_init(
certificate: messages.CardanoTxCertificate, certificate: messages.CardanoTxCertificate,
) -> CborSequence: ) -> CborSequence:
assert certificate.type == CardanoCertificateType.STAKE_POOL_REGISTRATION assert certificate.type == CardanoCertificateType.STAKE_POOL_REGISTRATION
@ -155,11 +155,11 @@ def cborize_pool_registration_certificate_init(
), ),
# this relies on pool_parameters.reward_account being validated beforehand # this relies on pool_parameters.reward_account being validated beforehand
# in _validate_pool_parameters # in _validate_pool_parameters
addresses.get_address_bytes_unsafe(pool_parameters.reward_account), addresses.get_bytes_unsafe(pool_parameters.reward_account),
) )
def assert_certificate_cond(condition: bool) -> None: def assert_cond(condition: bool) -> None:
if not condition: if not condition:
raise wire.ProcessError("Invalid certificate") raise wire.ProcessError("Invalid certificate")
@ -169,16 +169,14 @@ def _validate_pool_parameters(
protocol_magic: int, protocol_magic: int,
network_id: int, network_id: int,
) -> None: ) -> None:
assert_certificate_cond(len(pool_parameters.pool_id) == POOL_HASH_SIZE) assert_cond(len(pool_parameters.pool_id) == POOL_HASH_SIZE)
assert_certificate_cond(len(pool_parameters.vrf_key_hash) == VRF_KEY_HASH_SIZE) assert_cond(len(pool_parameters.vrf_key_hash) == VRF_KEY_HASH_SIZE)
assert_certificate_cond(0 <= pool_parameters.pledge <= LOVELACE_MAX_SUPPLY) assert_cond(0 <= pool_parameters.pledge <= LOVELACE_MAX_SUPPLY)
assert_certificate_cond(0 <= pool_parameters.cost <= LOVELACE_MAX_SUPPLY) assert_cond(0 <= pool_parameters.cost <= LOVELACE_MAX_SUPPLY)
assert_certificate_cond(pool_parameters.margin_numerator >= 0) assert_cond(pool_parameters.margin_numerator >= 0)
assert_certificate_cond(pool_parameters.margin_denominator > 0) assert_cond(pool_parameters.margin_denominator > 0)
assert_certificate_cond( assert_cond(pool_parameters.margin_numerator <= pool_parameters.margin_denominator)
pool_parameters.margin_numerator <= pool_parameters.margin_denominator assert_cond(pool_parameters.owners_count > 0)
)
assert_certificate_cond(pool_parameters.owners_count > 0)
addresses.validate_reward_address( addresses.validate_reward_address(
pool_parameters.reward_account, protocol_magic, network_id pool_parameters.reward_account, protocol_magic, network_id
@ -191,41 +189,39 @@ def _validate_pool_parameters(
def validate_pool_owner( def validate_pool_owner(
owner: messages.CardanoPoolOwner, account_path_checker: AccountPathChecker owner: messages.CardanoPoolOwner, account_path_checker: AccountPathChecker
) -> None: ) -> None:
assert_certificate_cond( assert_cond(
owner.staking_key_hash is not None or owner.staking_key_path is not None owner.staking_key_hash is not None or owner.staking_key_path is not None
) )
if owner.staking_key_hash is not None: if owner.staking_key_hash is not None:
assert_certificate_cond(len(owner.staking_key_hash) == ADDRESS_KEY_HASH_SIZE) assert_cond(len(owner.staking_key_hash) == ADDRESS_KEY_HASH_SIZE)
if owner.staking_key_path: if owner.staking_key_path:
assert_certificate_cond( assert_cond(SCHEMA_STAKING_ANY_ACCOUNT.match(owner.staking_key_path))
SCHEMA_STAKING_ANY_ACCOUNT.match(owner.staking_key_path)
)
account_path_checker.add_pool_owner(owner) account_path_checker.add_pool_owner(owner)
def validate_pool_relay(pool_relay: messages.CardanoPoolRelayParameters) -> None: def validate_pool_relay(pool_relay: messages.CardanoPoolRelayParameters) -> None:
if pool_relay.type == CardanoPoolRelayType.SINGLE_HOST_IP: if pool_relay.type == CardanoPoolRelayType.SINGLE_HOST_IP:
assert_certificate_cond( assert_cond(
pool_relay.ipv4_address is not None or pool_relay.ipv6_address is not None pool_relay.ipv4_address is not None or pool_relay.ipv6_address is not None
) )
if pool_relay.ipv4_address is not None: if pool_relay.ipv4_address is not None:
assert_certificate_cond(len(pool_relay.ipv4_address) == IPV4_ADDRESS_SIZE) assert_cond(len(pool_relay.ipv4_address) == IPV4_ADDRESS_SIZE)
if pool_relay.ipv6_address is not None: if pool_relay.ipv6_address is not None:
assert_certificate_cond(len(pool_relay.ipv6_address) == IPV6_ADDRESS_SIZE) assert_cond(len(pool_relay.ipv6_address) == IPV6_ADDRESS_SIZE)
assert_certificate_cond( assert_cond(
pool_relay.port is not None and 0 <= pool_relay.port <= MAX_PORT_NUMBER pool_relay.port is not None and 0 <= pool_relay.port <= MAX_PORT_NUMBER
) )
elif pool_relay.type == CardanoPoolRelayType.SINGLE_HOST_NAME: elif pool_relay.type == CardanoPoolRelayType.SINGLE_HOST_NAME:
assert_certificate_cond( assert_cond(
pool_relay.host_name is not None pool_relay.host_name is not None
and len(pool_relay.host_name) <= MAX_URL_LENGTH and len(pool_relay.host_name) <= MAX_URL_LENGTH
) )
assert_certificate_cond( assert_cond(
pool_relay.port is not None and 0 <= pool_relay.port <= MAX_PORT_NUMBER pool_relay.port is not None and 0 <= pool_relay.port <= MAX_PORT_NUMBER
) )
elif pool_relay.type == CardanoPoolRelayType.MULTIPLE_HOST_NAME: elif pool_relay.type == CardanoPoolRelayType.MULTIPLE_HOST_NAME:
assert_certificate_cond( assert_cond(
pool_relay.host_name is not None pool_relay.host_name is not None
and len(pool_relay.host_name) <= MAX_URL_LENGTH and len(pool_relay.host_name) <= MAX_URL_LENGTH
) )
@ -234,9 +230,9 @@ def validate_pool_relay(pool_relay: messages.CardanoPoolRelayParameters) -> None
def _validate_pool_metadata(pool_metadata: messages.CardanoPoolMetadataType) -> None: def _validate_pool_metadata(pool_metadata: messages.CardanoPoolMetadataType) -> None:
assert_certificate_cond(len(pool_metadata.url) <= MAX_URL_LENGTH) assert_cond(len(pool_metadata.url) <= MAX_URL_LENGTH)
assert_certificate_cond(len(pool_metadata.hash) == POOL_METADATA_HASH_SIZE) assert_cond(len(pool_metadata.hash) == POOL_METADATA_HASH_SIZE)
assert_certificate_cond(all((32 <= ord(c) < 127) for c in pool_metadata.url)) assert_cond(all((32 <= ord(c) < 127) for c in pool_metadata.url))
def cborize_pool_owner( def cborize_pool_owner(

@ -14,7 +14,7 @@ async def get_address(
addresses.validate_address_parameters(msg.address_parameters) addresses.validate_address_parameters(msg.address_parameters)
try: try:
address = addresses.derive_human_readable_address( address = addresses.derive_human_readable(
keychain, msg.address_parameters, msg.protocol_magic, msg.network_id keychain, msg.address_parameters, msg.protocol_magic, msg.network_id
) )
except ValueError as e: except ValueError as e:

@ -543,7 +543,7 @@ async def confirm_stake_pool_owner(
props.append(("Pool owner:", address_n_to_str(owner.staking_key_path))) props.append(("Pool owner:", address_n_to_str(owner.staking_key_path)))
props.append( props.append(
( (
addresses.derive_human_readable_address( addresses.derive_human_readable(
keychain, keychain,
messages.CardanoAddressParametersType( messages.CardanoAddressParametersType(
address_type=CardanoAddressType.REWARD, address_type=CardanoAddressType.REWARD,
@ -560,7 +560,7 @@ async def confirm_stake_pool_owner(
props.append( props.append(
( (
"Pool owner:", "Pool owner:",
addresses.derive_human_readable_address( addresses.derive_human_readable(
keychain, keychain,
messages.CardanoAddressParametersType( messages.CardanoAddressParametersType(
address_type=CardanoAddressType.REWARD, address_type=CardanoAddressType.REWARD,
@ -635,7 +635,7 @@ async def confirm_withdrawal(
network_id: int, network_id: int,
) -> None: ) -> None:
address_type_name = "script reward" if withdrawal.script_hash else "reward" address_type_name = "script reward" if withdrawal.script_hash else "reward"
address = addresses.encode_human_readable_address(address_bytes) address = addresses.encode_human_readable(address_bytes)
props: list[PropertyType] = [ props: list[PropertyType] = [
(f"Confirm withdrawal for {address_type_name} address:", address), (f"Confirm withdrawal for {address_type_name} address:", address),
] ]

@ -313,7 +313,7 @@ class Signer:
await layout.warn_tx_output_contains_tokens(self.ctx) await layout.warn_tx_output_contains_tokens(self.ctx)
if output.address_parameters is not None: if output.address_parameters is not None:
address = addresses.derive_human_readable_address( address = addresses.derive_human_readable(
self.keychain, self.keychain,
output.address_parameters, output.address_parameters,
self.msg.protocol_magic, self.msg.protocol_magic,
@ -467,7 +467,7 @@ class Signer:
POOL_REGISTRATION_CERTIFICATE_ITEMS_COUNT POOL_REGISTRATION_CERTIFICATE_ITEMS_COUNT
) )
with certificates_list.append(pool_items_list): with certificates_list.append(pool_items_list):
for item in certificates.cborize_pool_registration_certificate_init( for item in certificates.cborize_pool_registration_init(
certificate certificate
): ):
pool_items_list.append(item) pool_items_list.append(item)
@ -493,11 +493,11 @@ class Signer:
) )
else: else:
certificates_list.append( certificates_list.append(
certificates.cborize_certificate(self.keychain, certificate) certificates.cborize(self.keychain, certificate)
) )
def _validate_certificate(self, certificate: messages.CardanoTxCertificate) -> None: def _validate_certificate(self, certificate: messages.CardanoTxCertificate) -> None:
certificates.validate_certificate( certificates.validate(
certificate, certificate,
self.msg.protocol_magic, self.msg.protocol_magic,
self.msg.network_id, self.msg.network_id,
@ -542,7 +542,7 @@ class Signer:
if owner.staking_key_path: if owner.staking_key_path:
owners_as_path_count += 1 owners_as_path_count += 1
certificates.assert_certificate_cond(owners_as_path_count == 1) certificates.assert_cond(owners_as_path_count == 1)
async def _show_pool_owner(self, owner: messages.CardanoPoolOwner) -> None: async def _show_pool_owner(self, owner: messages.CardanoPoolOwner) -> None:
if owner.staking_key_path: if owner.staking_key_path:
@ -603,15 +603,15 @@ class Signer:
data: messages.CardanoTxAuxiliaryData = await self.ctx.call( data: messages.CardanoTxAuxiliaryData = await self.ctx.call(
messages.CardanoTxItemAck(), messages.CardanoTxAuxiliaryData messages.CardanoTxItemAck(), messages.CardanoTxAuxiliaryData
) )
auxiliary_data.validate_auxiliary_data(data) auxiliary_data.validate(data)
( (
auxiliary_data_hash, auxiliary_data_hash,
auxiliary_data_supplement, auxiliary_data_supplement,
) = auxiliary_data.get_auxiliary_data_hash_and_supplement( ) = auxiliary_data.get_hash_and_supplement(
self.keychain, data, self.msg.protocol_magic, self.msg.network_id self.keychain, data, self.msg.protocol_magic, self.msg.network_id
) )
await auxiliary_data.show_auxiliary_data( await auxiliary_data.show(
self.ctx, self.ctx,
self.keychain, self.keychain,
auxiliary_data_hash, auxiliary_data_hash,
@ -790,7 +790,7 @@ class Signer:
def _get_output_address(self, output: messages.CardanoTxOutput) -> bytes: def _get_output_address(self, output: messages.CardanoTxOutput) -> bytes:
if output.address_parameters: if output.address_parameters:
return addresses.derive_address_bytes( return addresses.derive_bytes(
self.keychain, self.keychain,
output.address_parameters, output.address_parameters,
self.msg.protocol_magic, self.msg.protocol_magic,
@ -798,7 +798,7 @@ class Signer:
) )
else: else:
assert output.address is not None # _validate_output assert output.address is not None # _validate_output
return addresses.get_address_bytes_unsafe(output.address) return addresses.get_bytes_unsafe(output.address)
def _get_output_address_type( def _get_output_address_type(
self, output: messages.CardanoTxOutput self, output: messages.CardanoTxOutput
@ -806,9 +806,7 @@ class Signer:
if output.address_parameters: if output.address_parameters:
return output.address_parameters.address_type return output.address_parameters.address_type
assert output.address is not None # _validate_output assert output.address is not None # _validate_output
return addresses.get_address_type( return addresses.get_type(addresses.get_bytes_unsafe(output.address))
addresses.get_address_bytes_unsafe(output.address)
)
def _derive_withdrawal_address_bytes( def _derive_withdrawal_address_bytes(
self, withdrawal: messages.CardanoTxWithdrawal self, withdrawal: messages.CardanoTxWithdrawal
@ -818,7 +816,7 @@ class Signer:
if withdrawal.path or withdrawal.key_hash if withdrawal.path or withdrawal.key_hash
else CardanoAddressType.REWARD_SCRIPT else CardanoAddressType.REWARD_SCRIPT
) )
return addresses.derive_address_bytes( return addresses.derive_bytes(
self.keychain, self.keychain,
messages.CardanoAddressParametersType( messages.CardanoAddressParametersType(
address_type=reward_address_type, address_type=reward_address_type,

Loading…
Cancel
Save