Skip to content

support multiple outer optims for diloco #230

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 45 additions & 9 deletions torchft/local_sgd.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,14 @@ def __init__(
self.should_quantize = should_quantize

self._grads: Dict[str, torch.Tensor] = {}

# Used to save global parameters so that they can be restored in case
# commit fails
self.original_parameters: Dict[str, torch.Tensor] = {}

# Used to mix the local and global parameters
self._local_parameters: Dict[str, torch.Tensor] = {}

for name, p in self._model_fragment.named_parameters():
if isinstance(p, DTensor):
p = extract_local_tensor(p.data)
Expand All @@ -237,6 +243,14 @@ def save_parameters(self) -> None:
param_to_local = extract_local_tensor(p.data)
self.original_parameters[name].copy_(param_to_local, non_blocking=True)

def _save_local_parameters(self) -> None:
"""
Saves a copy of the model's parameters.
"""
with torch.no_grad():
for name, p in self._model_fragment.named_parameters():
self._local_parameters[name] = extract_local_tensor(p.data)

@torch.profiler.record_function("torchft::local_sgd::restore_parameters")
def restore_parameters(self) -> None:
with torch.no_grad():
Expand Down Expand Up @@ -293,6 +307,19 @@ def _set_grads(self) -> None:
# No longer needed
del self._grads[name]

def _clear_local_parameters(self) -> None:
"""
Clears the saved copy of the model's parameters
"""
self._local_parameters = {}

def _merge_parameters(self) -> None:
"""
Merges the local and global parameters.
"""
for name, p in self._model_fragment.named_parameters():
p.data.lerp(self._local_parameters[name], 1 - self._fragment_update_alpha)

@torch.profiler.record_function("torchft::local_sgd::wait")
def wait(self) -> None:
"""
Expand Down Expand Up @@ -382,6 +409,8 @@ def perform_sync(self) -> bool:

self.wait()

# save the parameters so they can be used for merging
self._save_local_parameters()
# Restore the parameters back to the previous state
self.restore_parameters()

Expand All @@ -404,8 +433,12 @@ def perform_sync(self) -> bool:
self._set_grads()
self._outer_optimizer.step()
self.save_parameters()
self._merge_parameters()
self._outer_optimizer.zero_grad()

# free up memory
self._clear_local_parameters()

return should_commit

def _average_grads(self) -> None:
Expand Down Expand Up @@ -515,7 +548,8 @@ def __init__(
manager: Manager,
model_fragments: List[nn.Module],
inner_optimizer: optim.Optimizer,
outer_optimizer: optim.Optimizer,
# TODO: this is for backward compatibility
outer_optimizer: optim.Optimizer | list[optim.Optimizer],
sync_every: int,
backup_device: Optional[torch.device] = None,
pin_memory: bool = True,
Expand All @@ -540,6 +574,11 @@ def __init__(
fragment_update_alpha: Determines how to mix the local and global optimized parameters
"""

if isinstance(outer_optimizer, list):
assert len(outer_optimizer) == len(
model_fragments
), "The number of outer optimizers must match the number of model fragments"

if manager._use_async_quorum:
raise ValueError(
"Using DiLoCo require synchronous quorum to be enabled. "
Expand All @@ -557,12 +596,6 @@ def __init__(
if fragment_update_alpha < 0 or fragment_update_alpha > 1:
raise ValueError("fragment_update_alpha must be between 0 and 1")

Copy link
Preview

Copilot AI Jul 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fragment_update_alpha value is validated but never stored on the instance. Add self._fragment_update_alpha = fragment_update_alpha after the validation so _merge_parameters can access it.

Suggested change
self._fragment_update_alpha = fragment_update_alpha

Copilot uses AI. Check for mistakes.

# TODO: Support `fragment_update_alpha`
if fragment_update_alpha != 0.0:
raise ValueError(
"Merging local parameters with global parameters is not supported yet"
)

super().__init__()
self._manager = manager

Expand Down Expand Up @@ -594,8 +627,11 @@ def __init__(
model_fragment,
math.floor((sync_every / len(model_fragments)) * (i + 1)),
inner_optimizer,
# TODO: Support different outer optimizers for each fragment
outer_optimizer,
(
outer_optimizer[i]
if isinstance(outer_optimizer, list)
else outer_optimizer
),
sync_every,
backup_device,
pin_memory,
Expand Down
11 changes: 7 additions & 4 deletions torchft/local_sgd_integ_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,18 +589,19 @@ def test_streaming_diloco_recovery(self, use_cuda: bool) -> None:

self.assertEqual(event_injectors[1].count[EventInjectorEvent.Failure], 1)

CONFIG: list[tuple[bool, int, int]] = [
(use_cuda, n_fragments, fragment_sync_delay)
CONFIG: list[tuple[bool, int, int, float]] = [
(use_cuda, n_fragments, fragment_sync_delay, alpha)
for use_cuda in [False]
for n_fragments in [1, 2]
for fragment_sync_delay in [0, 1]
for alpha in [0.0, 0.5, 1.0]
]

# pyre-fixme[56]: Pyre was not able to infer the type of argument
@skipIf(sys.platform == "darwin", "not reliable on mac")
@parameterized.expand(CONFIG)
def test_streaming_diloco_upscale(
self, use_cuda: bool, n_fragments: int, fragment_sync_delay: int
self, use_cuda: bool, n_fragments: int, fragment_sync_delay: int, alpha: float
) -> None:
# Skip the test if use_cuda is True and there are not enough GPUs
if use_cuda and torch.cuda.device_count() < 2:
Expand Down Expand Up @@ -642,6 +643,7 @@ def test_streaming_diloco_upscale(
"diloco_args": {
"fragment_sync_delay": fragment_sync_delay,
"sync_every": 4,
"fragment_update_alpha": alpha,
},
},
)
Expand Down Expand Up @@ -681,7 +683,7 @@ def test_streaming_diloco_upscale(
@skipIf(sys.platform == "darwin", "not reliable on mac")
@parameterized.expand(CONFIG)
def test_streaming_diloco_commit_failure(
self, use_cuda: bool, n_fragments: int, fragment_sync_delay: int
self, use_cuda: bool, n_fragments: int, fragment_sync_delay: int, alpha: float
) -> None:
# Skip the test if use_cuda is True and there are not enough GPUs
if use_cuda and torch.cuda.device_count() < 2:
Expand Down Expand Up @@ -719,6 +721,7 @@ def test_streaming_diloco_commit_failure(
"diloco_args": {
"fragment_sync_delay": fragment_sync_delay,
"sync_every": 4,
"fragment_update_alpha": alpha,
},
},
)
Expand Down
Loading