1
0
mirror of https://github.com/trezor/trezor-firmware.git synced 2025-07-22 22:48:20 +00:00
trezor-firmware/tools/automatic_battery_tester/dut/dut.py
2025-07-02 14:19:11 +02:00

399 lines
12 KiB
Python

import hashlib
import logging
import time
from dataclasses import dataclass, field
from pathlib import Path
import serial
from hardware_ctl.relay_controller import RelayController
BAUDRATE_DEFAULT = 115200
BYTESIZE_DEFAULT = 8
PARITY_DEFAULT = serial.PARITY_NONE
CMD_TIMEOUT = 10
@dataclass
class DutReportData:
time: float = 0.0
power_state: str = ""
usb: str = ""
wlc: str = ""
battery_voltage: float = 0.0
battery_current: float = 0.0
battery_temp: float = 0.0
battery_soc: int = 0
battery_soc_latched: int = 0
pmic_die_temp: float = 0.0
wlc_voltage: float = 0.0
wlc_current: float = 0.0
wlc_die_temp: float = 0.0
system_voltage: float = 0.0
@dataclass
class DutProdtestResponse:
timestamp: float | None = None
cmd: str | None = None
trace: list = field(default_factory=list)
data_entries: list = field(default_factory=list)
OK: bool = False
class Dut:
def __init__(
self,
name,
cpu_id=None,
usb_port=None,
relay_port=None,
relay_ctl: RelayController | None = None,
verbose=False,
):
self.name = name
self.relay_ctl = relay_ctl
self.verbose = verbose
self.relay_port = relay_port
# Power up the device with relay controller
self.power_up()
# Wait for device to boot up
time.sleep(3)
self.vcp = serial.Serial(
port=usb_port,
baudrate=BAUDRATE_DEFAULT,
bytesize=BYTESIZE_DEFAULT,
parity=PARITY_DEFAULT,
)
# Connect serial port
if not self.vcp.is_open:
self.init_error()
raise RuntimeError(f"Failed to open serial port {usb_port} for DUT {name}")
self.entry_interactive_mode()
self.enable_charging()
self.set_backlight(100)
time.sleep(2) # Give some time to process te commands
if not self.ping():
self.init_error()
raise RuntimeError(f"DUT {self.name} did not respond to ping command")
self.cpu_id = self.get_cpuid()
if self.cpu_id is None:
self.init_error()
raise RuntimeError(f"DUT {self.name} failed to retrieve CPU ID")
logging.debug(f"DUT {self.name} initialized with CPU ID: {self.cpu_id}")
self.cpu_id_hash = self.generate_id_hash(self.cpu_id)
# cpu_id check
if cpu_id is not None:
if self.cpu_id != cpu_id:
self.init_error()
raise RuntimeError(
f"DUT {self.name} CPU ID mismatch: expected {cpu_id}, got {self.cpu_id}"
)
logging.debug(f"DUT {self.name} ID hash: {self.cpu_id_hash}")
# device should start charging
report = self.read_report()
if not report.usb == "USB_connected":
self.init_error()
raise RuntimeError(
f"{self.name} USB not connected. Check VCP and relay ports"
)
self.display_ok()
self.disable_charging()
self.power_down()
def init_error(self):
self.display_error()
self.disable_charging()
self.power_down()
def display_error(self):
self.display_bars("R")
time.sleep(3)
def display_ok(self):
self.display_bars("G")
time.sleep(3)
def get_cpu_id_hash(self):
return self.cpu_id_hash
def get_relay_port(self):
return self.relay_port
def generate_id_hash(self, cpu_id):
"""
Generate a unique ID hash for the DUT based on its CPU ID.
:param cpu_id: The CPU ID of the DUT.
:return: A unique ID hash string.
"""
if cpu_id is None:
raise ValueError("CPU ID cannot be None.")
device_id_bytes = bytes.fromhex(cpu_id)
digest = hashlib.sha256(device_id_bytes).digest()
return digest[:2].hex()
def set_verbose(self, verbose):
self.verbose = verbose
def get_verbose(self):
return self.verbose
def entry_interactive_mode(self):
# Enter interactive mode
self.send_command(".", skip_response=True)
def power_up(self):
"""
Power up the DUT by activating the relay.
"""
if self.relay_port is None:
logging.debug("Relay port not set for DUT, skipping power up.")
return
self.relay_ctl.set_relay_on(self.relay_port)
def power_down(self):
"""
Power down the DUT by deactivating the relay.
"""
if self.relay_port is None:
logging.debug("Relay port not set for DUT, skipping power down.")
return
self.relay_ctl.set_relay_off(self.relay_port)
def display_bars(self, value: str):
"""
Display bars on the DUT's screen.
:param value: A string representing the bars to display (e.g., "G" for green).
:return: True if the command was successful, False otherwise.
"""
response = self.send_command("display-bars", value)
return response.OK
def ping(self):
"""
Send a ping command to the DUT and wait for a response.
Returns True if the DUT responds with "OK", False otherwise.
"""
response = self.send_command("ping")
return response.OK
def enable_charging(self):
response = self.send_command("pm-charge-enable")
return response.OK
def disable_charging(self):
response = self.send_command("pm-charge-disable")
return response.OK
def set_soc_limit(self, soc_limit: int):
"""
Set the state of charge (SoC) limit for the DUT.
:param soc_limit: The SoC limit to set (0-100).
:return: True if the command was successful, False otherwise.
"""
if not 0 <= soc_limit <= 100:
raise ValueError("SoC limit must be between 0 and 100.")
response = self.send_command("pm-set-soc-limit", soc_limit)
return response.OK
def set_backlight(self, value: int):
if not 0 <= value <= 255:
raise ValueError("Backlight value must be between 0 and 255.")
response = self.send_command("display-set-backlight", value)
return response.OK
def get_cpuid(self):
response = self.send_command("get-cpuid")
if not response.OK:
return None
if len(response.data_entries) == 0:
# No cpuid in data entries
return None
return response.data_entries[0][0]
def parse_report(self, response: DutProdtestResponse) -> DutReportData:
data = DutReportData()
data.time = response.timestamp
data.power_state = response.data_entries[0][0]
data.usb = response.data_entries[0][1]
data.wlc = response.data_entries[0][2] if response.data_entries else ""
data.battery_voltage = float(response.data_entries[0][3])
data.battery_current = float(response.data_entries[0][4])
data.battery_temp = float(response.data_entries[0][5])
data.battery_soc = int(float(response.data_entries[0][6]))
data.battery_soc_latched = int(float(response.data_entries[0][7]))
data.pmic_die_temp = float(response.data_entries[0][8])
data.wlc_voltage = float(response.data_entries[0][9])
data.wlc_current = float(response.data_entries[0][10])
data.wlc_die_temp = float(response.data_entries[0][11])
data.system_voltage = float(response.data_entries[0][12])
return data
def read_report(self) -> DutReportData:
"""
Read the PM report from the DUT.
Returns a ProdtestResponse object containing the report data.
"""
response = self.send_command("pm-report")
if not response.OK:
logging.error(f"Failed to read PM report from {self.name}.")
return None
return self.parse_report(response)
def send_command(self, cmd, *args, skip_response=False):
if self.vcp is None:
raise "VPC not initalized"
response = DutProdtestResponse()
# assert(len == 0)
# Assamble command
response.cmd = cmd
if args:
response.cmd = response.cmd + " " + " ".join(str(k) for k in args)
response.cmd = response.cmd + "\n"
response.timestamp = time.time()
# Flush serial
self.vcp.flush()
self._log_output(response.cmd.rstrip("\r\n"))
self.vcp.write(response.cmd.encode())
if skip_response:
return response
while True:
line = self.vcp.readline().decode()
self._log_input(line.strip("\r\n"))
# Capture traces
if line[:1] == "#":
response.trace.append(line[2:])
# Capture data
if line[:8] == "PROGRESS":
line = line.replace("\r\n", "")
response.data_entries.append((line[9:].split(" ")))
# Terminate
if "OK" in line:
response.OK = True
# Check if there is any data comming along with OK
line = line.replace("\r\n", "")
response.data_entries.append((line[3:].split(" ")))
break
if "ERROR" in line:
break
return response
def log_data(
self,
output_directory: Path,
test_time_id,
test_scenario,
test_phase,
temp,
verbose=False,
):
# Log file name format:
# > <device_id_hash>.<time_identifier>.<test_scenario>.<test><temperarture>.csv
# Example: a8bf.2506091307.linear.charge.25_deg.csv
file_path = (
output_directory
/ f"{self.cpu_id_hash}.{test_time_id}.{test_scenario}.{test_phase}.{temp}.csv"
)
report = None
try:
report = self.send_command("pm-report")
except Exception as e:
logging.error(f"Failed to read PM report from {self.name}, skip log: {e}")
return
if not file_path.exists():
# creat a file header
with open(file_path, "w") as f:
f.write(
"time,power_state,usb,wlc,battery_voltage,battery_current,"
"battery_temp,battery_soc,battery_soc_latched,pmic_die_temp,"
"wlc_voltage,wlc_current,wlc_die_temp,system_voltage\n"
)
with open(file_path, "a") as f:
f.write(
str(report.timestamp) + "," + ",".join(report.data_entries[0]) + "\n"
)
if verbose:
print(str(report.timestamp) + "," + ",".join(report.data_entries[0]))
def _log_output(self, message):
if self.verbose:
prefix = f"\033[95m[{self.name}]\033[0m"
logging.debug(prefix + " > " + message)
def _log_input(self, message):
if self.verbose:
prefix = f"\033[95m[{self.name}]\033[0m"
logging.debug(prefix + " < " + message)
def close(self):
"""
Close the DUT's serial port and clean up resources.
"""
if self.vcp is not None and self.vcp.is_open:
try:
self.vcp.close()
except Exception as e:
logging.warning(f"Failed to close VCP for {self.name}: {e}")
self.vcp = None
self.name = None
self.relay_ctl = None
self.relay_port = None
def __del__(self):
try:
if hasattr(self, "vcp") and self.vcp is not None and self.vcp.is_open:
self.close()
except Exception as e:
logging.warning(f"Error during DUT cleanup: {e}")