1
0
mirror of https://github.com/trezor/trezor-firmware.git synced 2025-05-07 09:29:04 +00:00

fix(core): disable unwanted replacing of channels

This commit is contained in:
M1nd3r 2025-04-22 15:51:54 +02:00
parent 360f21f21f
commit 1afccbbdc2
2 changed files with 44 additions and 27 deletions

View File

@ -8,6 +8,10 @@ from .channel import Channel
if TYPE_CHECKING:
from trezorio import WireInterface
if __debug__:
from trezor import log
CHANNELS_LOADED: bool = False
def create_new_channel(iface: WireInterface) -> Channel:
"""
@ -19,12 +23,21 @@ def create_new_channel(iface: WireInterface) -> Channel:
return channel
def load_cached_channels() -> dict[int, Channel]:
def load_cached_channels(channels_dict: dict[int, Channel]) -> None:
"""
Returns all allocated channels from cache.
"""
channels: dict[int, Channel] = {}
global CHANNELS_LOADED
if CHANNELS_LOADED:
if __debug__:
log.debug(__name__, "Channels already loaded, process skipped.")
return
cached_channels = cache_thp.get_all_allocated_channels()
for channel in cached_channels:
channels[int.from_bytes(channel.channel_id, "big")] = Channel(channel)
return channels
channel_id = int.from_bytes(channel.channel_id, "big")
channels_dict[channel_id] = Channel(channel)
if __debug__:
log.debug(__name__, "Channels loaded from cache.")
CHANNELS_LOADED = True

View File

@ -35,14 +35,17 @@ _CHANNELS: dict[int, Channel] = {}
async def thp_main_loop(iface: WireInterface) -> None:
global _CHANNELS
_CHANNELS = channel_manager.load_cached_channels()
channel_manager.load_cached_channels(_CHANNELS)
read = loop.wait(iface.iface_num() | io.POLL_READ)
packet = bytearray(iface.RX_PACKET_LEN)
try:
while True:
try:
if __debug__ and utils.ALLOW_DEBUG_MESSAGES:
log.debug(__name__, "thp_main_loop")
log.debug(
__name__, f"thp_main_loop from iface: {iface.iface_num()}"
)
packet_len = await read
assert packet_len == len(packet)
iface.read(packet, 0)
@ -65,7 +68,8 @@ async def thp_main_loop(iface: WireInterface) -> None:
except ThpError as e:
if __debug__:
log.exception(__name__, e)
finally:
channel_manager.CHANNELS_LOADED = False
async def _handle_codec_v1(iface: WireInterface, packet: bytes) -> None:
# If the received packet is not an initial codec_v1 packet, do not send error message