1
0
mirror of https://github.com/trezor/trezor-firmware.git synced 2025-07-22 22:48:20 +00:00

feat(core): rationalized package imports, reworked/simplify deditec library, purge Czech comments.

[no changelog]
This commit is contained in:
kopecdav 2025-06-20 12:57:26 +02:00 committed by kopecdav
parent 9f74704e89
commit 6837f3ab5f
17 changed files with 220 additions and 513 deletions

View File

@ -2,4 +2,4 @@
venv/ venv/
test_results/ test_results/
test.log test.log

View File

@ -83,7 +83,7 @@ test_modes = ["linear", "switching", "random_wonder"]
```sh ```sh
python main_tester.py python main_tester.py
``` ```
4. Follow console and log (`test.log`) instructions. 4. Follow console instructions or check log in "test.log" file.
--- ---

View File

@ -0,0 +1,2 @@
from .dut_controller import DutController
from .dut import Dut

View File

@ -1,5 +1,3 @@
import serial import serial
import time import time
import logging import logging
@ -40,9 +38,6 @@ class DutProdtestResponse:
data_entries: list = field(default_factory=list) data_entries: list = field(default_factory=list)
OK: bool = False OK: bool = False
# def __init__(self):
# self.trace = []
# self.data_entries = []
class Dut(): class Dut():
@ -72,7 +67,7 @@ class Dut():
self.entry_interactive_mode() self.entry_interactive_mode()
self.enable_charging() self.enable_charging()
time.sleep(2) # Give some time for the command to be processed time.sleep(2) # Give some time to process te commands
if not self.ping(): if not self.ping():
self.init_error() self.init_error()
@ -351,12 +346,23 @@ class Dut():
prefix = f"\033[95m[{self.name}]\033[0m" prefix = f"\033[95m[{self.name}]\033[0m"
logging.debug(prefix + " < " + message) logging.debug(prefix + " < " + message)
def close(self):
"""
Close the DUT's serial port and clean up resources.
"""
if self.vcp is not None and self.vcp.is_open:
try:
self.vcp.close()
except Exception as e:
logging.warning(f"Failed to close VCP for {self.name}: {e}")
self.vcp = None
self.name = None
self.relay_ctl = None
self.relay_port = None
def __del__(self): def __del__(self):
try: try:
if hasattr(self, "vcp") and self.vcp is not None and self.vcp.is_open: if hasattr(self, "vcp") and self.vcp is not None and self.vcp.is_open:
self.vcp.close() self.close()
except Exception as e: except Exception as e:
logging.warning(f"Failed to close VCP for {self.name}: {e}") logging.warning(f"Error during DUT cleanup: {e}")
self.vcp = None
self.name = None
self.verbose = None

View File

@ -1,34 +1,15 @@
import time import time
import logging import logging
import sys
from pathlib import Path
from dataclasses import dataclass from dataclasses import dataclass
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from .dut import Dut from .dut import Dut
from hardware_ctl.relay_controller import RelayController from hardware_ctl.relay_controller import RelayController
# Podmíněný import pro type hinting
if TYPE_CHECKING:
from libs.prodtest_cli import prodtest_cli, ProdtestResponse
# Přidání cesty k libs pro import prodtest_cli
import sys
from pathlib import Path
libs_path_ctrl = Path(__file__).parent.parent / "libs"
if str(libs_path_ctrl) not in sys.path:
sys.path.insert(0, str(libs_path_ctrl))
# Import prodtest_cli po úpravě sys.path
try:
from prodtest_cli import prodtest_cli, ProdtestResponse
except ImportError as e:
logging.error(f"FATAL: Could not import prodtest_cli from {libs_path_ctrl}. Check file presence and structure.")
# Definujeme dummy třídy, aby zbytek mohl selhat při inicializaci
class ProdtestResponse: pass
class prodtest_cli:
def __init__(self, *args, **kwargs): raise ImportError("prodtest_cli not found")
@dataclass @dataclass
class ProdtestPmReport: class ProdtestPmReport:
"""Dataclass pro uchování výsledků pm-report.""" """pm-report command response data structure"""
power_state: str = "" power_state: str = ""
usb: str = "" usb: str = ""
wlc: str = "" wlc: str = ""
@ -70,7 +51,7 @@ class DutController:
""" """
Device-under-test (DUT) controller. Device-under-test (DUT) controller.
provides direct simulataneous control of configured DUTs provides direct simultaneous control of configured DUTs
""" """
def __init__(self, duts, relay_ctl, verbose: bool = False): def __init__(self, duts, relay_ctl, verbose: bool = False):
@ -79,19 +60,19 @@ class DutController:
# Power off all DUTs before self test # Power off all DUTs before self test
for d in duts: for d in duts:
self.relay_ctl.set_relay_off(d["relay_port"]) self.relay_ctl.set_relay_off(d['relay_port'])
for d in duts: for d in duts:
try: try:
dut = Dut(name=d["name"], dut = Dut(name=d['name'],
cpu_id=d["cpu_id"], cpu_id=d['cpu_id'],
usb_port=d["usb_port"], usb_port=d['usb_port'],
relay_port=d["relay_port"], relay_port=d['relay_port'],
relay_ctl=self.relay_ctl, relay_ctl=self.relay_ctl,
verbose=True) verbose=verbose)
self.duts.append(dut) self.duts.append(dut)
logging.info(f"Initialized {d["name"]} on port {d['usb_port']}") logging.info(f"Initialized {d['name']} on port {d['usb_port']}")
logging.info(f" -- cpu_id hash : {dut.get_cpu_id_hash()}") logging.info(f" -- cpu_id hash : {dut.get_cpu_id_hash()}")
logging.info(f" -- relay port : {dut.get_relay_port()}") logging.info(f" -- relay port : {dut.get_relay_port()}")
@ -113,7 +94,6 @@ class DutController:
for d in self.duts: for d in self.duts:
d.power_down() d.power_down()
def enable_charging(self): def enable_charging(self):
""" """
Enable charging on all DUTs. Enable charging on all DUTs.
@ -239,24 +219,12 @@ class DutController:
test_phase, test_phase,
temp) temp)
# --- Cleanup ---
def close(self): def close(self):
pass for d in self.duts:
# """Ukončí sériovou komunikaci.""" try:
# if self.cli and hasattr(self.cli, 'vcp') and self.cli.vcp: d.close()
# logging.info("Closing DUT serial connection...") except Exception as e:
# try: logging.error(f"Failed to close DUT {d.name}: {e}")
# self.cli.vcp.close()
# except Exception as e:
# logging.error(f"Error closing serial port: {e}")
# self.cli = None
# elif self.cli:
# logging.debug("DUT controller had cli object, but no active vcp to close.")
# self.cli = None
# else:
# logging.debug("DUT controller already closed or not initialized.")
def __del__(self): def __del__(self):
# Zajistí zavření portu i při neočekávaném ukončení objektu
self.close() self.close()

