Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions flow/test/test_autotuner.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ if [ "$PLATFORM_WITHOUT_DASHES" == "asap7" ] && [ "$DESIGN_NAME" == "gcd" ]; the
python3 -m unittest tools.AutoTuner.test.ref_file_check.RefFileCheck

echo "Running AutoTuner resume test (only once)"
# Temporarily disable resume check test due to flakiness
#python3 -m unittest tools.AutoTuner.test.resume_check.ResumeCheck.test_tune_resume
python3 -m unittest tools.AutoTuner.test.resume_check.ResumeCheck.test_tune_resume

echo "Running AutoTuner binary check (only once)"
openroad_autotuner -h
Expand Down
174 changes: 135 additions & 39 deletions tools/AutoTuner/test/resume_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,17 @@
from .autotuner_test_utils import AutoTunerTestUtils, accepted_rc

from contextlib import contextmanager
from ray.tune import ExperimentAnalysis

cur_dir = os.path.dirname(os.path.abspath(__file__))
src_dir = os.path.join(cur_dir, "../src")
orfs_dir = os.path.join(cur_dir, "../../../flow")
os.chdir(src_dir)
orfs_flow_dir = os.path.join(cur_dir, "../../../flow")

# Maximum time (seconds) to wait for trials to start producing results.
POLL_TIMEOUT = 300
# Interval (seconds) between status polls.
POLL_INTERVAL = 15
# Maximum time (seconds) to wait for Ray cluster to shut down.
RAY_SHUTDOWN_TIMEOUT = 120


@contextmanager
Expand All @@ -55,70 +61,160 @@ def managed_process(*args, **kwargs):
try:
yield proc
finally:
if proc.poll() is None: # If the process is still running
proc.kill() # Forcefully kill it
if proc.poll() is None:
proc.kill()
proc.wait()


def get_experiment_status(experiment_dir):
"""
Check the status of a Ray Tune experiment by reading its directory.

Returns a dict with:
- state: "not_started", "running", "finished"
- num_trials: number of trials found
- num_completed: number of trials that reported results
"""
status = {
"state": "not_started",
"num_trials": 0,
"num_completed": 0,
}

if not os.path.isdir(experiment_dir):
return status

# Check for experiment state file (created by Ray Tune)
state_files = [
f
for f in os.listdir(experiment_dir)
if f.startswith("experiment_state") and f.endswith(".json")
]
if not state_files:
return status

try:
analysis = ExperimentAnalysis(experiment_dir)
results = analysis.results
status["num_trials"] = len(results)
status["num_completed"] = sum(1 for r in results.values() if r is not None)

if status["num_completed"] == 0:
status["state"] = "running"
elif status["num_completed"] < status["num_trials"]:
status["state"] = "running"
else:
status["state"] = "finished"
except Exception:
# Experiment directory exists but state is not yet readable.
status["state"] = "running"

return status


def stop_ray_cluster(timeout=RAY_SHUTDOWN_TIMEOUT):
"""
Stop the Ray cluster, retrying until no nodes remain or timeout is reached.
"""
start = time.time()
while time.time() - start < timeout:
status_proc = subprocess.run(
"ray status", shell=True, capture_output=True, text=True
)
no_nodes = status_proc.returncode != 0

stop_proc = subprocess.run(
"ray stop", shell=True, capture_output=True, text=True
)
stop_ok = stop_proc.returncode in accepted_rc

if no_nodes and stop_ok:
return True
time.sleep(5)

raise RuntimeError(f"Failed to stop Ray cluster within {timeout} seconds")


class ResumeCheck(unittest.TestCase):
# only test 1 platform/design.
# Only test 1 platform/design.
platform = "asap7"
design = "gcd"
samples = 5
iterations = 2
experiment_name = "test-resume"

def setUp(self):
self.config = os.path.join(
orfs_dir, "designs", self.platform, self.design, "autotuner.json"
orfs_flow_dir,
"designs",
self.platform,
self.design,
"autotuner.json",
)
self.experiment_dir = os.path.join(
orfs_flow_dir,
"logs",
self.platform,
self.design,
self.experiment_name,
)
self.jobs = self.samples
self.num_cpus = os.cpu_count()

# How it works: Say we have 5 samples and 5 iterations.
# If we want to limit to only 5 trials (and avoid any parallelism magic by Ray)
# We can set resources_per_trial = NUM_CORES/5 = 3.2 (fractional resources_per_trial are allowed!)

# Cast to 1 decimal place
# Fractional resources_per_trial avoids parallelism issues with Ray.
res_per_trial = float("{:.1f}".format(self.num_cpus / self.samples))
options = ["", "--resume"]
self.exec = AutoTunerTestUtils.get_exec_cmd()
self.executable = AutoTunerTestUtils.get_exec_cmd()
self.commands = [
f"{self.exec}"
f"{self.executable}"
f" --design {self.design}"
f" --platform {self.platform}"
f" --config {self.config}"
f" --jobs {self.jobs}"
f" --experiment test-resume"
f" tune --iterations {self.iterations} --samples {self.samples}"
f" --experiment {self.experiment_name}"
f" tune --iterations {self.iterations}"
f" --samples {self.samples}"
f" --resources_per_trial {res_per_trial}"
f" {c}"
for c in options
]

def test_tune_resume(self):
# Goal is to first run the first config (without resume) and then run the second config (with resume)
# and check if the run is able to complete.

# Run the first config asynchronously.
print("Running the first config")
with managed_process(self.commands[0], shell=True) as proc:
time.sleep(120)

# Keep trying to stop the ray cluster until it is stopped
while 1:
proc = subprocess.run("ray status", shell=True)
no_nodes = proc.returncode != 0
proc = subprocess.run("ray stop", shell=True)
successful = proc.returncode in accepted_rc

if no_nodes and successful:
break
time.sleep(10)

# Run the second config to completion
print("Running the second config")
proc = subprocess.run(self.commands[1], shell=True)
# Step 1: Run the first config (without --resume) asynchronously.
# Wait until at least one trial has completed, then kill it.
print("Step 1: Starting initial tuning run")
with managed_process(self.commands[0].split()) as proc:
start = time.time()
while time.time() - start < POLL_TIMEOUT:
status = get_experiment_status(self.experiment_dir)
print(
f" Status: {status['state']}, "
f"trials: {status['num_trials']}, "
f"completed: {status['num_completed']}"
)
if status["num_completed"] > 0:
print(
f" {status['num_completed']} trial(s) completed, "
f"stopping initial run"
)
break
time.sleep(POLL_INTERVAL)
else:
self.fail(f"No trials completed within {POLL_TIMEOUT} seconds")

# Step 2: Stop the Ray cluster cleanly.
print("Step 2: Stopping Ray cluster")
stop_ray_cluster()
print(" Ray cluster stopped")

# Step 3: Run the second config (with --resume) to completion.
print("Step 3: Resuming tuning run")
proc = subprocess.run(self.commands[1].split())
successful = proc.returncode in accepted_rc
self.assertTrue(successful)
self.assertTrue(
successful,
f"Resume run failed with return code {proc.returncode}",
)


if __name__ == "__main__":
Expand Down