Skip to content

Commit

Permalink
Include mountpoints in the macOS payload
Browse files Browse the repository at this point in the history
Add missing `model_name` when stats are unretreivable
Handle failing size conversion when byte size is 0
  • Loading branch information
dormant-user committed Jan 3, 2025
1 parent 95c3d06 commit 6d64615
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 36 deletions.
2 changes: 1 addition & 1 deletion pyudisk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from .main import EnvConfig, generate_report, monitor, smart_metrics # noqa: F401

version = "0.3.1"
version = "0.3.2a0"


@click.command()
Expand Down
90 changes: 57 additions & 33 deletions pyudisk/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import subprocess
from collections.abc import Generator
from datetime import datetime
from typing import Any, Dict, List, NoReturn, Tuple
from typing import Any, Dict, List, NoReturn

import psutil
import pyarchitecture
Expand All @@ -16,7 +16,7 @@
from .models import SystemPartitions, darwin, linux
from .notification import notification_service, send_report
from .support import humanize_usage_metrics, load_dump, load_partitions
from .util import standard
from .util import size_converter, standard


def get_partitions(env: EnvConfig) -> Generator[sdiskpart]:
Expand Down Expand Up @@ -183,67 +183,87 @@ def parse_block_devices(
return block_devices


def get_disk_data_macos(env: EnvConfig) -> List[Tuple[str, List[str]]]:
def get_disk_data_macos(env: EnvConfig) -> Generator[Dict[str, str | List[str]]]:
"""Get disk information for macOS.
Args:
env: Environment variables configuration.
Returns:
List[Tuple[str, List[str]]]:
Returns a list of tuples containing device_id and mountpoints.
Yields:
Dict[str, str | List[str]]:
Yields a dictionary of device information as key-value pairs.
"""
try:
# 1: Attempt to extract physical disks from PyArchitecture
disk_data = [
(
(disk.get("node") or disk.get("device_id") or raise_pyarch_error(disk)),
disk.get("mountpoints"),
)
for disk in pyarchitecture.disks.get_all_disks(env.disk_lib)
]
assert disk_data, "Failed to load physical disk data"
if all_disks := pyarchitecture.disks.get_all_disks(env.disk_lib):
yield from all_disks
else:
raise ValueError("Failed to load physical disk data")
except Exception as error:
# The accuracy of methods 2, and 3 are questionable - it may vary from device to device
# But almost all macOS machines should be caught by the 1st method
LOGGER.error(error)
# 2: Assume disks with non-zero write count as physical disks
# disk_io_counters fn will fetch disks rather than partitions (similar to output from 'diskutil list')
if physical_disks := [
f"/dev/{disk_id}"
for disk_id, disk_io in psutil.disk_io_counters(perdisk=True).items()
if min(disk_io.write_bytes, disk_io.write_count, disk_io.write_time) != 0
]:
if not (
physical_disks := [
disk_id
for disk_id, disk_io in psutil.disk_io_counters(perdisk=True).items()
if min(disk_io.write_bytes, disk_io.write_count, disk_io.write_time)
!= 0
]
):
# If there is only one physical disk, then set the mountpoint to root (/)
if len(physical_disks) == 1:
disk_data = [(physical_disks[0], ["/"])]
yield dict(
size=size_converter(psutil.disk_usage("/").total),
device_id=physical_disks[0],
node=f"/dev/{physical_disks[0]}",
mountpoints=["/"],
)
else:
# If there are multiple physical disks, then set the mountpoint to disk path itself
disk_data = [(disk, [disk]) for disk in physical_disks]
for physical_disk in physical_disks:
yield dict(
size=size_converter(psutil.disk_usage("/").total),
device_id=physical_disk,
node=f"/dev/{physical_disk}",
mountpoints=[f"/dev/{physical_disk}"],
)
else:
# 3. If both the methods fail, then fallback to disk partitions
LOGGER.warning(
"No physical disks found through IO counters, using all partitions instead"
"No physical disks found through IO counters, using partitions instead"
)
disk_data = [
(partition.device, [partition.mountpoint])
for partition in get_partitions(env)
]
return disk_data
for partition in get_partitions(env):
if (
partition.mountpoint.startswith("/System/Volumes")
or "Recovery" in partition.mountpoint
):
continue
yield dict(
size=size_converter(psutil.disk_usage(partition.mountpoint).total),
device_id=partition.device.lstrip("/dev/"),
node=partition.device,
mountpoints=[partition.device],
)


def get_smart_metrics_macos(
smart_lib: FilePath, device_id: str, mountpoints: List[str]
smart_lib: FilePath, device_info: Dict[str, str | List[str]]
) -> dict:
"""Gathers disk information using the 'smartctl' command on macOS.
Args:
smart_lib: Library path to 'smartctl' command.
device_id: Device ID for the physical drive.
mountpoints: Mountpoints for the partitions.
device_info: Device information retrieved.
Returns:
Disk:
Returns the Disk object with the gathered metrics.
"""
device_id = device_info["node"]
mountpoints = device_info["mountpoints"]
try:
result = subprocess.run(
[smart_lib, "-a", device_id, "--json"],
Expand All @@ -266,13 +286,17 @@ def get_smart_metrics_macos(
LOGGER.error("[%d]: %s", error.returncode, result)
output = {}
try:
output["device"] = output.get("device", darwin.Device(name=device_id, info_name=device_id))
output["device"] = output.get(
"device", darwin.Device(name=device_id, info_name=device_id).model_dump()
)
output["model_name"] = output.get("model_name", device_info.get("name"))
if len(mountpoints) == 1:
output["usage"] = humanize_usage_metrics(psutil.disk_usage(mountpoints[0]))
else:
# This will occur only when the disk retrieval falls back to partitions or the disk path itself
# In both cases, the mountpoint can be assumed as root (/)
output["usage"] = humanize_usage_metrics(psutil.disk_usage("/"))
output["mountpoints"] = mountpoints
return output
except ValidationError as error:
LOGGER.error(error.errors())
Expand All @@ -294,8 +318,8 @@ def smart_metrics(env: EnvConfig) -> Generator[linux.Disk | darwin.Disk]:
Yields the Disk object from the generated Dataframe.
"""
if OPERATING_SYSTEM == OperationSystem.darwin:
for (device_id, mountpoint) in get_disk_data_macos(env):
if metrics := get_smart_metrics_macos(env.smart_lib, device_id, mountpoint):
for device_info in get_disk_data_macos(env):
if metrics := get_smart_metrics_macos(env.smart_lib, device_info):
try:
yield darwin.Disk(**metrics)
except ValidationError as error:
Expand Down
1 change: 1 addition & 0 deletions pyudisk/models/darwin.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ class Disk(BaseModel):
ata_smart_self_test_log: Optional[ATAStandard] = None
ata_smart_selective_self_test_log: Optional[ATASmartSelectiveSelfTestLog] = None
usage: Optional[Usage] = None
mountpoints: List[str] = None

class Config:
"""Configuration for the model."""
Expand Down
4 changes: 2 additions & 2 deletions pyudisk/models/linux.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
from typing import List, Optional

from pydantic import BaseModel, DirectoryPath, field_validator
from pydantic import BaseModel, field_validator

from . import Usage

Expand Down Expand Up @@ -55,7 +55,7 @@ class Partition(BaseModel):
IdUsage: Optional[str] = None
ReadOnly: Optional[bool] = None
Size: Optional[int] = None
MountPoints: Optional[DirectoryPath] = None
MountPoints: Optional[List[str]] = None
Symlinks: Optional[List[str]] = None


Expand Down
2 changes: 2 additions & 0 deletions pyudisk/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ def size_converter(byte_size: int | float) -> str:
str:
Converted understandable size.
"""
if byte_size == 0:
return "0 B"
size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
index = int(math.floor(math.log(byte_size, 1024)))
return f"{format_nos(round(byte_size / pow(1024, index), 2))} {size_name[index]}"
Expand Down

0 comments on commit 6d64615

Please sign in to comment.