View File

@ -0,0 +1 @@
from .relay_controller import RelayController

View File

@ -0,0 +1 @@
from .bs_weu_16 import DeditecBsWeu16

View File

@ -0,0 +1,149 @@
import logging
import signal
import socket
from typing import Any, List
from typing_extensions import Self
IP = "192.168.1.10" # default static IP address
PORT = 9912
PIN_COUNT = 16 # Total number of pins on the Deditec BS-WEU-16 board
class DeditecBsWeu16:
def __init__(self, ip: str = IP, port: int = PORT, timeout_seconds: int = 3):
self.ip = ip
self.port = port
self.timeout_seconds = max(1, timeout_seconds)
self.socket: socket.socket | None = None
self.pins_on_latched = []
logging.debug(f"DeditecBsWeu16: instance created on {self.ip}:{self.port}")
def connect(self) -> bool:
if self.socket is not None:
logging.warning("DeditecBsWeu16: connect called, but socket already exists. Closing first.")
self.close_connection()
logging.debug(f"DeditecBsWeu16: connecting to device at {self.ip}:{self.port}...")
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.settimeout(self.timeout_seconds)
self.socket.connect((self.ip, self.port))
logging.debug("DeditecBsWeu16: connection established")
return True
except socket.timeout:
logging.error(f"DeditecBsWeu16: connection timed out ({self.timeout_seconds}s)")
self.socket = None
return False
except Exception as e:
logging.exception(f"DeditecBsWeu16: connection error > {e}")
self.socket = None
return False
def send_command(self, command: bytes):
if self.socket is None:
logging.error("DeditecBsWeu16: send_command called but not connected.")
return False
logging.debug(f"DeditecBsWeu16: sending command: {command!r}")
try:
self.socket.sendall(command)
data = self.socket.recv(64)
logging.debug(f"DeditecBsWeu16: received confirmation data (len={len(data)}): {data!r}")
return True
except socket.timeout:
logging.error(f"DeditecBsWeu16: socket timeout during send/recv ({self.timeout_seconds}s)")
return False
except Exception as e:
logging.exception(f"DeditecBsWeu16: error sending command or receiving confirmation: {e}")
return False
def control_relay(self, pins_on: List[int], pins_off: List[int]) -> bool:
""" Turns on all pins specified in pins_on list and turns off all pins specified in pins_off list.
Returns True if successful, False otherwise.
"""
if not self.socket:
logging.error("DeditecBsWeu16: Relay not connected.")
return False
for pin in pins_on:
if not (1 <= pin <= PIN_COUNT):
logging.error(f"DeditecBsWeu16: Invalid pin number {pin} in pins_on.")
return False
for pin in pins_off:
if not (1 <= pin <= PIN_COUNT):
logging.error(f"DeditecBsWeu16: Invalid pin number {pin} in pins_off.")
return False
# Update the list of
self.pins_on_latched = list(set(self.pins_on_latched + pins_on) - set(pins_off))
command = self.assabmle_command(self.pins_on_latched)
if not self.send_command(command):
logging.error("DeditecBsWeu16: Failed to send command to Deditec device.")
return False
logging.info(f"DeditecBsWeu16: Changed relay setup. Pins ON: {self.pins_on_latched}")
return True
def assabmle_command(self, pins: List[int]) -> bytes:
"""Assembles the command to turn on specified pins on the Deditec BS-WEU-16 board."""
command_prefix = b"\x63\x9a\x01\x01\x00\x0b\x57\x57\x00\x00"
pin_mask_value = 0
for pin in set(pins): # Ensure uniqueness
if isinstance(pin, int) and 1 <= pin <= PIN_COUNT:
pin_mask_value += 2 ** (pin - 1)
else:
logging.warning(f"DeditecBsWeu16: Invalid pin number provided to assabmle_command: {pin}. Ignoring.")
command = command_prefix + pin_mask_value.to_bytes(2, byteorder="big")
return command
def close_connection(self) -> None:
if self.socket:
logging.debug("DeditecBsWeu16: closing connection")
try:
self.socket.close()
except Exception as e:
logging.error(f"DeditecBsWeu16: error closing socket: {e}")
finally:
self.socket = None
else:
logging.debug("Deditec:: close_connection called but already closed.")
def __enter__(self) -> Self:
try:
signal.alarm(self.timeout_seconds + 1)
except ValueError:
logging.warning("Cannot set SIGALRM handler (not on Unix main thread?), relying on socket timeout.")
pass
if not self.connect():
signal.alarm(0)
raise ConnectionError(f"Failed to connect to Deditec device at {self.ip}:{self.port}")
return self
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> bool:
try:
signal.alarm(0)
except ValueError:
pass
self.close_connection()
if exc_type:
logging.error(f"Deditec:: An error occurred during 'with' block: {exc_type.__name__}: {exc_val}")
return False

