-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathevaluate.py
More file actions
117 lines (76 loc) · 3.64 KB
/
evaluate.py
File metadata and controls
117 lines (76 loc) · 3.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
from __future__ import annotations
import argparse
import os
import sys
import hydra
import torch
import torch.multiprocessing as mp
from hydra.core.hydra_config import HydraConfig
from loguru import logger
from omegaconf import DictConfig
from tactic.config.config_pretrain import ConfigPretrain
from tactic.core.trainer_pretrain import TrainerPretrain
from tactic.utils.set_seed import set_seed
import wandb
@hydra.main(version_base=None, config_path="config", config_name="pretrain")
def main(cfg_hydra: DictConfig):
cfg = ConfigPretrain.from_hydra(cfg_hydra)
barrier = setup_multiprocessing(cfg)
setup_logger(cfg)
hydra_cfg = HydraConfig.get()
#check_existence_of_benchmark_results_csv(cfg)
cfg.save(path=cfg.output_dir / "config" / f"{hydra_cfg.job.config_name}.yaml")
setup_gpus(cfg)
set_seed(cfg.seed)
logger.info(f"Training with {len(cfg.devices)} GPU(s)")
mp.spawn(main_experiment, nprocs=len(cfg.devices), args=(cfg,barrier))
def main_experiment(gpu_process_index: int, cfg: ConfigPretrain, barrier: mp.Barrier) -> None:
logger.add(cfg.output_dir / "log.log", enqueue=True)
setup_gpus_of_experiment(cfg, gpu_process_index)
trainer = TrainerPretrain(cfg, barrier)
if cfg.is_main_process:
logger.info(f"Trainer of TACTIC created, start training")
wandb.init(project="tactic", config=cfg.__dict__, dir=cfg.output_dir)
trainer.test()
def setup_multiprocessing(cfg: ConfigPretrain) -> mp.Barrier:
mp.set_start_method('spawn')
if debugger_is_active():
os.environ['CUDA_LAUNCH_BLOCKING']='1'
return mp.Barrier(len(cfg.devices))
def setup_logger(cfg: ConfigPretrain) -> None:
# Should be called after setting up the multiprocessing, because enqueue is used
logger.add(cfg.output_dir / "log.log", enqueue=True)
logger.info("Finished creating pretrain config")
def setup_gpus(cfg: ConfigPretrain) -> None:
num_gpus = len(cfg.devices)
if cfg.use_ddp:
if num_gpus == 1:
logger.info("Are you sure you want distributed training with only one GPU?")
batch_size = cfg.optim.batch_size
assert batch_size >= num_gpus, "Batch size must be at least the number of GPUs"
cfg.optim.batch_size = batch_size // num_gpus if cfg.use_ddp else batch_size
logger.info(f"Using GPUs {[d.index for d in cfg.devices]} for distributed training")
logger.info(f"Batch size per device set to {cfg.optim.batch_size}")
logger.info(f"With gradient accumulation steps {cfg.optim.gradient_accumulation_steps}, total batch size is {cfg.optim.batch_size * cfg.optim.gradient_accumulation_steps * num_gpus}")
else:
assert num_gpus == 1, "Cannot use more than one GPU without distributed training"
cfg.device = cfg.devices[0]
cfg.is_main_process = True
logger.info(f"Using GPU {cfg.device} for training")
def setup_gpus_of_experiment(cfg: ConfigPretrain, gpu_process_index: int) -> torch.device:
device = cfg.devices[gpu_process_index]
torch.cuda.set_device(device)
cfg.device = device
cfg.is_main_process = (gpu_process_index == 0)
if cfg.use_ddp:
os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID'
os.environ['MASTER_ADDR'] = 'localhost'
port = 5678 + cfg.devices[0].index
os.environ['MASTER_PORT'] = str(port)
torch.distributed.init_process_group(backend="nccl", world_size = len(cfg.devices), rank=gpu_process_index)
return device
def debugger_is_active() -> bool:
"""Return if the debugger is currently active"""
return hasattr(sys, 'gettrace') and sys.gettrace() is not None
if __name__ == "__main__":
main()