Improve logging of channel state, reduce logging of sessions

M1nd3r/thp5
M1nd3r 1 month ago
parent 13ac917d22
commit e25675a4e1

@ -180,7 +180,6 @@ def set_channel_host_ephemeral_key(channel: ChannelCache, key: bytearray) -> Non
def get_new_session(channel: ChannelCache):
print("---------------get new session")
new_sid = get_next_session_id(channel)
index = _get_next_session_index()
@ -194,8 +193,6 @@ def get_new_session(channel: ChannelCache):
_SESSIONS[index].state[:] = bytearray(
_UNALLOCATED_STATE.to_bytes(_SESSION_STATE_LENGTH, "big")
)
for s in _SESSIONS:
print(s)
return _SESSIONS[index]

@ -71,11 +71,11 @@ class Channel(Context):
def get_channel_state(self) -> int:
state = int.from_bytes(self.channel_cache.state, "big")
print("channel.get_ch_state:", state)
print("channel.get_ch_state:", _state_to_str(state))
return state
def set_channel_state(self, state: ChannelState) -> None:
print("channel.set_ch_state:", int.from_bytes(state.to_bytes(1, "big"), "big"))
print("channel.set_ch_state:", _state_to_str(state))
self.channel_cache.state = bytearray(state.to_bytes(1, "big"))
def set_buffer(self, buffer: utils.BufferType) -> None:
@ -194,7 +194,7 @@ class Channel(Context):
) -> None:
state = self.get_channel_state()
if __debug__:
log.debug(__name__, _state_to_str(state))
log.debug(__name__, "state: " + _state_to_str(state))
if state is ChannelState.TH1:
await self._handle_state_TH1(payload_length, message_length, sync_bit)
@ -601,26 +601,8 @@ def is_channel_state_pairing(state: int) -> bool:
def _state_to_str(state: int) -> str:
if state == ChannelState.ENCRYPTED_TRANSPORT:
return "state: encrypted transport"
elif state == ChannelState.TH1:
return "state: th1"
elif state == ChannelState.TH2:
return "state: th2"
elif state == ChannelState.TP1:
return "state: tp1"
elif state == ChannelState.TP2:
return "state: tp2"
elif state == ChannelState.TP3:
return "state: tp3"
elif state == ChannelState.TP4:
return "state: tp4"
elif state == ChannelState.TP5:
return "state: tp5"
elif state == ChannelState.UNALLOCATED:
return "state: unallocated"
else:
return "state: <not implemented>"
names = {v: k for k, v in ChannelState.__dict__.items() if not k.startswith("__")}
return names.get(state)
def printBytes(a):

Loading…
Cancel
Save