View File

@ -1,127 +0,0 @@
from __future__ import annotations
import signal
import socket
import sys
from typing import Any
import logging
from typing_extensions import Self # Potřebuje pip install typing-extensions
# !!! ZKONTROLUJ A PŘÍPADNĚ UPRAV VÝCHOZÍ IP !!!
# Tato IP se použije, pokud RelayController nedostane jinou,
# a hlavně se použije v test_deditec_control.py
IP = "192.168.1.10" # <--- VÝCHOZÍ IP PRO TESTY Z TERMINÁLU
PORT = 9912
class Deditec_1_16_on:
def __init__(self, ip: str = IP, port: int = PORT, timeout_seconds: int = 3):
# Použije předanou IP/port, nebo výchozí hodnoty
self.ip = ip
self.port = port
self.timeout_seconds = max(1, timeout_seconds) # Min 1s timeout
self.socket: socket.socket | None = None # Inicializace na None
logging.debug(f"Deditec communication object created for {self.ip}:{self.port}")
def connect(self) -> bool: # Vrací bool pro úspěch/neúspěch
if self.socket is not None:
logging.warning("Deditec:: connect called, but socket already exists. Closing first.")
self.close_connection()
logging.debug(f"Deditec:: connecting to device at {self.ip}:{self.port}...")
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Nastavení timeoutu pro operace socketu (connect, send, recv)
self.socket.settimeout(self.timeout_seconds)
self.socket.connect((self.ip, self.port))
logging.debug("Deditec:: connection established")
return True
except socket.timeout:
logging.error(f"Deditec:: connection timed out ({self.timeout_seconds}s)")
self.socket = None # Zajistit, že socket je None při chybě
return False
except Exception as e:
logging.exception(f"Deditec:: error when connecting: {e}")
self.socket = None
return False
def send_command(self, command: bytes) -> int: # Vrací 0 pro úspěch, >0 pro chybu
if self.socket is None:
logging.error("Deditec:: send_command called but not connected.")
return 1 # Chyba - není připojeno
logging.debug(f"Deditec:: sending command: {command!r}")
try:
self.socket.sendall(command)
# Čekání na odpověď (očekává se nějaká?)
# Původní kód četl data, i když je nepoužíval. Zkusíme to také.
# Velikost bufferu může být malá, pokud neočekáváme velkou odpověď.
data = self.socket.recv(64) # Přečíst malou odpověď
logging.debug(f"Deditec:: received confirmation data (len={len(data)}): {data!r}")
return 0 # Předpokládáme úspěch, pokud sendall a recv nehodily výjimku
except socket.timeout:
logging.error(f"Deditec:: socket timeout during send/recv ({self.timeout_seconds}s)")
return 2 # Chyba - timeout
except Exception as e:
logging.exception(f"Deditec:: error sending command or receiving confirmation: {e}")
return 3 # Jiná chyba
def close_connection(self) -> None:
if self.socket:
logging.debug("Deditec:: closing connection")
try:
self.socket.close()
except Exception as e:
logging.error(f"Deditec:: error closing socket: {e}")
finally:
self.socket = None # Vždy nastavit na None
else:
logging.debug("Deditec:: close_connection called but already closed.")
def __enter__(self) -> Self:
# Nastavení SIGALRM pro celkový timeout operace (funguje jen v hlavním vlákně na Unixu)
# V jiných případech spoléháme na socket timeout
try:
signal.alarm(self.timeout_seconds + 1) # Dáme o 1s víc než socket timeout
except ValueError: # Stává se ve Windows nebo mimo hlavní vlákno
logging.warning("Cannot set SIGALRM handler (not on Unix main thread?), relying on socket timeout.")
pass # Pokračujeme bez alarmu
if not self.connect():
signal.alarm(0) # Zrušit alarm, pokud byl nastaven
# Vyvolat specifickou výjimku pro připojení
raise ConnectionError(f"Failed to connect to Deditec device at {self.ip}:{self.port}")
# Nepotřebujeme rušit alarm zde, udělá se v __exit__
return self
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> bool:
# Vždy zrušit alarm a zavřít spojení
try:
signal.alarm(0)
except ValueError:
pass
self.close_connection()
if exc_type: # Pokud došlo k výjimce uvnitř 'with' bloku
logging.error(f"Deditec:: An error occurred during 'with' block: {exc_type.__name__}: {exc_val}")
# return False # Předáme výjimku dál (standardní chování)
return False # Vždy předat výjimku dál, pokud nastala
# --- Kód pro přímé spuštění (test připojení) ---
if __name__ == "__main__":
# Tento test se spustí jen při `python libs/backend/deditec_driver/deditec_1_16_on.py`
print("Running direct connection test...")
# Použijeme try-except místo __enter__/__exit__ pro jednodušší test
tester = Deditec_1_16_on(timeout_seconds=2) # Krátký timeout pro test
if tester.connect():
print(f"SUCCESS: Connected to Deditec device at {tester.ip}:{tester.port}")
print("Closing connection.")
tester.close_connection()
sys.exit(0) # Úspěch
else:
print(f"ERROR: Failed to connect to Deditec device at {tester.ip}:{tester.port}")
sys.exit(1) # Neúspěch

