diff --git a/flow/test/test_autotuner.sh b/flow/test/test_autotuner.sh index 03e9ddd005..a1198176e2 100755 --- a/flow/test/test_autotuner.sh +++ b/flow/test/test_autotuner.sh @@ -30,8 +30,7 @@ if [ "$PLATFORM_WITHOUT_DASHES" == "asap7" ] && [ "$DESIGN_NAME" == "gcd" ]; the python3 -m unittest tools.AutoTuner.test.ref_file_check.RefFileCheck echo "Running AutoTuner resume test (only once)" - # Temporarily disable resume check test due to flakiness - #python3 -m unittest tools.AutoTuner.test.resume_check.ResumeCheck.test_tune_resume + python3 -m unittest tools.AutoTuner.test.resume_check.ResumeCheck.test_tune_resume echo "Running AutoTuner binary check (only once)" openroad_autotuner -h diff --git a/tools/AutoTuner/test/resume_check.py b/tools/AutoTuner/test/resume_check.py index 69eaec1f24..a0a72a3ff8 100644 --- a/tools/AutoTuner/test/resume_check.py +++ b/tools/AutoTuner/test/resume_check.py @@ -39,11 +39,17 @@ from .autotuner_test_utils import AutoTunerTestUtils, accepted_rc from contextlib import contextmanager +from ray.tune import ExperimentAnalysis cur_dir = os.path.dirname(os.path.abspath(__file__)) -src_dir = os.path.join(cur_dir, "../src") -orfs_dir = os.path.join(cur_dir, "../../../flow") -os.chdir(src_dir) +orfs_flow_dir = os.path.join(cur_dir, "../../../flow") + +# Maximum time (seconds) to wait for trials to start producing results. +POLL_TIMEOUT = 300 +# Interval (seconds) between status polls. +POLL_INTERVAL = 15 +# Maximum time (seconds) to wait for Ray cluster to shut down. +RAY_SHUTDOWN_TIMEOUT = 120 @contextmanager @@ -55,70 +61,160 @@ def managed_process(*args, **kwargs): try: yield proc finally: - if proc.poll() is None: # If the process is still running - proc.kill() # Forcefully kill it + if proc.poll() is None: + proc.kill() + proc.wait() + + +def get_experiment_status(experiment_dir): + """ + Check the status of a Ray Tune experiment by reading its directory. + + Returns a dict with: + - state: "not_started", "running", "finished" + - num_trials: number of trials found + - num_completed: number of trials that reported results + """ + status = { + "state": "not_started", + "num_trials": 0, + "num_completed": 0, + } + + if not os.path.isdir(experiment_dir): + return status + + # Check for experiment state file (created by Ray Tune) + state_files = [ + f + for f in os.listdir(experiment_dir) + if f.startswith("experiment_state") and f.endswith(".json") + ] + if not state_files: + return status + + try: + analysis = ExperimentAnalysis(experiment_dir) + results = analysis.results + status["num_trials"] = len(results) + status["num_completed"] = sum(1 for r in results.values() if r is not None) + + if status["num_completed"] == 0: + status["state"] = "running" + elif status["num_completed"] < status["num_trials"]: + status["state"] = "running" + else: + status["state"] = "finished" + except Exception: + # Experiment directory exists but state is not yet readable. + status["state"] = "running" + + return status + + +def stop_ray_cluster(timeout=RAY_SHUTDOWN_TIMEOUT): + """ + Stop the Ray cluster, retrying until no nodes remain or timeout is reached. + """ + start = time.time() + while time.time() - start < timeout: + status_proc = subprocess.run( + "ray status", shell=True, capture_output=True, text=True + ) + no_nodes = status_proc.returncode != 0 + + stop_proc = subprocess.run( + "ray stop", shell=True, capture_output=True, text=True + ) + stop_ok = stop_proc.returncode in accepted_rc + + if no_nodes and stop_ok: + return True + time.sleep(5) + + raise RuntimeError(f"Failed to stop Ray cluster within {timeout} seconds") class ResumeCheck(unittest.TestCase): - # only test 1 platform/design. + # Only test 1 platform/design. platform = "asap7" design = "gcd" samples = 5 iterations = 2 + experiment_name = "test-resume" def setUp(self): self.config = os.path.join( - orfs_dir, "designs", self.platform, self.design, "autotuner.json" + orfs_flow_dir, + "designs", + self.platform, + self.design, + "autotuner.json", + ) + self.experiment_dir = os.path.join( + orfs_flow_dir, + "logs", + self.platform, + self.design, + self.experiment_name, ) self.jobs = self.samples self.num_cpus = os.cpu_count() - # How it works: Say we have 5 samples and 5 iterations. - # If we want to limit to only 5 trials (and avoid any parallelism magic by Ray) - # We can set resources_per_trial = NUM_CORES/5 = 3.2 (fractional resources_per_trial are allowed!) - - # Cast to 1 decimal place + # Fractional resources_per_trial avoids parallelism issues with Ray. res_per_trial = float("{:.1f}".format(self.num_cpus / self.samples)) options = ["", "--resume"] - self.exec = AutoTunerTestUtils.get_exec_cmd() + self.executable = AutoTunerTestUtils.get_exec_cmd() self.commands = [ - f"{self.exec}" + f"{self.executable}" f" --design {self.design}" f" --platform {self.platform}" f" --config {self.config}" f" --jobs {self.jobs}" - f" --experiment test-resume" - f" tune --iterations {self.iterations} --samples {self.samples}" + f" --experiment {self.experiment_name}" + f" tune --iterations {self.iterations}" + f" --samples {self.samples}" f" --resources_per_trial {res_per_trial}" f" {c}" for c in options ] def test_tune_resume(self): - # Goal is to first run the first config (without resume) and then run the second config (with resume) - # and check if the run is able to complete. - - # Run the first config asynchronously. - print("Running the first config") - with managed_process(self.commands[0], shell=True) as proc: - time.sleep(120) - - # Keep trying to stop the ray cluster until it is stopped - while 1: - proc = subprocess.run("ray status", shell=True) - no_nodes = proc.returncode != 0 - proc = subprocess.run("ray stop", shell=True) - successful = proc.returncode in accepted_rc - - if no_nodes and successful: - break - time.sleep(10) - - # Run the second config to completion - print("Running the second config") - proc = subprocess.run(self.commands[1], shell=True) + # Step 1: Run the first config (without --resume) asynchronously. + # Wait until at least one trial has completed, then kill it. + print("Step 1: Starting initial tuning run") + with managed_process(self.commands[0].split()) as proc: + start = time.time() + while time.time() - start < POLL_TIMEOUT: + status = get_experiment_status(self.experiment_dir) + print( + f" Status: {status['state']}, " + f"trials: {status['num_trials']}, " + f"completed: {status['num_completed']}" + ) + if status["num_completed"] > 0: + print( + f" {status['num_completed']} trial(s) completed, " + f"stopping initial run" + ) + break + time.sleep(POLL_INTERVAL) + else: + self.fail(f"No trials completed within {POLL_TIMEOUT} seconds") + + # Step 2: Stop the Ray cluster cleanly. + print("Step 2: Stopping Ray cluster") + stop_ray_cluster() + print(" Ray cluster stopped") + + # Step 3: Run the second config (with --resume) to completion. + print("Step 3: Resuming tuning run") + proc = subprocess.run(self.commands[1].split()) successful = proc.returncode in accepted_rc - self.assertTrue(successful) + self.assertTrue( + successful, + f"Resume run failed with return code {proc.returncode}", + ) if __name__ == "__main__":