# main_tester.py import time import toml import sys import logging from pathlib import Path import subprocess import socket from typing import Optional, Dict, Any from test_logic.linear_scenario import LinearScenario from test_logic.switching_scenario import SwitchingScenario from test_logic.random_wonder_scenario import RandomWonderScenario from notifications import send_slack_message project_root = Path(__file__).parent hw_ctl_path = project_root / "hardware_ctl" logic_path = project_root / "test_logic" sys.path.append(str(project_root)) if str(hw_ctl_path) not in sys.path: sys.path.insert(0, str(hw_ctl_path)) if str(logic_path) not in sys.path: sys.path.insert(0, str(logic_path)) from hardware_ctl.relay_controller import RelayController from dut.dut_controller import DutController # Configure logging log_formatter = log_formatter = logging.Formatter('[%(levelname).1s %(asctime)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S') log_file = project_root / "test.log" logger = logging.getLogger() logger.setLevel(logging.DEBUG) # Clear all existing handlers to avoid duplicates if logger.hasHandlers(): logger.handlers.clear() # File handler try: file_handler = logging.FileHandler(log_file, mode='w', encoding='utf-8') file_handler.setFormatter(log_formatter) file_handler.setLevel(logging.INFO) logger.addHandler(file_handler) except Exception as log_e: print(f"WARNING: Failed to create file log handler for {log_file}: {log_e}") # Console handler console_handler = logging.StreamHandler(sys.stdout) console_handler.setFormatter(log_formatter) console_handler.setLevel(logging.INFO) logger.addHandler(console_handler) def load_config(config_path="test_config.toml") -> Optional[Dict[str, Any]]: """Load test configuration from TOML config file .""" config_file = project_root / config_path logging.info(f"Loading configuration file: {config_file}") try: config = toml.load(config_file) # Validate required sections in the config file required_sections = ["general", "relay", "duts", "test_plan", "notifications"] for section in required_sections: if section not in config: raise ValueError(f"Missing required section '{section}' in config file.") logging.info("Config file loading successfully.") return config except FileNotFoundError: logging.error(f"Configuration file not found at {config_file}") return None except toml.TomlDecodeError as e: logging.error(f"Error decoding TOML configuration file: {e}") return None except Exception as e: logging.error(f"Error loading configuration file: {e}") return None def run_test_cycle(config: dict, temp_c: float, cycle_num: int, test_mode: str, relay_ctl: RelayController, dut_ctl: DutController) -> bool: """ Run single test cycle for given test mode on all DUTs at specified temperature""" # Select test scenario if test_mode == "linear": test_scenario = LinearScenario( discharge_load=config['test_plan']['linear_discharge_load'], relaxation_time_min=config['test_plan']['linear_relaxation_time_min']) elif test_mode == "switching": test_scenario = SwitchingScenario( discharge_switch_cycle_min = config['test_plan']['switching_discharge_switch_cycle_min'], relaxation_time_min = config['test_plan']['switching_relaxation_time_min']) elif test_mode == "random_wonder": test_scenario = RandomWonderScenario( core_test_time=config['test_plan']['random_wonder_core_test_time_min'], relaxation_time_min=config['test_plan']['random_wonder_relaxation_time_min']) else: logging.error(f"Unknown test mode: {test_mode}. Cannot run test cycle.") return False # Setup test_scenario.setup(dut_controller=dut_ctl) # Test loop while(True): # Call run function in loop to execute the test scenario. finished = test_scenario.run(dut_controller=dut_ctl) # Read power manager report and log them to file test_scenario.log_data(dut_controller=dut_ctl, output_directory = Path(config['general']['output_directory']), temp=temp_c) if finished: break # Exit test loop time.sleep(1) # Tear down test scenario test_scenario.teardown(dut_controller=dut_ctl) return True def main(): logging.info("==============================================") logging.info(" Starting Automated Battery Cycle Tester ") logging.info("==============================================") config = load_config() if config is None: logging.critical("Failed to load configuration. Exiting.") sys.exit(1) # Inicializace ovladačů HW logging.info("Initializing hardware controllers...") relay_ctl = None dut_ctl = None init_ok = True logging.info("==============================================") logging.info(" Initializing Peripherals ") logging.info("==============================================") try: # Initialize relay controller (Deditec board) relay_ctl = RelayController( ip_address=config['relay']['ip_address'] ) # Initialize DUTs from dut.dut_controller import DutController dut_ctl = DutController(config['duts'], relay_ctl=relay_ctl, verbose=False) except Exception as e: logging.exception(f"Failed to intialize peripherals: {e}") exit(1) # Create output data directory output_data_dir = Path(config['general']['output_directory']) try: output_data_dir.mkdir(parents=True, exist_ok=True) logging.info(f"Test results will be saved in: {output_data_dir.resolve()}") except OSError as e: logging.critical(f"Failed to create output directory {output_data_dir}: {e}. Exiting.") if relay_ctl: relay_ctl.close(); if dut_ctl: dut_ctl.close(); sys.exit(1) ############################################################################ # LOAD TEST PLAN ############################################################################ logging.info("==============================================") logging.info(" TEST PLAN LOADING ") logging.info("==============================================") temperatures = config['test_plan'].get('temperatures_celsius', [25]) test_modes_to_run = config['test_plan'].get('test_modes', ['linear']) cycles_per_temp = config['test_plan'].get('cycles_per_temperature', 1) total_runs = len(temperatures) * cycles_per_temp * len(test_modes_to_run) completed_runs = 0 logger.info(f' + Tested temperatures : {temperatures}') logger.info(f' + Cycles per temperature : {cycles_per_temp}') logger.info(f' + Test modes in every cycle : {test_modes_to_run}') logger.info(f' + Total runs planned : {total_runs}') start_time = time.time() test_aborted = False ############################################################################ # MAIN TEST LOOP ############################################################################ try: # Run a full test cycle for each temperature setting. # since the temperature chamber is still controlled only manually, every # test will notify the user to set the temperature manually before it # start next iteration. for temp_c in temperatures: # Set temperature in temperature chamber logging.info("==============================================") logging.info(f" {temp_c} °C TEMP TEST ") logging.info("==============================================") logging.info(f"Set the temperature chamber to {temp_c} °C and wait for stabilization.") try: if config["notifications"]["notification_channel"] == "slack": send_slack_message(config["notifications"]["slack_webhook_url"], f"""Set the temperature chamber to {temp_c} °C and confirm to continue with the test.""") except Exception as e: logging.error(f"Failed to send Slack notification: {e}") while True: user_input = input(f"Confirm temperature is set to {temp_c} °C (Y)?") if user_input.lower() == "y": break for cycle_num in range(cycles_per_temp): for test_mode in test_modes_to_run: run_start_time = time.time() logging.info(f"Running Test Mode: '{test_mode}'") success = run_test_cycle(config, temp_c, cycle_num, test_mode, relay_ctl, dut_ctl) run_end_time = time.time() run_duration_m = (run_end_time - run_start_time) / 60 if success: completed_runs += 1 logging.info(f"Test Mode '{test_mode}' finished successfully in {run_duration_m:.1f} minutes.") else: logging.error(f"Test Mode '{test_mode}' failed after {run_duration_m:.1f} minutes.") if config.get('general', {}).get('fail_fast', False): logging.warning("Fail fast enabled. Aborting entire test plan.") test_aborted = True break else: logging.info("Continuing with the next mode/cycle/temperature.") logging.info(f"Progress: {completed_runs}/{total_runs} total runs completed.") if test_aborted: break if test_aborted: break logging.info("==============================================") logging.info(" TEST FINISHED ") logging.info("==============================================") except KeyboardInterrupt: logging.warning("Test execution interrupted by user (Ctrl+C)") test_aborted = True except Exception as e: logging.exception(f"FATAL ERROR during test execution: {e}") test_aborted = True finally: # --- Cleanup --- logging.info("Performing final cleanup...") if relay_ctl: try: logging.info("Ensuring all relays are OFF...") relay_ctl.turn_all_relays_off() relay_ctl.close() except Exception as e_relay: logging.error(f"Error during relay cleanup: {e_relay}") # ... (cleanup dut_ctl a temp_ctl) ... if dut_ctl: try: dut_ctl.close() except Exception as e_dut: logging.error(f"Error during DUT controller cleanup: {e_dut}") logging.info("==================== TEST SUMMARY ====================") end_time = time.time() total_duration_s = end_time - start_time total_duration_h = total_duration_s / 3600 status = "ABORTED" if test_aborted else ("COMPLETED" if completed_runs == total_runs else "PARTIALLY COMPLETED") logging.info("-" * 60) logging.info(f"Test execution {status}.") logging.info(f"Total runs completed: {completed_runs}/{total_runs}") logging.info(f"Total duration: {total_duration_s:.0f} seconds ({total_duration_h:.2f} hours).") logging.info("==================== TEST END ====================") if __name__ == "__main__": # Ensure at least one handler is set up if not logger.hasHandlers(): logger.addHandler(logging.StreamHandler(sys.stdout)) main()