View File

@ -1,210 +0,0 @@
import json
import sys
import logging
from datetime import datetime
from pathlib import Path
from typing import TypedDict, List, Dict
# --- Relativní a absolutní importy ---
try:
# Relativní import v rámci stejného balíčku
from .deditec_1_16_on import Deditec_1_16_on
# Import z nadřazeného balíčku (backend)
from backend.common import get_logger
imports_ok = True
except ImportError as e:
imports_ok = False
# Fallback logger, pokud selže import common
logging.basicConfig()
logger = logging.getLogger(__name__)
logger.error(f"Failed to import dependencies: {e}. Using fallback logger.")
# Dummy třída, aby zbytek kódu mohl selhat později
class Deditec_1_16_on: pass
else:
# Pokud importy prošly, nastavíme logger
logger = get_logger(__name__)
# ------------------------------------
class Cache(TypedDict):
last_run: str
pins_on: List[int]
# Cesta k cache souboru relativně k tomuto souboru
HERE = Path(__file__).parent
CACHE_FILE = HERE / "pin_cache.json"
PIN_COUNT = 16 # Počet relé na desce
def _ensure_cache_file_exists():
"""Zajistí existenci cache souboru s výchozí strukturou."""
if not CACHE_FILE.exists():
logger.warning(f"Cache file {CACHE_FILE} not found, creating default.")
try:
with open(CACHE_FILE, "w") as f:
empty_cache: Cache = {"last_run": datetime.now().isoformat(), "pins_on": []}
json.dump(empty_cache, f, indent=2) # Použít json.dump
except IOError as e:
logger.error(f"Failed to create cache file {CACHE_FILE}: {e}")
# Zajistíme existenci cache při načtení modulu
_ensure_cache_file_exists()
# Prefix příkazu pro nastavení všech výstupů najednou
# Zdroj: Dokumentace Deditec nebo reverzní inženýrství?
# DELIBOX-OPTO16-RELAIS16 - Protokollbeschreibung v1.0.pdf (pokud existuje)
# Tento prefix se zdá být specifický pro určitý příkaz.
PREFIX_CMD = b"\x63\x9a\x01\x01\x00\x0b\x57\x57\x00\x00"
# --- Funkce pro práci s cache a příkazy ---
def get_pins_on() -> List[int]:
"""Načte seznam aktuálně zapnutých pinů z cache souboru."""
try:
with open(CACHE_FILE) as f:
data: Cache = json.load(f)
# Validace načtených dat?
if isinstance(data.get("pins_on"), List):
# Odstranit duplicity a zajistit čísla
pins = sorted(list(set(int(p) for p in data["pins_on"] if isinstance(p, (int, str)) and str(p).isdigit())))
# Omezit na platný rozsah
valid_pins = [p for p in pins if 1 <= p <= PIN_COUNT]
if len(valid_pins) != len(data["pins_on"]):
logger.warning(f"Invalid pins found in cache, cleaned up: {data['pins_on']} -> {valid_pins}")
return valid_pins
else:
logger.error(f"Invalid format in cache file {CACHE_FILE}: 'pins_on' is not a List.")
return [] # Vrátit prázdný seznam při chybě formátu
except FileNotFoundError:
logger.error(f"Cache file {CACHE_FILE} not found during read. Returning empty state.")
_ensure_cache_file_exists() # Zkusit znovu vytvořit
return []
except (json.JSONDecodeError, Exception) as e:
logger.exception(f"Deditec:: failed reading/parsing cache file {CACHE_FILE}: {e}")
return []
def save_new_pins_on(pins_on: List[int]) -> None:
"""Uloží nový seznam zapnutých pinů do cache souboru."""
# Odstranit duplicity a seřadit pro konzistenci
unique_sorted_pins = sorted(list(set(pins_on)))
logger.debug(f"Saving new pins state to cache: {unique_sorted_pins}")
try:
with open(CACHE_FILE, "w") as f:
cache: Cache = {"last_run": datetime.now().isoformat(), "pins_on": unique_sorted_pins}
json.dump(cache, f, indent=2)
except IOError as e:
logger.error(f"Failed to write cache file {CACHE_FILE}: {e}")
def get_new_pins_on(on: List[int], off: List[int], all_off: bool) -> List[int]:
"""Vypočítá nový seznam zapnutých pinů."""
if all_off:
return [] # Všechno vypnout
# Získáme předchozí stav z cache
previous_on_set = set(get_pins_on())
logger.debug(f"Calculating new state: Previous ON={previous_on_set}, Requested ON={on}, Requested OFF={off}")
# Aplikujeme změny
current_on_set = (previous_on_set.union(set(on))) - set(off)
# Vrátíme jako seřazený seznam
new_pins = sorted(list(current_on_set))
logger.debug(f"Resulting new ON state: {new_pins}")
return new_pins
def turn_on_pins_command(pins: List[int]) -> bytes:
"""Vytvoří bajtový příkaz pro zapnutí daných pinů."""
# Hodnota reprezentuje bitovou masku zapnutých pinů
# Pin 1 = bit 0 (2^0), Pin 2 = bit 1 (2^1), ..., Pin 16 = bit 15 (2^15)
pin_mask_value = 0
valid_pins = set() # Sledujeme validní piny pro logování
for pin in set(pins): # Zajistíme unikátnost
if isinstance(pin, int) and 1 <= pin <= PIN_COUNT:
pin_mask_value += 2 ** (pin - 1)
valid_pins.add(pin)
else:
logger.warning(f"Invalid pin number provided to turn_on_pins_command: {pin}. Ignoring.")
logger.debug(f"Generating command for pins: {sorted(list(valid_pins))}. Mask value: {pin_mask_value}")
# Hodnota masky se přidá jako 2 bajty (big-endian) za prefix
command = PREFIX_CMD + pin_mask_value.to_bytes(2, byteorder="big")
return command
# --- Funkce pro přímé ovládání (použité v test_deditec_control) ---
# Tuto funkci náš RelayController nepoužívá, ale test ano
def run_pins_on_off_command_save(ip: str, port: int, pins_on: List[int], pins_off: List[int]) -> bool:
"""Kompletní sekvence: výpočet, připojení, odeslání, uložení."""
logger.info(f"Executing direct command: ON={pins_on}, OFF={pins_off} to {ip}:{port}")
if not imports_ok:
logger.error("Cannot run direct command, imports failed.")
return False
try:
with Deditec_1_16_on(ip=ip, port=port) as deditec_controller:
new_pins_on = get_new_pins_on(pins_on, pins_off, False)
command = turn_on_pins_command(new_pins_on)
response = deditec_controller.send_command(command)
success = response == 0
if success:
save_new_pins_on(new_pins_on)
logger.info(f"Direct command successful. New state ON: {new_pins_on}")
else:
logger.error(f"Direct command failed. Deditec response: {response}")
return success
except Exception as e:
logger.exception(f"Error during direct command execution: {e}")
return False
# --- Funkce pro čtení stavu (použité v test_deditec_control) ---
def get_pins_status_dict() -> Dict[str, bool]:
"""Vrátí slovník se stavem všech pinů na základě cache."""
pins_on = get_pins_on()
pin_status = {str(pin): (pin in pins_on) for pin in range(1, 1 + PIN_COUNT)}
return pin_status
# --- Kód pro přímé spuštění (testování funkcí z helpers) ---
if __name__ == "__main__":
print("Running helper tests...")
# Test get_pins_on / save_new_pins_on
print("Current pins ON (from cache):", get_pins_on())
save_new_pins_on([1, 5, 15])
print("Set pins [1, 5, 15]. New state from cache:", get_pins_on())
save_new_pins_on([])
print("Set pins []. New state from cache:", get_pins_on())
# Test get_new_pins_on
save_new_pins_on([2, 4])
print("Cache state: [2, 4]")
print("get_new_pins_on(on=[1], off=[2], all_off=False) ->", get_new_pins_on(on=[1], off=[2], all_off=False)) # Očekává [1, 4]
print("get_new_pins_on(on=[6], off=[7], all_off=False) ->", get_new_pins_on(on=[6], off=[7], all_off=False)) # Očekává [2, 4, 6]
print("get_new_pins_on(on=[], off=[], all_off=True) ->", get_new_pins_on(on=[], off=[], all_off=True)) # Očekává []
# Test turn_on_pins_command
res = turn_on_pins_command([1, 2])
expected = b"\x63\x9a\x01\x01\x00\x0b\x57\x57\x00\x00\x00\x03"
print(f"Cmd for [1, 2]: {res!r} (Expected: {expected!r}) -> {'OK' if res == expected else 'FAIL'}")
res = turn_on_pins_command([1, 1, 2]) # Test duplicity
print(f"Cmd for [1, 1, 2]: {res!r} (Expected: {expected!r}) -> {'OK' if res == expected else 'FAIL'}")
res = turn_on_pins_command([11, 12]) # Piny 11 a 12
expected = b"\x63\x9a\x01\x01\x00\x0b\x57\x57\x00\x00\x0c\x00" # 2^10 + 2^11 = 1024 + 2048 = 3072 = 0x0C00
print(f"Cmd for [11, 12]: {res!r} (Expected: {expected!r}) -> {'OK' if res == expected else 'FAIL'}")
res = turn_on_pins_command([]) # Vše vypnuto
expected = b"\x63\x9a\x01\x01\x00\x0b\x57\x57\x00\x00\x00\x00"
print(f"Cmd for []: {res!r} (Expected: {expected!r}) -> {'OK' if res == expected else 'FAIL'}")
res = turn_on_pins_command([1, 16]) # První a poslední
expected_val = 1 + 2**15 # 1 + 32768 = 32769 = 0x8001
expected = PREFIX_CMD + expected_val.to_bytes(2, 'big')
print(f"Cmd for [1, 16]: {res!r} (Expected: {expected!r}) -> {'OK' if res == expected else 'FAIL'}")
# Test get_pins_status_dict
save_new_pins_on([3, 8, 10])
status = get_pins_status_dict()
print("Status dict for [3, 8, 10]:", status)
print(" Pin 3 status:", status.get('3'))
print(" Pin 4 status:", status.get('4'))
print(" Pin 8 status:", status.get('8'))
print(" Pin 10 status:", status.get('10'))

