1
0
mirror of https://github.com/trezor/trezor-firmware.git synced 2025-06-17 05:28:45 +00:00

chore(core): introduce wire logging with interface

This commit is contained in:
M1nd3r 2025-04-25 16:55:15 +02:00
parent 16d88e9083
commit ac729eb157
15 changed files with 263 additions and 101 deletions

View File

@ -789,6 +789,8 @@ Q(trezor.enums.DebugButton)
Q(trezor.enums.DebugPhysicalButton) Q(trezor.enums.DebugPhysicalButton)
Q(trezor.enums.DebugSwipeDirection) Q(trezor.enums.DebugSwipeDirection)
Q(trezor.enums.DebugWaitType) Q(trezor.enums.DebugWaitType)
Q(trezor.wire.wire_log)
Q(wire_log)
#endif #endif
// generate full alphabet // generate full alphabet

View File

@ -33,7 +33,7 @@ ALTCOINS = (
ALTCOINS_RE = re.compile("|".join(ALTCOINS), flags=re.IGNORECASE) ALTCOINS_RE = re.compile("|".join(ALTCOINS), flags=re.IGNORECASE)
THP_RE = re.compile(r"\.thp", flags=re.IGNORECASE) THP_RE = re.compile(r"\.thp", flags=re.IGNORECASE)
DEBUG_RE = re.compile("debug|prof", flags=re.IGNORECASE) DEBUG_RE = re.compile("debug|prof|wire_log", flags=re.IGNORECASE)
pyfiles = chain.from_iterable(sorted(SRCDIR.glob(p)) for p in PATTERNS) pyfiles = chain.from_iterable(sorted(SRCDIR.glob(p)) for p in PATTERNS)

View File

@ -56,15 +56,14 @@ def exception(name: str, exc: BaseException) -> None:
# we also need to instruct typechecker to ignore the missing argument # we also need to instruct typechecker to ignore the missing argument
# in ui.Result exception # in ui.Result exception
if exc.__class__.__name__ == "Result": if exc.__class__.__name__ == "Result":
_log( debug(
name, name,
_DEBUG,
"ui.Result: %s", "ui.Result: %s",
exc.value, # type: ignore [Cannot access attribute "value" for class "BaseException"] exc.value, # type: ignore [Cannot access attribute "value" for class "BaseException"]
) )
elif exc.__class__.__name__ == "Cancelled": elif exc.__class__.__name__ == "Cancelled":
_log(name, _DEBUG, "ui.Cancelled") debug(name, "ui.Cancelled")
else: else:
_log(name, _ERROR, "exception:") error(name, "exception:")
# since mypy 0.770 we cannot override sys, so print_exception is unknown # since mypy 0.770 we cannot override sys, so print_exception is unknown
sys.print_exception(exc) # type: ignore ["print_exception" is not a known attribute of module] sys.print_exception(exc) # type: ignore ["print_exception" is not a known attribute of module]

View File

@ -25,7 +25,7 @@ reads the message's header. When the message type is known the first handler is
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from trezor import log, loop, protobuf, utils from trezor import loop, protobuf, utils
from . import message_handler, protocol_common from . import message_handler, protocol_common
@ -41,6 +41,9 @@ from .message_handler import failure
# other packages. # other packages.
from .errors import * # isort:skip # noqa: F401,F403 from .errors import * # isort:skip # noqa: F401,F403
if __debug__:
from . import wire_log as log
if TYPE_CHECKING: if TYPE_CHECKING:
from trezorio import WireInterface from trezorio import WireInterface
from typing import Any, Callable, Coroutine, TypeVar from typing import Any, Callable, Coroutine, TypeVar
@ -92,11 +95,13 @@ if utils.USE_THP:
except Exception as exc: except Exception as exc:
# Log and try again. # Log and try again.
if __debug__: if __debug__:
log.exception(__name__, exc) log.exception(__name__, iface, exc)
finally: finally:
# Unload modules imported by the workflow. Should not raise. # Unload modules imported by the workflow. Should not raise.
if __debug__: if __debug__:
log.debug(__name__, "utils.unimport_end(modules) and loop.clear()") log.debug(
__name__, iface, "utils.unimport_end(modules) and loop.clear()"
)
utils.unimport_end(modules) utils.unimport_end(modules)
loop.clear() loop.clear()
return # pylint: disable=lost-exception return # pylint: disable=lost-exception
@ -119,7 +124,7 @@ else:
msg = await ctx.read_from_wire() msg = await ctx.read_from_wire()
except protocol_common.WireError as exc: except protocol_common.WireError as exc:
if __debug__: if __debug__:
log.exception(__name__, exc) log.exception(__name__, iface, exc)
await ctx.write(failure(exc)) await ctx.write(failure(exc))
continue continue
@ -144,7 +149,7 @@ else:
# Log and ignore. The session handler can only exit explicitly in the # Log and ignore. The session handler can only exit explicitly in the
# following finally block. # following finally block.
if __debug__: if __debug__:
log.exception(__name__, exc) log.exception(__name__, iface, exc)
finally: finally:
# Unload modules imported by the workflow. Should not raise. # Unload modules imported by the workflow. Should not raise.
utils.unimport_end(modules) utils.unimport_end(modules)
@ -152,7 +157,7 @@ else:
if not do_not_restart: if not do_not_restart:
# Let the session be restarted from `main`. # Let the session be restarted from `main`.
if __debug__: if __debug__:
log.debug(__name__, "loop.clear()") log.debug(__name__, iface, "loop.clear()")
loop.clear() loop.clear()
return # pylint: disable=lost-exception return # pylint: disable=lost-exception
@ -160,4 +165,4 @@ else:
# Log and try again. The session handler can only exit explicitly via # Log and try again. The session handler can only exit explicitly via
# loop.clear() above. # loop.clear() above.
if __debug__: if __debug__:
log.exception(__name__, exc) log.exception(__name__, iface, exc)

View File

@ -2,12 +2,15 @@ from typing import TYPE_CHECKING, Awaitable, Container
from storage import cache_codec from storage import cache_codec
from storage.cache_common import DataCache, InvalidSessionError from storage.cache_common import DataCache, InvalidSessionError
from trezor import log, protobuf from trezor import protobuf
from trezor.wire.codec import codec_v1 from trezor.wire.codec import codec_v1
from trezor.wire.context import UnexpectedMessageException from trezor.wire.context import UnexpectedMessageException
from trezor.wire.message_handler import wrap_protobuf_load from trezor.wire.message_handler import wrap_protobuf_load
from trezor.wire.protocol_common import Context, Message from trezor.wire.protocol_common import Context, Message
if __debug__:
from .. import wire_log as log
if TYPE_CHECKING: if TYPE_CHECKING:
from typing import TypeVar from typing import TypeVar
@ -45,8 +48,8 @@ class CodecContext(Context):
if __debug__: if __debug__:
log.debug( log.debug(
__name__, __name__,
"%d: expect: %s", self.iface,
self.iface.iface_num(), "expect: %s",
expected_type.MESSAGE_NAME if expected_type else expected_types, expected_type.MESSAGE_NAME if expected_type else expected_types,
) )
@ -64,8 +67,8 @@ class CodecContext(Context):
if __debug__: if __debug__:
log.debug( log.debug(
__name__, __name__,
"%d: read: %s", self.iface,
self.iface.iface_num(), "read: %s",
expected_type.MESSAGE_NAME, expected_type.MESSAGE_NAME,
) )
@ -76,8 +79,8 @@ class CodecContext(Context):
if __debug__: if __debug__:
log.debug( log.debug(
__name__, __name__,
"%d: write: %s", self.iface,
self.iface.iface_num(), "write: %s",
msg.MESSAGE_NAME, msg.MESSAGE_NAME,
) )

View File

@ -4,9 +4,10 @@ from storage.cache_common import InvalidSessionError
from trezor import log, loop, protobuf, utils, workflow from trezor import log, loop, protobuf, utils, workflow
from trezor.enums import FailureType from trezor.enums import FailureType
from trezor.messages import Failure from trezor.messages import Failure
from trezor.wire.context import UnexpectedMessageException, with_context
from trezor.wire.errors import ActionCancelled, DataError, Error, UnexpectedMessage from .context import UnexpectedMessageException, with_context
from trezor.wire.protocol_common import Context, Message from .errors import ActionCancelled, DataError, Error, UnexpectedMessage
from .protocol_common import Context, Message
if TYPE_CHECKING: if TYPE_CHECKING:
from typing import Any, Callable, Container from typing import Any, Callable, Container
@ -16,6 +17,11 @@ if TYPE_CHECKING:
HandlerFinder = Callable[[Any, Any], Handler | None] HandlerFinder = Callable[[Any, Any], Handler | None]
Filter = Callable[[int, Handler], Handler] Filter = Callable[[int, Handler], Handler]
if __debug__:
from trezor.utils import get_bytes_as_str
from . import wire_log
# If set to False protobuf messages marked with "experimental_message" option are rejected. # If set to False protobuf messages marked with "experimental_message" option are rejected.
EXPERIMENTAL_ENABLED = False EXPERIMENTAL_ENABLED = False
@ -71,19 +77,19 @@ async def handle_single_message(ctx: Context, msg: Message) -> bool:
except Exception: except Exception:
msg_type = f"{msg.type} - unknown message type" msg_type = f"{msg.type} - unknown message type"
if utils.USE_THP: if utils.USE_THP:
cid = int.from_bytes(ctx.channel_id, "big") cid = get_bytes_as_str(ctx.channel_id)
log.debug( wire_log.info(
__name__, __name__,
"%d:%d receive: <%s>", ctx.iface,
ctx.iface.iface_num(), "(cid: %s) received message: %s",
cid, cid,
msg_type, msg_type,
) )
else: else:
log.debug( wire_log.info(
__name__, __name__,
"%d receive: <%s>", ctx.iface,
ctx.iface.iface_num(), "received message: %s",
msg_type, msg_type,
) )
@ -164,11 +170,11 @@ async def handle_single_message(ctx: Context, msg: Message) -> bool:
# - something canceled the workflow from the outside # - something canceled the workflow from the outside
if __debug__ and utils.ALLOW_DEBUG_MESSAGES: if __debug__ and utils.ALLOW_DEBUG_MESSAGES:
if isinstance(exc, ActionCancelled): if isinstance(exc, ActionCancelled):
log.debug(__name__, "cancelled: %s", exc.message) wire_log.debug(__name__, ctx.iface, "cancelled: %s", exc.message)
elif isinstance(exc, loop.TaskClosed): elif isinstance(exc, loop.TaskClosed):
log.debug(__name__, "cancelled: loop task was closed") wire_log.debug(__name__, ctx.iface, "cancelled: loop task was closed")
else: else:
log.exception(__name__, exc) wire_log.exception(__name__, ctx.iface, exc)
res_msg = failure(exc) res_msg = failure(exc)
if res_msg is not None: if res_msg is not None:

View File

@ -1,6 +1,17 @@
from typing import TYPE_CHECKING
from storage.cache_thp import ChannelCache from storage.cache_thp import ChannelCache
from trezor import log, utils from trezor import utils
from trezor.wire.thp import ThpError
from . import ThpError
if __debug__:
from .. import wire_log as log
from . import interface_manager
if TYPE_CHECKING:
from trezorio import WireInterface
def is_ack_valid(cache: ChannelCache, ack_bit: int) -> bool: def is_ack_valid(cache: ChannelCache, ack_bit: int) -> bool:
@ -21,14 +32,16 @@ def is_ack_valid(cache: ChannelCache, ack_bit: int) -> bool:
def _is_ack_expected(cache: ChannelCache) -> bool: def _is_ack_expected(cache: ChannelCache) -> bool:
is_expected: bool = not is_sending_allowed(cache) is_expected: bool = not is_sending_allowed(cache)
if __debug__ and utils.ALLOW_DEBUG_MESSAGES and not is_expected: if __debug__ and utils.ALLOW_DEBUG_MESSAGES and not is_expected:
log.debug(__name__, "Received unexpected ACK message") log.debug(__name__, _get_iface(cache), "Received unexpected ACK message")
return is_expected return is_expected
def _has_ack_correct_sync_bit(cache: ChannelCache, sync_bit: int) -> bool: def _has_ack_correct_sync_bit(cache: ChannelCache, sync_bit: int) -> bool:
is_correct: bool = get_send_seq_bit(cache) == sync_bit is_correct: bool = get_send_seq_bit(cache) == sync_bit
if __debug__ and utils.ALLOW_DEBUG_MESSAGES and not is_correct: if __debug__ and utils.ALLOW_DEBUG_MESSAGES and not is_correct:
log.debug(__name__, "Received ACK message with wrong ack bit") log.debug(
__name__, _get_iface(cache), "Received ACK message with wrong ack bit"
)
return is_correct return is_correct
@ -73,7 +86,12 @@ def set_expected_receive_seq_bit(cache: ChannelCache, seq_bit: int) -> None:
in the provided channel in the provided channel
""" """
if __debug__ and utils.ALLOW_DEBUG_MESSAGES: if __debug__ and utils.ALLOW_DEBUG_MESSAGES:
log.debug(__name__, "Set sync receive expected seq bit to %d", seq_bit) log.debug(
__name__,
_get_iface(cache),
"Set sync receive expected seq bit to %d",
seq_bit,
)
if seq_bit not in (0, 1): if seq_bit not in (0, 1):
raise ThpError("Unexpected receive sync bit") raise ThpError("Unexpected receive sync bit")
@ -87,7 +105,9 @@ def _set_send_seq_bit(cache: ChannelCache, seq_bit: int) -> None:
if seq_bit not in (0, 1): if seq_bit not in (0, 1):
raise ThpError("Unexpected send seq bit") raise ThpError("Unexpected send seq bit")
if __debug__ and utils.ALLOW_DEBUG_MESSAGES: if __debug__ and utils.ALLOW_DEBUG_MESSAGES:
log.debug(__name__, "setting sync send seq bit to %d", seq_bit) log.debug(
__name__, _get_iface(cache), "setting sync send seq bit to %d", seq_bit
)
# set third bit to "seq_bit" value # set third bit to "seq_bit" value
cache.sync &= 0xDF cache.sync &= 0xDF
if seq_bit: if seq_bit:
@ -100,3 +120,9 @@ def set_send_seq_bit_to_opposite(cache: ChannelCache) -> None:
i.e. 1 -> 0 and 0 -> 1 i.e. 1 -> 0 and 0 -> 1
""" """
_set_send_seq_bit(cache=cache, seq_bit=1 - get_send_seq_bit(cache)) _set_send_seq_bit(cache=cache, seq_bit=1 - get_send_seq_bit(cache))
if __debug__:
def _get_iface(cache: ChannelCache) -> WireInterface:
return interface_manager.decode_iface(cache.iface)

View File

@ -17,7 +17,7 @@ from storage.cache_thp import (
conditionally_replace_channel, conditionally_replace_channel,
is_there_a_channel_to_replace, is_there_a_channel_to_replace,
) )
from trezor import log, loop, protobuf, utils, workflow from trezor import loop, protobuf, utils, workflow
from trezor.wire.errors import WireBufferError from trezor.wire.errors import WireBufferError
from . import ENCRYPTED, ChannelState, PacketHeader, ThpDecryptionError, ThpError from . import ENCRYPTED, ChannelState, PacketHeader, ThpDecryptionError, ThpError
@ -42,11 +42,12 @@ from .writer import (
if __debug__: if __debug__:
from trezor.utils import get_bytes_as_str from trezor.utils import get_bytes_as_str
from .. import wire_log as log
from . import state_to_str from . import state_to_str
if TYPE_CHECKING: if TYPE_CHECKING:
from trezorio import WireInterface from trezorio import WireInterface
from typing import Awaitable from typing import Any, Awaitable
from trezor.messages import ThpPairingCredential from trezor.messages import ThpPairingCredential
@ -63,9 +64,9 @@ class Channel:
# Channel properties # Channel properties
self.channel_id: bytes = channel_cache.channel_id self.channel_id: bytes = channel_cache.channel_id
self.iface: WireInterface = interface_manager.decode_iface(channel_cache.iface)
if __debug__ and utils.ALLOW_DEBUG_MESSAGES: if __debug__ and utils.ALLOW_DEBUG_MESSAGES:
self._log("channel initialization") self._log("channel initialization")
self.iface: WireInterface = interface_manager.decode_iface(channel_cache.iface)
self.channel_cache: ChannelCache = channel_cache self.channel_cache: ChannelCache = channel_cache
# Shared variables # Shared variables
@ -463,8 +464,18 @@ class Channel:
force: bool = False, force: bool = False,
fallback: bool = False, fallback: bool = False,
) -> None: ) -> None:
if __debug__ and utils.EMULATOR: if __debug__:
self._log(f"write message: {msg.MESSAGE_NAME}\n", utils.dump_protobuf(msg)) self._log(
f"write message: {msg.MESSAGE_NAME}",
logger=log.info,
)
if utils.EMULATOR:
log.debug(
__name__,
self.iface,
"message contents:\n%s",
utils.dump_protobuf(msg),
)
cid = self.get_channel_id_int() cid = self.get_channel_id_int()
msg_size = protobuf.encoded_length(msg) msg_size = protobuf.encoded_length(msg)
@ -540,8 +551,8 @@ class Channel:
payload_length = noise_payload_len + TAG_LENGTH payload_length = noise_payload_len + TAG_LENGTH
if self.write_task_spawn is not None: if self.write_task_spawn is not None:
self.write_task_spawn.close() # UPS TODO might break something self.write_task_spawn.close() # TODO might break something
print("\nCLOSED\n") self._log("Closed write task", logger=log.warning)
self._prepare_write() self._prepare_write()
if fallback: if fallback:
if __debug__ and utils.ALLOW_DEBUG_MESSAGES: if __debug__ and utils.ALLOW_DEBUG_MESSAGES:
@ -585,8 +596,12 @@ class Channel:
header = PacketHeader(ctrl_byte, self.get_channel_id_int(), payload_len) header = PacketHeader(ctrl_byte, self.get_channel_id_int(), payload_len)
self.transmission_loop = TransmissionLoop(self, header, payload) self.transmission_loop = TransmissionLoop(self, header, payload)
if only_once: if only_once:
if __debug__:
self._log('Starting transmission loop "only once"')
await self.transmission_loop.start(max_retransmission_count=1) await self.transmission_loop.start(max_retransmission_count=1)
else: else:
if __debug__:
self._log("Starting transmission loop")
await self.transmission_loop.start() await self.transmission_loop.start()
ABP.set_send_seq_bit_to_opposite(self.channel_cache) ABP.set_send_seq_bit_to_opposite(self.channel_cache)
@ -639,9 +654,10 @@ class Channel:
if __debug__: if __debug__:
def _log(self, text_1: str, text_2: str = "") -> None: def _log(self, text_1: str, text_2: str = "", logger: Any = log.debug) -> None:
log.debug( logger(
__name__, __name__,
self.iface,
"(cid: %s) %s%s", "(cid: %s) %s%s",
get_bytes_as_str(self.channel_id), get_bytes_as_str(self.channel_id),
text_1, text_1,

View File

@ -9,10 +9,11 @@ if TYPE_CHECKING:
from trezorio import WireInterface from trezorio import WireInterface
if __debug__: if __debug__:
from trezor import log from .. import wire_log as log
CHANNELS_LOADED: bool = False CHANNELS_LOADED: bool = False
def create_new_channel(iface: WireInterface) -> Channel: def create_new_channel(iface: WireInterface) -> Channel:
""" """
Creates a new channel for the interface `iface`. Creates a new channel for the interface `iface`.
@ -23,7 +24,9 @@ def create_new_channel(iface: WireInterface) -> Channel:
return channel return channel
def load_cached_channels(channels_dict: dict[int, Channel]) -> None: def load_cached_channels(
channels_dict: dict[int, Channel], iface: WireInterface
) -> None:
""" """
Returns all allocated channels from cache. Returns all allocated channels from cache.
""" """
@ -31,7 +34,7 @@ def load_cached_channels(channels_dict: dict[int, Channel]) -> None:
if CHANNELS_LOADED: if CHANNELS_LOADED:
if __debug__: if __debug__:
log.debug(__name__, "Channels already loaded, process skipped.") log.debug(__name__, iface, "Channels already loaded, process skipped.")
return return
cached_channels = cache_thp.get_all_allocated_channels() cached_channels = cache_thp.get_all_allocated_channels()
@ -39,5 +42,5 @@ def load_cached_channels(channels_dict: dict[int, Channel]) -> None:
channel_id = int.from_bytes(channel.channel_id, "big") channel_id = int.from_bytes(channel.channel_id, "big")
channels_dict[channel_id] = Channel(channel) channels_dict[channel_id] = Channel(channel)
if __debug__: if __debug__:
log.debug(__name__, "Channels loaded from cache.") log.debug(__name__, iface, "Channels loaded from cache.")
CHANNELS_LOADED = True CHANNELS_LOADED = True

View File

@ -23,7 +23,7 @@ if TYPE_CHECKING:
pass pass
if __debug__: if __debug__:
from trezor import log from .. import wire_log as log
class PairingContext(Context): class PairingContext(Context):
@ -60,7 +60,7 @@ class PairingContext(Context):
message: Message = await self.incoming_message message: Message = await self.incoming_message
except protocol_common.WireError as e: except protocol_common.WireError as e:
if __debug__: if __debug__:
log.exception(__name__, e) log.exception(__name__, self.iface, e)
await self.write(message_handler.failure(e)) await self.write(message_handler.failure(e))
continue continue
else: else:
@ -74,7 +74,7 @@ class PairingContext(Context):
# Log and ignore. The context handler can only exit explicitly in the # Log and ignore. The context handler can only exit explicitly in the
# following finally block. # following finally block.
if __debug__: if __debug__:
log.exception(__name__, exc) log.exception(__name__, self.iface, exc)
finally: finally:
if next_message is None: if next_message is None:
@ -85,7 +85,7 @@ class PairingContext(Context):
# Log and try again. The context handler can only exit explicitly via # Log and try again. The context handler can only exit explicitly via
# finally block above # finally block above
if __debug__: if __debug__:
log.exception(__name__, exc) log.exception(__name__, self.iface, exc)
async def read( async def read(
self, self,
@ -98,6 +98,7 @@ class PairingContext(Context):
exp_type = expected_type.MESSAGE_NAME exp_type = expected_type.MESSAGE_NAME
log.debug( log.debug(
__name__, __name__,
self.iface,
"Read - with expected types %s and expected type %s", "Read - with expected types %s and expected type %s",
str(expected_types), str(expected_types),
exp_type, exp_type,
@ -169,6 +170,7 @@ class PairingContext(Context):
if __debug__ and utils.ALLOW_DEBUG_MESSAGES: if __debug__ and utils.ALLOW_DEBUG_MESSAGES:
log.debug( log.debug(
__name__, __name__,
self.iface,
"Hotfix for current context being destroyed by workflow.close_others", "Hotfix for current context being destroyed by workflow.close_others",
) )
# --- HOTFIX END --- # --- HOTFIX END ---
@ -218,7 +220,9 @@ class PairingContext(Context):
br_code=ButtonRequestType.Other, br_code=ButtonRequestType.Other,
) )
if __debug__ and utils.ALLOW_DEBUG_MESSAGES: if __debug__ and utils.ALLOW_DEBUG_MESSAGES:
log.debug(__name__, "Result of connection dialog %s", str(result)) log.debug(
__name__, self.iface, "Result of connection dialog %s", str(result)
)
async def show_autoconnect_credential_confirmation_screen(self) -> None: async def show_autoconnect_credential_confirmation_screen(self) -> None:
from trezor.ui.layouts.common import interact from trezor.ui.layouts.common import interact
@ -297,7 +301,7 @@ class PairingContext(Context):
if self.code_code_entry is not None: if self.code_code_entry is not None:
code_str = f"{self.code_code_entry:06}" code_str = f"{self.code_code_entry:06}"
if __debug__: if __debug__:
log.debug(__name__, "code_code_entry: %s", code_str) log.debug(__name__, self.iface, "code_code_entry: %s", code_str)
return code_str[:3] + " " + code_str[3:] return code_str[:3] + " " + code_str[3:]
raise Exception("Code entry string is not available") raise Exception("Code entry string is not available")
@ -306,7 +310,7 @@ class PairingContext(Context):
if self.code_qr_code is not None: if self.code_qr_code is not None:
code_str = (hexlify(self.code_qr_code)).decode("utf-8") code_str = (hexlify(self.code_qr_code)).decode("utf-8")
if __debug__: if __debug__:
log.debug(__name__, "code_qr_code_hexlified: %s", code_str) log.debug(__name__, self.iface, "code_qr_code_hexlified: %s", code_str)
return code_str return code_str
raise Exception("QR code string is not available") raise Exception("QR code string is not available")
@ -364,7 +368,7 @@ async def handle_message(
return exc.msg return exc.msg
except SilentError as exc: except SilentError as exc:
if __debug__: if __debug__:
log.error(__name__, "SilentError: %s", exc.message) log.error(__name__, pairing_ctx.iface, "SilentError: %s", exc.message)
except BaseException as exc: except BaseException as exc:
# Either: # Either:
# - the message had a type that has a registered handler, but does not have # - the message had a type that has a registered handler, but does not have
@ -374,11 +378,13 @@ async def handle_message(
# - something canceled the workflow from the outside # - something canceled the workflow from the outside
if __debug__: if __debug__:
if isinstance(exc, ActionCancelled): if isinstance(exc, ActionCancelled):
log.debug(__name__, "cancelled: %s", exc.message) log.debug(__name__, pairing_ctx.iface, "cancelled: %s", exc.message)
elif isinstance(exc, loop.TaskClosed): elif isinstance(exc, loop.TaskClosed):
log.debug(__name__, "cancelled: loop task was closed") log.debug(
__name__, pairing_ctx.iface, "cancelled: loop task was closed"
)
else: else:
log.exception(__name__, exc) log.exception(__name__, pairing_ctx.iface, exc)
res_msg = message_handler.failure(exc) res_msg = message_handler.failure(exc)
if res_msg is not None: if res_msg is not None:

View File

@ -15,7 +15,7 @@ from storage.cache_thp import (
update_channel_last_used, update_channel_last_used,
update_session_last_used, update_session_last_used,
) )
from trezor import config, log, loop, protobuf, utils from trezor import config, loop, protobuf, utils
from trezor.enums import FailureType from trezor.enums import FailureType
from trezor.messages import Failure from trezor.messages import Failure
from trezor.wire.thp import memory_manager from trezor.wire.thp import memory_manager
@ -59,6 +59,7 @@ if TYPE_CHECKING:
if __debug__: if __debug__:
from trezor.utils import get_bytes_as_str from trezor.utils import get_bytes_as_str
from .. import wire_log as log
_TREZOR_STATE_UNPAIRED = b"\x00" _TREZOR_STATE_UNPAIRED = b"\x00"
_TREZOR_STATE_PAIRED = b"\x01" _TREZOR_STATE_PAIRED = b"\x01"
@ -71,18 +72,19 @@ async def handle_received_message(
"""Handle a message received from the channel.""" """Handle a message received from the channel."""
if __debug__ and utils.ALLOW_DEBUG_MESSAGES: if __debug__ and utils.ALLOW_DEBUG_MESSAGES:
log.debug(__name__, "handle_received_message") log.debug(__name__, ctx.iface, "handle_received_message")
if utils.ALLOW_DEBUG_MESSAGES: # TODO remove after performance tests are done if utils.ALLOW_DEBUG_MESSAGES: # TODO remove after performance tests are done
try: pass
import micropython # try:
# import micropython
print("micropython.mem_info() from received_message_handler.py") # print("micropython.mem_info() from received_message_handler.py")
micropython.mem_info() # micropython.mem_info()
print("Allocation count:", micropython.alloc_count()) # print("Allocation count:", micropython.alloc_count())
except AttributeError: # except AttributeError:
print( # print(
"To show allocation count, create the build with TREZOR_MEMPERF=1" # "To show allocation count, create the build with TREZOR_MEMPERF=1"
) # )
ctrl_byte, _, payload_length = ustruct.unpack(">BHH", message_buffer) ctrl_byte, _, payload_length = ustruct.unpack(">BHH", message_buffer)
message_length = payload_length + INIT_HEADER_LENGTH message_length = payload_length + INIT_HEADER_LENGTH
@ -94,6 +96,7 @@ async def handle_received_message(
if __debug__ and utils.ALLOW_DEBUG_MESSAGES: if __debug__ and utils.ALLOW_DEBUG_MESSAGES:
log.debug( log.debug(
__name__, __name__,
ctx.iface,
"handle_completed_message - seq bit of message: %d, ack bit of message: %d", "handle_completed_message - seq bit of message: %d, ack bit of message: %d",
seq_bit, seq_bit,
ack_bit, ack_bit,
@ -114,7 +117,11 @@ async def handle_received_message(
# 2: Handle message with unexpected sequential bit # 2: Handle message with unexpected sequential bit
if seq_bit != ABP.get_expected_receive_seq_bit(ctx.channel_cache): if seq_bit != ABP.get_expected_receive_seq_bit(ctx.channel_cache):
if __debug__ and utils.ALLOW_DEBUG_MESSAGES: if __debug__ and utils.ALLOW_DEBUG_MESSAGES:
log.debug(__name__, "Received message with an unexpected sequential bit") log.debug(
__name__,
ctx.iface,
"Received message with an unexpected sequential bit",
)
await _send_ack(ctx, ack_bit=seq_bit) await _send_ack(ctx, ack_bit=seq_bit)
raise ThpError("Received message with an unexpected sequential bit") raise ThpError("Received message with an unexpected sequential bit")
@ -143,7 +150,7 @@ async def handle_received_message(
await ctx.write_error(ThpErrorType.DEVICE_LOCKED) await ctx.write_error(ThpErrorType.DEVICE_LOCKED)
if __debug__ and utils.ALLOW_DEBUG_MESSAGES: if __debug__ and utils.ALLOW_DEBUG_MESSAGES:
log.debug(__name__, "handle_received_message - end") log.debug(__name__, ctx.iface, "handle_received_message - end")
def _send_ack(ctx: Channel, ack_bit: int) -> Awaitable[None]: def _send_ack(ctx: Channel, ack_bit: int) -> Awaitable[None]:
@ -152,6 +159,7 @@ def _send_ack(ctx: Channel, ack_bit: int) -> Awaitable[None]:
if __debug__ and utils.ALLOW_DEBUG_MESSAGES: if __debug__ and utils.ALLOW_DEBUG_MESSAGES:
log.debug( log.debug(
__name__, __name__,
ctx.iface,
"Writing ACK message to a channel with cid: %s, ack_bit: %d", "Writing ACK message to a channel with cid: %s, ack_bit: %d",
get_bytes_as_str(ctx.channel_id), get_bytes_as_str(ctx.channel_id),
ack_bit, ack_bit,
@ -161,13 +169,13 @@ def _send_ack(ctx: Channel, ack_bit: int) -> Awaitable[None]:
def _check_checksum(message_length: int, message_buffer: utils.BufferType) -> None: def _check_checksum(message_length: int, message_buffer: utils.BufferType) -> None:
if __debug__ and utils.ALLOW_DEBUG_MESSAGES: if __debug__ and utils.ALLOW_DEBUG_MESSAGES:
log.debug(__name__, "check_checksum") log.debug(__name__, None, "check_checksum")
if not checksum.is_valid( if not checksum.is_valid(
checksum=message_buffer[message_length - CHECKSUM_LENGTH : message_length], checksum=message_buffer[message_length - CHECKSUM_LENGTH : message_length],
data=memoryview(message_buffer)[: message_length - CHECKSUM_LENGTH], data=memoryview(message_buffer)[: message_length - CHECKSUM_LENGTH],
): ):
if __debug__ and utils.ALLOW_DEBUG_MESSAGES: if __debug__ and utils.ALLOW_DEBUG_MESSAGES:
log.debug(__name__, "Invalid checksum, ignoring message.") log.debug(__name__, None, "Invalid checksum, ignoring message.")
raise ThpError("Invalid checksum, ignoring message.") raise ThpError("Invalid checksum, ignoring message.")
@ -176,17 +184,21 @@ async def _handle_ack(ctx: Channel, ack_bit: int) -> None:
return return
# ACK is expected and it has correct sync bit # ACK is expected and it has correct sync bit
if __debug__ and utils.ALLOW_DEBUG_MESSAGES: if __debug__ and utils.ALLOW_DEBUG_MESSAGES:
log.debug(__name__, "Received ACK message with correct ack bit") log.debug(__name__, ctx.iface, "Received ACK message with correct ack bit")
if ctx.transmission_loop is not None: if ctx.transmission_loop is not None:
ctx.transmission_loop.stop_immediately() ctx.transmission_loop.stop_immediately()
if __debug__ and utils.ALLOW_DEBUG_MESSAGES: if __debug__ and utils.ALLOW_DEBUG_MESSAGES:
log.debug(__name__, "Stopped transmission loop") log.debug(__name__, ctx.iface, "Stopped transmission loop")
elif __debug__:
log.debug(__name__, ctx.iface, "Transmission loop was not stopped!")
ABP.set_sending_allowed(ctx.channel_cache, True) ABP.set_sending_allowed(ctx.channel_cache, True)
if ctx.write_task_spawn is not None: if ctx.write_task_spawn is not None:
if __debug__ and utils.ALLOW_DEBUG_MESSAGES: if __debug__ and utils.ALLOW_DEBUG_MESSAGES:
log.debug(__name__, 'Control to "write_encrypted_payload_loop" task') log.debug(
__name__, ctx.iface, 'Control to "write_encrypted_payload_loop" task'
)
await ctx.write_task_spawn await ctx.write_task_spawn
# Note that no the write_task_spawn could result in loop.clear(), # Note that no the write_task_spawn could result in loop.clear(),
# which will result in termination of this function - any code after # which will result in termination of this function - any code after
@ -223,7 +235,7 @@ async def _handle_state_TH1(
ctrl_byte: int, ctrl_byte: int,
) -> None: ) -> None:
if __debug__ and utils.ALLOW_DEBUG_MESSAGES: if __debug__ and utils.ALLOW_DEBUG_MESSAGES:
log.debug(__name__, "handle_state_TH1") log.debug(__name__, ctx.iface, "handle_state_TH1")
if not control_byte.is_handshake_init_req(ctrl_byte): if not control_byte.is_handshake_init_req(ctrl_byte):
raise ThpError("Message received is not a handshake init request!") raise ThpError("Message received is not a handshake init request!")
if not payload_length == PUBKEY_LENGTH + CHECKSUM_LENGTH: if not payload_length == PUBKEY_LENGTH + CHECKSUM_LENGTH:
@ -250,15 +262,17 @@ async def _handle_state_TH1(
if __debug__ and utils.ALLOW_DEBUG_MESSAGES: if __debug__ and utils.ALLOW_DEBUG_MESSAGES:
log.debug( log.debug(
__name__, __name__,
ctx.iface,
"trezor ephemeral pubkey: %s", "trezor ephemeral pubkey: %s",
get_bytes_as_str(trezor_ephemeral_pubkey), get_bytes_as_str(trezor_ephemeral_pubkey),
) )
log.debug( log.debug(
__name__, __name__,
ctx.iface,
"encrypted trezor masked static pubkey: %s", "encrypted trezor masked static pubkey: %s",
get_bytes_as_str(encrypted_trezor_static_pubkey), get_bytes_as_str(encrypted_trezor_static_pubkey),
) )
log.debug(__name__, "tag: %s", get_bytes_as_str(tag)) log.debug(__name__, ctx.iface, "tag: %s", get_bytes_as_str(tag))
payload = trezor_ephemeral_pubkey + encrypted_trezor_static_pubkey + tag payload = trezor_ephemeral_pubkey + encrypted_trezor_static_pubkey + tag
@ -272,7 +286,7 @@ async def _handle_state_TH2(ctx: Channel, message_length: int, ctrl_byte: int) -
from apps.thp.credential_manager import decode_credential, validate_credential from apps.thp.credential_manager import decode_credential, validate_credential
if __debug__ and utils.ALLOW_DEBUG_MESSAGES: if __debug__ and utils.ALLOW_DEBUG_MESSAGES:
log.debug(__name__, "handle_state_TH2") log.debug(__name__, ctx.iface, "handle_state_TH2")
if not control_byte.is_handshake_comp_req(ctrl_byte): if not control_byte.is_handshake_comp_req(ctrl_byte):
raise ThpError("Message received is not a handshake completion request!") raise ThpError("Message received is not a handshake completion request!")
@ -321,6 +335,7 @@ async def _handle_state_TH2(ctx: Channel, message_length: int, ctrl_byte: int) -
if __debug__ and utils.ALLOW_DEBUG_MESSAGES: if __debug__ and utils.ALLOW_DEBUG_MESSAGES:
log.debug( log.debug(
__name__, __name__,
ctx.iface,
"host static pubkey: %s, noise payload: %s", "host static pubkey: %s, noise payload: %s",
utils.get_bytes_as_str(host_encrypted_static_pubkey), utils.get_bytes_as_str(host_encrypted_static_pubkey),
utils.get_bytes_as_str(handshake_completion_request_noise_payload), utils.get_bytes_as_str(handshake_completion_request_noise_payload),
@ -347,7 +362,7 @@ async def _handle_state_TH2(ctx: Channel, message_length: int, ctrl_byte: int) -
ctx.credential = None ctx.credential = None
except DataError as e: except DataError as e:
if __debug__ and utils.ALLOW_DEBUG_MESSAGES: if __debug__ and utils.ALLOW_DEBUG_MESSAGES:
log.exception(__name__, e) log.exception(__name__, ctx.iface, e)
pass pass
# send hanshake completion response # send hanshake completion response
@ -366,7 +381,7 @@ async def _handle_state_TH2(ctx: Channel, message_length: int, ctrl_byte: int) -
async def _handle_state_ENCRYPTED_TRANSPORT(ctx: Channel, message_length: int) -> None: async def _handle_state_ENCRYPTED_TRANSPORT(ctx: Channel, message_length: int) -> None:
if __debug__ and utils.ALLOW_DEBUG_MESSAGES: if __debug__ and utils.ALLOW_DEBUG_MESSAGES:
log.debug(__name__, "handle_state_ENCRYPTED_TRANSPORT") log.debug(__name__, ctx.iface, "handle_state_ENCRYPTED_TRANSPORT")
ctx.decrypt_buffer(message_length) ctx.decrypt_buffer(message_length)
@ -407,6 +422,7 @@ async def _handle_state_ENCRYPTED_TRANSPORT(ctx: Channel, message_length: int) -
if __debug__ and utils.ALLOW_DEBUG_MESSAGES: if __debug__ and utils.ALLOW_DEBUG_MESSAGES:
log.debug( log.debug(
__name__, __name__,
ctx.iface,
f"Scheduled message to be handled by a session (session_id: {session_id}, msg_type (int): {message_type})", f"Scheduled message to be handled by a session (session_id: {session_id}, msg_type (int): {message_type})",
) )
@ -454,7 +470,7 @@ def _decode_message(
buffer: bytes, msg_type: int, message_name: str | None = None buffer: bytes, msg_type: int, message_name: str | None = None
) -> protobuf.MessageType: ) -> protobuf.MessageType:
if __debug__: if __debug__:
log.debug(__name__, "decode message") log.debug(__name__, None, "decode message")
if message_name is not None: if message_name is not None:
expected_type = protobuf.type_for_name(message_name) expected_type = protobuf.type_for_name(message_name)
else: else:

View File

@ -3,7 +3,7 @@ from typing import TYPE_CHECKING
from storage import cache_thp from storage import cache_thp
from storage.cache_common import InvalidSessionError from storage.cache_common import InvalidSessionError
from storage.cache_thp import SessionThpCache from storage.cache_thp import SessionThpCache
from trezor import log, loop, protobuf, utils from trezor import loop, protobuf, utils
from trezor.wire import message_handler, protocol_common from trezor.wire import message_handler, protocol_common
from trezor.wire.context import UnexpectedMessageException from trezor.wire.context import UnexpectedMessageException
from trezor.wire.message_handler import failure from trezor.wire.message_handler import failure
@ -26,6 +26,8 @@ _REPEAT_LOOP = False
if __debug__: if __debug__:
from trezor.utils import get_bytes_as_str from trezor.utils import get_bytes_as_str
from .. import wire_log as log
class GenericSessionContext(Context): class GenericSessionContext(Context):
@ -39,6 +41,7 @@ class GenericSessionContext(Context):
if __debug__ and utils.ALLOW_DEBUG_MESSAGES: if __debug__ and utils.ALLOW_DEBUG_MESSAGES:
log.debug( log.debug(
__name__, __name__,
self.iface,
"handle - start (channel_id (bytes): %s, session_id: %d)", "handle - start (channel_id (bytes): %s, session_id: %d)",
get_bytes_as_str(self.channel_id), get_bytes_as_str(self.channel_id),
self.session_id, self.session_id,
@ -61,7 +64,7 @@ class GenericSessionContext(Context):
except Exception as exc: except Exception as exc:
# Log and try again. # Log and try again.
if __debug__: if __debug__:
log.exception(__name__, exc) log.exception(__name__, self.iface, exc)
async def _handle_message( async def _handle_message(
self, self,
@ -79,7 +82,7 @@ class GenericSessionContext(Context):
except protocol_common.WireError as e: except protocol_common.WireError as e:
if __debug__ and utils.ALLOW_DEBUG_MESSAGES: if __debug__ and utils.ALLOW_DEBUG_MESSAGES:
log.exception(__name__, e) log.exception(__name__, self.iface, e)
await self.write(failure(e)) await self.write(failure(e))
return _REPEAT_LOOP return _REPEAT_LOOP
@ -97,6 +100,7 @@ class GenericSessionContext(Context):
exp_type = expected_type.MESSAGE_NAME exp_type = expected_type.MESSAGE_NAME
log.debug( log.debug(
__name__, __name__,
self.iface,
"Read - with expected types %s and expected type %s", "Read - with expected types %s and expected type %s",
str(expected_types), str(expected_types),
exp_type, exp_type,
@ -106,6 +110,7 @@ class GenericSessionContext(Context):
if __debug__: if __debug__:
log.debug( log.debug(
__name__, __name__,
self.iface,
"EXPECTED TYPES: %s\nRECEIVED TYPE: %s", "EXPECTED TYPES: %s\nRECEIVED TYPE: %s",
str(expected_types), str(expected_types),
str(message.type), str(message.type),

View File

@ -3,7 +3,7 @@ from micropython import const
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from storage.cache_thp import BROADCAST_CHANNEL_ID from storage.cache_thp import BROADCAST_CHANNEL_ID
from trezor import io, log, loop, utils from trezor import io, loop, utils
from . import ( from . import (
CHANNEL_ALLOCATION_REQ, CHANNEL_ALLOCATION_REQ,
@ -26,6 +26,9 @@ from .writer import (
write_payload_to_wire_and_add_checksum, write_payload_to_wire_and_add_checksum,
) )
if __debug__:
from .. import wire_log as log
if TYPE_CHECKING: if TYPE_CHECKING:
from trezorio import WireInterface from trezorio import WireInterface
@ -35,7 +38,7 @@ _CHANNELS: dict[int, Channel] = {}
async def thp_main_loop(iface: WireInterface) -> None: async def thp_main_loop(iface: WireInterface) -> None:
global _CHANNELS global _CHANNELS
channel_manager.load_cached_channels(_CHANNELS) channel_manager.load_cached_channels(_CHANNELS, iface)
read = loop.wait(iface.iface_num() | io.POLL_READ) read = loop.wait(iface.iface_num() | io.POLL_READ)
packet = bytearray(iface.RX_PACKET_LEN) packet = bytearray(iface.RX_PACKET_LEN)
@ -43,9 +46,8 @@ async def thp_main_loop(iface: WireInterface) -> None:
while True: while True:
try: try:
if __debug__ and utils.ALLOW_DEBUG_MESSAGES: if __debug__ and utils.ALLOW_DEBUG_MESSAGES:
log.debug( log.debug(__name__, iface, "thp_main_loop")
__name__, f"thp_main_loop from iface: {iface.iface_num()}" # wire_log.debug(__name__, iface, "thp main loop")
)
packet_len = await read packet_len = await read
assert packet_len == len(packet) assert packet_len == len(packet)
iface.read(packet, 0) iface.read(packet, 0)
@ -67,16 +69,17 @@ async def thp_main_loop(iface: WireInterface) -> None:
except ThpError as e: except ThpError as e:
if __debug__: if __debug__:
log.exception(__name__, e) log.exception(__name__, iface, e)
finally: finally:
channel_manager.CHANNELS_LOADED = False channel_manager.CHANNELS_LOADED = False
async def _handle_codec_v1(iface: WireInterface, packet: bytes) -> None: async def _handle_codec_v1(iface: WireInterface, packet: bytes) -> None:
# If the received packet is not an initial codec_v1 packet, do not send error message # If the received packet is not an initial codec_v1 packet, do not send error message
if not packet[1:3] == b"##": if not packet[1:3] == b"##":
return return
if __debug__: if __debug__:
log.debug(__name__, "Received codec_v1 message, returning error") log.debug(__name__, iface, "Received codec_v1 message, returning error")
error_message = _get_codec_v1_error_message() error_message = _get_codec_v1_error_message()
await writer.write_packet_to_wire(iface, error_message) await writer.write_packet_to_wire(iface, error_message)
@ -85,7 +88,7 @@ async def _handle_broadcast(iface: WireInterface, packet: utils.BufferType) -> N
if _get_ctrl_byte(packet) != CHANNEL_ALLOCATION_REQ: if _get_ctrl_byte(packet) != CHANNEL_ALLOCATION_REQ:
raise ThpError("Unexpected ctrl_byte in a broadcast channel packet") raise ThpError("Unexpected ctrl_byte in a broadcast channel packet")
if __debug__: if __debug__:
log.debug(__name__, "Received valid message on the broadcast channel") log.debug(__name__, iface, "Received valid message on the broadcast channel")
length, nonce = ustruct.unpack(">H8s", packet[3:]) length, nonce = ustruct.unpack(">H8s", packet[3:])
payload = _get_buffer_for_payload(length, packet[5:], _CID_REQ_PAYLOAD_LENGTH) payload = _get_buffer_for_payload(length, packet[5:], _CID_REQ_PAYLOAD_LENGTH)
@ -106,7 +109,7 @@ async def _handle_broadcast(iface: WireInterface, packet: utils.BufferType) -> N
len(response_data) + CHECKSUM_LENGTH, len(response_data) + CHECKSUM_LENGTH,
) )
if __debug__: if __debug__:
log.debug(__name__, "New channel allocated with id %d", cid) log.debug(__name__, iface, "New channel allocated with id %d", cid)
await write_payload_to_wire_and_add_checksum(iface, response_header, response_data) await write_payload_to_wire_and_add_checksum(iface, response_header, response_data)

View File

@ -2,7 +2,7 @@ from micropython import const
from trezorcrypto import crc from trezorcrypto import crc
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from trezor import io, log, loop, utils from trezor import io, loop, utils
from . import PacketHeader from . import PacketHeader
@ -16,6 +16,9 @@ if TYPE_CHECKING:
from trezorio import WireInterface from trezorio import WireInterface
from typing import Awaitable, Sequence from typing import Awaitable, Sequence
if __debug__:
from .. import wire_log as log
def write_payload_to_wire_and_add_checksum( def write_payload_to_wire_and_add_checksum(
iface: WireInterface, header: PacketHeader, transport_payload: bytes iface: WireInterface, header: PacketHeader, transport_payload: bytes
@ -71,7 +74,10 @@ async def write_payloads_to_wire(
# write packet to wire (in-lined) # write packet to wire (in-lined)
if __debug__ and utils.ALLOW_DEBUG_MESSAGES: if __debug__ and utils.ALLOW_DEBUG_MESSAGES:
log.debug( log.debug(
__name__, "write_packet_to_wire: %s", utils.get_bytes_as_str(packet) __name__,
iface,
"write_packet_to_wire: %s",
utils.get_bytes_as_str(packet),
) )
written_by_iface: int = 0 written_by_iface: int = 0
while written_by_iface < len(packet): while written_by_iface < len(packet):
@ -84,7 +90,10 @@ async def write_packet_to_wire(iface: WireInterface, packet: bytes) -> None:
await loop.wait(iface.iface_num() | io.POLL_WRITE) await loop.wait(iface.iface_num() | io.POLL_WRITE)
if __debug__ and utils.ALLOW_DEBUG_MESSAGES: if __debug__ and utils.ALLOW_DEBUG_MESSAGES:
log.debug( log.debug(
__name__, "write_packet_to_wire: %s", utils.get_bytes_as_str(packet) __name__,
iface,
"write_packet_to_wire: %s",
utils.get_bytes_as_str(packet),
) )
n_written = iface.write(packet) n_written = iface.write(packet)
if n_written == len(packet): if n_written == len(packet):

View File

@ -0,0 +1,63 @@
import sys
from typing import TYPE_CHECKING
from trezor import log
if TYPE_CHECKING:
from trezorio import WireInterface
from typing import Any, ParamSpec, Protocol
P = ParamSpec("P")
class LogFunction(Protocol):
def __call__(self, name: str, msg: str, *args: Any) -> None: ...
class LogIfaceFunction(Protocol):
def __call__(
self, name: str, iface: WireInterface | None, msg: str, *args: Any
) -> None: ...
# LogFunction = Callable[Concatenate[str, P], None]
def _wrap_log(log_func: LogFunction) -> LogIfaceFunction:
def wrapper(
name: str,
iface: WireInterface | None,
msg: str,
*args: P.args,
**kwargs: P.kwargs,
) -> None:
if iface is None:
return log_func(name, msg, *args, **kwargs)
iface_str = f"\x1b[93m[{iface.__class__.__name__}]\x1b[0m"
return log_func(name, f"{iface_str} {msg}", *args, **kwargs)
return wrapper
debug = _wrap_log(log.debug)
info = _wrap_log(log.info)
warning = _wrap_log(log.warning)
error = _wrap_log(log.error)
def exception(name: str, iface: WireInterface | None, exc: BaseException) -> None:
# we are using `__class__.__name__` to avoid importing ui module
# we also need to instruct typechecker to ignore the missing argument
# in ui.Result exception
if exc.__class__.__name__ == "Result":
debug(
name,
iface,
"ui.Result: %s",
exc.value, # type: ignore [Cannot access attribute "value" for class "BaseException"]
)
elif exc.__class__.__name__ == "Cancelled":
debug(name, iface, "ui.Cancelled")
else:
error(name, iface, "exception:")
# since mypy 0.770 we cannot override sys, so print_exception is unknown
sys.print_exception(exc) # type: ignore ["print_exception" is not a known attribute of module]