1
0
mirror of https://github.com/trezor/trezor-firmware.git synced 2025-07-30 02:18:16 +00:00

style(tools/automatic_battery_charger): fix styles + flake8 errors.

[no changelog]
This commit is contained in:
kopecdav 2025-07-01 17:49:08 +02:00 committed by kopecdav
parent 4e6d169dfa
commit d21bdb7f12
21 changed files with 654 additions and 495 deletions

View File

@ -1,11 +1,9 @@
import sys
import matplotlib.pyplot as plt
from pathlib import Path
from InquirerPy.base import Choice
from InquirerPy import inquirer
import matplotlib.pyplot as plt
from InquirerPy import inquirer
from InquirerPy.base import Choice
from utils import load_measured_data
default_dataset_dir = Path("../single_capture_test_results")
@ -13,7 +11,8 @@ default_dataset_dir = Path("../single_capture_test_results")
battery_thermal_limit = 45.0 # Celsius
case_thermal_limit = 41.0 # Celsius
def select_waveforms(dataset_directory = default_dataset_dir):
def select_waveforms(dataset_directory=default_dataset_dir):
"""
Select waveforms from a given dataset directory.
@ -43,12 +42,15 @@ def select_waveforms(dataset_directory = default_dataset_dir):
for waveform_file in waveform_files:
time_id = waveform_file.stem.split(".")[1]
ch = Choice(name=f"{waveform_file.name}", value={'waveform':waveform_file, 'external_temp': None})
ch = Choice(
name=f"{waveform_file.name}",
value={"waveform": waveform_file, "external_temp": None},
)
for temp_file in external_temp_files:
if time_id in temp_file.stem:
ch.name += f" (ext. temp available)"
ch.value['external_temp'] = temp_file
ch.name += " (ext. temp available)"
ch.value["external_temp"] = temp_file
break
choices.append(ch)
@ -58,7 +60,7 @@ def select_waveforms(dataset_directory = default_dataset_dir):
message="Select one or more waveforms:",
choices=choices,
multiselect=True,
instruction="(Use <tab> to select, <enter> to confirm)"
instruction="(Use <tab> to select, <enter> to confirm)",
).execute()
except KeyboardInterrupt:
@ -71,99 +73,167 @@ def select_waveforms(dataset_directory = default_dataset_dir):
return selected
def colored_region_plot(axis, time_vector, data_vector, mask, color='red', alpha=0.5):
def colored_region_plot(axis, time_vector, data_vector, mask, color="red", alpha=0.5):
start = None
in_region = False
for i , val in enumerate(mask):
for i, val in enumerate(mask):
if val and not in_region:
start = i
in_region = True
elif not val and in_region:
axis.plot(time_vector[start:i-1], data_vector[start:i-1], color=color, alpha=alpha)
axis.plot(
time_vector[start : i - 1],
data_vector[start : i - 1],
color=color,
alpha=alpha,
)
in_region = False
if in_region:
axis.plot(time_vector[start:(i-1)], data_vector[start:i-1], color=color, alpha=alpha)
axis.plot(
time_vector[start : (i - 1)],
data_vector[start : i - 1],
color=color,
alpha=alpha,
)
def colored_region_box(axis, time_vector, mask, color='orange', alpha=0.5):
def colored_region_box(axis, time_vector, mask, color="orange", alpha=0.5):
start = None
in_region = False
for i , val in enumerate(mask):
for i, val in enumerate(mask):
if val and not in_region:
start = i
in_region = True
elif not val and in_region:
axis.axvspan(time_vector[start], time_vector[i-1], color=color, alpha=alpha)
axis.axvspan(
time_vector[start], time_vector[i - 1], color=color, alpha=alpha
)
in_region = False
if in_region:
axis.axvspan(time_vector[start], time_vector[-1], color=color, alpha=alpha)
def sec_to_min(time_vector):
return (time_vector - time_vector[0]) / 60.0
def plot_temperature_profile(waveform_name, profile_data):
fig, ax = plt.subplots(2)
fig.canvas.manager.set_window_title(waveform_name)
ax[0].plot(sec_to_min(profile_data.time), profile_data.battery_temp, color='green', label='battery temeperature')
ax[0].axhline(y=battery_thermal_limit, color='green', linestyle='--')
ax[0].plot(
sec_to_min(profile_data.time),
profile_data.battery_temp,
color="green",
label="battery temeperature",
)
ax[0].axhline(y=battery_thermal_limit, color="green", linestyle="--")
ax[0].plot(sec_to_min(profile_data.time), profile_data.pmic_die_temp, color='orange', label='pmic die temperature')
ax[0].plot(
sec_to_min(profile_data.time),
profile_data.pmic_die_temp,
color="orange",
label="pmic die temperature",
)
colored_region_plot(
ax[0],
sec_to_min(profile_data.time),
profile_data.battery_temp,
profile_data.battery_temp > battery_thermal_limit,
color='red',
alpha=1)
color="red",
alpha=1,
)
if profile_data.ext_temp is not None:
ax[0].plot(sec_to_min(profile_data.ext_temp_time), profile_data.ext_temp, color='blue', label='case temperature', linestyle='--')
ax[0].axhline(y=case_thermal_limit, color='blue', linestyle='--')
ax[0].plot(
sec_to_min(profile_data.ext_temp_time),
profile_data.ext_temp,
color="blue",
label="case temperature",
linestyle="--",
)
ax[0].axhline(y=case_thermal_limit, color="blue", linestyle="--")
colored_region_plot(
ax[0],
sec_to_min(profile_data.ext_temp_time),
profile_data.ext_temp,
profile_data.ext_temp > case_thermal_limit,
color='red',
alpha=1)
color="red",
alpha=1,
)
ax[0].set_xlabel("Time (min)")
ax[0].set_ylabel("Temperature (C)")
ax[0].set_title("Temperature Profile: " + waveform_name)
ax[0].set_xlim(left=sec_to_min(profile_data.time)[0], right=sec_to_min(profile_data.time)[-1])
ax[0].set_xlim(
left=sec_to_min(profile_data.time)[0], right=sec_to_min(profile_data.time)[-1]
)
ax[0].legend()
ax[0].grid(True)
def min_to_hr(x): return x / 60.0
def hr_to_min(x): return x * 60.0
def min_to_hr(x):
return x / 60.0
secax = ax[0].secondary_xaxis('top', functions=(min_to_hr, hr_to_min))
def hr_to_min(x):
return x * 60.0
secax = ax[0].secondary_xaxis("top", functions=(min_to_hr, hr_to_min))
secax.set_xlabel("Time (hours)")
# Change background color according to charging state
usb_charging_mask = (profile_data.usb == "USB_connected") & (abs(profile_data.battery_current) > 0)
wlc_charging_mask = (profile_data.wlc == "WLC_connected") & ~usb_charging_mask & (abs(profile_data.battery_current) > 0)
colored_region_box(ax[0], sec_to_min(profile_data.time), usb_charging_mask, color='blue', alpha=0.2)
colored_region_box(ax[0], sec_to_min(profile_data.time), wlc_charging_mask, color='green', alpha=0.2)
usb_charging_mask = (profile_data.usb == "USB_connected") & (
abs(profile_data.battery_current) > 0
)
wlc_charging_mask = (
(profile_data.wlc == "WLC_connected")
& ~usb_charging_mask
& (abs(profile_data.battery_current) > 0)
)
colored_region_box(
ax[0], sec_to_min(profile_data.time), usb_charging_mask, color="blue", alpha=0.2
)
colored_region_box(
ax[0],
sec_to_min(profile_data.time),
wlc_charging_mask,
color="green",
alpha=0.2,
)
ax[1].plot(sec_to_min(profile_data.time), profile_data.battery_current, color='purple', label='battery current')
ax[1].plot(
sec_to_min(profile_data.time),
profile_data.battery_current,
color="purple",
label="battery current",
)
ax[1].set_xlabel("Time (min)")
ax[1].set_ylabel("Current (mA)")
ax[1].set_xlim(left=sec_to_min(profile_data.time)[0], right=sec_to_min(profile_data.time)[-1])
ax[1].set_xlim(
left=sec_to_min(profile_data.time)[0], right=sec_to_min(profile_data.time)[-1]
)
ax[1].grid(True)
ax[1].legend()
colored_region_box(ax[1], sec_to_min(profile_data.time), usb_charging_mask, color='blue', alpha=0.2)
colored_region_box(ax[1], sec_to_min(profile_data.time), wlc_charging_mask, color='green', alpha=0.2)
colored_region_box(
ax[1], sec_to_min(profile_data.time), usb_charging_mask, color="blue", alpha=0.2
)
colored_region_box(
ax[1],
sec_to_min(profile_data.time),
wlc_charging_mask,
color="green",
alpha=0.2,
)
def main():
@ -173,16 +243,15 @@ def main():
# Load data from files
profile_data = load_measured_data(
data_file_path=waveform['waveform'],
extern_temp_file_path=waveform['external_temp']
data_file_path=waveform["waveform"],
extern_temp_file_path=waveform["external_temp"],
)
plot_temperature_profile(waveform['waveform'].name, profile_data)
plot_temperature_profile(waveform["waveform"].name, profile_data)
# Plot graphs
plt.show()
if __name__ == "__main__":
main()

View File

@ -1,3 +1,3 @@
from .data_convertor import BatteryAnalysisData, load_measured_data
__all__ = ["BatteryAnalysisData", "load_measured_data"]

View File

@ -1,8 +1,9 @@
import pandas as pd
import numpy as np
from pathlib import Path
from dataclasses import dataclass
from pathlib import Path
import numpy as np
import pandas as pd
@dataclass
class BatteryAnalysisData:
@ -23,25 +24,27 @@ class BatteryAnalysisData:
ext_temp_time: np.ndarray = None # Optional time vector for external temperature
ext_temp: np.ndarray = None # Optional external temperature data
def load_measured_data(data_file_path: Path,
extern_temp_file_path:Path = None) -> BatteryAnalysisData:
def load_measured_data(
data_file_path: Path, extern_temp_file_path: Path = None
) -> BatteryAnalysisData:
profile_data = pd.read_csv(data_file_path)
# Extract data from the DataFrame
time_vector = profile_data["time"].to_numpy()
power_state_vector = profile_data["power_state"].to_numpy()
usb_vector = profile_data["usb"].to_numpy()
wlc_vector = profile_data["wlc"].to_numpy()
time_vector = profile_data["time"].to_numpy()
power_state_vector = profile_data["power_state"].to_numpy()
usb_vector = profile_data["usb"].to_numpy()
wlc_vector = profile_data["wlc"].to_numpy()
battery_voltage_vector = profile_data["battery_voltage"].to_numpy()
battery_current_vector = profile_data["battery_current"].to_numpy()
battery_temp_vector = profile_data["battery_temp"].to_numpy()
battery_soc_vector = profile_data["battery_soc"].to_numpy()
battery_temp_vector = profile_data["battery_temp"].to_numpy()
battery_soc_vector = profile_data["battery_soc"].to_numpy()
battery_soc_latched_vector = profile_data["battery_soc_latched"].to_numpy()
pmic_die_temp_vector = profile_data["pmic_die_temp"].to_numpy()
wlc_voltage_vector = profile_data["wlc_voltage"].to_numpy()
wlc_current_vector = profile_data["wlc_current"].to_numpy()
wlc_die_temp_vector = profile_data["wlc_die_temp"].to_numpy()
pmic_die_temp_vector = profile_data["pmic_die_temp"].to_numpy()
wlc_voltage_vector = profile_data["wlc_voltage"].to_numpy()
wlc_current_vector = profile_data["wlc_current"].to_numpy()
wlc_die_temp_vector = profile_data["wlc_die_temp"].to_numpy()
system_voltage_vector = profile_data["system_voltage"].to_numpy()
if extern_temp_file_path is not None:
@ -69,5 +72,5 @@ def load_measured_data(data_file_path: Path,
wlc_die_temp=wlc_die_temp_vector,
system_voltage=system_voltage_vector,
ext_temp_time=ext_temp_time_vector,
ext_temp=ext_temp_vector
)
ext_temp=ext_temp_vector,
)

View File

@ -1,2 +1,4 @@
from .dut_controller import DutController
from .dut import Dut
from .dut_controller import DutController
__all__ = ["Dut", "DutController"]

View File

@ -1,17 +1,17 @@
import serial
import time
import logging
import hashlib
import logging
import time
from dataclasses import dataclass, field
from typing import List
from pathlib import Path
import serial
from hardware_ctl.relay_controller import RelayController
BAUDRATE_DEFAULT = 115200
BYTESIZE_DEFAULT = 8
PARITY_DEFAULT = serial.PARITY_NONE
CMD_TIMEOUT = 10
PARITY_DEFAULT = serial.PARITY_NONE
CMD_TIMEOUT = 10
@dataclass
class DutReportData:
@ -30,6 +30,7 @@ class DutReportData:
wlc_die_temp: float = 0.0
system_voltage: float = 0.0
@dataclass
class DutProdtestResponse:
timestamp: float | None = None
@ -39,9 +40,17 @@ class DutProdtestResponse:
OK: bool = False
class Dut():
class Dut:
def __init__(self, name, cpu_id=None, usb_port=None, relay_port=None, relay_ctl=None, verbose=False):
def __init__(
self,
name,
cpu_id=None,
usb_port=None,
relay_port=None,
relay_ctl: RelayController | None = None,
verbose=False,
):
self.name = name
self.relay_ctl = relay_ctl
@ -54,13 +63,15 @@ class Dut():
# Wait for device to boot up
time.sleep(3)
self.vcp = serial.Serial(port=usb_port,
baudrate=BAUDRATE_DEFAULT,
bytesize=BYTESIZE_DEFAULT,
parity=PARITY_DEFAULT)
self.vcp = serial.Serial(
port=usb_port,
baudrate=BAUDRATE_DEFAULT,
bytesize=BYTESIZE_DEFAULT,
parity=PARITY_DEFAULT,
)
# Connect serial port
if(not self.vcp.is_open):
if not self.vcp.is_open:
self.init_error()
raise RuntimeError(f"Failed to open serial port {usb_port} for DUT {name}")
@ -68,7 +79,7 @@ class Dut():
self.enable_charging()
self.set_backlight(100)
time.sleep(2) # Give some time to process te commands
time.sleep(2) # Give some time to process te commands
if not self.ping():
self.init_error()
@ -84,18 +95,22 @@ class Dut():
self.cpu_id_hash = self.generate_id_hash(self.cpu_id)
# cpu_id check
if not cpu_id is None:
if(self.cpu_id != cpu_id):
if cpu_id is not None:
if self.cpu_id != cpu_id:
self.init_error()
raise RuntimeError(f"DUT {self.name} CPU ID mismatch: expected {cpu_id}, got {self.cpu_id}")
raise RuntimeError(
f"DUT {self.name} CPU ID mismatch: expected {cpu_id}, got {self.cpu_id}"
)
logging.debug(f"DUT {self.name} ID hash: {self.cpu_id_hash}")
# device should start charging
report = self.read_report()
if not (report.usb == "USB_connected"):
if not report.usb == "USB_connected":
self.init_error()
raise RuntimeError(f"{self.name} USB not connected. Check VCP and relay ports")
raise RuntimeError(
f"{self.name} USB not connected. Check VCP and relay ports"
)
self.display_ok()
self.disable_charging()
@ -114,10 +129,6 @@ class Dut():
self.display_bars("G")
time.sleep(3)
def __del__(self):
self.vcp.close()
self.vcp = None
def get_cpu_id_hash(self):
return self.cpu_id_hash
@ -193,13 +204,12 @@ class Dut():
return response.OK
def set_soc_limit(self, soc_limit: int):
"""
Set the state of charge (SoC) limit for the DUT.
:param soc_limit: The SoC limit to set (0-100).
:return: True if the command was successful, False otherwise.
"""
if not (0 <= soc_limit <= 100):
if not 0 <= soc_limit <= 100:
raise ValueError("SoC limit must be between 0 and 100.")
response = self.send_command("pm-set-soc-limit", soc_limit)
@ -207,7 +217,7 @@ class Dut():
def set_backlight(self, value: int):
if not (0 <= value <= 255):
if not 0 <= value <= 255:
raise ValueError("Backlight value must be between 0 and 255.")
response = self.send_command("display-set-backlight", value)
@ -247,7 +257,6 @@ class Dut():
return data
def read_report(self) -> DutReportData:
"""
Read the PM report from the DUT.
Returns a ProdtestResponse object containing the report data.
@ -259,10 +268,9 @@ class Dut():
return self.parse_report(response)
def send_command(self, cmd, *args, skip_response=False):
if(self.vcp is None):
if self.vcp is None:
raise "VPC not initalized"
response = DutProdtestResponse()
@ -270,37 +278,37 @@ class Dut():
# Assamble command
response.cmd = cmd
if(args):
response.cmd = response.cmd + ' ' + ' '.join(str(k) for k in args)
response.cmd = response.cmd + '\n'
if args:
response.cmd = response.cmd + " " + " ".join(str(k) for k in args)
response.cmd = response.cmd + "\n"
response.timestamp = time.time()
# Flush serial
self.vcp.flush()
self._log_output(response.cmd.rstrip('\r\n'))
self._log_output(response.cmd.rstrip("\r\n"))
self.vcp.write(response.cmd.encode())
if(skip_response):
if skip_response:
return response
while(True):
while True:
line = self.vcp.readline().decode()
self._log_input(line.strip('\r\n'))
self._log_input(line.strip("\r\n"))
# Capture traces
if(line[:1] == "#"):
if line[:1] == "#":
response.trace.append(line[2:])
# Capture data
if(line[:8] == "PROGRESS"):
if line[:8] == "PROGRESS":
line = line.replace("\r\n", "")
response.data_entries.append((line[9:].split(" ")))
# Terminate
if("OK" in line):
if "OK" in line:
response.OK = True
@ -310,19 +318,29 @@ class Dut():
break
if("ERROR" in line):
if "ERROR" in line:
break
return response
def log_data(self, output_directory: Path, test_time_id, test_scenario,
test_phase, temp, verbose=False):
def log_data(
self,
output_directory: Path,
test_time_id,
test_scenario,
test_phase,
temp,
verbose=False,
):
# Log file name format:
# > <device_id_hash>.<time_identifier>.<test_scenario>.<test><temperarture>.csv
# Example: a8bf.2506091307.linear.charge.25_deg.csv
file_path = output_directory / f"{self.cpu_id_hash}.{test_time_id}.{test_scenario}.{test_phase}.{temp}.csv"
file_path = (
output_directory
/ f"{self.cpu_id_hash}.{test_time_id}.{test_scenario}.{test_phase}.{temp}.csv"
)
report = None
try:
@ -333,24 +351,28 @@ class Dut():
if not file_path.exists():
# creat a file header
with open(file_path, 'w') as f:
f.write("time,power_state,usb,wlc,battery_voltage,battery_current,"
"battery_temp,battery_soc,battery_soc_latched,pmic_die_temp,"
"wlc_voltage,wlc_current,wlc_die_temp,system_voltage\n")
with open(file_path, "w") as f:
f.write(
"time,power_state,usb,wlc,battery_voltage,battery_current,"
"battery_temp,battery_soc,battery_soc_latched,pmic_die_temp,"
"wlc_voltage,wlc_current,wlc_die_temp,system_voltage\n"
)
with open(file_path, 'a') as f:
f.write(str(report.timestamp) + "," +",".join(report.data_entries[0]) + "\n")
with open(file_path, "a") as f:
f.write(
str(report.timestamp) + "," + ",".join(report.data_entries[0]) + "\n"
)
if verbose:
print(str(report.timestamp) + "," +",".join(report.data_entries[0]))
print(str(report.timestamp) + "," + ",".join(report.data_entries[0]))
def _log_output(self, message):
if(self.verbose):
if self.verbose:
prefix = f"\033[95m[{self.name}]\033[0m"
logging.debug(prefix + " > " + message)
def _log_input(self, message):
if(self.verbose):
if self.verbose:
prefix = f"\033[95m[{self.name}]\033[0m"
logging.debug(prefix + " < " + message)