View File

@ -1,4 +0,0 @@
{
"last_run": "2025-06-17T11:07:53.816919",
"pins_on": []
}

View File

@ -8,19 +8,8 @@ import sys
from typing import List, Optional, Set from typing import List, Optional, Set
from pathlib import Path from pathlib import Path
libs_path = Path(__file__).parent.parent / "libs"
if str(libs_path) not in sys.path:
sys.path.insert(0, str(libs_path))
# Import Deditec drives # Import Deditec drives
try: from .deditec import DeditecBsWeu16
from .deditec_driver.deditec_1_16_on import Deditec_1_16_on
from .deditec_driver.helpers import (get_pins_on, save_new_pins_on,
get_new_pins_on, turn_on_pins_command)
except ImportError as e:
logging.error(f"ERROR: Failed to import Deditec modules")
class RelayController: class RelayController:
@ -38,27 +27,16 @@ class RelayController:
self.ip_address = ip_address self.ip_address = ip_address
self.port = self.DEDITEC_PORT self.port = self.DEDITEC_PORT
logging.info(f"Initializing Relay Controller for Deditec at {self.ip_address}:{self.DEDITEC_PORT}")
# Ping the device to check connectivity # Ping the device to check connectivity
if not self.check_ping(self.ip_address): if not self.check_ping(self.ip_address):
logging.warning("Ping to Deditec relay board failed. Network issue possible, but attempting TCP check.") logging.warning("Ping to Deditec relay board failed. Network issue possible, but attempting TCP check.")
if not self._check_deditec_connection(self.ip_address, self.port): self.deditec = DeditecBsWeu16(ip=self.ip_address, port=self.port)
logging.error("CRITICAL: Failed to establish TCP connection with Deditec relay board.")
def _check_deditec_connection(self, ip: str, port: int, timeout: int = 2) -> bool:
logging.info(f"Checking Deditec connection to {ip}:{port} (timeout={timeout}s)...")
try:
with Deditec_1_16_on(ip=ip, port=port, timeout_seconds=timeout) as tester:
logging.info("Deditec connection established successfuly.")
return True
except ConnectionError as e: logging.error(f"Deditec connection failed: {e}"); return False
except TimeoutError: logging.error(f"Deditec connection timed out."); return False
except Exception as e: logging.error(f"Unexpected error during Deditec connection check: {e}"); return False
# Connect to deditec relay board
if not self.deditec.connect():
logging.error(f"Failed to connect to Deditec relay board at {self.ip_address}:{self.DEDITEC_PORT}.")
sys.exit(1)
def check_ping(self, ip: str) -> bool: def check_ping(self, ip: str) -> bool:
@ -66,14 +44,14 @@ class RelayController:
logging.info(f"Pinging {ip}...") logging.info(f"Pinging {ip}...")
system = platform.system().lower() system = platform.system().lower()
if system == "windows": if system == "windows":
command = ["ping", "-n", "1", "-w", "1000", ip] # Timeout 1000ms command = ["ping", "-n", "1", "-w", "1000", ip]
else: # Linux, macOS else: # Linux, macOS
command = ["ping", "-c", "1", "-W", "1", ip] # Timeout 1s command = ["ping", "-c", "1", "-W", "1", ip]
try: try:
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
stdout, stderr = process.communicate(timeout=3) # Celkový timeout procesu stdout, stderr = process.communicate(timeout=3)
return_code = process.returncode return_code = process.returncode
logging.debug(f"Ping stdout:\n{stdout}") logging.debug(f"Ping stdout:\n{stdout}")
@ -82,7 +60,6 @@ class RelayController:
if return_code == 0: if return_code == 0:
# Doplňková kontrola (může být závislá na jazyku systému)
if "unreachable" in stdout.lower() or "timed out" in stdout.lower() or "ttl expired" in stdout.lower(): if "unreachable" in stdout.lower() or "timed out" in stdout.lower() or "ttl expired" in stdout.lower():
logging.error(f"Ping to {ip} technically succeeded (code 0) but output indicates failure.") logging.error(f"Ping to {ip} technically succeeded (code 0) but output indicates failure.")
return False return False
@ -106,50 +83,21 @@ class RelayController:
logging.error(f"Unknown error during ping check: {e}") logging.error(f"Unknown error during ping check: {e}")
return False return False
def _execute_relay_command(self, pins_to_turn_on: List[int], pins_to_turn_off: List[int]) -> bool:
try:
# 1. Zjistit nový celkový stav
new_pins_state = get_new_pins_on(pins_to_turn_on, pins_to_turn_off, all_off=False)
logging.debug(f"Calculating new relay state: Request ON={pins_to_turn_on}, Request OFF={pins_to_turn_off} -> New total ON state: {new_pins_state}")
# 2. Připravit command
command_bytes = turn_on_pins_command(new_pins_state)
logging.debug(f"Generated command bytes: {command_bytes!r}")
# 3. Odeslat command
logging.debug(f"Connecting to Deditec at {self.ip_address}:{self.port} to send command...")
with Deditec_1_16_on(ip=self.ip_address, port=self.port, timeout_seconds=3) as controller: # Timeout pro spojení
response = controller.send_command(command_bytes) # Použije timeout socketu definovaný v Deditec_1_16_on
# 4. Zkontrolovat a uložit
if response == 0:
logging.debug("Command sent successfully. Saving new state to cache.")
save_new_pins_on(new_pins_state)
logging.debug(f"Relay state updated. Currently ON: {get_pins_on()}") # Zobrazit aktuální stav z cache
return True
else:
logging.error(f"Deditec device returned unexpected response code: {response}")
return False
except ConnectionError as e: logging.error(f"Failed to connect to Deditec at {self.ip_address}: {e}"); return False
except TimeoutError: logging.error(f"Timeout communicating with Deditec at {self.ip_address}"); return False
except Exception as e: logging.exception(f"An unexpected error occurred during relay operation: {e}"); return False
def set_relay_off(self, pin: int) -> bool: def set_relay_off(self, pin: int) -> bool:
return self._execute_relay_command(pins_to_turn_on=[],
pins_to_turn_off=[pin]) if not self.deditec:
logging.error("RelayController: Deditec not initialized.")
return False
return self.deditec.control_relay(pins_on=[], pins_off=[pin])
def set_relay_on(self, pin: int) -> bool: def set_relay_on(self, pin: int) -> bool:
return self._execute_relay_command(pins_to_turn_on=[pin],
pins_to_turn_off=[])
def turn_all_relays_off(self) -> bool: if not self.deditec:
"""Turn off all relays.""" logging.error("RelayController: Deditec not initialized.")
logging.info("Turning all relays OFF.") return False
# Vytvoříme seznam všech pinů (1 až MAX_PIN)
all_pins = list(range(1, self.MAX_PIN + 1)) return self.deditec.control_relay(pins_on=[pin], pins_off=[])
return self._execute_relay_command(pins_to_turn_on=[], pins_to_turn_off=all_pins)
def close(self): def close(self):
pass pass

