Skip to content

Commit

Permalink
feat: added various training loggers using confit.Draft
Browse files Browse the repository at this point in the history
  • Loading branch information
percevalw committed Feb 18, 2025
1 parent 30ec72a commit 76ebae2
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 98 deletions.
189 changes: 110 additions & 79 deletions edsnlp/training/loggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,22 @@
import edsnlp


def flatten_dict(d, path=""):
if not isinstance(d, (list, dict)):
return {path: d}

if isinstance(d, list):
items = enumerate(d)
else:
items = d.items()

return {
k: v
for key, val in items
for k, v in flatten_dict(val, f"{path}/{key}" if path else key).items()
}


@edsnlp.registry.loggers.register("csv", auto_draft_in_config=True)
class CSVLogger(accelerate.tracking.GeneralTracker):
name = "csv"
Expand Down Expand Up @@ -53,7 +69,7 @@ def __init__(
self._has_header = False

@property
def tracker(self):
def tracker(self): # pragma: no cover
return None

@accelerate.tracking.on_main_process
Expand All @@ -70,11 +86,9 @@ def log(self, values: Dict[str, Any], step: Optional[int] = None):
- All subsequent calls must use the same columns. Any missing columns get
written as empty, any new columns generate a warning.
"""
# Ensure we have columns set
print("LOGGING TO CSV", self.file_path)
values = flatten_dict(values)

if self._columns is None:
# Create the list of columns. We'll always reserve "step" as first if step
# is provided.
self._columns = list({**{"step": None}, **values}.keys())
self._writer.writerow(self._columns)
self._has_header = True
Expand Down Expand Up @@ -142,7 +156,7 @@ def __init__(
self._logs = []

@property
def tracker(self):
def tracker(self): # pragma: no cover
return None

@accelerate.tracking.on_main_process
Expand Down Expand Up @@ -267,8 +281,7 @@ def log(self, values: Dict[str, Any], step: Optional[int] = None):
Logs values in the Rich table. If `step` is provided, we include it in the
logged data.
"""
print("LOGGING WITH RICH")
combined = {"step": step, **values}
combined = {"step": step, **flatten_dict(values)}
self.printer.log_metrics(combined)

@accelerate.tracking.on_main_process
Expand All @@ -280,36 +293,40 @@ def finish(self):


@edsnlp.registry.loggers.register("tensorboard", auto_draft_in_config=True)
def TensorBoardLogger(
project_name: str,
logging_dir: Optional[Union[str, os.PathLike]] = None,
**kwargs,
) -> "accelerate.tracking.TensorBoardTracker": # pragma: no cover
"""
Logger for [TensorBoard](https://github.com/tensorflow/tensorboard).
This logger is also available via the loggers registry as `tensorboard`.
class TensorboardLogger(accelerate.tracking.TensorBoardTracker):
def __init__(
self,
project_name: str,
logging_dir: Optional[Union[str, os.PathLike]] = None,
):
"""
Logger for [TensorBoard](https://github.com/tensorflow/tensorboard).
This logger is also available via the loggers registry as `tensorboard`.
Parameters
----------
project_name: str
Name of the project.
logging_dir: Union[str, os.PathLike]
Directory in which to store the TensorBoard logs. Logs of different runs
will be stored in `logging_dir/project_name`. If not provided, the
environment variable `TENSORBOARD_LOGGING_DIR` will be used.
kwargs: Dict
Additional keyword arguments to pass to `tensorboard.SummaryWriter`.
Parameters
----------
project_name: str
Name of the project.
logging_dir: Union[str, os.PathLike]
Directory in which to store the TensorBoard logs. Logs of different runs
will be stored in `logging_dir/project_name`. If not provided, the
environment variable `TENSORBOARD_LOGGING_DIR` will be used.
kwargs: Dict
Additional keyword arguments to pass to `tensorboard.SummaryWriter`.
"""
logging_dir = logging_dir or os.environ.get("TENSORBOARD_LOGGING_DIR", None)
assert logging_dir is not None, (
"Please provide a logging directory or set TENSORBOARD_LOGGING_DIR"
)
super().__init__(project_name, logging_dir)

Returns
-------
accelerate.tracking.TensorBoardTracker
"""
logging_dir = logging_dir or os.environ.get("TENSORBOARD_LOGGING_DIR", None)
assert logging_dir is not None, (
"Please provide a logging directory or set TENSORBOARD_LOGGING_DIR"
)
def store_init_configuration(self, values: Dict[str, Any]):
values = json.loads(json.dumps(flatten_dict(values), default=str))
return super().store_init_configuration(values)

return accelerate.tracking.TensorBoardTracker(project_name, logging_dir, **kwargs)
def log(self, values: dict, step: Optional[int] = None, **kwargs):
values = flatten_dict(values)
return super().log(values, step, **kwargs)


@edsnlp.registry.loggers.register("aim", auto_draft_in_config=True)
Expand Down Expand Up @@ -365,31 +382,6 @@ def WandBLogger(
return accelerate.tracking.WandBTracker(project_name, **kwargs)


@edsnlp.registry.loggers.register("clearml", auto_draft_in_config=True)
def ClearMLLogger(
project_name: str,
**kwargs,
) -> "accelerate.tracking.ClearMLTracker": # pragma: no cover
"""
Logger for
[ClearML](https://clear.ml/docs/latest/docs/getting_started/ds/ds_first_steps/).
This logger is also available via the loggers registry as `clearml`.
Parameters
----------
project_name: str
Name of the experiment. Environment variables `CLEARML_PROJECT` and
`CLEARML_TASK` have priority over this argument.
kwargs: Dict
Additional keyword arguments to pass to the ClearML Task object.
Returns
-------
accelerate.tracking.ClearMLTracker
"""
return accelerate.tracking.ClearMLTracker(project_name, **kwargs)


@edsnlp.registry.loggers.register("mlflow", auto_draft_in_config=True)
def MLflowLogger(
project_name: str,
Expand Down Expand Up @@ -471,24 +463,63 @@ def CometMLLogger(
return accelerate.tracking.CometMLTracker(project_name, **kwargs)


@edsnlp.registry.loggers.register("dvclive", auto_draft_in_config=True)
def DVCLiveLogger(
live: Any = None,
**kwargs,
) -> "accelerate.tracking.DVCLiveTracker":
"""
Logger for [DVC Live](https://dvc.org/doc/dvclive).
This logger is also available via the loggers registry as `dvclive`.
try:
from accelerate.tracking import ClearMLTracker as _ClearMLTracker

Parameters
----------
live: dvclive.Live
An instance of `dvclive.Live` to use for logging.
kwargs: Dict
Additional keyword arguments to pass to the `dvclive.Live` constructor.
@edsnlp.registry.loggers.register("clearml", auto_draft_in_config=True)
def ClearMLLogger(
project_name: str,
**kwargs,
) -> "accelerate.tracking.ClearMLTracker": # pragma: no cover
"""
Logger for
[ClearML](https://clear.ml/docs/latest/docs/getting_started/ds/ds_first_steps/).
This logger is also available via the loggers registry as `clearml`.
Returns
-------
accelerate.tracking.DVCLiveTracker
"""
return accelerate.tracking.DVCLiveTracker(None, live=live, **kwargs)
Parameters
----------
project_name: str
Name of the experiment. Environment variables `CLEARML_PROJECT` and
`CLEARML_TASK` have priority over this argument.
kwargs: Dict
Additional keyword arguments to pass to the ClearML Task object.
Returns
-------
accelerate.tracking.ClearMLTracker
"""
return _ClearMLTracker(project_name, **kwargs)
except ImportError: # pragma: no cover

def ClearMLLogger(*args, **kwargs):
raise ImportError("ClearMLLogger is not available.")


try:
from accelerate.tracking import DVCLiveTracker as _DVCLiveTracker

@edsnlp.registry.loggers.register("dvclive", auto_draft_in_config=True)
def DVCLiveLogger(
live: Any = None,
**kwargs,
) -> "accelerate.tracking.DVCLiveTracker": # pragma: no cover
"""
Logger for [DVC Live](https://dvc.org/doc/dvclive).
This logger is also available via the loggers registry as `dvclive`.
Parameters
----------
live: dvclive.Live
An instance of `dvclive.Live` to use for logging.
kwargs: Dict
Additional keyword arguments to pass to the `dvclive.Live` constructor.
Returns
-------
accelerate.tracking.DVCLiveTracker
"""
return _DVCLiveTracker(None, live=live, **kwargs)
except ImportError: # pragma: no cover

def DVCLiveLogger(*args, **kwargs):
raise ImportError("DVCLiveLogger is not available.")
24 changes: 9 additions & 15 deletions edsnlp/training/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,6 @@
}


def flatten_dict(d, path=""):
if not isinstance(d, dict):
return {path: d}

return {
k: v
for key, val in d.items()
for k, v in flatten_dict(val, f"{path}/{key}" if path else key).items()
}


def fill_flat_stats(x, result, path=()):
if result is None:
result = {}
Expand Down Expand Up @@ -580,7 +569,6 @@ def train(
logging_dir=output_dir,
),
)
accelerator.init_trackers(project_name) # in theory project name shouldn't be used
# accelerator.register_for_checkpointing(dataset)
is_main_process = accelerator.is_main_process
device = accelerator.device
Expand All @@ -593,15 +581,18 @@ def train(

output_dir = Path(output_dir or Path.cwd() / "artifacts")
output_model_dir = Path(output_model_dir or output_dir / "model-last")
unresolved_config = None
if is_main_process:
os.makedirs(output_dir, exist_ok=True)
os.makedirs(output_model_dir, exist_ok=True)
unresolved_config = {}
if config_meta is not None: # pragma: no cover
unresolved_config = config_meta["unresolved_config"]
print(unresolved_config.to_yaml_str())
unresolved_config.to_disk(output_dir / "train_config.yml")
# TODO: handle config_meta is None
accelerator.init_trackers(
project_name, config=unresolved_config
) # in theory project name shouldn't be used

validation_interval = validation_interval or max_steps // 10
checkpoint_interval = checkpoint_interval or validation_interval
Expand Down Expand Up @@ -726,7 +717,7 @@ def train(
**scores,
}
cumulated_data = defaultdict(lambda: 0, **default_metrics)
accelerator.log(flatten_dict(metrics), step=step)
accelerator.log(metrics, step=step)

if on_validation_callback:
on_validation_callback(metrics)
Expand Down Expand Up @@ -787,7 +778,7 @@ def train(
del all_res
if isinstance(loss, torch.Tensor) and loss.requires_grad:
accelerator.backward(loss)
except torch.cuda.OutOfMemoryError:
except torch.cuda.OutOfMemoryError: # pragma: no cover
print(
"Out of memory error encountered when processing a "
"batch with the following statistics:"
Expand Down Expand Up @@ -858,4 +849,7 @@ def train(

del iterator

# Should we put this in a finally block?
accelerator.end_training()

return nlp
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ where = ["."]
"csv" = "edsnlp.training.loggers:CSVLogger"
"json" = "edsnlp.training.loggers:JSONLogger"
"rich" = "edsnlp.training.loggers:RichLogger"
"tensorboard" = "edsnlp.training.loggers:TensorboardLogger"
"tensorboard" = "edsnlp.training.loggers:TensorBoardLogger"
"aim" = "edsnlp.training.loggers:AimLogger"
"wandb" = "edsnlp.training.loggers:WandBLogger"
"clearml" = "edsnlp.training.loggers:ClearMLLogger"
Expand Down
38 changes: 35 additions & 3 deletions tests/training/test_train.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,16 @@ def test_ner_qualif_train_diff_bert(run_in_test_dir, tmp_path):
config = Config.from_disk("ner_qlf_diff_bert_config.yml")
shutil.rmtree(tmp_path, ignore_errors=True)
kwargs = Config.resolve(config["train"], registry=registry, root=config)
nlp = train(**kwargs, output_dir=tmp_path, cpu=True)
nlp = train(
**kwargs,
output_dir=tmp_path,
cpu=True,
config_meta={
"config_path": "dep_parser_config.yml",
"resolved_config": kwargs,
"unresolved_config": config,
},
)
scorer = GenericScorer(**kwargs["scorer"])
val_data = kwargs["val_data"]
last_scores = scorer(nlp, val_data)
Expand All @@ -120,7 +129,16 @@ def test_ner_qualif_train_same_bert(run_in_test_dir, tmp_path):
config = Config.from_disk("ner_qlf_same_bert_config.yml")
shutil.rmtree(tmp_path, ignore_errors=True)
kwargs = Config.resolve(config["train"], registry=registry, root=config)
nlp = train(**kwargs, output_dir=tmp_path, cpu=True)
nlp = train(
**kwargs,
output_dir=tmp_path,
cpu=True,
config_meta={
"config_path": "dep_parser_config.yml",
"resolved_config": kwargs,
"unresolved_config": config,
},
)
scorer = GenericScorer(**kwargs["scorer"])
val_data = kwargs["val_data"]
last_scores = scorer(nlp, val_data)
Expand All @@ -137,7 +155,16 @@ def test_qualif_train(run_in_test_dir, tmp_path):
config = Config.from_disk("qlf_config.yml")
shutil.rmtree(tmp_path, ignore_errors=True)
kwargs = Config.resolve(config["train"], registry=registry, root=config)
nlp = train(**kwargs, output_dir=tmp_path, cpu=True)
nlp = train(
**kwargs,
output_dir=tmp_path,
cpu=True,
config_meta={
"config_path": "dep_parser_config.yml",
"resolved_config": kwargs,
"unresolved_config": config,
},
)
scorer = GenericScorer(**kwargs["scorer"])
val_data = kwargs["val_data"]
last_scores = scorer(nlp, val_data)
Expand All @@ -158,6 +185,11 @@ def test_dep_parser_train(run_in_test_dir, tmp_path):
logger=CSVLogger.draft(),
output_dir=tmp_path,
cpu=True,
config_meta={
"config_path": "dep_parser_config.yml",
"resolved_config": kwargs,
"unresolved_config": config,
},
)
scorer = GenericScorer(**kwargs["scorer"])
val_data = list(kwargs["val_data"])
Expand Down

0 comments on commit 76ebae2

Please sign in to comment.