- 
                Notifications
    You must be signed in to change notification settings 
- Fork 37
Metric Logging updates 5/N - enable streaming #363
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Metric Logging updates 5/N - enable streaming #363
Conversation
…estamp_logging_diff2
…estamp_logging_diff3
…estamp_logging_diff3
…estamp_logging_diff3
…estamp_logging_diff3
…estamp_logging_diff4
| Codecov Report❌ Patch coverage is  
 Additional details and impacted files@@            Coverage Diff             @@
##             main     #363      +/-   ##
==========================================
- Coverage   73.68%   65.04%   -8.65%     
==========================================
  Files          81       82       +1     
  Lines        7729     7901     +172     
==========================================
- Hits         5695     5139     -556     
- Misses       2034     2762     +728     ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
 | 
| logging_mode: global_reduce # global_reduce, per_rank_reduce, per_rank_no_reduce | ||
| per_rank_share_run: False | ||
| console: | ||
| reduce_across_ranks: True | ||
| logging_mode: global_reduce | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to duplicate logging_mode across different configs like this? feels like clunky UX to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is per backend. You could have scuba logging on streamining mode, console logging global_reduce and wandb logging per rank. If you have a single backend, you define it only once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline. I would like to consider some refactors here, but acknowledge that it's not really in the scope of this PR
| A logger is represented by a backend, i.e. wandb backend. If reduce_across_ranks=False, | ||
| the backend is instantiated per-rank, in the MetricCollector, otherwise it is instantiated once globally, | ||
| in the GlobalLoggingActor. | ||
| Supports multiple logging backends, each with different logging modes. | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(I know this is the design from prior to this PR but) just wondering why we use a singleton across multiple logger backends. In my mind it's more idiomatic to instantiate different classes for different logger backends, and also that usage pattern shouldn't be super common anyways. (Personally I would prefer e.g. a simple console logger + generalized logging backend for wandb etc as two separate entities.) Basically I worry that we are putting extra onus on people who just wanna log to wandb with all these nested dict configs etc for something that is a bit of a niche use case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my mind it's more idiomatic to instantiate different classes for different logger backends
We do it. But the logic for backend in backends: backend.log(metric) has to live somewhere. MetricCollector is this wrapper with 4 methods:
class MetricCollector:
       def init_backends():
       def push()
       def flush()
       def shutdown()
each one of them is sort of just doing for backend in backends: backend.do_something()
| share_run_id (bool, default False): Only used if reduce_across_ranks=False. | ||
| True -> shared run across ranks; False -> separate runs per rank. | ||
| logging_mode (LoggingMode): Determines logging behavior | ||
| per_rank_share_run (bool, default False): For per-rank modes, whether to share run ID across ranks. | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know we chatted about it a bit already, but personally I still don't understand this one. It seems to me like we should always use a single run; if e.g. there are name collisions of metrics within the run we should sort that out ourselves.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"It seems to me like we should always use a single run", then you cannot have this:
 
Imagine that multiple run == multiple json files. e.g. rank_0.json, rank_1.json, etc.
hopefully the readme can clarify this: https://github.com/meta-pytorch/forge/pull/380/files
| # Per-rank modes based on share_run_id bool | ||
| elif role == BackendRole.GLOBAL and self.share_run_id: | ||
| # Per-rank modes based on per_rank_share_run bool | ||
| elif role == BackendRole.GLOBAL and self.per_rank_share_run: | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A more general question about extensibility: say I want to use my own logger not provided by forge. Do I now have to implement my own custom logic to handle these different cases? (Basically I want to ensure we are not introducing unnecessary friction for users who want to customize beyond what we've directly provided)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you have your own backend, you have to implement a LoggerBackend. The logging_mode defines where things get called.
If "GLOBAL_REDUCE", we call log_batch(reduced_metrics) once, from the controller
if "PER_RANK_REDUCE", we call log_batch(per_rank_reduced_metrics) once per rank.
if "PER_RANK_NO_REDUCE", we call log_stream(metric) on every rank as soon as the record_metric is called.
In the init the user can define how to initialize it, e.g. create a file, initialize a run, etc
class LoggerBackend(ABC):
    @abstractmethod
    async def init(...) -> None:
        pass
    @abstractmethod
    async def log_batch(
        self, metrics: list[Metric], global_step: int, *args, **kwargs
    ) -> None:
        pass
	
    @abstractmethod
    def log_stream(self, metric: Metric, global_step: int, *args, **kwargs) -> None:
        pass
        
          
                src/forge/observability/metrics.py
              
                Outdated
          
        
      | return | ||
|  | ||
| # Convert metrics to WandB log format | ||
| log_data = {"step": global_step} | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did we change global_step -> step here?
| def log_stream(self, metric: Metric, global_step: int, *args, **kwargs) -> None: | ||
| """Stream single metric to backend immediately. | ||
| NOTE: This method is called synchronously. | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the rationale here? In my mind the difference between this and log_batch behavior is a bit unintuitive
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we do record_metric(key, value, reduce) -> MetricCollector.push
if "per_rank_no_reduce", we immediately call MetricCollector.push -> backend.log_stream(single_metric)
else, we just accumulate the metric until flush.
GlobalLoggingActor.flush -> MetricCollector.flush -> backend.log_batch(reduced_metrics)
I am not 100% convinced that they should be different methods, but i also was not comfortable merging them.
| Weights & Biases logging backend for distributed training. | ||
| Weights & Biases logging backend. | ||
| For logging mode details, see `forge.observability.metrics.LoggingMode` documentation. | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would also be helpful to provide some images in the documentation demonstrating the resulting figures from each of the different modes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
documentation as in readme, or in this docstring?
| if metadata_per_primary_backend: | ||
| primary_metadata = metadata_per_primary_backend.get(backend_name, {}) | ||
| # Skip local instantiation. Backend will be instantiated in GlobalLoggingActor. | ||
| if mode == LoggingMode.GLOBAL_REDUCE: | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kinda similar to my other comment about supporting multiple logging backends.. I feel like the MetricCollector class is doing a lot (imo too much). I know I wasn't around to give detailed review of the first few PRs, but I do wonder whether all these different logging modes should really all be handled in the same class. Currently the logic is a bit hard for me to understand
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
feel like the MetricCollector class is doing a lot (imo too much)
MetricCollector is this:
class MetricCollector:
       def init_backends():
       def push()
       def flush()
       def shutdown()
Are you saying that the methods are doing too much or we have too many methods?
Currently the logic is a bit hard for me to understand
Thats fair. Maybe i can try to make it more obvious, or maybe its a documentation issue for a global view, which i address in https://github.com/meta-pytorch/forge/pull/380/files
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed a bit offline, thanks for addressing in #380 as well
…estamp_logging_diff4
| return | ||
|  | ||
| for backend_name, backend_config in config.items(): | ||
| self.config[backend_name] = self._validate_backend_config( | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Small thing, but i don't love _validate_backend_config as is. We are doing some sanity checks and potentially updating the config, but this requires understanding logic in one more function. Imo better to just validate and fail (rather than modify) if the arguments are no good, then the config does not get updated in a separate function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your patience on the review!
Co-authored-by: Felipe Mello <[email protected]>
Before:
After:
backend.log(metric)as soon as we get it, without any reduction.Before, MetricLogger.push(metric) would just collect the metric. Now, it also logs.
Notice how x-axis is timestamp:

async def log_batchanddef log_stream. It not totally clear to me if both should be async/sync or if i should try to unify them.