View File

@ -8,26 +8,14 @@ from pathlib import Path
import subprocess import subprocess
import socket import socket
from typing import Optional, Dict, Any from typing import Optional, Dict, Any
from test_logic import LinearScenario, SwitchingScenario, RandomWonderScenario
from test_logic.linear_scenario import LinearScenario
from test_logic.switching_scenario import SwitchingScenario
from test_logic.random_wonder_scenario import RandomWonderScenario
from notifications import send_slack_message from notifications import send_slack_message
from hardware_ctl import RelayController
project_root = Path(__file__).parent from dut import DutController
hw_ctl_path = project_root / "hardware_ctl"
logic_path = project_root / "test_logic"
sys.path.append(str(project_root))
if str(hw_ctl_path) not in sys.path: sys.path.insert(0, str(hw_ctl_path))
if str(logic_path) not in sys.path: sys.path.insert(0, str(logic_path))
from hardware_ctl.relay_controller import RelayController
from dut.dut_controller import DutController
# Configure logging # Configure logging
log_formatter = log_formatter = logging.Formatter('[%(levelname).1s %(asctime)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S') log_formatter = log_formatter = logging.Formatter('[%(levelname).1s %(asctime)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
log_file = project_root / "test.log" log_file = "test.log"
logger = logging.getLogger() logger = logging.getLogger()
logger.setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG)
@ -53,7 +41,7 @@ logger.addHandler(console_handler)
def load_config(config_path="test_config.toml") -> Optional[Dict[str, Any]]: def load_config(config_path="test_config.toml") -> Optional[Dict[str, Any]]:
"""Load test configuration from TOML config file .""" """Load test configuration from TOML config file ."""
config_file = project_root / config_path config_file = config_path
logging.info(f"Loading configuration file: {config_file}") logging.info(f"Loading configuration file: {config_file}")
try: try:
config = toml.load(config_file) config = toml.load(config_file)
@ -145,7 +133,7 @@ def main():
logging.critical("Failed to load configuration. Exiting.") logging.critical("Failed to load configuration. Exiting.")
sys.exit(1) sys.exit(1)
# Inicializace ovladačů HW # Initialize hardware controllers
logging.info("Initializing hardware controllers...") logging.info("Initializing hardware controllers...")
relay_ctl = None relay_ctl = None
dut_ctl = None dut_ctl = None
@ -278,16 +266,14 @@ def main():
test_aborted = True test_aborted = True
finally: finally:
# --- Cleanup ---
logging.info("Performing final cleanup...") logging.info("Performing final cleanup...")
if relay_ctl: if relay_ctl:
try: try:
logging.info("Ensuring all relays are OFF...") logging.info("Ensuring all relays are OFF...")
relay_ctl.turn_all_relays_off() dut_ctl.power_down_all() # Power down all DUTs
relay_ctl.close() relay_ctl.close()
except Exception as e_relay: logging.error(f"Error during relay cleanup: {e_relay}") except Exception as e_relay: logging.error(f"Error during relay cleanup: {e_relay}")
# ... (cleanup dut_ctl a temp_ctl) ...
if dut_ctl: if dut_ctl:
try: dut_ctl.close() try: dut_ctl.close()
except Exception as e_dut: logging.error(f"Error during DUT controller cleanup: {e_dut}") except Exception as e_dut: logging.error(f"Error during DUT controller cleanup: {e_dut}")