View File

@ -1,15 +1,17 @@
import time
import logging
import sys
from pathlib import Path
from dataclasses import dataclass
from typing import TYPE_CHECKING
from .dut import Dut
from pathlib import Path
from hardware_ctl.relay_controller import RelayController
from .dut import Dut
@dataclass
class ProdtestPmReport:
"""pm-report command response data structure"""
power_state: str = ""
usb: str = ""
wlc: str = ""
@ -47,37 +49,42 @@ class ProdtestPmReport:
logging.error(f"Failed to parse pm-report data: {data} ({e})")
return cls()
class DutController:
class DutController:
"""
Device-under-test (DUT) controller.
provides direct simultaneous control of configured DUTs
"""
def __init__(self, duts, relay_ctl, verbose: bool = False):
def __init__(self, duts, relay_ctl: RelayController, verbose: bool = False):
self.duts = []
self.relay_ctl = relay_ctl
# Power off all DUTs before self test
for d in duts:
self.relay_ctl.set_relay_off(d['relay_port'])
self.relay_ctl.set_relay_off(d["relay_port"])
for d in duts:
try:
dut = Dut(name=d['name'],
cpu_id=d['cpu_id'],
usb_port=d['usb_port'],
relay_port=d['relay_port'],
relay_ctl=self.relay_ctl,
verbose=verbose)
dut = Dut(
name=d["name"],
cpu_id=d["cpu_id"],
usb_port=d["usb_port"],
relay_port=d["relay_port"],
relay_ctl=self.relay_ctl,
verbose=verbose,
)
self.duts.append(dut)
logging.info(f"Initialized {d['name']} on port {d['usb_port']}")
logging.info(f" -- cpu_id hash : {dut.get_cpu_id_hash()}")
logging.info(f" -- relay port : {dut.get_relay_port()}")
except Exception as e:
logging.critical(f"Failed to initialize DUT {d['name']} on port {d['usb_port']}: {e}")
logging.critical(
f"Failed to initialize DUT {d['name']} on port {d['usb_port']}: {e}"
)
sys.exit(1)
if len(self.duts) == 0:
@ -204,20 +211,16 @@ class DutController:
except Exception as e:
logging.error(f"Failed to set backlight on {d.name}: {e}")
def log_data(self, output_directory: Path, test_time_id, test_scenario,
test_phase, temp):
def log_data(
self, output_directory: Path, test_time_id, test_scenario, test_phase, temp
):
# Log file name format:
# > <device_id_hash>.<time_identifier>.<test_scenario>.<temperarture>.csv
# Example: a8bf.2506091307.linear.charge.25_deg.csv
for d in self.duts:
d.log_data(output_directory,
test_time_id,
test_scenario,
test_phase,
temp)
d.log_data(output_directory, test_time_id, test_scenario, test_phase, temp)
def close(self):
for d in self.duts:

View File

@ -1 +1,3 @@
from .relay_controller import RelayController
__all__ = ["RelayController"]

View File

@ -1 +1,3 @@
from .bs_weu_16 import DeditecBsWeu16
__all__ = ["DeditecBsWeu16"]

View File

@ -1,14 +1,15 @@
import logging
import signal
import socket
from typing import Any, List
from typing_extensions import Self
IP = "192.168.1.10" # default static IP address
PORT = 9912
PIN_COUNT = 16 # Total number of pins on the Deditec BS-WEU-16 board
class DeditecBsWeu16:
def __init__(self, ip: str = IP, port: int = PORT, timeout_seconds: int = 3):
@ -23,10 +24,14 @@ class DeditecBsWeu16:
def connect(self) -> bool:
if self.socket is not None:
logging.warning("DeditecBsWeu16: connect called, but socket already exists. Closing first.")
self.close_connection()
logging.warning(
"DeditecBsWeu16: connect called, but socket already exists. Closing first."
)
self.close_connection()
logging.debug(f"DeditecBsWeu16: connecting to device at {self.ip}:{self.port}...")
logging.debug(
f"DeditecBsWeu16: connecting to device at {self.ip}:{self.port}..."
)
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.settimeout(self.timeout_seconds)
@ -34,7 +39,9 @@ class DeditecBsWeu16:
logging.debug("DeditecBsWeu16: connection established")
return True
except socket.timeout:
logging.error(f"DeditecBsWeu16: connection timed out ({self.timeout_seconds}s)")
logging.error(
f"DeditecBsWeu16: connection timed out ({self.timeout_seconds}s)"
)
self.socket = None
return False
except Exception as e:
@ -52,19 +59,24 @@ class DeditecBsWeu16:
try:
self.socket.sendall(command)
data = self.socket.recv(64)
logging.debug(f"DeditecBsWeu16: received confirmation data (len={len(data)}): {data!r}")
logging.debug(
f"DeditecBsWeu16: received confirmation data (len={len(data)}): {data!r}"
)
return True
except socket.timeout:
logging.error(f"DeditecBsWeu16: socket timeout during send/recv ({self.timeout_seconds}s)")
return False
logging.error(
f"DeditecBsWeu16: socket timeout during send/recv ({self.timeout_seconds}s)"
)
return False
except Exception as e:
logging.exception(f"DeditecBsWeu16: error sending command or receiving confirmation: {e}")
logging.exception(
f"DeditecBsWeu16: error sending command or receiving confirmation: {e}"
)
return False
def control_relay(self, pins_on: List[int], pins_off: List[int]) -> bool:
""" Turns on all pins specified in pins_on list and turns off all pins specified in pins_off list.
Returns True if successful, False otherwise.
"""Turns on all pins specified in pins_on list and turns off all pins specified in pins_off list.
Returns True if successful, False otherwise.
"""
if not self.socket:
@ -72,12 +84,12 @@ class DeditecBsWeu16:
return False
for pin in pins_on:
if not (1 <= pin <= PIN_COUNT):
if not 1 <= pin <= PIN_COUNT:
logging.error(f"DeditecBsWeu16: Invalid pin number {pin} in pins_on.")
return False
for pin in pins_off:
if not (1 <= pin <= PIN_COUNT):
if not 1 <= pin <= PIN_COUNT:
logging.error(f"DeditecBsWeu16: Invalid pin number {pin} in pins_off.")
return False
@ -89,11 +101,12 @@ class DeditecBsWeu16:
logging.error("DeditecBsWeu16: Failed to send command to Deditec device.")
return False
logging.info(f"DeditecBsWeu16: Changed relay setup. Pins ON: {self.pins_on_latched}")
logging.info(
f"DeditecBsWeu16: Changed relay setup. Pins ON: {self.pins_on_latched}"
)
return True
def assabmle_command(self, pins: List[int]) -> bytes:
"""Assembles the command to turn on specified pins on the Deditec BS-WEU-16 board."""
command_prefix = b"\x63\x9a\x01\x01\x00\x0b\x57\x57\x00\x00"
@ -103,47 +116,53 @@ class DeditecBsWeu16:
if isinstance(pin, int) and 1 <= pin <= PIN_COUNT:
pin_mask_value += 2 ** (pin - 1)
else:
logging.warning(f"DeditecBsWeu16: Invalid pin number provided to assabmle_command: {pin}. Ignoring.")
logging.warning(
f"DeditecBsWeu16: Invalid pin number provided to assabmle_command: {pin}. Ignoring."
)
command = command_prefix + pin_mask_value.to_bytes(2, byteorder="big")
return command
def close_connection(self) -> None:
if self.socket:
logging.debug("DeditecBsWeu16: closing connection")
try:
self.socket.close()
except Exception as e:
logging.error(f"DeditecBsWeu16: error closing socket: {e}")
logging.error(f"DeditecBsWeu16: error closing socket: {e}")
finally:
self.socket = None
self.socket = None
else:
logging.debug("Deditec:: close_connection called but already closed.")
logging.debug("Deditec:: close_connection called but already closed.")
def __enter__(self) -> Self:
try:
signal.alarm(self.timeout_seconds + 1)
except ValueError:
logging.warning("Cannot set SIGALRM handler (not on Unix main thread?), relying on socket timeout.")
pass
logging.warning(
"Cannot set SIGALRM handler (not on Unix main thread?), relying on socket timeout."
)
pass
if not self.connect():
signal.alarm(0)
raise ConnectionError(f"Failed to connect to Deditec device at {self.ip}:{self.port}")
raise ConnectionError(
f"Failed to connect to Deditec device at {self.ip}:{self.port}"
)
return self
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> bool:
try:
signal.alarm(0)
signal.alarm(0)
except ValueError:
pass
pass
self.close_connection()
if exc_type:
logging.error(f"Deditec:: An error occurred during 'with' block: {exc_type.__name__}: {exc_val}")
logging.error(
f"Deditec:: An error occurred during 'with' block: {exc_type.__name__}: {exc_val}"
)
return False

View File

@ -1,3 +1,3 @@
from .gdm8351 import GDM8351
__all__ = ["GDM8351"]

View File

@ -1,6 +1,7 @@
import time
import pyvisa
import time
class GDM8351:
@ -14,17 +15,18 @@ class GDM8351:
print("Available devices:")
for r_name in self.rm.list_resources():
if("/dev/ttyACM" in r_name):
if "/dev/ttyACM" in r_name:
device_count += 1
available_devices[device_count] = r_name
print(f" [{device_count}]: {r_name}")
self.device_connected = False
while not self.device_connected:
input_device_id = input("Digital multimeter GDM8351: Select VCP port number (or Q to quit the selection): ")
input_device_id = input(
"Digital multimeter GDM8351: Select VCP port number (or Q to quit the selection): "
)
if(input_device_id.lower() == 'q'):
if input_device_id.lower() == "q":
print("Exiting device selection.")
return
@ -35,11 +37,13 @@ class GDM8351:
try:
self.device = self.rm.open_resource(device_name)
self.device_id = self.device.query("*IDN?")
if("GDM8351" in self.device_id):
if "GDM8351" in self.device_id:
print("Device connected successfully.")
else:
self.device.close()
print("Connected device is not a GDM8351. Please check the device ID.")
print(
"Connected device is not a GDM8351. Please check the device ID."
)
continue
self.device_connected = True
@ -60,7 +64,9 @@ class GDM8351:
raise ValueError("Invalid sensor type. Use 'K', 'J', or 'T'.")
if junction_temp_deg < 0 or junction_temp_deg > 50:
raise ValueError("Junction temperature must be between 0 and 50 degrees Celsius.")
raise ValueError(
"Junction temperature must be between 0 and 50 degrees Celsius."
)
try:
junction_temp_deg = float(junction_temp_deg)
@ -101,16 +107,15 @@ class GDM8351:
if not file_path.exists():
# creat a file header
with open(file_path, 'w') as f:
with open(file_path, "w") as f:
f.write("time,temperature\n")
with open(file_path, 'a') as f:
with open(file_path, "a") as f:
f.write(str(time.time()) + "," + str(temp) + "\n")
if verbose:
print(f"GDM8351 temperature: {temp}°C")
def close(self):
if self.device is not None and self.device_connected:
try:
@ -118,5 +123,3 @@ class GDM8351:
print("GDM8351 connection closed.")
except Exception as e:
print(f"GDM8351 Failed to close connection: {e}")

View File

@ -1,20 +1,18 @@
# hardware_ctl/relay_controller.py
import logging
import time
import platform
import subprocess
import sys
from typing import List, Optional, Set
from pathlib import Path
# Import Deditec drives
from .deditec import DeditecBsWeu16
class RelayController:
DEDITEC_PORT = 9912
MAX_PIN = 16 # Max PIN index (1-16)
MAX_PIN = 16 # Max PIN index (1-16)
def __init__(self, ip_address: str):
"""
@ -29,40 +27,52 @@ class RelayController:
# Ping the device to check connectivity
if not self.check_ping(self.ip_address):
logging.warning("Ping to Deditec relay board failed. Network issue possible, but attempting TCP check.")
logging.warning(
"Ping to Deditec relay board failed. Network issue possible, but attempting TCP check."
)
self.deditec = DeditecBsWeu16(ip=self.ip_address, port=self.port)
# Connect to deditec relay board
if not self.deditec.connect():
logging.error(f"Failed to connect to Deditec relay board at {self.ip_address}:{self.DEDITEC_PORT}.")
logging.error(
f"Failed to connect to Deditec relay board at {self.ip_address}:{self.DEDITEC_PORT}."
)
sys.exit(1)
def check_ping(self, ip: str) -> bool:
""" Ping the given IP address """
"""Ping the given IP address"""
logging.info(f"Pinging {ip}...")
system = platform.system().lower()
if system == "windows":
command = ["ping", "-n", "1", "-w", "1000", ip]
else: # Linux, macOS
else: # Linux, macOS
command = ["ping", "-c", "1", "-W", "1", ip]
try:
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
process = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
stdout, stderr = process.communicate(timeout=3)
return_code = process.returncode
logging.debug(f"Ping stdout:\n{stdout}")
if stderr: logging.debug(f"Ping stderr:\n{stderr}")
if stderr:
logging.debug(f"Ping stderr:\n{stderr}")
if return_code == 0:
if "unreachable" in stdout.lower() or "timed out" in stdout.lower() or "ttl expired" in stdout.lower():
logging.error(f"Ping to {ip} technically succeeded (code 0) but output indicates failure.")
return False
if (
"unreachable" in stdout.lower()
or "timed out" in stdout.lower()
or "ttl expired" in stdout.lower()
):
logging.error(
f"Ping to {ip} technically succeeded (code 0) but output indicates failure."
)
return False
logging.info(f"Ping to {ip} successful.")
return True
@ -72,7 +82,9 @@ class RelayController:
return False
except FileNotFoundError:
logging.error("Ping command not found. Install ping or check PATH. Skipping ping check.")
logging.error(
"Ping command not found. Install ping or check PATH. Skipping ping check."
)
return True
except subprocess.TimeoutExpired:
@ -100,4 +112,4 @@ class RelayController:
return self.deditec.control_relay(pins_on=[pin], pins_off=[])
def close(self):
pass
pass

