From 6837f3ab5fc97e2a7c1669f67f7c2b9893679bcc Mon Sep 17 00:00:00 2001 From: kopecdav Date: Fri, 20 Jun 2025 12:57:26 +0200 Subject: [PATCH] feat(core): rationalized package imports, reworked/simplify deditec library, purge Czech comments. [no changelog] --- tools/automatic_battery_tester/.gitignore | 2 +- tools/automatic_battery_tester/README.md | 2 +- .../automatic_battery_tester/dut/__init__.py | 2 + tools/automatic_battery_tester/dut/dut.py | 28 ++- .../dut/dut_controller.py | 64 ++---- .../hardware_ctl/__init__.py | 1 + .../hardware_ctl/deditec/__init__.py | 1 + .../hardware_ctl/deditec/bs_weu_16.py | 149 +++++++++++++ .../hardware_ctl/deditec_driver/__init__.py | 0 .../deditec_driver/deditec_1_16_on.py | 127 ----------- .../hardware_ctl/deditec_driver/helpers.py | 210 ------------------ .../deditec_driver/pin_cache.json | 4 - .../hardware_ctl/relay_controller.py | 92 ++------ tools/automatic_battery_tester/main_tester.py | 28 +-- .../automatic_battery_tester/notifications.py | 16 -- .../automatic_battery_tester/test_config.toml | 3 +- .../test_logic/__init__.py | 4 + 17 files changed, 220 insertions(+), 513 deletions(-) create mode 100644 tools/automatic_battery_tester/dut/__init__.py create mode 100644 tools/automatic_battery_tester/hardware_ctl/deditec/__init__.py create mode 100644 tools/automatic_battery_tester/hardware_ctl/deditec/bs_weu_16.py delete mode 100644 tools/automatic_battery_tester/hardware_ctl/deditec_driver/__init__.py delete mode 100644 tools/automatic_battery_tester/hardware_ctl/deditec_driver/deditec_1_16_on.py delete mode 100644 tools/automatic_battery_tester/hardware_ctl/deditec_driver/helpers.py delete mode 100644 tools/automatic_battery_tester/hardware_ctl/deditec_driver/pin_cache.json diff --git a/tools/automatic_battery_tester/.gitignore b/tools/automatic_battery_tester/.gitignore index babadb1a97..9b31665a83 100644 --- a/tools/automatic_battery_tester/.gitignore +++ b/tools/automatic_battery_tester/.gitignore @@ -2,4 +2,4 @@ venv/ test_results/ -test.log \ No newline at end of file +test.log diff --git a/tools/automatic_battery_tester/README.md b/tools/automatic_battery_tester/README.md index 92ee9f5c46..c37633cd05 100644 --- a/tools/automatic_battery_tester/README.md +++ b/tools/automatic_battery_tester/README.md @@ -83,7 +83,7 @@ test_modes = ["linear", "switching", "random_wonder"] ```sh python main_tester.py ``` -4. Follow console and log (`test.log`) instructions. +4. Follow console instructions or check log in "test.log" file. --- diff --git a/tools/automatic_battery_tester/dut/__init__.py b/tools/automatic_battery_tester/dut/__init__.py new file mode 100644 index 0000000000..01fcca616c --- /dev/null +++ b/tools/automatic_battery_tester/dut/__init__.py @@ -0,0 +1,2 @@ +from .dut_controller import DutController +from .dut import Dut diff --git a/tools/automatic_battery_tester/dut/dut.py b/tools/automatic_battery_tester/dut/dut.py index 2c980d9805..7361590bbb 100644 --- a/tools/automatic_battery_tester/dut/dut.py +++ b/tools/automatic_battery_tester/dut/dut.py @@ -1,5 +1,3 @@ - - import serial import time import logging @@ -40,9 +38,6 @@ class DutProdtestResponse: data_entries: list = field(default_factory=list) OK: bool = False - # def __init__(self): - # self.trace = [] - # self.data_entries = [] class Dut(): @@ -72,7 +67,7 @@ class Dut(): self.entry_interactive_mode() self.enable_charging() - time.sleep(2) # Give some time for the command to be processed + time.sleep(2) # Give some time to process te commands if not self.ping(): self.init_error() @@ -351,12 +346,23 @@ class Dut(): prefix = f"\033[95m[{self.name}]\033[0m" logging.debug(prefix + " < " + message) + def close(self): + """ + Close the DUT's serial port and clean up resources. + """ + if self.vcp is not None and self.vcp.is_open: + try: + self.vcp.close() + except Exception as e: + logging.warning(f"Failed to close VCP for {self.name}: {e}") + self.vcp = None + self.name = None + self.relay_ctl = None + self.relay_port = None + def __del__(self): try: if hasattr(self, "vcp") and self.vcp is not None and self.vcp.is_open: - self.vcp.close() + self.close() except Exception as e: - logging.warning(f"Failed to close VCP for {self.name}: {e}") - self.vcp = None - self.name = None - self.verbose = None + logging.warning(f"Error during DUT cleanup: {e}") diff --git a/tools/automatic_battery_tester/dut/dut_controller.py b/tools/automatic_battery_tester/dut/dut_controller.py index b623880f21..5fcbe0e416 100644 --- a/tools/automatic_battery_tester/dut/dut_controller.py +++ b/tools/automatic_battery_tester/dut/dut_controller.py @@ -1,34 +1,15 @@ import time import logging +import sys +from pathlib import Path from dataclasses import dataclass from typing import TYPE_CHECKING from .dut import Dut from hardware_ctl.relay_controller import RelayController -# Podmíněný import pro type hinting -if TYPE_CHECKING: - from libs.prodtest_cli import prodtest_cli, ProdtestResponse - -# Přidání cesty k libs pro import prodtest_cli -import sys -from pathlib import Path -libs_path_ctrl = Path(__file__).parent.parent / "libs" -if str(libs_path_ctrl) not in sys.path: - sys.path.insert(0, str(libs_path_ctrl)) - -# Import prodtest_cli po úpravě sys.path -try: - from prodtest_cli import prodtest_cli, ProdtestResponse -except ImportError as e: - logging.error(f"FATAL: Could not import prodtest_cli from {libs_path_ctrl}. Check file presence and structure.") - # Definujeme dummy třídy, aby zbytek mohl selhat při inicializaci - class ProdtestResponse: pass - class prodtest_cli: - def __init__(self, *args, **kwargs): raise ImportError("prodtest_cli not found") - @dataclass class ProdtestPmReport: - """Dataclass pro uchování výsledků pm-report.""" + """pm-report command response data structure""" power_state: str = "" usb: str = "" wlc: str = "" @@ -70,7 +51,7 @@ class DutController: """ Device-under-test (DUT) controller. - provides direct simulataneous control of configured DUTs + provides direct simultaneous control of configured DUTs """ def __init__(self, duts, relay_ctl, verbose: bool = False): @@ -79,19 +60,19 @@ class DutController: # Power off all DUTs before self test for d in duts: - self.relay_ctl.set_relay_off(d["relay_port"]) + self.relay_ctl.set_relay_off(d['relay_port']) for d in duts: try: - dut = Dut(name=d["name"], - cpu_id=d["cpu_id"], - usb_port=d["usb_port"], - relay_port=d["relay_port"], + dut = Dut(name=d['name'], + cpu_id=d['cpu_id'], + usb_port=d['usb_port'], + relay_port=d['relay_port'], relay_ctl=self.relay_ctl, - verbose=True) + verbose=verbose) self.duts.append(dut) - logging.info(f"Initialized {d["name"]} on port {d['usb_port']}") + logging.info(f"Initialized {d['name']} on port {d['usb_port']}") logging.info(f" -- cpu_id hash : {dut.get_cpu_id_hash()}") logging.info(f" -- relay port : {dut.get_relay_port()}") @@ -113,7 +94,6 @@ class DutController: for d in self.duts: d.power_down() - def enable_charging(self): """ Enable charging on all DUTs. @@ -239,24 +219,12 @@ class DutController: test_phase, temp) - - # --- Cleanup --- def close(self): - pass - # """Ukončí sériovou komunikaci.""" - # if self.cli and hasattr(self.cli, 'vcp') and self.cli.vcp: - # logging.info("Closing DUT serial connection...") - # try: - # self.cli.vcp.close() - # except Exception as e: - # logging.error(f"Error closing serial port: {e}") - # self.cli = None - # elif self.cli: - # logging.debug("DUT controller had cli object, but no active vcp to close.") - # self.cli = None - # else: - # logging.debug("DUT controller already closed or not initialized.") + for d in self.duts: + try: + d.close() + except Exception as e: + logging.error(f"Failed to close DUT {d.name}: {e}") def __del__(self): - # Zajistí zavření portu i při neočekávaném ukončení objektu self.close() diff --git a/tools/automatic_battery_tester/hardware_ctl/__init__.py b/tools/automatic_battery_tester/hardware_ctl/__init__.py index e69de29bb2..4960686a6b 100644 --- a/tools/automatic_battery_tester/hardware_ctl/__init__.py +++ b/tools/automatic_battery_tester/hardware_ctl/__init__.py @@ -0,0 +1 @@ +from .relay_controller import RelayController diff --git a/tools/automatic_battery_tester/hardware_ctl/deditec/__init__.py b/tools/automatic_battery_tester/hardware_ctl/deditec/__init__.py new file mode 100644 index 0000000000..bcc9f587b7 --- /dev/null +++ b/tools/automatic_battery_tester/hardware_ctl/deditec/__init__.py @@ -0,0 +1 @@ +from .bs_weu_16 import DeditecBsWeu16 diff --git a/tools/automatic_battery_tester/hardware_ctl/deditec/bs_weu_16.py b/tools/automatic_battery_tester/hardware_ctl/deditec/bs_weu_16.py new file mode 100644 index 0000000000..781ab1d5da --- /dev/null +++ b/tools/automatic_battery_tester/hardware_ctl/deditec/bs_weu_16.py @@ -0,0 +1,149 @@ + +import logging +import signal +import socket +from typing import Any, List +from typing_extensions import Self + +IP = "192.168.1.10" # default static IP address +PORT = 9912 +PIN_COUNT = 16 # Total number of pins on the Deditec BS-WEU-16 board + +class DeditecBsWeu16: + + def __init__(self, ip: str = IP, port: int = PORT, timeout_seconds: int = 3): + + self.ip = ip + self.port = port + self.timeout_seconds = max(1, timeout_seconds) + self.socket: socket.socket | None = None + self.pins_on_latched = [] + + logging.debug(f"DeditecBsWeu16: instance created on {self.ip}:{self.port}") + + def connect(self) -> bool: + if self.socket is not None: + logging.warning("DeditecBsWeu16: connect called, but socket already exists. Closing first.") + self.close_connection() + + logging.debug(f"DeditecBsWeu16: connecting to device at {self.ip}:{self.port}...") + try: + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.settimeout(self.timeout_seconds) + self.socket.connect((self.ip, self.port)) + logging.debug("DeditecBsWeu16: connection established") + return True + except socket.timeout: + logging.error(f"DeditecBsWeu16: connection timed out ({self.timeout_seconds}s)") + self.socket = None + return False + except Exception as e: + logging.exception(f"DeditecBsWeu16: connection error > {e}") + self.socket = None + return False + + def send_command(self, command: bytes): + + if self.socket is None: + logging.error("DeditecBsWeu16: send_command called but not connected.") + return False + + logging.debug(f"DeditecBsWeu16: sending command: {command!r}") + try: + self.socket.sendall(command) + data = self.socket.recv(64) + logging.debug(f"DeditecBsWeu16: received confirmation data (len={len(data)}): {data!r}") + return True + except socket.timeout: + logging.error(f"DeditecBsWeu16: socket timeout during send/recv ({self.timeout_seconds}s)") + return False + except Exception as e: + logging.exception(f"DeditecBsWeu16: error sending command or receiving confirmation: {e}") + return False + + + def control_relay(self, pins_on: List[int], pins_off: List[int]) -> bool: + """ Turns on all pins specified in pins_on list and turns off all pins specified in pins_off list. + Returns True if successful, False otherwise. + """ + + if not self.socket: + logging.error("DeditecBsWeu16: Relay not connected.") + return False + + for pin in pins_on: + if not (1 <= pin <= PIN_COUNT): + logging.error(f"DeditecBsWeu16: Invalid pin number {pin} in pins_on.") + return False + + for pin in pins_off: + if not (1 <= pin <= PIN_COUNT): + logging.error(f"DeditecBsWeu16: Invalid pin number {pin} in pins_off.") + return False + + # Update the list of + self.pins_on_latched = list(set(self.pins_on_latched + pins_on) - set(pins_off)) + command = self.assabmle_command(self.pins_on_latched) + + if not self.send_command(command): + logging.error("DeditecBsWeu16: Failed to send command to Deditec device.") + return False + + logging.info(f"DeditecBsWeu16: Changed relay setup. Pins ON: {self.pins_on_latched}") + + return True + + + def assabmle_command(self, pins: List[int]) -> bytes: + """Assembles the command to turn on specified pins on the Deditec BS-WEU-16 board.""" + command_prefix = b"\x63\x9a\x01\x01\x00\x0b\x57\x57\x00\x00" + + pin_mask_value = 0 + for pin in set(pins): # Ensure uniqueness + if isinstance(pin, int) and 1 <= pin <= PIN_COUNT: + pin_mask_value += 2 ** (pin - 1) + else: + logging.warning(f"DeditecBsWeu16: Invalid pin number provided to assabmle_command: {pin}. Ignoring.") + + command = command_prefix + pin_mask_value.to_bytes(2, byteorder="big") + return command + + + def close_connection(self) -> None: + if self.socket: + logging.debug("DeditecBsWeu16: closing connection") + try: + self.socket.close() + except Exception as e: + logging.error(f"DeditecBsWeu16: error closing socket: {e}") + finally: + self.socket = None + else: + logging.debug("Deditec:: close_connection called but already closed.") + + + def __enter__(self) -> Self: + + try: + signal.alarm(self.timeout_seconds + 1) + except ValueError: + logging.warning("Cannot set SIGALRM handler (not on Unix main thread?), relying on socket timeout.") + pass + + if not self.connect(): + signal.alarm(0) + raise ConnectionError(f"Failed to connect to Deditec device at {self.ip}:{self.port}") + + return self + + def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> bool: + + try: + signal.alarm(0) + except ValueError: + pass + self.close_connection() + + if exc_type: + logging.error(f"Deditec:: An error occurred during 'with' block: {exc_type.__name__}: {exc_val}") + return False diff --git a/tools/automatic_battery_tester/hardware_ctl/deditec_driver/__init__.py b/tools/automatic_battery_tester/hardware_ctl/deditec_driver/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/tools/automatic_battery_tester/hardware_ctl/deditec_driver/deditec_1_16_on.py b/tools/automatic_battery_tester/hardware_ctl/deditec_driver/deditec_1_16_on.py deleted file mode 100644 index c1a29f910d..0000000000 --- a/tools/automatic_battery_tester/hardware_ctl/deditec_driver/deditec_1_16_on.py +++ /dev/null @@ -1,127 +0,0 @@ -from __future__ import annotations - -import signal -import socket -import sys -from typing import Any -import logging - -from typing_extensions import Self # Potřebuje pip install typing-extensions - - -# !!! ZKONTROLUJ A PŘÍPADNĚ UPRAV VÝCHOZÍ IP !!! -# Tato IP se použije, pokud RelayController nedostane jinou, -# a hlavně se použije v test_deditec_control.py -IP = "192.168.1.10" # <--- VÝCHOZÍ IP PRO TESTY Z TERMINÁLU -PORT = 9912 - - -class Deditec_1_16_on: - def __init__(self, ip: str = IP, port: int = PORT, timeout_seconds: int = 3): - # Použije předanou IP/port, nebo výchozí hodnoty - self.ip = ip - self.port = port - self.timeout_seconds = max(1, timeout_seconds) # Min 1s timeout - self.socket: socket.socket | None = None # Inicializace na None - logging.debug(f"Deditec communication object created for {self.ip}:{self.port}") - - def connect(self) -> bool: # Vrací bool pro úspěch/neúspěch - if self.socket is not None: - logging.warning("Deditec:: connect called, but socket already exists. Closing first.") - self.close_connection() - - logging.debug(f"Deditec:: connecting to device at {self.ip}:{self.port}...") - try: - self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - # Nastavení timeoutu pro operace socketu (connect, send, recv) - self.socket.settimeout(self.timeout_seconds) - self.socket.connect((self.ip, self.port)) - logging.debug("Deditec:: connection established") - return True - except socket.timeout: - logging.error(f"Deditec:: connection timed out ({self.timeout_seconds}s)") - self.socket = None # Zajistit, že socket je None při chybě - return False - except Exception as e: - logging.exception(f"Deditec:: error when connecting: {e}") - self.socket = None - return False - - def send_command(self, command: bytes) -> int: # Vrací 0 pro úspěch, >0 pro chybu - if self.socket is None: - logging.error("Deditec:: send_command called but not connected.") - return 1 # Chyba - není připojeno - - logging.debug(f"Deditec:: sending command: {command!r}") - try: - self.socket.sendall(command) - # Čekání na odpověď (očekává se nějaká?) - # Původní kód četl data, i když je nepoužíval. Zkusíme to také. - # Velikost bufferu může být malá, pokud neočekáváme velkou odpověď. - data = self.socket.recv(64) # Přečíst malou odpověď - logging.debug(f"Deditec:: received confirmation data (len={len(data)}): {data!r}") - return 0 # Předpokládáme úspěch, pokud sendall a recv nehodily výjimku - except socket.timeout: - logging.error(f"Deditec:: socket timeout during send/recv ({self.timeout_seconds}s)") - return 2 # Chyba - timeout - except Exception as e: - logging.exception(f"Deditec:: error sending command or receiving confirmation: {e}") - return 3 # Jiná chyba - - def close_connection(self) -> None: - if self.socket: - logging.debug("Deditec:: closing connection") - try: - self.socket.close() - except Exception as e: - logging.error(f"Deditec:: error closing socket: {e}") - finally: - self.socket = None # Vždy nastavit na None - else: - logging.debug("Deditec:: close_connection called but already closed.") - - - def __enter__(self) -> Self: - # Nastavení SIGALRM pro celkový timeout operace (funguje jen v hlavním vlákně na Unixu) - # V jiných případech spoléháme na socket timeout - try: - signal.alarm(self.timeout_seconds + 1) # Dáme o 1s víc než socket timeout - except ValueError: # Stává se ve Windows nebo mimo hlavní vlákno - logging.warning("Cannot set SIGALRM handler (not on Unix main thread?), relying on socket timeout.") - pass # Pokračujeme bez alarmu - - if not self.connect(): - signal.alarm(0) # Zrušit alarm, pokud byl nastaven - # Vyvolat specifickou výjimku pro připojení - raise ConnectionError(f"Failed to connect to Deditec device at {self.ip}:{self.port}") - # Nepotřebujeme rušit alarm zde, udělá se v __exit__ - return self - - def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> bool: - # Vždy zrušit alarm a zavřít spojení - try: - signal.alarm(0) - except ValueError: - pass - self.close_connection() - - if exc_type: # Pokud došlo k výjimce uvnitř 'with' bloku - logging.error(f"Deditec:: An error occurred during 'with' block: {exc_type.__name__}: {exc_val}") - # return False # Předáme výjimku dál (standardní chování) - return False # Vždy předat výjimku dál, pokud nastala - - -# --- Kód pro přímé spuštění (test připojení) --- -if __name__ == "__main__": - # Tento test se spustí jen při `python libs/backend/deditec_driver/deditec_1_16_on.py` - print("Running direct connection test...") - # Použijeme try-except místo __enter__/__exit__ pro jednodušší test - tester = Deditec_1_16_on(timeout_seconds=2) # Krátký timeout pro test - if tester.connect(): - print(f"SUCCESS: Connected to Deditec device at {tester.ip}:{tester.port}") - print("Closing connection.") - tester.close_connection() - sys.exit(0) # Úspěch - else: - print(f"ERROR: Failed to connect to Deditec device at {tester.ip}:{tester.port}") - sys.exit(1) # Neúspěch diff --git a/tools/automatic_battery_tester/hardware_ctl/deditec_driver/helpers.py b/tools/automatic_battery_tester/hardware_ctl/deditec_driver/helpers.py deleted file mode 100644 index 4ce30c10d8..0000000000 --- a/tools/automatic_battery_tester/hardware_ctl/deditec_driver/helpers.py +++ /dev/null @@ -1,210 +0,0 @@ -import json -import sys -import logging -from datetime import datetime -from pathlib import Path -from typing import TypedDict, List, Dict - -# --- Relativní a absolutní importy --- -try: - # Relativní import v rámci stejného balíčku - from .deditec_1_16_on import Deditec_1_16_on - # Import z nadřazeného balíčku (backend) - from backend.common import get_logger - imports_ok = True -except ImportError as e: - imports_ok = False - # Fallback logger, pokud selže import common - logging.basicConfig() - logger = logging.getLogger(__name__) - logger.error(f"Failed to import dependencies: {e}. Using fallback logger.") - # Dummy třída, aby zbytek kódu mohl selhat později - class Deditec_1_16_on: pass -else: - # Pokud importy prošly, nastavíme logger - logger = get_logger(__name__) -# ------------------------------------ - - -class Cache(TypedDict): - last_run: str - pins_on: List[int] - -# Cesta k cache souboru relativně k tomuto souboru -HERE = Path(__file__).parent -CACHE_FILE = HERE / "pin_cache.json" - -PIN_COUNT = 16 # Počet relé na desce - -def _ensure_cache_file_exists(): - """Zajistí existenci cache souboru s výchozí strukturou.""" - if not CACHE_FILE.exists(): - logger.warning(f"Cache file {CACHE_FILE} not found, creating default.") - try: - with open(CACHE_FILE, "w") as f: - empty_cache: Cache = {"last_run": datetime.now().isoformat(), "pins_on": []} - json.dump(empty_cache, f, indent=2) # Použít json.dump - except IOError as e: - logger.error(f"Failed to create cache file {CACHE_FILE}: {e}") - -# Zajistíme existenci cache při načtení modulu -_ensure_cache_file_exists() - -# Prefix příkazu pro nastavení všech výstupů najednou -# Zdroj: Dokumentace Deditec nebo reverzní inženýrství? -# DELIBOX-OPTO16-RELAIS16 - Protokollbeschreibung v1.0.pdf (pokud existuje) -# Tento prefix se zdá být specifický pro určitý příkaz. -PREFIX_CMD = b"\x63\x9a\x01\x01\x00\x0b\x57\x57\x00\x00" - -# --- Funkce pro práci s cache a příkazy --- - -def get_pins_on() -> List[int]: - """Načte seznam aktuálně zapnutých pinů z cache souboru.""" - try: - with open(CACHE_FILE) as f: - data: Cache = json.load(f) - # Validace načtených dat? - if isinstance(data.get("pins_on"), List): - # Odstranit duplicity a zajistit čísla - pins = sorted(list(set(int(p) for p in data["pins_on"] if isinstance(p, (int, str)) and str(p).isdigit()))) - # Omezit na platný rozsah - valid_pins = [p for p in pins if 1 <= p <= PIN_COUNT] - if len(valid_pins) != len(data["pins_on"]): - logger.warning(f"Invalid pins found in cache, cleaned up: {data['pins_on']} -> {valid_pins}") - return valid_pins - else: - logger.error(f"Invalid format in cache file {CACHE_FILE}: 'pins_on' is not a List.") - return [] # Vrátit prázdný seznam při chybě formátu - except FileNotFoundError: - logger.error(f"Cache file {CACHE_FILE} not found during read. Returning empty state.") - _ensure_cache_file_exists() # Zkusit znovu vytvořit - return [] - except (json.JSONDecodeError, Exception) as e: - logger.exception(f"Deditec:: failed reading/parsing cache file {CACHE_FILE}: {e}") - return [] - - -def save_new_pins_on(pins_on: List[int]) -> None: - """Uloží nový seznam zapnutých pinů do cache souboru.""" - # Odstranit duplicity a seřadit pro konzistenci - unique_sorted_pins = sorted(list(set(pins_on))) - logger.debug(f"Saving new pins state to cache: {unique_sorted_pins}") - try: - with open(CACHE_FILE, "w") as f: - cache: Cache = {"last_run": datetime.now().isoformat(), "pins_on": unique_sorted_pins} - json.dump(cache, f, indent=2) - except IOError as e: - logger.error(f"Failed to write cache file {CACHE_FILE}: {e}") - - -def get_new_pins_on(on: List[int], off: List[int], all_off: bool) -> List[int]: - """Vypočítá nový seznam zapnutých pinů.""" - if all_off: - return [] # Všechno vypnout - - # Získáme předchozí stav z cache - previous_on_set = set(get_pins_on()) - logger.debug(f"Calculating new state: Previous ON={previous_on_set}, Requested ON={on}, Requested OFF={off}") - - # Aplikujeme změny - current_on_set = (previous_on_set.union(set(on))) - set(off) - - # Vrátíme jako seřazený seznam - new_pins = sorted(list(current_on_set)) - logger.debug(f"Resulting new ON state: {new_pins}") - return new_pins - -def turn_on_pins_command(pins: List[int]) -> bytes: - """Vytvoří bajtový příkaz pro zapnutí daných pinů.""" - # Hodnota reprezentuje bitovou masku zapnutých pinů - # Pin 1 = bit 0 (2^0), Pin 2 = bit 1 (2^1), ..., Pin 16 = bit 15 (2^15) - pin_mask_value = 0 - valid_pins = set() # Sledujeme validní piny pro logování - for pin in set(pins): # Zajistíme unikátnost - if isinstance(pin, int) and 1 <= pin <= PIN_COUNT: - pin_mask_value += 2 ** (pin - 1) - valid_pins.add(pin) - else: - logger.warning(f"Invalid pin number provided to turn_on_pins_command: {pin}. Ignoring.") - - logger.debug(f"Generating command for pins: {sorted(list(valid_pins))}. Mask value: {pin_mask_value}") - # Hodnota masky se přidá jako 2 bajty (big-endian) za prefix - command = PREFIX_CMD + pin_mask_value.to_bytes(2, byteorder="big") - return command - - -# --- Funkce pro přímé ovládání (použité v test_deditec_control) --- -# Tuto funkci náš RelayController nepoužívá, ale test ano -def run_pins_on_off_command_save(ip: str, port: int, pins_on: List[int], pins_off: List[int]) -> bool: - """Kompletní sekvence: výpočet, připojení, odeslání, uložení.""" - logger.info(f"Executing direct command: ON={pins_on}, OFF={pins_off} to {ip}:{port}") - if not imports_ok: - logger.error("Cannot run direct command, imports failed.") - return False - - try: - with Deditec_1_16_on(ip=ip, port=port) as deditec_controller: - new_pins_on = get_new_pins_on(pins_on, pins_off, False) - command = turn_on_pins_command(new_pins_on) - response = deditec_controller.send_command(command) - success = response == 0 - if success: - save_new_pins_on(new_pins_on) - logger.info(f"Direct command successful. New state ON: {new_pins_on}") - else: - logger.error(f"Direct command failed. Deditec response: {response}") - return success - except Exception as e: - logger.exception(f"Error during direct command execution: {e}") - return False - -# --- Funkce pro čtení stavu (použité v test_deditec_control) --- -def get_pins_status_dict() -> Dict[str, bool]: - """Vrátí slovník se stavem všech pinů na základě cache.""" - pins_on = get_pins_on() - pin_status = {str(pin): (pin in pins_on) for pin in range(1, 1 + PIN_COUNT)} - return pin_status - - -# --- Kód pro přímé spuštění (testování funkcí z helpers) --- -if __name__ == "__main__": - print("Running helper tests...") - # Test get_pins_on / save_new_pins_on - print("Current pins ON (from cache):", get_pins_on()) - save_new_pins_on([1, 5, 15]) - print("Set pins [1, 5, 15]. New state from cache:", get_pins_on()) - save_new_pins_on([]) - print("Set pins []. New state from cache:", get_pins_on()) - - # Test get_new_pins_on - save_new_pins_on([2, 4]) - print("Cache state: [2, 4]") - print("get_new_pins_on(on=[1], off=[2], all_off=False) ->", get_new_pins_on(on=[1], off=[2], all_off=False)) # Očekává [1, 4] - print("get_new_pins_on(on=[6], off=[7], all_off=False) ->", get_new_pins_on(on=[6], off=[7], all_off=False)) # Očekává [2, 4, 6] - print("get_new_pins_on(on=[], off=[], all_off=True) ->", get_new_pins_on(on=[], off=[], all_off=True)) # Očekává [] - - # Test turn_on_pins_command - res = turn_on_pins_command([1, 2]) - expected = b"\x63\x9a\x01\x01\x00\x0b\x57\x57\x00\x00\x00\x03" - print(f"Cmd for [1, 2]: {res!r} (Expected: {expected!r}) -> {'OK' if res == expected else 'FAIL'}") - res = turn_on_pins_command([1, 1, 2]) # Test duplicity - print(f"Cmd for [1, 1, 2]: {res!r} (Expected: {expected!r}) -> {'OK' if res == expected else 'FAIL'}") - res = turn_on_pins_command([11, 12]) # Piny 11 a 12 - expected = b"\x63\x9a\x01\x01\x00\x0b\x57\x57\x00\x00\x0c\x00" # 2^10 + 2^11 = 1024 + 2048 = 3072 = 0x0C00 - print(f"Cmd for [11, 12]: {res!r} (Expected: {expected!r}) -> {'OK' if res == expected else 'FAIL'}") - res = turn_on_pins_command([]) # Vše vypnuto - expected = b"\x63\x9a\x01\x01\x00\x0b\x57\x57\x00\x00\x00\x00" - print(f"Cmd for []: {res!r} (Expected: {expected!r}) -> {'OK' if res == expected else 'FAIL'}") - res = turn_on_pins_command([1, 16]) # První a poslední - expected_val = 1 + 2**15 # 1 + 32768 = 32769 = 0x8001 - expected = PREFIX_CMD + expected_val.to_bytes(2, 'big') - print(f"Cmd for [1, 16]: {res!r} (Expected: {expected!r}) -> {'OK' if res == expected else 'FAIL'}") - - # Test get_pins_status_dict - save_new_pins_on([3, 8, 10]) - status = get_pins_status_dict() - print("Status dict for [3, 8, 10]:", status) - print(" Pin 3 status:", status.get('3')) - print(" Pin 4 status:", status.get('4')) - print(" Pin 8 status:", status.get('8')) - print(" Pin 10 status:", status.get('10')) diff --git a/tools/automatic_battery_tester/hardware_ctl/deditec_driver/pin_cache.json b/tools/automatic_battery_tester/hardware_ctl/deditec_driver/pin_cache.json deleted file mode 100644 index 92c46ff2a6..0000000000 --- a/tools/automatic_battery_tester/hardware_ctl/deditec_driver/pin_cache.json +++ /dev/null @@ -1,4 +0,0 @@ -{ - "last_run": "2025-06-17T11:07:53.816919", - "pins_on": [] -} diff --git a/tools/automatic_battery_tester/hardware_ctl/relay_controller.py b/tools/automatic_battery_tester/hardware_ctl/relay_controller.py index e154ca1bdc..ce4400563f 100644 --- a/tools/automatic_battery_tester/hardware_ctl/relay_controller.py +++ b/tools/automatic_battery_tester/hardware_ctl/relay_controller.py @@ -8,19 +8,8 @@ import sys from typing import List, Optional, Set from pathlib import Path -libs_path = Path(__file__).parent.parent / "libs" -if str(libs_path) not in sys.path: - sys.path.insert(0, str(libs_path)) - # Import Deditec drives -try: - - from .deditec_driver.deditec_1_16_on import Deditec_1_16_on - from .deditec_driver.helpers import (get_pins_on, save_new_pins_on, - get_new_pins_on, turn_on_pins_command) - -except ImportError as e: - logging.error(f"ERROR: Failed to import Deditec modules") +from .deditec import DeditecBsWeu16 class RelayController: @@ -38,27 +27,16 @@ class RelayController: self.ip_address = ip_address self.port = self.DEDITEC_PORT - logging.info(f"Initializing Relay Controller for Deditec at {self.ip_address}:{self.DEDITEC_PORT}") - # Ping the device to check connectivity if not self.check_ping(self.ip_address): logging.warning("Ping to Deditec relay board failed. Network issue possible, but attempting TCP check.") - if not self._check_deditec_connection(self.ip_address, self.port): - logging.error("CRITICAL: Failed to establish TCP connection with Deditec relay board.") - - def _check_deditec_connection(self, ip: str, port: int, timeout: int = 2) -> bool: - - logging.info(f"Checking Deditec connection to {ip}:{port} (timeout={timeout}s)...") - - try: - with Deditec_1_16_on(ip=ip, port=port, timeout_seconds=timeout) as tester: - logging.info("Deditec connection established successfuly.") - return True - except ConnectionError as e: logging.error(f"Deditec connection failed: {e}"); return False - except TimeoutError: logging.error(f"Deditec connection timed out."); return False - except Exception as e: logging.error(f"Unexpected error during Deditec connection check: {e}"); return False + self.deditec = DeditecBsWeu16(ip=self.ip_address, port=self.port) + # Connect to deditec relay board + if not self.deditec.connect(): + logging.error(f"Failed to connect to Deditec relay board at {self.ip_address}:{self.DEDITEC_PORT}.") + sys.exit(1) def check_ping(self, ip: str) -> bool: @@ -66,14 +44,14 @@ class RelayController: logging.info(f"Pinging {ip}...") system = platform.system().lower() if system == "windows": - command = ["ping", "-n", "1", "-w", "1000", ip] # Timeout 1000ms + command = ["ping", "-n", "1", "-w", "1000", ip] else: # Linux, macOS - command = ["ping", "-c", "1", "-W", "1", ip] # Timeout 1s + command = ["ping", "-c", "1", "-W", "1", ip] try: process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) - stdout, stderr = process.communicate(timeout=3) # Celkový timeout procesu + stdout, stderr = process.communicate(timeout=3) return_code = process.returncode logging.debug(f"Ping stdout:\n{stdout}") @@ -82,7 +60,6 @@ class RelayController: if return_code == 0: - # Doplňková kontrola (může být závislá na jazyku systému) if "unreachable" in stdout.lower() or "timed out" in stdout.lower() or "ttl expired" in stdout.lower(): logging.error(f"Ping to {ip} technically succeeded (code 0) but output indicates failure.") return False @@ -106,50 +83,21 @@ class RelayController: logging.error(f"Unknown error during ping check: {e}") return False - def _execute_relay_command(self, pins_to_turn_on: List[int], pins_to_turn_off: List[int]) -> bool: - - try: - # 1. Zjistit nový celkový stav - new_pins_state = get_new_pins_on(pins_to_turn_on, pins_to_turn_off, all_off=False) - logging.debug(f"Calculating new relay state: Request ON={pins_to_turn_on}, Request OFF={pins_to_turn_off} -> New total ON state: {new_pins_state}") - - # 2. Připravit command - command_bytes = turn_on_pins_command(new_pins_state) - logging.debug(f"Generated command bytes: {command_bytes!r}") - - # 3. Odeslat command - logging.debug(f"Connecting to Deditec at {self.ip_address}:{self.port} to send command...") - with Deditec_1_16_on(ip=self.ip_address, port=self.port, timeout_seconds=3) as controller: # Timeout pro spojení - response = controller.send_command(command_bytes) # Použije timeout socketu definovaný v Deditec_1_16_on - - # 4. Zkontrolovat a uložit - if response == 0: - logging.debug("Command sent successfully. Saving new state to cache.") - save_new_pins_on(new_pins_state) - logging.debug(f"Relay state updated. Currently ON: {get_pins_on()}") # Zobrazit aktuální stav z cache - return True - else: - logging.error(f"Deditec device returned unexpected response code: {response}") - return False - - except ConnectionError as e: logging.error(f"Failed to connect to Deditec at {self.ip_address}: {e}"); return False - except TimeoutError: logging.error(f"Timeout communicating with Deditec at {self.ip_address}"); return False - except Exception as e: logging.exception(f"An unexpected error occurred during relay operation: {e}"); return False - def set_relay_off(self, pin: int) -> bool: - return self._execute_relay_command(pins_to_turn_on=[], - pins_to_turn_off=[pin]) + + if not self.deditec: + logging.error("RelayController: Deditec not initialized.") + return False + + return self.deditec.control_relay(pins_on=[], pins_off=[pin]) def set_relay_on(self, pin: int) -> bool: - return self._execute_relay_command(pins_to_turn_on=[pin], - pins_to_turn_off=[]) - def turn_all_relays_off(self) -> bool: - """Turn off all relays.""" - logging.info("Turning all relays OFF.") - # Vytvoříme seznam všech pinů (1 až MAX_PIN) - all_pins = list(range(1, self.MAX_PIN + 1)) - return self._execute_relay_command(pins_to_turn_on=[], pins_to_turn_off=all_pins) + if not self.deditec: + logging.error("RelayController: Deditec not initialized.") + return False + + return self.deditec.control_relay(pins_on=[pin], pins_off=[]) def close(self): pass diff --git a/tools/automatic_battery_tester/main_tester.py b/tools/automatic_battery_tester/main_tester.py index c78e28767c..7920d15c3b 100644 --- a/tools/automatic_battery_tester/main_tester.py +++ b/tools/automatic_battery_tester/main_tester.py @@ -8,26 +8,14 @@ from pathlib import Path import subprocess import socket from typing import Optional, Dict, Any - -from test_logic.linear_scenario import LinearScenario -from test_logic.switching_scenario import SwitchingScenario -from test_logic.random_wonder_scenario import RandomWonderScenario +from test_logic import LinearScenario, SwitchingScenario, RandomWonderScenario from notifications import send_slack_message - -project_root = Path(__file__).parent -hw_ctl_path = project_root / "hardware_ctl" -logic_path = project_root / "test_logic" - -sys.path.append(str(project_root)) -if str(hw_ctl_path) not in sys.path: sys.path.insert(0, str(hw_ctl_path)) -if str(logic_path) not in sys.path: sys.path.insert(0, str(logic_path)) - -from hardware_ctl.relay_controller import RelayController -from dut.dut_controller import DutController +from hardware_ctl import RelayController +from dut import DutController # Configure logging log_formatter = log_formatter = logging.Formatter('[%(levelname).1s %(asctime)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S') -log_file = project_root / "test.log" +log_file = "test.log" logger = logging.getLogger() logger.setLevel(logging.DEBUG) @@ -53,7 +41,7 @@ logger.addHandler(console_handler) def load_config(config_path="test_config.toml") -> Optional[Dict[str, Any]]: """Load test configuration from TOML config file .""" - config_file = project_root / config_path + config_file = config_path logging.info(f"Loading configuration file: {config_file}") try: config = toml.load(config_file) @@ -145,7 +133,7 @@ def main(): logging.critical("Failed to load configuration. Exiting.") sys.exit(1) - # Inicializace ovladačů HW + # Initialize hardware controllers logging.info("Initializing hardware controllers...") relay_ctl = None dut_ctl = None @@ -278,16 +266,14 @@ def main(): test_aborted = True finally: - # --- Cleanup --- logging.info("Performing final cleanup...") if relay_ctl: try: logging.info("Ensuring all relays are OFF...") - relay_ctl.turn_all_relays_off() + dut_ctl.power_down_all() # Power down all DUTs relay_ctl.close() except Exception as e_relay: logging.error(f"Error during relay cleanup: {e_relay}") - # ... (cleanup dut_ctl a temp_ctl) ... if dut_ctl: try: dut_ctl.close() except Exception as e_dut: logging.error(f"Error during DUT controller cleanup: {e_dut}") diff --git a/tools/automatic_battery_tester/notifications.py b/tools/automatic_battery_tester/notifications.py index 1b0313f5f2..fb55250b91 100644 --- a/tools/automatic_battery_tester/notifications.py +++ b/tools/automatic_battery_tester/notifications.py @@ -76,19 +76,3 @@ def send_slack_message(webhook_url: Optional[str], message: str, fallback_text: except Exception as e: logger.exception(f"An unexpected error occurred during Slack notification: {e}") return False - -if __name__ == '__main__': - print("Testing Slack notification module (sends as payload parameter)...") - test_webhook_url = "YOUR_SLACK_WEBHOOK_URL" # Replace with your actual Slack Webhook URL - test_msg = "Test z Pythonu :wave: (posláno jako payload parametr).\n*Formátování* by mělo _fungovat_." - fallback = "Test from Python (payload)" - - if "YOUR_SLACK_WEBHOOK_URL" == test_webhook_url or not test_webhook_url: - print("\nPlease replace YOUR_SLACK_WEBHOOK_URL with your actual Slack Webhook URL.") - else: - print(f"Sending test message via webhook...") - success = send_slack_message(test_webhook_url, test_msg, fallback_text=fallback) - if success: - print("\nTest request sent successfully (check your Slack channel).") - else: - print("\nTest request failed.") diff --git a/tools/automatic_battery_tester/test_config.toml b/tools/automatic_battery_tester/test_config.toml index 1199e975c1..41a4b0539e 100644 --- a/tools/automatic_battery_tester/test_config.toml +++ b/tools/automatic_battery_tester/test_config.toml @@ -62,7 +62,6 @@ random_wonder_relaxation_time_min = 30 [notifications] notification_channel = "slack" -slack_webhook_url = "https://hooks.slack.com/services/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" -#slack_webhook_url = "https://hooks.slack.com/services/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" +slack_webhook_url = "" diff --git a/tools/automatic_battery_tester/test_logic/__init__.py b/tools/automatic_battery_tester/test_logic/__init__.py index e69de29bb2..2b357a0123 100644 --- a/tools/automatic_battery_tester/test_logic/__init__.py +++ b/tools/automatic_battery_tester/test_logic/__init__.py @@ -0,0 +1,4 @@ +from .test_scenario import TestScenario +from .switching_scenario import SwitchingScenario +from .linear_scenario import LinearScenario +from .random_wonder_scenario import RandomWonderScenario