View File

@ -76,19 +76,3 @@ def send_slack_message(webhook_url: Optional[str], message: str, fallback_text:
except Exception as e: except Exception as e:
logger.exception(f"An unexpected error occurred during Slack notification: {e}") logger.exception(f"An unexpected error occurred during Slack notification: {e}")
return False return False
if __name__ == '__main__':
print("Testing Slack notification module (sends as payload parameter)...")
test_webhook_url = "YOUR_SLACK_WEBHOOK_URL" # Replace with your actual Slack Webhook URL
test_msg = "Test z Pythonu :wave: (posláno jako payload parametr).\n*Formátování* by mělo _fungovat_."
fallback = "Test from Python (payload)"
if "YOUR_SLACK_WEBHOOK_URL" == test_webhook_url or not test_webhook_url:
print("\nPlease replace YOUR_SLACK_WEBHOOK_URL with your actual Slack Webhook URL.")
else:
print(f"Sending test message via webhook...")
success = send_slack_message(test_webhook_url, test_msg, fallback_text=fallback)
if success:
print("\nTest request sent successfully (check your Slack channel).")
else:
print("\nTest request failed.")

View File

@ -62,7 +62,6 @@ random_wonder_relaxation_time_min = 30
[notifications] [notifications]
notification_channel = "slack" notification_channel = "slack"
slack_webhook_url = "https://hooks.slack.com/services/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" slack_webhook_url = "<Fill your webhook URL>"
#slack_webhook_url = "https://hooks.slack.com/services/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"

View File

@ -0,0 +1,4 @@
from .test_scenario import TestScenario
from .switching_scenario import SwitchingScenario
from .linear_scenario import LinearScenario
from .random_wonder_scenario import RandomWonderScenario