View File

@ -1,36 +1,38 @@
# main_tester.py
import time
import toml
import sys
import logging
import sys
import time
from pathlib import Path
import subprocess
import socket
from typing import Optional, Dict, Any
from test_logic import LinearScenario, SwitchingScenario, RandomWonderScenario
from notifications import send_slack_message
from hardware_ctl import RelayController
from typing import Any, Dict, Optional
import toml
from dut import DutController
from hardware_ctl import RelayController
from notifications import send_slack_message
from test_logic import LinearScenario, RandomWonderScenario, SwitchingScenario
# Configure logging
log_formatter = log_formatter = logging.Formatter('[%(levelname).1s %(asctime)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
log_formatter = log_formatter = logging.Formatter(
"[%(levelname).1s %(asctime)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S"
)
log_file = "test.log"
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
# Clear all existing handlers to avoid duplicates
if logger.hasHandlers(): logger.handlers.clear()
if logger.hasHandlers():
logger.handlers.clear()
# File handler
try:
file_handler = logging.FileHandler(log_file, mode='w', encoding='utf-8')
file_handler = logging.FileHandler(log_file, mode="w", encoding="utf-8")
file_handler.setFormatter(log_formatter)
file_handler.setLevel(logging.INFO)
logger.addHandler(file_handler)
except Exception as log_e:
print(f"WARNING: Failed to create file log handler for {log_file}: {log_e}")
print(f"WARNING: Failed to create file log handler for {log_file}: {log_e}")
# Console handler
console_handler = logging.StreamHandler(sys.stdout)
@ -38,8 +40,8 @@ console_handler.setFormatter(log_formatter)
console_handler.setLevel(logging.INFO)
logger.addHandler(console_handler)
def load_config(config_path="test_config.toml") -> Optional[Dict[str, Any]]:
def load_config(config_path="test_config.toml") -> Optional[Dict[str, Any]]:
"""Load test configuration from TOML config file ."""
config_file = config_path
logging.info(f"Loading configuration file: {config_file}")
@ -47,12 +49,13 @@ def load_config(config_path="test_config.toml") -> Optional[Dict[str, Any]]:
config = toml.load(config_file)
# Validate required sections in the config file
required_sections = ["general", "relay", "duts", "test_plan",
"notifications"]
required_sections = ["general", "relay", "duts", "test_plan", "notifications"]
for section in required_sections:
if section not in config:
raise ValueError(f"Missing required section '{section}' in config file.")
raise ValueError(
f"Missing required section '{section}' in config file."
)
logging.info("Config file loading successfully.")
return config
@ -69,29 +72,42 @@ def load_config(config_path="test_config.toml") -> Optional[Dict[str, Any]]:
logging.error(f"Error loading configuration file: {e}")
return None
def run_test_cycle(config: dict, temp_c: float, cycle_num: int, test_mode: str,
relay_ctl: RelayController, dut_ctl: DutController) -> bool:
""" Run single test cycle for given test mode on all DUTs at specified temperature"""
def run_test_cycle(
config: dict,
temp_c: float,
cycle_num: int,
test_mode: str,
relay_ctl: RelayController,
dut_ctl: DutController,
) -> bool:
"""Run single test cycle for given test mode on all DUTs at specified temperature"""
# Select test scenario
if test_mode == "linear":
test_scenario = LinearScenario(
discharge_load=config['test_plan']['linear_discharge_load'],
relaxation_time_min=config['test_plan']['linear_relaxation_time_min'])
discharge_load=config["test_plan"]["linear_discharge_load"],
relaxation_time_min=config["test_plan"]["linear_relaxation_time_min"],
)
elif test_mode == "switching":
test_scenario = SwitchingScenario(
discharge_switch_cycle_min = config['test_plan']['switching_discharge_switch_cycle_min'],
relaxation_time_min = config['test_plan']['switching_relaxation_time_min'])
discharge_switch_cycle_min=config["test_plan"][
"switching_discharge_switch_cycle_min"
],
relaxation_time_min=config["test_plan"]["switching_relaxation_time_min"],
)
elif test_mode == "random_wonder":
test_scenario = RandomWonderScenario(
core_test_time=config['test_plan']['random_wonder_core_test_time_min'],
relaxation_time_min=config['test_plan']['random_wonder_relaxation_time_min'])
core_test_time=config["test_plan"]["random_wonder_core_test_time_min"],
relaxation_time_min=config["test_plan"][
"random_wonder_relaxation_time_min"
],
)
else:
@ -102,18 +118,20 @@ def run_test_cycle(config: dict, temp_c: float, cycle_num: int, test_mode: str,
test_scenario.setup(dut_controller=dut_ctl)
# Test loop
while(True):
while True:
# Call run function in loop to execute the test scenario.
finished = test_scenario.run(dut_controller=dut_ctl)
# Read power manager report and log them to file
test_scenario.log_data(dut_controller=dut_ctl,
output_directory = Path(config['general']['output_directory']),
temp=temp_c)
test_scenario.log_data(
dut_controller=dut_ctl,
output_directory=Path(config["general"]["output_directory"]),
temp=temp_c,
)
if finished:
break # Exit test loop
break # Exit test loop
time.sleep(1)
@ -122,6 +140,7 @@ def run_test_cycle(config: dict, temp_c: float, cycle_num: int, test_mode: str,
return True
def main():
logging.info("==============================================")
@ -137,7 +156,6 @@ def main():
logging.info("Initializing hardware controllers...")
relay_ctl = None
dut_ctl = None
init_ok = True
logging.info("==============================================")
logging.info(" Initializing Peripherals ")
@ -146,30 +164,31 @@ def main():
try:
# Initialize relay controller (Deditec board)
relay_ctl = RelayController(
ip_address=config['relay']['ip_address']
)
relay_ctl = RelayController(ip_address=config["relay"]["ip_address"])
# Initialize DUTs
from dut.dut_controller import DutController
dut_ctl = DutController(config['duts'],
relay_ctl=relay_ctl,
verbose=False)
dut_ctl = DutController(config["duts"], relay_ctl=relay_ctl, verbose=False)
except Exception as e:
logging.exception(f"Failed to intialize peripherals: {e}")
exit(1)
# Create output data directory
output_data_dir = Path(config['general']['output_directory'])
output_data_dir = Path(config["general"]["output_directory"])
try:
output_data_dir.mkdir(parents=True, exist_ok=True)
logging.info(f"Test results will be saved in: {output_data_dir.resolve()}")
except OSError as e:
logging.critical(f"Failed to create output directory {output_data_dir}: {e}. Exiting.")
if relay_ctl: relay_ctl.close();
if dut_ctl: dut_ctl.close();
sys.exit(1)
logging.critical(
f"Failed to create output directory {output_data_dir}: {e}. Exiting."
)
if relay_ctl:
relay_ctl.close()
if dut_ctl:
dut_ctl.close()
sys.exit(1)
############################################################################
# LOAD TEST PLAN
@ -179,16 +198,16 @@ def main():
logging.info(" TEST PLAN LOADING ")
logging.info("==============================================")
temperatures = config['test_plan'].get('temperatures_celsius', [25])
test_modes_to_run = config['test_plan'].get('test_modes', ['linear'])
cycles_per_temp = config['test_plan'].get('cycles_per_temperature', 1)
temperatures = config["test_plan"].get("temperatures_celsius", [25])
test_modes_to_run = config["test_plan"].get("test_modes", ["linear"])
cycles_per_temp = config["test_plan"].get("cycles_per_temperature", 1)
total_runs = len(temperatures) * cycles_per_temp * len(test_modes_to_run)
completed_runs = 0
logger.info(f' + Tested temperatures : {temperatures}')
logger.info(f' + Cycles per temperature : {cycles_per_temp}')
logger.info(f' + Test modes in every cycle : {test_modes_to_run}')
logger.info(f' + Total runs planned : {total_runs}')
logger.info(f" + Tested temperatures : {temperatures}")
logger.info(f" + Cycles per temperature : {cycles_per_temp}")
logger.info(f" + Test modes in every cycle : {test_modes_to_run}")
logger.info(f" + Total runs planned : {total_runs}")
start_time = time.time()
test_aborted = False
@ -208,12 +227,16 @@ def main():
logging.info("==============================================")
logging.info(f" {temp_c} °C TEMP TEST ")
logging.info("==============================================")
logging.info(f"Set the temperature chamber to {temp_c} °C and wait for stabilization.")
logging.info(
f"Set the temperature chamber to {temp_c} °C and wait for stabilization."
)
try:
if config["notifications"]["notification_channel"] == "slack":
send_slack_message(config["notifications"]["slack_webhook_url"],
f"""Set the temperature chamber to {temp_c} °C and confirm to continue with the test.""")
send_slack_message(
config["notifications"]["slack_webhook_url"],
f"""Set the temperature chamber to {temp_c} °C and confirm to continue with the test.""",
)
except Exception as e:
logging.error(f"Failed to send Slack notification: {e}")
@ -229,30 +252,42 @@ def main():
run_start_time = time.time()
logging.info(f"Running Test Mode: '{test_mode}'")
success = run_test_cycle(config, temp_c, cycle_num, test_mode,
relay_ctl,
dut_ctl)
success = run_test_cycle(
config, temp_c, cycle_num, test_mode, relay_ctl, dut_ctl
)
run_end_time = time.time()
run_duration_m = (run_end_time - run_start_time) / 60
if success:
completed_runs += 1
logging.info(f"Test Mode '{test_mode}' finished successfully in {run_duration_m:.1f} minutes.")
logging.info(
f"Test Mode '{test_mode}' finished successfully in {run_duration_m:.1f} minutes."
)
else:
logging.error(f"Test Mode '{test_mode}' failed after {run_duration_m:.1f} minutes.")
if config.get('general', {}).get('fail_fast', False):
logging.warning("Fail fast enabled. Aborting entire test plan.")
test_aborted = True
break
logging.error(
f"Test Mode '{test_mode}' failed after {run_duration_m:.1f} minutes."
)
if config.get("general", {}).get("fail_fast", False):
logging.warning(
"Fail fast enabled. Aborting entire test plan."
)
test_aborted = True
break
else:
logging.info("Continuing with the next mode/cycle/temperature.")
logging.info(
"Continuing with the next mode/cycle/temperature."
)
logging.info(f"Progress: {completed_runs}/{total_runs} total runs completed.")
logging.info(
f"Progress: {completed_runs}/{total_runs} total runs completed."
)
if test_aborted: break
if test_aborted:
break
if test_aborted: break
if test_aborted:
break
logging.info("==============================================")
logging.info(" TEST FINISHED ")
@ -272,25 +307,37 @@ def main():
logging.info("Ensuring all relays are OFF...")
dut_ctl.power_down_all() # Power down all DUTs
relay_ctl.close()
except Exception as e_relay: logging.error(f"Error during relay cleanup: {e_relay}")
except Exception as e_relay:
logging.error(f"Error during relay cleanup: {e_relay}")
if dut_ctl:
try: dut_ctl.close()
except Exception as e_dut: logging.error(f"Error during DUT controller cleanup: {e_dut}")
try:
dut_ctl.close()
except Exception as e_dut:
logging.error(f"Error during DUT controller cleanup: {e_dut}")
logging.info("==================== TEST SUMMARY ====================")
end_time = time.time()
total_duration_s = end_time - start_time
total_duration_h = total_duration_s / 3600
status = "ABORTED" if test_aborted else ("COMPLETED" if completed_runs == total_runs else "PARTIALLY COMPLETED")
status = (
"ABORTED"
if test_aborted
else (
"COMPLETED" if completed_runs == total_runs else "PARTIALLY COMPLETED"
)
)
logging.info("-" * 60)
logging.info(f"Test execution {status}.")
logging.info(f"Total runs completed: {completed_runs}/{total_runs}")
logging.info(f"Total duration: {total_duration_s:.0f} seconds ({total_duration_h:.2f} hours).")
logging.info(
f"Total duration: {total_duration_s:.0f} seconds ({total_duration_h:.2f} hours)."
)
logging.info("==================== TEST END ====================")
if __name__ == "__main__":
# Ensure at least one handler is set up
if not logger.hasHandlers():
logger.addHandler(logging.StreamHandler(sys.stdout))
logger.addHandler(logging.StreamHandler(sys.stdout))
main()

View File

@ -1,13 +1,19 @@
# notifications.py
import logging
import requests
import json
import logging
from typing import Optional
import requests
logger = logging.getLogger(__name__)
def send_slack_message(webhook_url: Optional[str], message: str, fallback_text: str = "Notification from Battery Tester") -> bool:
def send_slack_message(
webhook_url: Optional[str],
message: str,
fallback_text: str = "Notification from Battery Tester",
) -> bool:
"""
Send slack message using Incoming Webhook URL
@ -27,24 +33,18 @@ def send_slack_message(webhook_url: Optional[str], message: str, fallback_text:
slack_data = {
"text": fallback_text,
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": message
}
}
]
"blocks": [{"type": "section", "text": {"type": "mrkdwn", "text": message}}],
}
try:
payload_json_string = json.dumps(slack_data)
post_data = {'payload': payload_json_string}
post_data = {"payload": payload_json_string}
logger.info(f"Sending Slack notification via webhook (using payload parameter)...")
logger.info(
"Sending Slack notification via webhook (using payload parameter)..."
)
logger.debug(f"Slack Webhook URL: {webhook_url}")
logger.debug(f"Slack Payload (JSON String): {payload_json_string}")
@ -53,21 +53,29 @@ def send_slack_message(webhook_url: Optional[str], message: str, fallback_text:
webhook_url,
data=post_data,
# headers={'Content-Type': 'application/x-www-form-urlencoded'}
timeout=timeout_seconds
timeout=timeout_seconds,
)
if response.status_code == 200 and response.text.lower() == "ok":
logger.info("Slack notification request sent successfully.")
return True
else:
error_detail = f"Status: {response.status_code}, Response: '{response.text[:500]}...'"
error_detail = (
f"Status: {response.status_code}, Response: '{response.text[:500]}...'"
)
logger.error(f"Slack request failed. {error_detail}")
if response.status_code == 400 and "invalid_payload" in response.text:
logger.error("Slack Error Detail: The JSON payload structure might be incorrect.")
logger.error(
"Slack Error Detail: The JSON payload structure might be incorrect."
)
elif response.status_code == 403:
logger.error("Slack Error Detail: Forbidden - Check webhook URL validity or permissions.")
logger.error(
"Slack Error Detail: Forbidden - Check webhook URL validity or permissions."
)
elif response.status_code == 404:
logger.error("Slack Error Detail: Not Found - The webhook URL might be incorrect or deactivated.")
logger.error(
"Slack Error Detail: Not Found - The webhook URL might be incorrect or deactivated."
)
return False
except requests.exceptions.RequestException as e:

View File

@ -1,11 +1,10 @@
import sys
import time
from pathlib import Path
from serial.tools import list_ports
from hardware_ctl.gdm8351 import GDM8351
from dut import Dut
from dut import Dut
from hardware_ctl.gdm8351 import GDM8351
from serial.tools import list_ports
output_directory = Path("single_capture_test_results")
test_description = "non_specified_test"
@ -19,6 +18,7 @@ log file. User can also select to log the temepertature readings from an
external thermocouple sensor connected to the GDM8351 multimeter.
"""
def main():
print("**********************************************************")
@ -42,7 +42,7 @@ def main():
dut_port_selection = input("Select VCP port number (or Q to quit the selection): ")
if dut_port_selection.lower() == 'q':
if dut_port_selection.lower() == "q":
print("Exiting script.")
sys.exit(0)
@ -59,7 +59,6 @@ def main():
sys.exit(1)
# Initialize DUT
print("**********************************************************")
print(" GDM8351 port selection (temp measurement) ")
print("**********************************************************")
@ -84,7 +83,6 @@ def main():
except Exception as e:
print(f"Error configuring temperature sensing: {e}")
# Creat test time ID
test_time_id = f"{time.strftime('%y%m%d%H%M')}"
@ -95,7 +93,6 @@ def main():
print("Failed to create output directory:", e)
sys.exit(1)
#########################################################################
# Test setup section
#########################################################################
@ -111,21 +108,24 @@ def main():
while True:
dut.log_data(output_directory=output_directory,
test_time_id=test_time_id,
test_scenario="single_capture",
test_phase=test_description,
temp=temp_description,
verbose=True)
dut.log_data(
output_directory=output_directory,
test_time_id=test_time_id,
test_scenario="single_capture",
test_phase=test_description,
temp=temp_description,
verbose=True,
)
# Read temperature from GDM8351
gdm8351.log_temperature(output_directory=output_directory,
test_time_id=test_time_id,
verbose=True)
gdm8351.log_temperature(
output_directory=output_directory,
test_time_id=test_time_id,
verbose=True,
)
time.sleep(1)
except KeyboardInterrupt:
print("Test execution interrupted by user (Ctrl+C)")
except Exception as e:
@ -138,12 +138,3 @@ def main():
if __name__ == "__main__":
main()

View File

@ -12,13 +12,13 @@ log_interval_seconds = 1
name = "DUT1"
cpu_id = "05001D000A50325557323120"
usb_port = "/dev/ttyACM0"
relay_port = 6
relay_port = 1
[[duts]]
name = "DUT2"
cpu_id = "1C0023000A50325557323120"
usb_port = "/dev/ttyACM1"
relay_port = 8
#[[duts]]
#name = "DUT2"
#cpu_id = "1C0023000A50325557323120"
#usb_port = "/dev/ttyACM0"
#relay_port = 1
# Uncomment and edit to add more DUTs
#[[duts]]

View File

@ -1,4 +1,5 @@
from .test_scenario import TestScenario
from .switching_scenario import SwitchingScenario
from .linear_scenario import LinearScenario
from .random_wonder_scenario import RandomWonderScenario
from .switching_scenario import SwitchingScenario
__all__ = ["LinearScenario", "RandomWonderScenario", "SwitchingScenario"]

View File

@ -1,18 +1,17 @@
from .test_scenario import TestScenario
import enum
import logging
import time
import sys
import enum
from pathlib import Path
from dut.dut_controller import DutController
from hardware_ctl.relay_controller import RelayController
from .test_scenario import TestScenario
SKIP_CHARGING = False
SKIP_RELAXING = False
SKIP_DISCHARGING = False
class ScenarioPhase(enum.Enum):
NOT_STARTED = 0
CHARGING = 1
@ -21,11 +20,10 @@ class ScenarioPhase(enum.Enum):
DISCHARGED_RELAXING = 4
DONE = 5
class LinearScenario(TestScenario):
def __init__(self,
discharge_load = 100,
relaxation_time_min=60):
def __init__(self, discharge_load=100, relaxation_time_min=60):
# DUT use display backlight intensity to change its load (discharge
# current). Backlight intensity could be set in range of 0-255, but
@ -40,15 +38,14 @@ class LinearScenario(TestScenario):
self.time_id = "0000000000"
self.previous_phase = ScenarioPhase.NOT_STARTED
def setup(self,
dut_controller : DutController):
def setup(self, dut_controller: DutController):
# Start with charging phase first, so connect the charger with relay
# and enable the charging.
self.scenario_phase = ScenarioPhase.CHARGING
dut_controller.power_up_all() # Power up all DUTs
## Wait for DUTs to power up
# Wait for DUTs to power up
time.sleep(3)
dut_controller.set_backlight(150)
@ -61,7 +58,7 @@ class LinearScenario(TestScenario):
def run(self, dut_controller):
if(self.previous_phase != self.scenario_phase):
if self.previous_phase != self.scenario_phase:
logging.info(f"Linear scenario entered {self.scenario_phase} phase.")
self.previous_phase = self.scenario_phase
@ -70,7 +67,7 @@ class LinearScenario(TestScenario):
# Move to next phase when all DUTs are charged.
if self.scenario_phase == ScenarioPhase.CHARGING:
if(dut_controller.all_duts_charged() or SKIP_CHARGING):
if dut_controller.all_duts_charged() or SKIP_CHARGING:
self.scenario_phase = ScenarioPhase.CHARGED_RELAXING
self.phase_start = time.time()
@ -78,7 +75,9 @@ class LinearScenario(TestScenario):
elif self.scenario_phase == ScenarioPhase.CHARGED_RELAXING:
# Relaxation time is set in minutes, so convert to seconds
if (time.time() - self.phase_start) >= (self.relaxation_time_min * 60) or SKIP_RELAXING:
if (time.time() - self.phase_start) >= (
self.relaxation_time_min * 60
) or SKIP_RELAXING:
dut_controller.power_down_all()
dut_controller.set_backlight(self.discharge_load)
@ -89,15 +88,16 @@ class LinearScenario(TestScenario):
elapsed_min = int((time.time() - self.phase_start) / 60)
if(self.remaining_time != self.relaxation_time_min - elapsed_min):
if self.remaining_time != self.relaxation_time_min - elapsed_min:
# Update remaining time only if it changed
self.remaining_time = self.relaxation_time_min - elapsed_min
logging.info(f"Relaxing for {self.relaxation_time_min} minutes, remaining: {self.remaining_time} minutes")
logging.info(
f"Relaxing for {self.relaxation_time_min} minutes, remaining: {self.remaining_time} minutes"
)
elif self.scenario_phase == ScenarioPhase.DISCHARGING:
if(dut_controller.all_duts_discharged() or
SKIP_DISCHARGING):
if dut_controller.all_duts_discharged() or SKIP_DISCHARGING:
self.scenario_phase = ScenarioPhase.DISCHARGED_RELAXING
self.phase_start = time.time()
@ -105,7 +105,9 @@ class LinearScenario(TestScenario):
elif self.scenario_phase == ScenarioPhase.DISCHARGED_RELAXING:
# Relaxation time is set in minutes, so convert to seconds
if (time.time() - self.phase_start) >= (self.relaxation_time_min * 60) or SKIP_RELAXING:
if (time.time() - self.phase_start) >= (
self.relaxation_time_min * 60
) or SKIP_RELAXING:
self.scenario_phase = ScenarioPhase.DONE
logging.info("Scenario completed successfully.")
@ -114,27 +116,25 @@ class LinearScenario(TestScenario):
else:
elapsed_min = int((time.time() - self.phase_start) / 60)
if(self.remaining_time != self.relaxation_time_min - elapsed_min):
if self.remaining_time != self.relaxation_time_min - elapsed_min:
# Update remaining time only if it changed
self.remaining_time = self.relaxation_time_min - elapsed_min
logging.info(f"Relaxing for {self.relaxation_time_min} minutes, remaining: {self.remaining_time} minutes")
logging.info(
f"Relaxing for {self.relaxation_time_min} minutes, remaining: {self.remaining_time} minutes"
)
# Relax
return False
def log_data(self, dut_controller, output_directory: Path, temp):
dut_controller.log_data(output_directory, self.test_time_id, "linear",
self.scenario_phase.name.lower(), temp)
dut_controller.log_data(
output_directory,
self.test_time_id,
"linear",
self.scenario_phase.name.lower(),
temp,
)
def teardown(self, dut_controller):
pass

View File

@ -1,20 +1,19 @@
import enum
import logging
import random
import time
from pathlib import Path
from dut.dut_controller import DutController
from .test_scenario import TestScenario
import logging
import time
import sys
import enum
import random
from pathlib import Path
from dut.dut_controller import DutController
from hardware_ctl.relay_controller import RelayController
SKIP_CHARGING = False
SKIP_RELAXING = False
SKIP_RANDOM_WONDER = False
SKIP_DISCHARGING = False
class ScenarioPhase(enum.Enum):
NOT_STARTED = 0
CHARGING = 1
@ -24,11 +23,10 @@ class ScenarioPhase(enum.Enum):
DISCHARGED_RELAXING = 5
DONE = 6
class RandomWonderScenario(TestScenario):
def __init__(self,
core_test_time = 60,
relaxation_time_min = 60):
def __init__(self, core_test_time=60, relaxation_time_min=60):
self.relaxation_time_min = relaxation_time_min
self.phase_start = time.time()
@ -42,15 +40,14 @@ class RandomWonderScenario(TestScenario):
self.time_id = "0000000000"
self.previous_phase = ScenarioPhase.NOT_STARTED
def setup(self,
dut_controller : DutController):
def setup(self, dut_controller: DutController):
# Start with charging phase first, so connect the charger with relay
# and enable the charging.
self.scenario_phase = ScenarioPhase.CHARGING
dut_controller.power_up_all() # Power up all DUTs
## Wait for DUTs to power up
# Wait for DUTs to power up
time.sleep(3)
dut_controller.set_backlight(150)
@ -64,7 +61,7 @@ class RandomWonderScenario(TestScenario):
def run(self, dut_controller):
if(self.previous_phase != self.scenario_phase):
if self.previous_phase != self.scenario_phase:
logging.info(f"Random Wonder scenario entered {self.scenario_phase} phase.")
self.previous_phase = self.scenario_phase
@ -73,7 +70,7 @@ class RandomWonderScenario(TestScenario):
# Move to next phase when all DUTs are charged.
if self.scenario_phase == ScenarioPhase.CHARGING:
if(dut_controller.all_duts_charged() or SKIP_CHARGING):
if dut_controller.all_duts_charged() or SKIP_CHARGING:
self.scenario_phase = ScenarioPhase.CHARGED_RELAXING
self.phase_start = time.time()
@ -81,7 +78,9 @@ class RandomWonderScenario(TestScenario):
elif self.scenario_phase == ScenarioPhase.CHARGED_RELAXING:
# Relaxation time is set in minutes, so convert to seconds
if (time.time() - self.phase_start) >= (self.relaxation_time_min * 60) or SKIP_RELAXING:
if (time.time() - self.phase_start) >= (
self.relaxation_time_min * 60
) or SKIP_RELAXING:
dut_controller.power_down_all()
self.scenario_phase = ScenarioPhase.RANDOM_WONDER
@ -90,16 +89,20 @@ class RandomWonderScenario(TestScenario):
else:
elapsed_min = int((time.time() - self.phase_start) / 60)
if(self.remaining_time != self.relaxation_time_min - elapsed_min):
if self.remaining_time != self.relaxation_time_min - elapsed_min:
# Update remaining time only if it changed
self.remaining_time = self.relaxation_time_min - elapsed_min
logging.info(f"Relaxing for {self.relaxation_time_min} minutes, remaining: {self.remaining_time} minutes")
logging.info(
f"Relaxing for {self.relaxation_time_min} minutes, remaining: {self.remaining_time} minutes"
)
elif self.scenario_phase == ScenarioPhase.RANDOM_WONDER:
# Random Wonder phase, switch the backlight intensity to change
# the discharge current.
if (time.time() - self.phase_start) >= (self.core_test_time * 60) or SKIP_RANDOM_WONDER:
if (time.time() - self.phase_start) >= (
self.core_test_time * 60
) or SKIP_RANDOM_WONDER:
# Random wonder test over
dut_controller.power_down_all()
@ -118,9 +121,15 @@ class RandomWonderScenario(TestScenario):
dut_controller.power_down_all()
dut_controller.disable_charging()
self.random_cycle_time_min = random.randint(5, 15) # Random cycle time between 5 and 15 minutes
self.random_discharge_load = random.randint(0, 225) # Random discharge load between 0 and 225
dut_controller.set_backlight(self.random_discharge_load) # Set backlight to 0 to stop discharge
self.random_cycle_time_min = random.randint(
5, 15
) # Random cycle time between 5 and 15 minutes
self.random_discharge_load = random.randint(
0, 225
) # Random discharge load between 0 and 225
dut_controller.set_backlight(
self.random_discharge_load
) # Set backlight to 0 to stop discharge
self.random_cycle_start = time.time()
@ -128,22 +137,30 @@ class RandomWonderScenario(TestScenario):
# One of the DUTs got discharged, bounce back to charging.
logging.info("One of the DUTs got discharged, switching back to charging.")
logging.info(
"One of the DUTs got discharged, switching back to charging."
)
self.random_direction_up = True
dut_controller.power_up_all() # Power up all DUTs
dut_controller.enable_charging() # Enable charging
dut_controller.power_up_all() # Power up all DUTs
dut_controller.enable_charging() # Enable charging
self.random_cycle_time_min = random.randint(5, 15) # Random cycle time between 5 and 15 minutes
self.random_cycle_time_min = random.randint(
5, 15
) # Random cycle time between 5 and 15 minutes
self.random_cycle_start = time.time()
else:
if (time.time() - self.random_cycle_start) >= (self.random_cycle_time_min * 60):
if (time.time() - self.random_cycle_start) >= (
self.random_cycle_time_min * 60
):
# Randomize following section
self.random_direction_up = random.choice([True, False])
self.random_cycle_time_min = random.randint(5, 15) # Random cycle time between 5 and 15 minutes
self.random_cycle_time_min = random.randint(
5, 15
) # Random cycle time between 5 and 15 minutes
if self.random_direction_up:
@ -153,33 +170,40 @@ class RandomWonderScenario(TestScenario):
else:
self.random_discharge_load = random.randint(0, 225) # Random discharge load between 0 and 225
self.random_discharge_load = random.randint(
0, 225
) # Random discharge load between 0 and 225
dut_controller.disable_charging() # Disable charging
dut_controller.power_down_all() # Power down all DUTs
dut_controller.set_backlight(self.random_discharge_load) # Set backlight to 0 to stop discharge
dut_controller.set_backlight(
self.random_discharge_load
) # Set backlight to 0 to stop discharge
# Reset phase start time
self.random_cycle_start = time.time()
logging.info(f"Random Wonder cycle changed: "
f"direction={'up' if self.random_direction_up else 'down'}, "
f"cycle_time={self.random_cycle_time_min}min, "
f"discharge_load={self.random_discharge_load}")
logging.info(
f"Random Wonder cycle changed: "
f"direction={'up' if self.random_direction_up else 'down'}, "
f"cycle_time={self.random_cycle_time_min}min, "
f"discharge_load={self.random_discharge_load}"
)
# Continue in random wonder phase
elif self.scenario_phase == ScenarioPhase.DISCHARGING:
if(dut_controller.all_duts_discharged() or
SKIP_DISCHARGING):
if dut_controller.all_duts_discharged() or SKIP_DISCHARGING:
self.scenario_phase = ScenarioPhase.DISCHARGED_RELAXING
self.phase_start = time.time()
elif self.scenario_phase == ScenarioPhase.DISCHARGED_RELAXING:
# Relaxation time is set in minutes, so convert to seconds
if (time.time() - self.phase_start) >= (self.relaxation_time_min * 60) or SKIP_RELAXING:
if (time.time() - self.phase_start) >= (
self.relaxation_time_min * 60
) or SKIP_RELAXING:
self.scenario_phase = ScenarioPhase.DONE
logging.info("Scenario completed successfully.")
@ -188,27 +212,25 @@ class RandomWonderScenario(TestScenario):
else:
elapsed_min = int((time.time() - self.phase_start) / 60)
if(self.remaining_time != self.relaxation_time_min - elapsed_min):
if self.remaining_time != self.relaxation_time_min - elapsed_min:
# Update remaining time only if it changed
self.remaining_time = self.relaxation_time_min - elapsed_min
logging.info(f"Relaxing for {self.relaxation_time_min} minutes, remaining: {self.remaining_time} minutes")
logging.info(
f"Relaxing for {self.relaxation_time_min} minutes, remaining: {self.remaining_time} minutes"
)
# Relax
return False
def log_data(self, dut_controller, output_directory: Path, temp):
dut_controller.log_data(output_directory, self.test_time_id, "random_wonder",
self.scenario_phase.name.lower(), temp)
dut_controller.log_data(
output_directory,
self.test_time_id,
"random_wonder",
self.scenario_phase.name.lower(),
temp,
)
def teardown(self, dut_controller):
pass

View File

@ -1,18 +1,17 @@
from .test_scenario import TestScenario
import enum
import logging
import time
import sys
import enum
from pathlib import Path
from dut.dut_controller import DutController
from hardware_ctl.relay_controller import RelayController
from .test_scenario import TestScenario
SKIP_CHARGING = False
SKIP_RELAXING = False
SKIP_DISCHARGING = False
class ScenarioPhase(enum.Enum):
NOT_STARTED = 0
CHARGING = 1
@ -24,9 +23,7 @@ class ScenarioPhase(enum.Enum):
class SwitchingScenario(TestScenario):
def __init__(self,
discharge_switch_cycle_min = 5,
relaxation_time_min = 60):
def __init__(self, discharge_switch_cycle_min=5, relaxation_time_min=60):
# DUT use display backlight intensity to change its load (discharge
# current). Backlight intensity could be set in range of 0-255, but
@ -35,22 +32,21 @@ class SwitchingScenario(TestScenario):
# and 220mA when set to max 225.
self.discharge_switch_cycle_min = discharge_switch_cycle_min
self.relaxation_time_min = relaxation_time_min
self.discharge_load = 100 # initial discharge load
self.discharge_load = 100 # initial discharge load
self.phase_start = time.time()
self.remaining_time = time.time()
self.scenario_phase = ScenarioPhase.NOT_STARTED
self.test_time_id = "0000000000"
self.previous_phase = ScenarioPhase.NOT_STARTED
def setup(self,
dut_controller : DutController):
def setup(self, dut_controller: DutController):
# Start with charging phase first, so connect the charger with relay
# and enable the charging.
self.scenario_phase = ScenarioPhase.CHARGING
dut_controller.power_up_all() # Power up all DUTs
## Wait for DUTs to power up
# Wait for DUTs to power up
time.sleep(3)
dut_controller.set_backlight(150)
@ -64,7 +60,7 @@ class SwitchingScenario(TestScenario):
def run(self, dut_controller):
if(self.previous_phase != self.scenario_phase):
if self.previous_phase != self.scenario_phase:
logging.info(f"Switching scenario entered {self.scenario_phase} phase.")
self.previous_phase = self.scenario_phase
@ -73,7 +69,7 @@ class SwitchingScenario(TestScenario):
# Move to next phase when all DUTs are charged.
if self.scenario_phase == ScenarioPhase.CHARGING:
if(dut_controller.all_duts_charged() or SKIP_CHARGING):
if dut_controller.all_duts_charged() or SKIP_CHARGING:
self.scenario_phase = ScenarioPhase.CHARGED_RELAXING
self.phase_start = time.time()
@ -81,7 +77,9 @@ class SwitchingScenario(TestScenario):
elif self.scenario_phase == ScenarioPhase.CHARGED_RELAXING:
# Relaxation time is set in minutes, so convert to seconds
if (time.time() - self.phase_start) >= (self.relaxation_time_min * 60) or SKIP_RELAXING:
if (time.time() - self.phase_start) >= (
self.relaxation_time_min * 60
) or SKIP_RELAXING:
dut_controller.power_down_all()
self.scenario_phase = ScenarioPhase.DISCHARGING
@ -90,18 +88,21 @@ class SwitchingScenario(TestScenario):
elapsed_min = int((time.time() - self.phase_start) / 60)
if(self.remaining_time != self.relaxation_time_min - elapsed_min):
if self.remaining_time != self.relaxation_time_min - elapsed_min:
# Update remaining time only if it changed
self.remaining_time = self.relaxation_time_min - elapsed_min
logging.info(f"Relaxing for {self.relaxation_time_min} minutes, remaining: {self.remaining_time} minutes")
logging.info(
f"Relaxing for {self.relaxation_time_min} minutes, remaining: {self.remaining_time} minutes"
)
elif self.scenario_phase == ScenarioPhase.DISCHARGING:
if(dut_controller.all_duts_discharged() or
SKIP_DISCHARGING):
if dut_controller.all_duts_discharged() or SKIP_DISCHARGING:
self.scenario_phase = ScenarioPhase.DISCHARGED_RELAXING
self.phase_start = time.time()
elif (time.time() - self.phase_start) >= (self.discharge_switch_cycle_min * 60):
elif (time.time() - self.phase_start) >= (
self.discharge_switch_cycle_min * 60
):
# Change discharge load
self.discharge_load += 50
@ -111,13 +112,16 @@ class SwitchingScenario(TestScenario):
dut_controller.set_backlight(self.discharge_load)
self.phase_start = time.time()
logging.info(f"Switched discharge cycle to {self.discharge_load}, next change in {self.discharge_switch_cycle_min} minutes.")
logging.info(
f"Switched discharge cycle to {self.discharge_load}, next change in {self.discharge_switch_cycle_min} minutes."
)
elif self.scenario_phase == ScenarioPhase.DISCHARGED_RELAXING:
# Relaxation time is set in minutes, so convert to seconds
if (time.time() - self.phase_start) >= (self.relaxation_time_min * 60) or SKIP_RELAXING:
if (time.time() - self.phase_start) >= (
self.relaxation_time_min * 60
) or SKIP_RELAXING:
self.scenario_phase = ScenarioPhase.DONE
logging.info("Scenario completed successfully.")
@ -126,27 +130,25 @@ class SwitchingScenario(TestScenario):
else:
elapsed_min = int((time.time() - self.phase_start) / 60)
if(self.remaining_time != self.relaxation_time_min - elapsed_min):
if self.remaining_time != self.relaxation_time_min - elapsed_min:
# Update remaining time only if it changed
self.remaining_time = self.relaxation_time_min - elapsed_min
logging.info(f"Relaxing for {self.relaxation_time_min} minutes, remaining: {self.remaining_time} minutes")
logging.info(
f"Relaxing for {self.relaxation_time_min} minutes, remaining: {self.remaining_time} minutes"
)
# Relax
return False
def log_data(self, dut_controller, output_directory: Path, temp):
dut_controller.log_data(output_directory, self.test_time_id, "switching",
self.scenario_phase.name.lower(), temp)
dut_controller.log_data(
output_directory,
self.test_time_id,
"switching",
self.scenario_phase.name.lower(),
temp,
)
def teardown(self, dut_controller):
pass

View File

@ -1,5 +1,6 @@
from abc import ABC, abstractmethod
class TestScenario(ABC):
"""Parent class for test scenarios."""
@ -18,53 +19,3 @@ class TestScenario(ABC):
@abstractmethod
def log_data(self):
pass