Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lint code with Ruff #622

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 8 additions & 19 deletions .github/workflows/check-style.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,6 @@ on:
pull_request:

jobs:
black:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: psf/black@stable
with:
options: "--check --diff"
version: "22.3.0"
isort:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v3
with:
python-version: 3.11
- uses: isort/isort-action@master
with:
isortVersion: "5.10.1"

codespell:
runs-on: ubuntu-latest
steps:
Expand All @@ -33,3 +14,11 @@ jobs:
with:
only_warn: 1
ignore_words_list: ibrary,nd

ruff:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: chartboost/ruff-action@v1
with:
version: 0.7.2
4 changes: 2 additions & 2 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ with the following rules:

* The code must follow [PEP8](https://www.python.org/dev/peps/pep-0008/) unless absolutely necessary. Also, each line
cannot be longer than 119 characters.
* We use [black](https://github.com/psf/black) for code formatting and [isort](https://github.com/PyCQA/isort) for
import sorting. Before submitting a PR, make sure to install and run `black .` and `isort .` in the root of the
* We use [ruff](https://github.com/astral-sh/ruff) as a linter.
Before submitting a PR, make sure to install and run `ruff check` and `ruff format` in the root of the
repository. Also, you may want to check your code for typos by running `codespell --skip=".git"`, though there
might be false positives.
* We highly encourage the use of [typing](https://docs.python.org/3/library/typing.html) where applicable.
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/benchmark_averaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def run_averager(index):
for step in range(num_rounds):
try:
success = averager.step(timeout=round_timeout) is not None
except:
except: # noqa: E722
success = False
with lock_stats:
successful_steps += int(success)
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/benchmark_dht.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def __init__(self, shutdown_peers: list, shutdown_timestamps: list):
async def check_and_kill(self):
async with self.lock:
if (
self.shutdown_timestamps != None
self.shutdown_timestamps is not None
and self.timestamp_iter < len(self.shutdown_timestamps)
and self.current_iter == self.shutdown_timestamps[self.timestamp_iter]
):
Expand Down Expand Up @@ -96,7 +96,7 @@ async def store_and_get_task(

total_gets += len(get_result)
for result in get_result:
if result != None:
if result is not None:
attendees, expiration = result
if len(attendees.keys()) == successful_stores_per_iter:
get_ok = True
Expand Down
14 changes: 8 additions & 6 deletions benchmarks/benchmark_throughput.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def print_device_info(device=None):
# Additional Info when using cuda
if device.type == "cuda":
logger.info(torch.cuda.get_device_name(0))
logger.info(f"Memory Usage:")
logger.info("Memory Usage:")
logger.info(f"Allocated: {round(torch.cuda.memory_allocated(0) / 1024 ** 3, 1)} GB")
logger.info(f"Cached: {round(torch.cuda.memory_cached(0) / 1024 ** 3, 1)} GB")

Expand Down Expand Up @@ -161,11 +161,13 @@ def benchmark_throughput(

sys.stdout.flush()
sys.stderr.flush()
time_between = (
lambda key1, key2: abs(timestamps[key2] - timestamps[key1])
if (key1 in timestamps and key2 in timestamps)
else float("nan")
)

def time_between(key1, key2):
if key1 in timestamps and key2 in timestamps:
return abs(timestamps[key2] - timestamps[key1])
else:
return float("nan")

total_examples = batch_size * num_clients * num_batches_per_client

logger.info("Benchmark finished, status:" + ["Success", "Failure"][benchmarking_failed.is_set()])
Expand Down
30 changes: 16 additions & 14 deletions examples/albert/run_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def get_model(training_args, config, tokenizer):
logger.info(f"Loading model from {latest_checkpoint_dir}")
model = AlbertForPreTraining.from_pretrained(latest_checkpoint_dir)
else:
logger.info(f"Training from scratch")
logger.info("Training from scratch")
model = AlbertForPreTraining(config)
model.resize_token_embeddings(len(tokenizer))

Expand Down Expand Up @@ -235,17 +235,18 @@ def main():

adjusted_target_batch_size = collaboration_args.target_batch_size - collaboration_args.batch_size_lead

# We need to make such a lambda function instead of just an optimizer instance
# We need to make such a function instead of just an optimizer instance
# to make hivemind.Optimizer(..., offload_optimizer=True) work
opt = lambda params: Lamb(
params,
lr=training_args.learning_rate,
betas=(training_args.adam_beta1, training_args.adam_beta2),
eps=training_args.adam_epsilon,
weight_decay=training_args.weight_decay,
clamp_value=training_args.clamp_value,
debias=True,
)
def opt(params):
return Lamb(
params,
lr=training_args.learning_rate,
betas=(training_args.adam_beta1, training_args.adam_beta2),
eps=training_args.adam_epsilon,
weight_decay=training_args.weight_decay,
clamp_value=training_args.clamp_value,
debias=True,
)

no_decay = ["bias", "LayerNorm.weight"]
params = [
Expand All @@ -259,9 +260,10 @@ def main():
},
]

scheduler = lambda opt: get_linear_schedule_with_warmup(
opt, num_warmup_steps=training_args.warmup_steps, num_training_steps=training_args.total_steps
)
def scheduler(opt):
return get_linear_schedule_with_warmup(
opt, num_warmup_steps=training_args.warmup_steps, num_training_steps=training_args.total_steps
)

optimizer = Optimizer(
dht=dht,
Expand Down
3 changes: 2 additions & 1 deletion examples/albert/tokenize_wikitext103.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env python3
""" This script builds a pre-tokenized compressed representation of WikiText-103 using huggingface/datasets """
"""This script builds a pre-tokenized compressed representation of WikiText-103 using huggingface/datasets"""

import random
from functools import partial

Expand Down
2 changes: 1 addition & 1 deletion hivemind/averaging/averager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
""" A background process that averages your tensors with peers """
"""A background process that averages your tensors with peers"""

from __future__ import annotations

Expand Down Expand Up @@ -79,7 +79,7 @@
local tensors for averaging
:param allow_state_sharing: if set to True, other peers can download this peer's state. Can be overwritten
with averager.allow_state_sharing = True / False
:param declare_state_period: re-declare averager as a donor for load_state_from_peers every this many seconds

Check failure on line 82 in hivemind/averaging/averager.py

View workflow job for this annotation

GitHub Actions / codespell

re-declare ==> redeclare
:param allreduce_timeout: spend at most this many seconds for allreduce (after group is formed)
:param next_chunk_timeout: during all-reduce and load_state_from_peers, if peer does not send next data chunk in
this number of seconds, consider it failed and proceed with remaining peers. default: no timeout
Expand Down
4 changes: 2 additions & 2 deletions hivemind/averaging/matchmaking.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
""" A background process that averages your tensors with peers """
"""A background process that averages your tensors with peers"""

from __future__ import annotations

Expand Down Expand Up @@ -237,7 +237,7 @@
except asyncio.TimeoutError:
logger.debug(f"{self} - potential leader {leader} did not respond within {self.request_timeout}")
return None
except (P2PDaemonError, P2PHandlerError, StopAsyncIteration) as e:
except (P2PDaemonError, P2PHandlerError, StopAsyncIteration):
logger.debug(f"{self} - failed to request potential leader {leader}:", exc_info=True)
return None

Expand Down Expand Up @@ -490,7 +490,7 @@
self.update_finished.clear()
continue
else:
raise asyncio.TimeoutError("pop_next_leader was invalidated: re-declared averager in background")

Check failure on line 493 in hivemind/averaging/matchmaking.py

View workflow job for this annotation

GitHub Actions / codespell

re-declared ==> redeclared

del self.leader_queue[maybe_next_leader]
self.past_attempts.add((maybe_next_leader, entry.expiration_time))
Expand Down
1 change: 1 addition & 0 deletions hivemind/averaging/partition.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Auxiliary data structures for AllReduceRunner
"""

import asyncio
from collections import deque
from typing import AsyncIterable, AsyncIterator, Optional, Sequence, Tuple, TypeVar
Expand Down
5 changes: 2 additions & 3 deletions hivemind/compression/adaptive.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@

class AdaptiveCompressionBase(CompressionBase, ABC):
@abstractmethod
def choose_compression(self, info: CompressionInfo) -> CompressionBase:
...
def choose_compression(self, info: CompressionInfo) -> CompressionBase: ...

def estimate_compression_ratio(self, info: CompressionInfo) -> float:
return self.choose_compression(info).estimate_compression_ratio(info)
Expand Down Expand Up @@ -43,7 +42,7 @@ def __init__(
parameter: CompressionBase = None,
gradient: CompressionBase = None,
optimizer: CompressionBase = None,
default: CompressionBase = NoCompression()
default: CompressionBase = NoCompression(),
):
self.role_compressions = {
TensorRole.ACTIVATION: activation or default,
Expand Down
4 changes: 2 additions & 2 deletions hivemind/dht/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,13 @@ class DHTNode:

"""

# fmt:off
# fmt: off
node_id: DHTID; is_alive: bool; peer_id: PeerID; num_replicas: int; num_workers: int; protocol: DHTProtocol
chunk_size: int; refresh_timeout: float; cache_locally: bool; cache_nearest: int; cache_refresh_before_expiry: float
cache_on_store: bool; reuse_get_requests: bool; pending_get_requests: DefaultDict[DHTID, SortedSet[_SearchState]]
cache_refresh_task: Optional[asyncio.Task]; cache_refresh_evt: asyncio.Event; cache_refresh_queue: CacheRefreshQueue
blacklist: Blacklist
# fmt:on
# fmt: on

@classmethod
async def create(
Expand Down
13 changes: 7 additions & 6 deletions hivemind/dht/protocol.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
""" RPC protocol that provides nodes a way to communicate with each other """
"""RPC protocol that provides nodes a way to communicate with each other"""

from __future__ import annotations

import asyncio
Expand All @@ -22,12 +23,12 @@


class DHTProtocol(ServicerBase):
# fmt:off
# fmt: off
p2p: P2P
node_id: DHTID; bucket_size: int; num_replicas: int; wait_timeout: float; node_info: dht_pb2.NodeInfo
storage: DHTLocalStorage; cache: DHTLocalStorage; routing_table: RoutingTable; rpc_semaphore: asyncio.Semaphore
record_validator: Optional[RecordValidatorBase]
# fmt:on
# fmt: on

serializer = MSGPackSerializer # used to pack/unpack DHT Values for transfer over network
RESERVED_SUBKEYS = IS_REGULAR_VALUE, IS_DICTIONARY = serializer.dumps(None), b""
Expand Down Expand Up @@ -109,7 +110,7 @@ async def call_ping(self, peer: PeerID, validate: bool = False, strict: bool = T
time_requested = get_dht_time()
response = await self.get_stub(peer).rpc_ping(ping_request, timeout=self.wait_timeout)
time_responded = get_dht_time()
except Exception as e:
except Exception:
logger.debug(f"DHTProtocol failed to ping {peer}", exc_info=True)
response = None
responded = bool(response and response.peer and response.peer.node_id)
Expand Down Expand Up @@ -224,7 +225,7 @@ async def call_store(
peer_id = DHTID.from_bytes(response.peer.node_id)
asyncio.create_task(self.update_routing_table(peer_id, peer, responded=True))
return response.store_ok
except Exception as e:
except Exception:
logger.debug(f"DHTProtocol failed to store at {peer}", exc_info=True)
asyncio.create_task(self.update_routing_table(self.routing_table.get(peer_id=peer), peer, responded=False))
return None
Expand Down Expand Up @@ -325,7 +326,7 @@ async def call_find(
logger.error(f"Unknown result type: {result.type}")

return output
except Exception as e:
except Exception:
logger.debug(f"DHTProtocol failed to find at {peer}", exc_info=True)
asyncio.create_task(self.update_routing_table(self.routing_table.get(peer_id=peer), peer, responded=False))

Expand Down
3 changes: 2 additions & 1 deletion hivemind/dht/routing.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
""" Utility data structures to represent DHT nodes (peers), data keys, and routing tables. """
"""Utility data structures to represent DHT nodes (peers), data keys, and routing tables."""

from __future__ import annotations

import hashlib
Expand Down
3 changes: 2 additions & 1 deletion hivemind/dht/traverse.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
""" Utility functions for crawling DHT nodes, used to get and store keys in a DHT """
"""Utility functions for crawling DHT nodes, used to get and store keys in a DHT"""

import asyncio
import heapq
from collections import Counter
Expand Down
4 changes: 2 additions & 2 deletions hivemind/hivemind_cli/run_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@


def main():
# fmt:off
# fmt: off
parser = configargparse.ArgParser(default_config_files=["config.yml"])
parser.add('-c', '--config', required=False, is_config_file=True, help='config file path')

Expand Down Expand Up @@ -85,7 +85,7 @@ def main():
help='Path of a file with custom nn.modules, wrapped into special decorator')
parser.add_argument('--identity_path', type=str, required=False, help='Path to identity file to be used in P2P')

# fmt:on
# fmt: on
args = vars(parser.parse_args())
args.pop("config", None)
optimizer = args.pop("optimizer")
Expand Down
2 changes: 1 addition & 1 deletion hivemind/moe/client/beam_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ async def _find_best_experts(
)
beam = [(score, prefix, successors[prefix]) for score, prefix in best_active_pairs if successors[prefix]]
if not beam:
logger.warning(f"Beam search had to terminate prematurely because of empty beam (dim 0)")
logger.warning("Beam search had to terminate prematurely because of empty beam (dim 0)")
break

# add best experts from the final beam
Expand Down
6 changes: 3 additions & 3 deletions hivemind/moe/client/expert.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
DUMMY = torch.empty(0, requires_grad=True) # dummy tensor that triggers autograd in RemoteExpert


def get_server_stub(p2p: P2P, server_peer_id: PeerID) -> "ConnectionHandlerStub":
def get_server_stub(p2p: P2P, server_peer_id: PeerID) -> "ConnectionHandlerStub": # noqa: F821
"""Create an RPC stub that can send requests to any expert on the specified remote server"""
return moe.server.connection_handler.ConnectionHandler.get_stub(p2p, server_peer_id)

Expand Down Expand Up @@ -67,7 +67,7 @@ def forward(self, *args, **kwargs):
forward_inputs = (args, kwargs)

if not nested_compare(forward_inputs, self.info["forward_schema"]):
raise TypeError(f"Inputs do not match expert input schema. Did you pass the right number of parameters?")
raise TypeError("Inputs do not match expert input schema. Did you pass the right number of parameters?")

flat_outputs = _RemoteModuleCall.apply(DUMMY, self.uid, self.stub, self.info, *nested_flatten(forward_inputs))

Expand Down Expand Up @@ -199,7 +199,7 @@ def forward(
ctx,
dummy: torch.Tensor,
uid: str,
stub: "ConnectionHandlerStub",
stub: "ConnectionHandlerStub", # noqa: F821
info: Dict[str, Any],
*inputs: torch.Tensor,
) -> Tuple[torch.Tensor, ...]:
Expand Down
2 changes: 1 addition & 1 deletion hivemind/moe/client/remote_expert_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
from concurrent.futures import Future
from threading import Thread
from typing import Awaitable, Optional
from typing import Awaitable

from hivemind.utils import switch_to_uvloop

Expand Down
2 changes: 1 addition & 1 deletion hivemind/moe/client/switch_moe.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def _compute_batch_utilization(self, batch_experts, expert_mask):
]

# out of chosen_experts, select those for which expert_mask is True
for (sample_idx, expert_idx) in expert_mask.nonzero().cpu().numpy():
for sample_idx, expert_idx in expert_mask.nonzero().cpu().numpy():
expert = batch_experts[sample_idx][expert_idx]
expert_indices = expert.uid[len(self.beam_search.uid_prefix) :]
expert_indices = list(map(int, expert_indices.split(UID_DELIMITER)))
Expand Down
15 changes: 9 additions & 6 deletions hivemind/moe/server/layers/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ def gelu_fast(x):
return 0.5 * x * (1.0 + torch.tanh(x * 0.7978845608 * (1.0 + 0.044715 * x * x)))


ffn_sample_input = lambda batch_size, hid_dim: torch.empty((batch_size, hid_dim))
def ffn_sample_input(batch_size, hid_dim):
return torch.empty((batch_size, hid_dim))


@register_expert_class("ffn", ffn_sample_input)
Expand Down Expand Up @@ -66,10 +67,11 @@ def forward(self, src, src_key_padding_mask=None):
return src


transformer_sample_input = lambda batch_size, hid_dim: (
torch.empty((batch_size, 128, hid_dim)),
torch.empty((batch_size, 128), dtype=torch.bool),
)
def transformer_sample_input(batch_size, hid_dim):
return (
torch.empty((batch_size, 128, hid_dim)),
torch.empty((batch_size, 128), dtype=torch.bool),
)


@register_expert_class("transformer", transformer_sample_input)
Expand All @@ -78,7 +80,8 @@ def __init__(self, hid_dim):
super().__init__(hid_dim, dim_feedforward=4 * hid_dim, nhead=16)


nop_sample_input = lambda batch_size, hid_dim: torch.empty((batch_size, hid_dim))
def nop_sample_input(batch_size, hid_dim):
return torch.empty((batch_size, hid_dim))


@register_expert_class("nop", nop_sample_input)
Expand Down
Loading
Loading