From 1bc300dcba14c54dc9a81d4a33c1f4bada30c085 Mon Sep 17 00:00:00 2001 From: chicco-carone Date: Thu, 23 May 2024 11:59:49 +0200 Subject: [PATCH 1/2] Improve Docstrings --- snapcast/control/client.py | 245 +++++++--- snapcast/control/group.py | 156 +++++-- snapcast/control/protocol.py | 71 ++- snapcast/control/server.py | 850 +++++++++++++++++++++++++++-------- snapcast/control/stream.py | 156 +++++-- 5 files changed, 1151 insertions(+), 327 deletions(-) diff --git a/snapcast/control/client.py b/snapcast/control/client.py index af14496..b9ef6b5 100644 --- a/snapcast/control/client.py +++ b/snapcast/control/client.py @@ -1,16 +1,30 @@ """Snapcast client.""" -import logging +import logging _LOGGER = logging.getLogger(__name__) # pylint: disable=too-many-public-methods -class Snapclient(): - """Represents a snapclient.""" +class Snapclient: + """Represents a snapclient. + + Attributes: + _server (str): The server address. + _snapshot (dict): The snapshot of the client's state. + _last_seen: The last seen timestamp of the client. + _callback_func: The callback function for the client. + + """ def __init__(self, server, data): - """Initialize.""" + """Initialize the Client object. + + Args: + server (str): The server address. + data (dict): The initial data for the client. + + """ self._server = server self._snapshot = None self._last_seen = None @@ -18,17 +32,30 @@ def __init__(self, server, data): self.update(data) def update(self, data): - """Update client.""" + """Update client. + + Args: + data: The updated client data. + + """ self._client = data @property def identifier(self): - """Get identifier.""" - return self._client.get('id') + """Get identifier. + + Returns: + The identifier of the client. + """ + return self._client.get("id") @property def group(self): - """Get group.""" + """Get the group that this client belongs to. + + Returns: + The group object that this client belongs to, or None if the client is not in any group. + """ for group in self._server.groups: if self.identifier in group.clients: return group @@ -36,132 +63,212 @@ def group(self): @property def friendly_name(self): - """Get friendly name.""" - if len(self._client.get('config').get('name')): - return self._client.get('config').get('name') - return self._client.get('host').get('name') + """Get the friendly name of the client. + + Returns: + str: The friendly name of the client. + + """ + if len(self._client.get("config").get("name")): + return self._client.get("config").get("name") + return self._client.get("host").get("name") @property def version(self): - """Version.""" - return self._client.get('snapclient').get('version') + """Version. + + Returns: + str: The version of the snapclient. + """ + return self._client.get("snapclient").get("version") @property def connected(self): - """Connected or not.""" - return self._client.get('connected') + """Get the current connection status of the client. + + Returns: + bool: True if the client is connected, False otherwise. + """ + return self._client.get("connected") @property def name(self): - """Name.""" - return self._client.get('config').get('name') + """Get the name of the client. + + Returns: + str: The name of the client. + """ + return self._client.get("config").get("name") async def set_name(self, name): - """Set a client name.""" + """Set a client name. + + Args: + name (str): The name to set for the client. + + """ if not name: - name = '' - self._client['config']['name'] = name + name = "" + self._client["config"]["name"] = name await self._server.client_name(self.identifier, name) @property def latency(self): - """Latency.""" - return self._client.get('config').get('latency') + """Get the client latency. + + Returns: + int: The latency of the client. + """ + return self._client.get("config").get("latency") async def set_latency(self, latency): - """Set client latency.""" - self._client['config']['latency'] = latency + """Set client latency. + + Args: + latency (int): The latency to set for the client. + """ + self._client["config"]["latency"] = latency await self._server.client_latency(self.identifier, latency) @property def muted(self): - """Muted or not.""" - return self._client.get('config').get('volume').get('muted') + """Get the mute status of the client. + + Returns: + bool: True if the client is muted, False otherwise. + """ + return self._client.get("config").get("volume").get("muted") async def set_muted(self, status): - """Set client mute status.""" - new_volume = self._client['config']['volume'] - new_volume['muted'] = status - self._client['config']['volume']['muted'] = status + """Set client mute status. + + Args: + status (bool): The mute status to set for the client. + """ + new_volume = self._client["config"]["volume"] + new_volume["muted"] = status + self._client["config"]["volume"]["muted"] = status await self._server.client_volume(self.identifier, new_volume) - _LOGGER.debug('set muted to %s on %s', status, self.friendly_name) + _LOGGER.debug("set muted to %s on %s", status, self.friendly_name) @property def volume(self): - """Volume percent.""" - return self._client.get('config').get('volume').get('percent') + """Get the volume percent. + + Returns: + int: The volume percent of the client. + """ + return self._client.get("config").get("volume").get("percent") async def set_volume(self, percent, update_group=True): - """Set client volume percent.""" + """Set client volume percent. + + Args: + percent (int): The volume percent to set for the client. + update_group (bool): Whether to update the group volume. Defaults to True. + + Raises: + ValueError: If volume percent is out of range. + """ if percent not in range(0, 101): - raise ValueError('Volume percent out of range') - new_volume = self._client['config']['volume'] - new_volume['percent'] = percent - self._client['config']['volume']['percent'] = percent + raise ValueError("Volume percent out of range") + new_volume = self._client["config"]["volume"] + new_volume["percent"] = percent + self._client["config"]["volume"]["percent"] = percent await self._server.client_volume(self.identifier, new_volume) if update_group: self._server.group(self.group.identifier).callback() - _LOGGER.debug('set volume to %s on %s', percent, self.friendly_name) + _LOGGER.debug("set volume to %s on %s", percent, self.friendly_name) def groups_available(self): - """Get available group objects.""" + """Get available group objects. + + Returns: + list: The list of available group objects. + """ return list(self._server.groups) def update_volume(self, data): - """Update volume.""" - self._client['config']['volume'] = data['volume'] - _LOGGER.debug('updated volume on %s', self.friendly_name) + """Update volume. + + Args: + data (dict): The updated volume data. + """ + self._client["config"]["volume"] = data["volume"] + _LOGGER.debug("updated volume on %s", self.friendly_name) self._server.group(self.group.identifier).callback() self.callback() def update_name(self, data): - """Update name.""" - self._client['config']['name'] = data['name'] - _LOGGER.debug('updated name on %s', self.friendly_name) + """Update name. + + Args: + data (dict): The updated name data. + """ + self._client["config"]["name"] = data["name"] + _LOGGER.debug("updated name on %s", self.friendly_name) self.callback() def update_latency(self, data): - """Update latency.""" - self._client['config']['latency'] = data['latency'] - _LOGGER.debug('updated latency on %s', self.friendly_name) + """Update latency. + + Args: + data (dict): The updated latency data. + """ + self._client["config"]["latency"] = data["latency"] + _LOGGER.debug("updated latency on %s", self.friendly_name) self.callback() def update_connected(self, status): - """Update connected.""" - self._client['connected'] = status - _LOGGER.debug('updated connected status to %s on %s', status, self.friendly_name) + """Update connected status. + + Args: + status (bool): The new connected status. + """ + self._client["connected"] = status + _LOGGER.debug( + "updated connected status to %s on %s", status, self.friendly_name + ) self.callback() def snapshot(self): - """Snapshot current state.""" + """Take a snapshot of the current state.""" self._snapshot = { - 'name': self.name, - 'volume': self.volume, - 'muted': self.muted, - 'latency': self.latency + "name": self.name, + "volume": self.volume, + "muted": self.muted, + "latency": self.latency, } - _LOGGER.debug('took snapshot of current state of %s', self.friendly_name) + _LOGGER.debug("took snapshot of current state of %s", self.friendly_name) async def restore(self): - """Restore snapshotted state.""" + """Restore the snapshotted state.""" if not self._snapshot: return - await self.set_name(self._snapshot['name']) - await self.set_volume(self._snapshot['volume']) - await self.set_muted(self._snapshot['muted']) - await self.set_latency(self._snapshot['latency']) + await self.set_name(self._snapshot["name"]) + await self.set_volume(self._snapshot["volume"]) + await self.set_muted(self._snapshot["muted"]) + await self.set_latency(self._snapshot["latency"]) self.callback() - _LOGGER.debug('restored snapshot of state of %s', self.friendly_name) + _LOGGER.debug("restored snapshot of state of %s", self.friendly_name) def callback(self): - """Run callback.""" + """Run the callback function if set.""" if self._callback_func and callable(self._callback_func): self._callback_func(self) def set_callback(self, func): - """Set callback function.""" + """Set the callback function. + + Args: + func (callable): The callback function to set. + """ self._callback_func = func def __repr__(self): - """Return string representation.""" - return f'Snapclient {self.version} ({self.friendly_name}, {self.identifier})' + """Return string representation of the client. + + Returns: + str: The string representation of the client. + """ + return f"Snapclient {self.version} ({self.friendly_name}, {self.identifier})" \ No newline at end of file diff --git a/snapcast/control/group.py b/snapcast/control/group.py index 98bbad1..c986f7b 100644 --- a/snapcast/control/group.py +++ b/snapcast/control/group.py @@ -1,37 +1,62 @@ """Snapcast group.""" import logging - _LOGGER = logging.getLogger(__name__) - # pylint: disable=too-many-public-methods -class Snapgroup(): - """Represents a snapcast group.""" +class Snapgroup: + """Represents a snapcast group. + + Attributes: + _server (str): The server address. + _snapshot (dict): The snapshot of the group's state. + _callback_func (callable): The callback function for the group. + """ def __init__(self, server, data): - """Initialize.""" + """Initialize the group object. + + Args: + server (str): The server address. + data (dict): The initial data for the group. + """ self._server = server self._snapshot = None self._callback_func = None self.update(data) def update(self, data): - """Update group.""" + """Update group data. + + Args: + data (dict): The updated group data. + """ self._group = data @property def identifier(self): - """Get group identifier.""" + """Get group identifier. + + Returns: + str: The identifier of the group. + """ return self._group.get('id') @property def name(self): - """Get group name.""" + """Get group name. + + Returns: + str: The name of the group. + """ return self._group.get('name') async def set_name(self, name): - """Set a group name.""" + """Set a group name. + + Args: + name (str): The name to set for the group. + """ if not name: name = '' self._group['name'] = name @@ -39,41 +64,72 @@ async def set_name(self, name): @property def stream(self): - """Get stream identifier.""" + """Get stream identifier. + + Returns: + str: The stream identifier of the group. + """ return self._group.get('stream_id') async def set_stream(self, stream_id): - """Set group stream.""" + """Set group stream. + + Args: + stream_id (str): The stream identifier to set for the group. + """ self._group['stream_id'] = stream_id await self._server.group_stream(self.identifier, stream_id) _LOGGER.debug('set stream to %s on %s', stream_id, self.friendly_name) @property def stream_status(self): - """Get stream status.""" + """Get stream status. + + Returns: + str: The status of the stream. + """ return self._server.stream(self.stream).status @property def muted(self): - """Get mute status.""" + """Get mute status. + + Returns: + bool: True if the group is muted, False otherwise. + """ return self._group.get('muted') async def set_muted(self, status): - """Set group mute status.""" + """Set group mute status. + + Args: + status (bool): The mute status to set for the group. + """ self._group['muted'] = status await self._server.group_mute(self.identifier, status) _LOGGER.debug('set muted to %s on %s', status, self.friendly_name) @property def volume(self): - """Get volume.""" + """Get volume. + + Returns: + int: The volume percent of the group. + """ volume_sum = 0 for client in self._group.get('clients'): volume_sum += self._server.client(client.get('id')).volume return int(volume_sum / len(self._group.get('clients'))) async def set_volume(self, volume): - """Set volume.""" + """Set volume. + + Args: + volume (int): The volume percent to set for the group. + + Raises: + ValueError: If volume percent is out of range. + """ if volume not in range(0, 101): raise ValueError('Volume out of range') current_volume = self.volume @@ -104,7 +160,11 @@ async def set_volume(self, volume): @property def friendly_name(self): - """Get friendly name.""" + """Get friendly name. + + Returns: + str: The friendly name of the group. + """ fname = self.name if self.name != '' else "+".join( sorted([self._server.client(c).friendly_name for c in self.clients if c in [client.identifier for client in self._server.clients]])) @@ -112,11 +172,19 @@ def friendly_name(self): @property def clients(self): - """Get client identifiers.""" + """Get client identifiers. + + Returns: + list: The list of client identifiers in the group. + """ return [client.get('id') for client in self._group.get('clients')] async def add_client(self, client_identifier): - """Add a client.""" + """Add a client to the group. + + Args: + client_identifier (str): The identifier of the client to add. + """ if client_identifier in self.clients: _LOGGER.error('%s already in group %s', client_identifier, self.identifier) return @@ -130,7 +198,11 @@ async def add_client(self, client_identifier): self.callback() async def remove_client(self, client_identifier): - """Remove a client.""" + """Remove a client from the group. + + Args: + client_identifier (str): The identifier of the client to remove. + """ new_clients = self.clients new_clients.remove(client_identifier) await self._server.group_clients(self.identifier, new_clients) @@ -141,29 +213,45 @@ async def remove_client(self, client_identifier): self.callback() def streams_by_name(self): - """Get available stream objects by name.""" + """Get available stream objects by name. + + Returns: + dict: A dictionary of stream objects keyed by their friendly names. + """ return {stream.friendly_name: stream for stream in self._server.streams} def update_mute(self, data): - """Update mute.""" + """Update mute status. + + Args: + data (dict): The updated mute data. + """ self._group['muted'] = data['mute'] self.callback() _LOGGER.debug('updated mute on %s', self.friendly_name) def update_name(self, data): - """Update name.""" + """Update group name. + + Args: + data (dict): The updated name data. + """ self._group['name'] = data['name'] _LOGGER.debug('updated name on %s', self.name) self.callback() def update_stream(self, data): - """Update stream.""" + """Update stream. + + Args: + data (dict): The updated stream data. + """ self._group['stream_id'] = data['stream_id'] self.callback() _LOGGER.debug('updated stream to %s on %s', self.stream, self.friendly_name) def snapshot(self): - """Snapshot current state.""" + """Take a snapshot of the current state.""" self._snapshot = { 'muted': self.muted, 'volume': self.volume, @@ -172,7 +260,7 @@ def snapshot(self): _LOGGER.debug('took snapshot of current state of %s', self.friendly_name) async def restore(self): - """Restore snapshotted state.""" + """Restore the snapshotted state.""" if not self._snapshot: return await self.set_muted(self._snapshot['muted']) @@ -182,14 +270,22 @@ async def restore(self): _LOGGER.debug('restored snapshot of state of %s', self.friendly_name) def callback(self): - """Run callback.""" + """Run the callback function if set.""" if self._callback_func and callable(self._callback_func): self._callback_func(self) def set_callback(self, func): - """Set callback.""" + """Set the callback function. + + Args: + func (callable): The callback function to set. + """ self._callback_func = func def __repr__(self): - """Return string representation.""" - return f'Snapgroup ({self.friendly_name}, {self.identifier})' + """Return string representation of the group. + + Returns: + str: The string representation of the group. + """ + return f'Snapgroup ({self.friendly_name}, {self.identifier})' \ No newline at end of file diff --git a/snapcast/control/protocol.py b/snapcast/control/protocol.py index 9a21050..ed198f3 100644 --- a/snapcast/control/protocol.py +++ b/snapcast/control/protocol.py @@ -7,9 +7,17 @@ SERVER_ONDISCONNECT = 'Server.OnDisconnect' -# pylint: disable=consider-using-f-string def jsonrpc_request(method, identifier, params=None): - """Produce a JSONRPC request.""" + """Produce a JSONRPC request. + + Args: + method (str): The method name to be invoked. + identifier (int): The unique identifier for the request. + params (dict, optional): The parameters for the method. Defaults to None. + + Returns: + bytes: The JSONRPC request in bytes. + """ return '{}\r\n'.format(json.dumps({ 'id': identifier, 'method': method, @@ -22,25 +30,42 @@ class SnapcastProtocol(asyncio.Protocol): """Async Snapcast protocol.""" def __init__(self, callbacks): - """Initialize.""" + """Initialize the SnapcastProtocol. + + Args: + callbacks (dict): A dictionary of callback functions for various events. + """ self._transport = None self._buffer = {} self._callbacks = callbacks self._data_buffer = '' def connection_made(self, transport): - """When a connection is made.""" + """Handle a new connection. + + Args: + transport (asyncio.Transport): The transport representing the connection. + """ self._transport = transport def connection_lost(self, exc): - """When a connection is lost.""" + """Handle a lost connection. + + Args: + exc (Exception): The exception that caused the connection to be lost. + """ for b in self._buffer.values(): b['error'] = {"code": -1, "message": "connection lost"} b['flag'].set() - self._callbacks.get(SERVER_ONDISCONNECT)(exc) + if SERVER_ONDISCONNECT in self._callbacks: + self._callbacks[SERVER_ONDISCONNECT](exc) def data_received(self, data): - """Handle received data.""" + """Handle received data. + + Args: + data (bytes): The data received from the connection. + """ self._data_buffer += data.decode() if not self._data_buffer.endswith('\r\n'): return @@ -54,26 +79,46 @@ def data_received(self, data): self.handle_data(item) def handle_data(self, data): - """Handle JSONRPC data.""" + """Handle JSONRPC data. + + Args: + data (dict): The JSONRPC data to handle. + """ if 'id' in data: self.handle_response(data) else: self.handle_notification(data) def handle_response(self, data): - """Handle JSONRPC response.""" + """Handle JSONRPC response. + + Args: + data (dict): The JSONRPC response data. + """ identifier = data.get('id') self._buffer[identifier]['data'] = data.get('result') self._buffer[identifier]['error'] = data.get('error') self._buffer[identifier]['flag'].set() def handle_notification(self, data): - """Handle JSONRPC notification.""" + """Handle JSONRPC notification. + + Args: + data (dict): The JSONRPC notification data. + """ if data.get('method') in self._callbacks: - self._callbacks.get(data.get('method'))(data.get('params')) + self._callbacks[data.get('method')](data.get('params')) async def request(self, method, params): - """Send a JSONRPC request.""" + """Send a JSONRPC request. + + Args: + method (str): The method name to be invoked. + params (dict): The parameters for the method. + + Returns: + tuple: A tuple containing the result and error (if any) of the request. + """ identifier = random.randint(1, 1000) self._transport.write(jsonrpc_request(method, identifier, params)) self._buffer[identifier] = {'flag': asyncio.Event()} @@ -82,4 +127,4 @@ async def request(self, method, params): error = self._buffer[identifier].get('error') self._buffer[identifier].clear() del self._buffer[identifier] - return (result, error) + return result, error diff --git a/snapcast/control/server.py b/snapcast/control/server.py index 0bfc537..de39d14 100644 --- a/snapcast/control/server.py +++ b/snapcast/control/server.py @@ -4,6 +4,7 @@ import logging from packaging import version + from snapcast.control.client import Snapclient from snapcast.control.group import Snapgroup from snapcast.control.protocol import SERVER_ONDISCONNECT, SnapcastProtocol @@ -13,59 +14,83 @@ CONTROL_PORT = 1705 -SERVER_GETSTATUS = 'Server.GetStatus' -SERVER_GETRPCVERSION = 'Server.GetRPCVersion' -SERVER_DELETECLIENT = 'Server.DeleteClient' -SERVER_ONUPDATE = 'Server.OnUpdate' - -CLIENT_GETSTATUS = 'Client.GetStatus' -CLIENT_SETNAME = 'Client.SetName' -CLIENT_SETLATENCY = 'Client.SetLatency' -CLIENT_SETVOLUME = 'Client.SetVolume' -CLIENT_ONCONNECT = 'Client.OnConnect' -CLIENT_ONDISCONNECT = 'Client.OnDisconnect' -CLIENT_ONVOLUMECHANGED = 'Client.OnVolumeChanged' -CLIENT_ONLATENCYCHANGED = 'Client.OnLatencyChanged' -CLIENT_ONNAMECHANGED = 'Client.OnNameChanged' - -GROUP_GETSTATUS = 'Group.GetStatus' -GROUP_SETMUTE = 'Group.SetMute' -GROUP_SETSTREAM = 'Group.SetStream' -GROUP_SETCLIENTS = 'Group.SetClients' -GROUP_SETNAME = 'Group.SetName' -GROUP_ONMUTE = 'Group.OnMute' -GROUP_ONSTREAMCHANGED = 'Group.OnStreamChanged' -GROUP_ONNAMECHANGED = 'Group.OnNameChanged' - - -STREAM_ONPROPERTIES = 'Stream.OnProperties' -STREAM_SETPROPERTY = 'Stream.SetProperty' -STREAM_CONTROL = 'Stream.Control' # not yet implemented -STREAM_SETMETA = 'Stream.SetMeta' # deprecated -STREAM_ONUPDATE = 'Stream.OnUpdate' -STREAM_ONMETA = 'Stream.OnMetadata' # deprecated -STREAM_ADDSTREAM = 'Stream.AddStream' -STREAM_REMOVESTREAM = 'Stream.RemoveStream' +SERVER_GETSTATUS = "Server.GetStatus" +SERVER_GETRPCVERSION = "Server.GetRPCVersion" +SERVER_DELETECLIENT = "Server.DeleteClient" +SERVER_ONUPDATE = "Server.OnUpdate" + +CLIENT_GETSTATUS = "Client.GetStatus" +CLIENT_SETNAME = "Client.SetName" +CLIENT_SETLATENCY = "Client.SetLatency" +CLIENT_SETVOLUME = "Client.SetVolume" +CLIENT_ONCONNECT = "Client.OnConnect" +CLIENT_ONDISCONNECT = "Client.OnDisconnect" +CLIENT_ONVOLUMECHANGED = "Client.OnVolumeChanged" +CLIENT_ONLATENCYCHANGED = "Client.OnLatencyChanged" +CLIENT_ONNAMECHANGED = "Client.OnNameChanged" + +GROUP_GETSTATUS = "Group.GetStatus" +GROUP_SETMUTE = "Group.SetMute" +GROUP_SETSTREAM = "Group.SetStream" +GROUP_SETCLIENTS = "Group.SetClients" +GROUP_SETNAME = "Group.SetName" +GROUP_ONMUTE = "Group.OnMute" +GROUP_ONSTREAMCHANGED = "Group.OnStreamChanged" +GROUP_ONNAMECHANGED = "Group.OnNameChanged" + + +STREAM_ONPROPERTIES = "Stream.OnProperties" +STREAM_SETPROPERTY = "Stream.SetProperty" +STREAM_CONTROL = "Stream.Control" # not yet implemented +STREAM_SETMETA = "Stream.SetMeta" # deprecated +STREAM_ONUPDATE = "Stream.OnUpdate" +STREAM_ONMETA = "Stream.OnMetadata" # deprecated +STREAM_ADDSTREAM = "Stream.AddStream" +STREAM_REMOVESTREAM = "Stream.RemoveStream" SERVER_RECONNECT_DELAY = 5 -_EVENTS = [SERVER_ONUPDATE, CLIENT_ONVOLUMECHANGED, CLIENT_ONLATENCYCHANGED, - CLIENT_ONNAMECHANGED, CLIENT_ONCONNECT, CLIENT_ONDISCONNECT, - GROUP_ONMUTE, GROUP_ONSTREAMCHANGED, GROUP_ONNAMECHANGED, STREAM_ONUPDATE, - STREAM_ONMETA, STREAM_ONPROPERTIES] -_METHODS = [SERVER_GETSTATUS, SERVER_GETRPCVERSION, SERVER_DELETECLIENT, - SERVER_DELETECLIENT, CLIENT_GETSTATUS, CLIENT_SETNAME, - CLIENT_SETLATENCY, CLIENT_SETVOLUME, - GROUP_GETSTATUS, GROUP_SETMUTE, GROUP_SETSTREAM, GROUP_SETCLIENTS, - GROUP_SETNAME, STREAM_SETMETA, STREAM_SETPROPERTY, STREAM_CONTROL, - STREAM_ADDSTREAM, STREAM_REMOVESTREAM] +_EVENTS = [ + SERVER_ONUPDATE, + CLIENT_ONVOLUMECHANGED, + CLIENT_ONLATENCYCHANGED, + CLIENT_ONNAMECHANGED, + CLIENT_ONCONNECT, + CLIENT_ONDISCONNECT, + GROUP_ONMUTE, + GROUP_ONSTREAMCHANGED, + GROUP_ONNAMECHANGED, + STREAM_ONUPDATE, + STREAM_ONMETA, + STREAM_ONPROPERTIES, +] +_METHODS = [ + SERVER_GETSTATUS, + SERVER_GETRPCVERSION, + SERVER_DELETECLIENT, + SERVER_DELETECLIENT, + CLIENT_GETSTATUS, + CLIENT_SETNAME, + CLIENT_SETLATENCY, + CLIENT_SETVOLUME, + GROUP_GETSTATUS, + GROUP_SETMUTE, + GROUP_SETSTREAM, + GROUP_SETCLIENTS, + GROUP_SETNAME, + STREAM_SETMETA, + STREAM_SETPROPERTY, + STREAM_CONTROL, + STREAM_ADDSTREAM, + STREAM_REMOVESTREAM, +] # server versions in which new methods were added _VERSIONS = { - GROUP_SETNAME: '0.16.0', - STREAM_SETPROPERTY: '0.26.0', - STREAM_ADDSTREAM: '0.16.0', - STREAM_REMOVESTREAM: '0.16.0', + GROUP_SETNAME: "0.16.0", + STREAM_SETPROPERTY: "0.26.0", + STREAM_ADDSTREAM: "0.16.0", + STREAM_REMOVESTREAM: "0.16.0", } @@ -74,8 +99,27 @@ class ServerVersionError(NotImplementedError): # pylint: disable=too-many-public-methods -class Snapserver(): - """Represents a snapserver.""" +class Snapserver: + """ + Represents a Snapserver instance. + + This class provides methods to interact with a Snapserver instance, such as starting and stopping the server, + retrieving server status, managing clients, groups, and streams, and performing various operations on them. + + Args: + loop (asyncio.AbstractEventLoop): The event loop to use for asynchronous operations. + host (str): The hostname or IP address of the Snapserver. + port (int, optional): The port number of the Snapserver control interface. Defaults to CONTROL_PORT. + reconnect (bool, optional): Whether to automatically reconnect to the Snapserver if the connection is lost. + Defaults to False. + + Attributes: + version (str): The version of the Snapserver. + groups (list): A list of Snapgroup objects representing the groups in the Snapserver. + clients (list): A list of Snapclient objects representing the clients connected to the Snapserver. + streams (list): A list of Snapstream objects representing the streams available in the Snapserver. + + """ # pylint: disable=too-many-instance-attributes def __init__(self, loop, host, port=CONTROL_PORT, reconnect=False): @@ -104,7 +148,7 @@ def __init__(self, loop, host, port=CONTROL_PORT, reconnect=False): STREAM_ONPROPERTIES: self._on_stream_properties, STREAM_ONUPDATE: self._on_stream_update, SERVER_ONDISCONNECT: self._on_server_disconnect, - SERVER_ONUPDATE: self._on_server_update + SERVER_ONUPDATE: self._on_server_update, } self._on_update_callback_func = None self._on_connect_callback_func = None @@ -112,63 +156,129 @@ def __init__(self, loop, host, port=CONTROL_PORT, reconnect=False): self._new_client_callback_func = None async def start(self): - """Initiate server connection.""" + """Initiate server connection. + + This method is used to establish a connection with the server. + It performs the necessary steps to connect to the server and + checks for a valid response. If the response is not valid, it + raises an OSError. + + Raises: + OSError: If the server response is not valid. + + """ self._is_stopped = False await self._do_connect() status, error = await self.status() - if (not isinstance(status, dict)) or ('server' not in status): - _LOGGER.warning('connected, but no valid response:\n%s', str(error)) + if (not isinstance(status, dict)) or ("server" not in status): + _LOGGER.warning("connected, but no valid response:\n%s", str(error)) self.stop() raise OSError - _LOGGER.debug('connected to snapserver on %s:%s', self._host, self._port) + _LOGGER.debug("connected to snapserver on %s:%s", self._host, self._port) self.synchronize(status) self._on_server_connect() def stop(self): - """Stop server.""" + """Stop server. + + This method stops the server by setting the `_is_stopped` flag to True, + disconnecting all clients, and resetting internal data structures. + + """ self._is_stopped = True self._do_disconnect() - _LOGGER.debug('Stopping') + _LOGGER.debug("Stopping") self._clients = {} self._streams = {} self._groups = {} self._version = None def _do_disconnect(self): - """Perform the connection to the server.""" + """Perform the disconnection from the server. + + This method closes the transport connection to the server if it exists. + """ if self._transport: self._transport.close() async def _do_connect(self): - """Perform the connection to the server.""" + """Perform the connection to the server. + + This method establishes a connection to the Snapcast server using the specified host and port. + It creates a transport and protocol using the asyncio `create_connection` method. + The `SnapcastProtocol` class is used as the protocol for handling communication with the server. + The `_callbacks` parameter is passed to the `SnapcastProtocol` constructor to handle callbacks. + + Returns: + None + + """ self._transport, self._protocol = await self._loop.create_connection( - lambda: SnapcastProtocol(self._callbacks), self._host, self._port) + lambda: SnapcastProtocol(self._callbacks), self._host, self._port + ) def _reconnect_cb(self): - """Try to reconnect to the server.""" - _LOGGER.debug('try reconnect') + """Try to reconnect to the server. + + This method is called when a connection to the server is lost and + attempts to reconnect to the server. It first tries to establish a + new connection and then checks if the server responds with a valid + status. If the response is not valid, it logs a warning and stops + the connection. If the connection is successful and the status is + valid, it synchronizes the status and triggers the `_on_server_connect` + method. + + """ + _LOGGER.debug("try reconnect") async def try_reconnect(): - """Actual coroutine ro try to reconnect or reschedule.""" + """Actual coroutine to try to reconnect or reschedule. + + This function attempts to reconnect to the server or reschedule the reconnection + based on the response received. If a valid response is received, the status is + synchronized and the `_on_server_connect` method is called. If no valid response + is received, a warning is logged and the connection is stopped. + + Raises: + OSError: If no valid response is received after reconnecting. + + """ try: await self._do_connect() status, error = await self.status() - if (not isinstance(status, dict)) or ('server' not in status): - _LOGGER.warning('connected, but no valid response:\n%s', str(error)) + if (not isinstance(status, dict)) or ("server" not in status): + _LOGGER.warning("connected, but no valid response:\n%s", str(error)) self.stop() raise OSError except OSError: - self._loop.call_later(SERVER_RECONNECT_DELAY, - self._reconnect_cb) + self._loop.call_later(SERVER_RECONNECT_DELAY, self._reconnect_cb) else: self.synchronize(status) self._on_server_connect() + asyncio.ensure_future(try_reconnect()) async def _transact(self, method, params=None): - """Wrap requests.""" + """Wrap requests. + + This method wraps requests made to the server. It checks if the server is connected, + and if so, it sends the request using the `_protocol` and `_transport` attributes. + If the server is not connected, it returns an error indicating that the server is not connected. + + Args: + method (str): The method to be requested. + params (dict, optional): The parameters to be sent with the request. + + Returns: + tuple: A tuple containing the result and error of the request. + + """ result = error = None - if self._protocol is None or self._transport is None or self._transport.is_closing(): + if ( + self._protocol is None + or self._transport is None + or self._transport.is_closing() + ): error = {"code": None, "message": "Server not connected"} else: result, error = await self._protocol.request(method, params) @@ -176,155 +286,378 @@ async def _transact(self, method, params=None): @property def version(self): - """Version.""" + """ + Return the version of the server. + + Returns: + str: The version of the server. + """ return self._version async def status(self): - """System status.""" + """ + System status. + + Returns: + The system status. + + """ return await self._transact(SERVER_GETSTATUS) async def rpc_version(self): - """RPC version.""" + """ + RPC version. + + Returns: + The version of the RPC. + + """ return await self._transact(SERVER_GETRPCVERSION) async def delete_client(self, identifier): - """Delete client.""" - params = {'id': identifier} + """Delete a client. + + Args: + identifier (str): The identifier of the client to be deleted. + + """ + params = {"id": identifier} response, _ = await self._transact(SERVER_DELETECLIENT, params) self.synchronize(response) async def client_name(self, identifier, name): - """Set client name.""" - return await self._request(CLIENT_SETNAME, identifier, 'name', name) + """ + Set client name. + + Args: + identifier (str): The identifier of the client. + name (str): The name to set for the client. + + Returns: + The result of the request. + + """ + return await self._request(CLIENT_SETNAME, identifier, "name", name) async def client_latency(self, identifier, latency): - """Set client latency.""" - return await self._request(CLIENT_SETLATENCY, identifier, 'latency', latency) + """ + Set client latency. + + Args: + identifier (str): The identifier of the client. + latency (int): The latency value to set. + + Returns: + The result of the request. + + """ + return await self._request(CLIENT_SETLATENCY, identifier, "latency", latency) async def client_volume(self, identifier, volume): - """Set client volume.""" - return await self._request(CLIENT_SETVOLUME, identifier, 'volume', volume) + """ + Set client volume. + + Args: + identifier (str): The identifier of the client. + volume (int): The volume level to set. + + Returns: + The result of the request. + + """ + return await self._request(CLIENT_SETVOLUME, identifier, "volume", volume) async def client_status(self, identifier): - """Get client status.""" - return await self._request(CLIENT_GETSTATUS, identifier, 'client') + """ + Get client status. + + Args: + identifier (str): The identifier of the client. + + Returns: + dict: A dictionary containing the status of the client. + + """ + return await self._request(CLIENT_GETSTATUS, identifier, "client") async def group_status(self, identifier): - """Get group status.""" - return await self._request(GROUP_GETSTATUS, identifier, 'group') + """ + Get group status. + + Args: + identifier (str): The identifier of the group. + + Returns: + dict: The status of the group. + + """ + return await self._request(GROUP_GETSTATUS, identifier, "group") async def group_mute(self, identifier, status): - """Set group mute.""" - return await self._request(GROUP_SETMUTE, identifier, 'mute', status) + """ + Set group mute. + + Args: + identifier (str): The identifier of the group. + status (bool): The mute status to set. + + Returns: + The result of the request. + + """ + return await self._request(GROUP_SETMUTE, identifier, "mute", status) async def group_stream(self, identifier, stream_id): - """Set group stream.""" - return await self._request(GROUP_SETSTREAM, identifier, 'stream_id', stream_id) + """ + Set group stream. + + Args: + identifier (str): The identifier of the group. + stream_id (str): The ID of the stream to set. + + Returns: + The result of the request. + + """ + return await self._request(GROUP_SETSTREAM, identifier, "stream_id", stream_id) async def group_clients(self, identifier, clients): - """Set group clients.""" - return await self._request(GROUP_SETCLIENTS, identifier, 'clients', clients) + """ + Set group clients. + + Args: + identifier (str): The identifier of the group. + clients (list): A list of client identifiers to be added to the group. + + Returns: + The result of the request. + + """ + return await self._request(GROUP_SETCLIENTS, identifier, "clients", clients) async def group_name(self, identifier, name): - """Set group name.""" + """ + Set the name of a group. + + Args: + identifier (str): The identifier of the group. + name (str): The new name for the group. + + Returns: + The result of the request. + + Raises: + VersionMismatchError: If the server version does not support the GROUP_SETNAME command. + """ self._version_check(GROUP_SETNAME) - return await self._request(GROUP_SETNAME, identifier, 'name', name) + return await self._request(GROUP_SETNAME, identifier, "name", name) async def stream_control(self, identifier, control_command, control_params): - """Set stream control.""" + """ + Set stream control. + + Args: + identifier (str): The identifier of the stream. + control_command (str): The control command to be executed. + control_params (dict): Additional parameters for the control command. + + Returns: + The response from the server. + + Raises: + VersionError: If the server version does not support stream control. + + """ self._version_check(STREAM_SETPROPERTY) return await self._request( - STREAM_CONTROL, identifier, 'command', control_command, control_params) + STREAM_CONTROL, identifier, "command", control_command, control_params + ) async def stream_setmeta(self, identifier, meta): # deprecated """Set stream metadata.""" - return await self._request(STREAM_SETMETA, identifier, 'meta', meta) + return await self._request(STREAM_SETMETA, identifier, "meta", meta) async def stream_setproperty(self, identifier, stream_property, value): - """Set stream metadata.""" + """ + Set stream metadata. + + Args: + identifier (str): The identifier of the stream. + stream_property (str): The property to set. + value: The value to set for the property. + + Returns: + The response from the server. + + """ self._version_check(STREAM_SETPROPERTY) - return await self._request(STREAM_SETPROPERTY, identifier, parameters={ - 'property': stream_property, - 'value': value - }) + return await self._request( + STREAM_SETPROPERTY, + identifier, + parameters={"property": stream_property, "value": value}, + ) async def stream_add_stream(self, stream_uri): - """Add a stream.""" + """ + Add a stream. + + Args: + stream_uri (str): The URI of the stream to be added. + + Returns: + dict or str: The result of adding the stream. If successful, a dictionary + containing the stream ID will be returned. If unsuccessful, an error message + will be returned. + + """ params = {"streamUri": stream_uri} result, error = await self._transact(STREAM_ADDSTREAM, params) - if (isinstance(result, dict) and ("id" in result)): + if isinstance(result, dict) and ("id" in result): self.synchronize((await self.status())[0]) return result or error async def stream_remove_stream(self, identifier): - """Remove a Stream.""" + """ + Remove a Stream. + + Args: + identifier (str): The identifier of the stream to be removed. + + Returns: + dict: The result of the removal operation. + + """ result = await self._request(STREAM_REMOVESTREAM, identifier) - if (isinstance(result, dict) and ("id" in result)): + if isinstance(result, dict) and ("id" in result): self.synchronize((await self.status())[0]) return result def group(self, group_identifier): - """Get a group.""" + """ + Get a group. + + Args: + group_identifier (str): The identifier of the group. + + Returns: + Group: The group object. + + """ return self._groups[group_identifier] def stream(self, stream_identifier): - """Get a stream.""" + """ + Get a stream. + + Args: + stream_identifier (str): The identifier of the stream. + + Returns: + Stream: The stream object corresponding to the given identifier. + """ return self._streams[stream_identifier] def client(self, client_identifier): - """Get a client.""" + """ + Get a client. + + Args: + client_identifier (str): The identifier of the client. + + Returns: + Client: The client object corresponding to the given identifier. + """ return self._clients[client_identifier] @property def groups(self): - """Get groups.""" + """ + Get groups. + + Returns: + list: A list of groups. + """ return list(self._groups.values()) @property def clients(self): - """Get clients.""" + """ + Get clients. + + Returns: + list: A list of clients. + """ return list(self._clients.values()) @property def streams(self): - """Get streams.""" + """ + Get streams. + + Returns: + list: A list of streams. + """ return list(self._streams.values()) def synchronize(self, status): - """Synchronize snapserver.""" - self._version = status['server']['server']['snapserver']['version'] + """ + Synchronize snapserver. + + This method synchronizes the snapserver with the provided status. + It updates the internal state of the server, including the version, + groups, clients, and streams. + + Args: + status (dict): The status of the snapserver. + + Returns: + None + """ + self._version = status["server"]["server"]["snapserver"]["version"] new_groups = {} new_clients = {} new_streams = {} - for stream in status.get('server').get('streams'): - if stream.get('id') in self._streams: - new_streams[stream.get('id')] = self._streams[stream.get('id')] - new_streams[stream.get('id')].update(stream) + for stream in status.get("server").get("streams"): + if stream.get("id") in self._streams: + new_streams[stream.get("id")] = self._streams[stream.get("id")] + new_streams[stream.get("id")].update(stream) else: - new_streams[stream.get('id')] = Snapstream(stream) - _LOGGER.debug('stream found: %s', new_streams[stream.get('id')]) - for group in status.get('server').get('groups'): - if group.get('id') in self._groups: - new_groups[group.get('id')] = self._groups[group.get('id')] - new_groups[group.get('id')].update(group) + new_streams[stream.get("id")] = Snapstream(stream) + _LOGGER.debug("stream found: %s", new_streams[stream.get("id")]) + for group in status.get("server").get("groups"): + if group.get("id") in self._groups: + new_groups[group.get("id")] = self._groups[group.get("id")] + new_groups[group.get("id")].update(group) else: - new_groups[group.get('id')] = Snapgroup(self, group) - for client in group.get('clients'): - if client.get('id') in self._clients: - new_clients[client.get('id')] = self._clients[client.get('id')] - new_clients[client.get('id')].update(client) + new_groups[group.get("id")] = Snapgroup(self, group) + for client in group.get("clients"): + if client.get("id") in self._clients: + new_clients[client.get("id")] = self._clients[client.get("id")] + new_clients[client.get("id")].update(client) else: - new_clients[client.get('id')] = Snapclient(self, client) - _LOGGER.debug('client found: %s', new_clients[client.get('id')]) - _LOGGER.debug('group found: %s', new_groups[group.get('id')]) + new_clients[client.get("id")] = Snapclient(self, client) + _LOGGER.debug("client found: %s", new_clients[client.get("id")]) + _LOGGER.debug("group found: %s", new_groups[group.get("id")]) self._groups = new_groups self._clients = new_clients self._streams = new_streams # pylint: disable=too-many-arguments async def _request(self, method, identifier, key=None, value=None, parameters=None): - """Perform request with identifier.""" - params = {'id': identifier} + """ + Perform a request with the given identifier. + + Args: + method (str): The HTTP method to use for the request. + identifier (str): The identifier for the request. + key (str, optional): The key for the request parameter. Defaults to None. + value (str, optional): The value for the request parameter. Defaults to None. + parameters (dict, optional): Additional parameters for the request. Defaults to None. + + Returns: + The result of the request, or an error if the request failed. + """ + params = {"id": identifier} if key is not None and value is not None: params[key] = value if isinstance(parameters, dict): @@ -335,15 +668,31 @@ async def _request(self, method, identifier, key=None, value=None, parameters=No return result or error def _on_server_connect(self): - """Handle server connection.""" - _LOGGER.debug('Server connected') + """ + Handle server connection. + + This method is called when the server is successfully connected. + It logs a debug message and invokes the `_on_connect_callback_func` if it is callable. + + """ + _LOGGER.debug("Server connected") if self._on_connect_callback_func and callable(self._on_connect_callback_func): self._on_connect_callback_func() def _on_server_disconnect(self, exception): - """Handle server disconnection.""" - _LOGGER.debug('Server disconnected: %s', str(exception)) - if self._on_disconnect_callback_func and callable(self._on_disconnect_callback_func): + """ + Handle server disconnection. + + Args: + exception: The exception that caused the disconnection. + + Returns: + None + """ + _LOGGER.debug("Server disconnected: %s", str(exception)) + if self._on_disconnect_callback_func and callable( + self._on_disconnect_callback_func + ): self._on_disconnect_callback_func(exception) self._protocol = None self._transport = None @@ -351,121 +700,258 @@ def _on_server_disconnect(self, exception): self._reconnect_cb() def _on_server_update(self, data): - """Handle server update.""" + """ + Handle server update. + + This method is responsible for handling updates received from the server. + It synchronizes the data and calls the update callback function if it is defined. + + Args: + data: The data received from the server. + + Returns: + None + """ self.synchronize(data) if self._on_update_callback_func and callable(self._on_update_callback_func): self._on_update_callback_func() def _on_group_mute(self, data): - """Handle group mute.""" - group = self._groups.get(data.get('id')) + """ + Handle group mute. + + This method is responsible for handling the mute event of a group. + It updates the mute status of the group and triggers a callback for each client in the group. + + Args: + data (dict): The data containing the group ID and mute status. + + Returns: + None + """ + group = self._groups.get(data.get("id")) group.update_mute(data) for client_id in group.clients: self._clients.get(client_id).callback() def _on_group_name_changed(self, data): - """Handle group name changed.""" - self._groups.get(data.get('id')).update_name(data) + """ + Handle group name changed. + + This method is called when the name of a group is changed. It updates the name of the group + with the new data provided. + + Args: + data (dict): A dictionary containing the updated group information. + + Returns: + None + """ + self._groups.get(data.get("id")).update_name(data) def _on_group_stream_changed(self, data): - """Handle group stream change.""" - group = self._groups.get(data.get('id')) + """ + Handle group stream change. + + This method is called when there is a change in the stream of a group. + It updates the stream data for the corresponding group and triggers a callback + for each client in the group. + + Args: + data (dict): The data containing the information about the stream change. + """ + group = self._groups.get(data.get("id")) group.update_stream(data) for client_id in group.clients: self._clients.get(client_id).callback() def _on_client_connect(self, data): - """Handle client connect.""" + """ + Handle client connect. + + This method is called when a client connects to the server. It updates the + connection status of the client and creates a new `Snapclient` instance if + the client is not already present in the `_clients` dictionary. + + Args: + data (dict): A dictionary containing the client data, including the client ID. + """ client = None - if data.get('id') in self._clients: - client = self._clients[data.get('id')] + if data.get("id") in self._clients: + client = self._clients[data.get("id")] client.update_connected(True) else: - client = Snapclient(self, data.get('client')) - self._clients[data.get('id')] = client - if self._new_client_callback_func and callable(self._new_client_callback_func): + client = Snapclient(self, data.get("client")) + self._clients[data.get("id")] = client + if self._new_client_callback_func and callable( + self._new_client_callback_func + ): self._new_client_callback_func(client) - _LOGGER.debug('client %s connected', client.friendly_name) + _LOGGER.debug("client %s connected", client.friendly_name) def _on_client_disconnect(self, data): - """Handle client disconnect.""" - self._clients[data.get('id')].update_connected(False) - _LOGGER.debug('client %s disconnected', self._clients[data.get('id')].friendly_name) + """ + Handle client disconnect. + + This method is called when a client disconnects from the server. + It updates the connected status of the client and logs a debug message. + + Args: + data (dict): A dictionary containing information about the disconnected client. + """ + self._clients[data.get("id")].update_connected(False) + _LOGGER.debug( + "client %s disconnected", self._clients[data.get("id")].friendly_name + ) def _on_client_volume_changed(self, data): - """Handle client volume change.""" - self._clients.get(data.get('id')).update_volume(data) + """ + Handle client volume change. + + This method is called when the volume of a client is changed. + It updates the volume of the corresponding client object. + + Args: + data (dict): A dictionary containing the volume change information. + """ + self._clients.get(data.get("id")).update_volume(data) def _on_client_name_changed(self, data): - """Handle client name changed.""" - self._clients.get(data.get('id')).update_name(data) + """ + Handle client name changed. + + Args: + data (dict): The data containing the client ID and the updated name. + """ + self._clients.get(data.get("id")).update_name(data) def _on_client_latency_changed(self, data): - """Handle client latency changed.""" - self._clients.get(data.get('id')).update_latency(data) + """ + Handle client latency changed. + + This method is called when the latency of a client changes. It updates the latency information + for the corresponding client. + + Args: + data (dict): A dictionary containing the updated latency information for the client. + """ + self._clients.get(data.get("id")).update_latency(data) def _on_stream_meta(self, data): # deprecated """Handle stream metadata update.""" - stream = self._streams[data.get('id')] - stream.update_meta(data.get('meta')) - _LOGGER.debug('stream %s metadata updated', stream.friendly_name) + stream = self._streams[data.get("id")] + stream.update_meta(data.get("meta")) + _LOGGER.debug("stream %s metadata updated", stream.friendly_name) for group in self._groups.values(): - if group.stream == data.get('id'): + if group.stream == data.get("id"): group.callback() def _on_stream_properties(self, data): - """Handle stream properties update.""" - stream = self._streams[data.get('id')] - stream.update_properties(data.get('properties')) - _LOGGER.debug('stream %s properties updated', stream.friendly_name) + """ + Handle stream properties update. + + This method is called when the properties of a stream are updated. + It updates the properties of the corresponding stream object and triggers + the callback functions for the affected groups and clients. + + Args: + data (dict): A dictionary containing the updated stream properties. + """ + stream = self._streams[data.get("id")] + stream.update_properties(data.get("properties")) + _LOGGER.debug("stream %s properties updated", stream.friendly_name) for group in self._groups.values(): - if group.stream == data.get('id'): + if group.stream == data.get("id"): group.callback() for client_id in group.clients: self._clients.get(client_id).callback() def _on_stream_update(self, data): - """Handle stream update.""" - if data.get('id') in self._streams: - self._streams[data.get('id')].update(data.get('stream')) - _LOGGER.debug('stream %s updated', self._streams[data.get('id')].friendly_name) + """ + Handle stream update. + + This method is called when a stream update event is received. It updates the + corresponding stream object with the new data, triggers the stream's callback, + and updates the associated groups and clients. + + Args: + data (dict): The data containing the stream update information. + """ + if data.get("id") in self._streams: + self._streams[data.get("id")].update(data.get("stream")) + _LOGGER.debug( + "stream %s updated", self._streams[data.get("id")].friendly_name + ) self._streams[data.get("id")].callback() for group in self._groups.values(): - if group.stream == data.get('id'): + if group.stream == data.get("id"): group.callback() for client_id in group.clients: self._clients.get(client_id).callback() else: - if data.get('stream', {}).get('uri', {}).get('query', {}).get('codec') == 'null': - _LOGGER.debug('stream %s is input-only, ignore', data.get('id')) + if ( + data.get("stream", {}).get("uri", {}).get("query", {}).get("codec") + == "null" + ): + _LOGGER.debug("stream %s is input-only, ignore", data.get("id")) else: - _LOGGER.info('stream %s not found, synchronize', data.get('id')) + _LOGGER.info("stream %s not found, synchronize", data.get("id")) async def async_sync(): self.synchronize((await self.status())[0]) + asyncio.ensure_future(async_sync()) def set_on_update_callback(self, func): - """Set on update callback function.""" + """ + Set the on update callback function. + + Parameters: + - func: The callback function to be set. + """ self._on_update_callback_func = func def set_on_connect_callback(self, func): - """Set on connection callback function.""" + """ + Set on connection callback function. + + Args: + func: The function to be called when a connection is established. + """ self._on_connect_callback_func = func def set_on_disconnect_callback(self, func): - """Set on disconnection callback function.""" + """ + Set on disconnection callback function. + + Args: + func: The function to be called when a connection is lost. + """ self._on_disconnect_callback_func = func def set_new_client_callback(self, func): - """Set new client callback function.""" + """ + Set new client callback function. + + Parameters: + - func: The callback function to be set. + """ self._new_client_callback_func = func def __repr__(self): - """Return string representation.""" - return f'Snapserver {self.version} ({self._host})' + """Return string representation of the Snapserver object.""" + return f"Snapserver {self.version} ({self._host})" def _version_check(self, api_call): + """ + Checks if the server version meets the minimum requirement for a given API call. + + Args: + api_call (str): The name of the API call. + + Raises: + ServerVersionError: If the server version is lower than the required version for the API call. + """ if version.parse(self.version) < version.parse(_VERSIONS.get(api_call)): raise ServerVersionError( f"{api_call} requires server version >= {_VERSIONS[api_call]}." diff --git a/snapcast/control/stream.py b/snapcast/control/stream.py index d9a663e..9624082 100644 --- a/snapcast/control/stream.py +++ b/snapcast/control/stream.py @@ -1,40 +1,87 @@ -"""Snapcast stream.""" +class Snapstream: + """ + Represents a snapcast stream. + + Attributes: + identifier (str): The stream id. + status (str): The stream status. + name (str): The stream name. + friendly_name (str): The friendly name of the stream. + metadata (dict): The metadata of the stream. + properties (dict): The properties of the stream. + path (str): The stream path. + + Methods: + __init__(data): Initializes the Snapstream object. + update(data): Updates the stream data. + update_meta(data): Updates the stream metadata. + update_metadata(data): Updates the stream metadata. + update_properties(data): Updates the stream properties. + callback(): Runs the callback function. + set_callback(func): Sets the callback function. + """ + def __init__(self, data): + """ + Initialize the Stream object. -class Snapstream(): - """Represents a snapcast stream.""" + Args: + data (dict): A dictionary containing the initial data for the Stream. - def __init__(self, data): - """Initialize.""" + """ self.update(data) self._callback_func = None @property def identifier(self): - """Get stream id.""" - return self._stream.get('id') + """ + Get stream id. + + Returns: + str: The stream id. + """ + return self._stream.get("id") @property def status(self): - """Get stream status.""" - return self._stream.get('status') + """ + Get stream status. + + Returns: + The status of the stream. + """ + return self._stream.get("status") @property def name(self): - """Get stream name.""" - return self._stream.get('uri').get('query').get('name') + """ + Get stream name. + + Returns: + str: The name of the stream. + """ + return self._stream.get("uri").get("query").get("name") @property def friendly_name(self): - """Get friendly name.""" - return self.name if self.name != '' else self.identifier + """ + Get friendly name. + + Returns: + str: The friendly name of the stream. If the name is empty, the identifier is returned instead. + """ + return self.name if self.name != "" else self.identifier @property def metadata(self): - """Get metadata.""" - if 'properties' in self._stream: - return self._stream['properties'].get('metadata') - return self._stream.get('meta') + """Get metadata. + + Returns: + The metadata of the stream, if available. Otherwise, returns None. + """ + if "properties" in self._stream: + return self._stream["properties"].get("metadata") + return self._stream.get("meta") @property def meta(self): @@ -43,41 +90,84 @@ def meta(self): @property def properties(self): - """Get properties.""" - return self._stream.get('properties') + """ + Get properties. + + Returns: + dict: The properties of the stream. + """ + return self._stream.get("properties") @property def path(self): - """Get stream path.""" - return self._stream.get('uri').get('path') + """ + Get stream path. + + Returns: + str: The path of the stream URI. + + """ + return self._stream.get("uri").get("path") def update(self, data): - """Update stream.""" + """ + Update stream. + + Args: + data: The updated data for the stream. + + """ self._stream = data def update_meta(self, data): - """Update stream metadata.""" + """ + Update stream metadata. + + Args: + data (dict): A dictionary containing the updated metadata. + """ self.update_metadata(data) def update_metadata(self, data): - """Update stream metadata.""" - if 'properties' in self._stream: - self._stream['properties']['metadata'] = data - self._stream['meta'] = data + """ + Update stream metadata. + + Args: + data (dict): The updated metadata for the stream. + + """ + if "properties" in self._stream: + self._stream["properties"]["metadata"] = data + self._stream["meta"] = data def update_properties(self, data): - """Update stream properties.""" - self._stream['properties'] = data + """ + Update stream properties. + + Args: + data (dict): A dictionary containing the updated properties of the stream. + """ + self._stream["properties"] = data def __repr__(self): - """Return string representation.""" - return f'Snapstream ({self.name})' + """Return string representation of the Snapstream object.""" + return f"Snapstream ({self.name})" def callback(self): - """Run callback.""" + """Run callback. + + This method executes the callback function, if it exists and is callable. + It passes the current instance of the class as an argument to the callback function. + """ if self._callback_func and callable(self._callback_func): self._callback_func(self) def set_callback(self, func): - """Set callback.""" + """ + Set callback. + + Args: + func (callable): The callback function to be set. + + """ self._callback_func = func From 5d3458b4dbff8ccd0787fe1d0de56df5a9b1dbfa Mon Sep 17 00:00:00 2001 From: chicco-carone Date: Mon, 3 Jun 2024 16:09:58 +0200 Subject: [PATCH 2/2] Improve docstrings with type hints --- snapcast/control/client.py | 314 ++++------- snapcast/control/group.py | 240 +++----- snapcast/control/protocol.py | 103 +--- snapcast/control/server.py | 1027 +++++++++------------------------- snapcast/control/stream.py | 193 ++----- 5 files changed, 549 insertions(+), 1328 deletions(-) diff --git a/snapcast/control/client.py b/snapcast/control/client.py index b9ef6b5..f49ead9 100644 --- a/snapcast/control/client.py +++ b/snapcast/control/client.py @@ -1,274 +1,180 @@ """Snapcast client.""" - import logging +from typing import Any, Callable, Dict, List, Optional, Union _LOGGER = logging.getLogger(__name__) # pylint: disable=too-many-public-methods class Snapclient: - """Represents a snapclient. - - Attributes: - _server (str): The server address. - _snapshot (dict): The snapshot of the client's state. - _last_seen: The last seen timestamp of the client. - _callback_func: The callback function for the client. - - """ - - def __init__(self, server, data): - """Initialize the Client object. + """Initialize the Client object.""" - Args: - server (str): The server address. - data (dict): The initial data for the client. - - """ + def __init__(self, server: Any, data: Dict[str, Any]) -> None: + """Initialize.""" self._server = server - self._snapshot = None - self._last_seen = None - self._callback_func = None + self._snapshot: Optional[Dict[str, Union[str, int, bool]]] = None + self._last_seen: Optional[str] = None + self._callback_func: Optional[Callable[[Any], None]] = None self.update(data) - def update(self, data): - """Update client. - - Args: - data: The updated client data. - - """ + def update(self, data: Dict[str, Any]) -> None: + """Update client.""" self._client = data @property - def identifier(self): - """Get identifier. - - Returns: - The identifier of the client. - """ - return self._client.get("id") + def identifier(self) -> Optional[str]: + """Get client identifier.""" + return self._client.get('id') @property - def group(self): - """Get the group that this client belongs to. - - Returns: - The group object that this client belongs to, or None if the client is not in any group. - """ + def group(self) -> Optional[Any]: + """Get group that the client is part of""" for group in self._server.groups: if self.identifier in group.clients: return group return None @property - def friendly_name(self): - """Get the friendly name of the client. - - Returns: - str: The friendly name of the client. - - """ - if len(self._client.get("config").get("name")): - return self._client.get("config").get("name") - return self._client.get("host").get("name") + def friendly_name(self) -> str: + """Get client friendly name.""" + if len(self._client.get('config', {}).get('name', '')): + return self._client.get('config').get('name', '') + return self._client.get('host', {}).get('name', '') @property - def version(self): - """Version. - - Returns: - str: The version of the snapclient. - """ - return self._client.get("snapclient").get("version") + def version(self) -> Optional[str]: + """Get client snapclient version.""" + return self._client.get('snapclient', {}).get('version') @property - def connected(self): - """Get the current connection status of the client. - - Returns: - bool: True if the client is connected, False otherwise. - """ - return self._client.get("connected") + def connected(self) -> bool: + """Get the current connection status of the client.""" + return self._client.get('connected', False) @property - def name(self): - """Get the name of the client. - - Returns: - str: The name of the client. - """ - return self._client.get("config").get("name") - - async def set_name(self, name): - """Set a client name. + def name(self) -> str: + """Get name of the client.""" + return self._client.get('config', {}).get('name', '') - Args: - name (str): The name to set for the client. - - """ + async def set_name(self, name: str) -> None: + """Set a new name for the client.""" if not name: - name = "" - self._client["config"]["name"] = name + name = '' + self._client['config']['name'] = name await self._server.client_name(self.identifier, name) @property - def latency(self): - """Get the client latency. - - Returns: - int: The latency of the client. - """ - return self._client.get("config").get("latency") - - async def set_latency(self, latency): - """Set client latency. + def latency(self) -> Optional[int]: + """Get client latency.""" + return self._client.get('config', {}).get('latency') - Args: - latency (int): The latency to set for the client. - """ - self._client["config"]["latency"] = latency + async def set_latency(self, latency: int) -> None: + """Set client latency.""" + self._client['config']['latency'] = latency await self._server.client_latency(self.identifier, latency) @property - def muted(self): - """Get the mute status of the client. - - Returns: - bool: True if the client is muted, False otherwise. - """ - return self._client.get("config").get("volume").get("muted") - - async def set_muted(self, status): - """Set client mute status. - - Args: - status (bool): The mute status to set for the client. - """ - new_volume = self._client["config"]["volume"] - new_volume["muted"] = status - self._client["config"]["volume"]["muted"] = status + def muted(self) -> bool: + """Muted or not.""" + return self._client.get('config', {}).get('volume', {}).get('muted', False) + + async def set_muted(self, status: bool) -> None: + """Set client mute status.""" + new_volume = self._client['config']['volume'] + new_volume['muted'] = status + self._client['config']['volume']['muted'] = status await self._server.client_volume(self.identifier, new_volume) - _LOGGER.debug("set muted to %s on %s", status, self.friendly_name) + _LOGGER.debug('set muted to %s on %s', status, self.friendly_name) @property - def volume(self): - """Get the volume percent. - - Returns: - int: The volume percent of the client. - """ - return self._client.get("config").get("volume").get("percent") - - async def set_volume(self, percent, update_group=True): - """Set client volume percent. + def volume(self) -> int: + """Get client volume percent.""" + return self._client.get('config', {}).get('volume', {}).get('percent', 0) - Args: - percent (int): The volume percent to set for the client. - update_group (bool): Whether to update the group volume. Defaults to True. - - Raises: - ValueError: If volume percent is out of range. - """ + async def set_volume(self, percent: int, update_group: bool = True) -> None: + """Set client volume percent.""" if percent not in range(0, 101): - raise ValueError("Volume percent out of range") - new_volume = self._client["config"]["volume"] - new_volume["percent"] = percent - self._client["config"]["volume"]["percent"] = percent + raise ValueError('Volume percent out of range') + new_volume = self._client['config']['volume'] + new_volume['percent'] = percent + self._client['config']['volume']['percent'] = percent await self._server.client_volume(self.identifier, new_volume) if update_group: self._server.group(self.group.identifier).callback() - _LOGGER.debug("set volume to %s on %s", percent, self.friendly_name) + _LOGGER.debug('set volume to %s on %s', percent, self.friendly_name) - def groups_available(self): - """Get available group objects. - - Returns: - list: The list of available group objects. - """ + def groups_available(self) -> List[Any]: + """Get available group objects.""" return list(self._server.groups) - def update_volume(self, data): - """Update volume. - - Args: - data (dict): The updated volume data. - """ - self._client["config"]["volume"] = data["volume"] - _LOGGER.debug("updated volume on %s", self.friendly_name) + def update_volume(self, data: Dict[str, Any]) -> None: + """Update volume.""" + self._client['config']['volume'] = data['volume'] + _LOGGER.debug('updated volume on %s', self.friendly_name) self._server.group(self.group.identifier).callback() self.callback() - def update_name(self, data): - """Update name. - - Args: - data (dict): The updated name data. - """ - self._client["config"]["name"] = data["name"] - _LOGGER.debug("updated name on %s", self.friendly_name) + def update_name(self, data: Dict[str, Any]) -> None: + """Update name.""" + self._client['config']['name'] = data['name'] + _LOGGER.debug('updated name on %s', self.friendly_name) self.callback() - def update_latency(self, data): - """Update latency. - - Args: - data (dict): The updated latency data. - """ - self._client["config"]["latency"] = data["latency"] - _LOGGER.debug("updated latency on %s", self.friendly_name) + def update_latency(self, data: Dict[str, Any]) -> None: + """Update latency.""" + self._client['config']['latency'] = data['latency'] + _LOGGER.debug('updated latency on %s', self.friendly_name) self.callback() - def update_connected(self, status): - """Update connected status. - - Args: - status (bool): The new connected status. - """ - self._client["connected"] = status - _LOGGER.debug( - "updated connected status to %s on %s", status, self.friendly_name - ) + def update_connected(self, status: bool) -> None: + """Update connected.""" + self._client['connected'] = status + _LOGGER.debug('updated connected status to %s on %s', status, self.friendly_name) self.callback() - def snapshot(self): - """Take a snapshot of the current state.""" + def snapshot(self) -> None: + """Snapshot current state of the client. + + Snapshot: + - Client name + - Client volume + - Client muting status + - Client latency + """ self._snapshot = { - "name": self.name, - "volume": self.volume, - "muted": self.muted, - "latency": self.latency, + 'name': self.name, + 'volume': self.volume, + 'muted': self.muted, + 'latency': self.latency } - _LOGGER.debug("took snapshot of current state of %s", self.friendly_name) + _LOGGER.debug('took snapshot of current state of %s', self.friendly_name) - async def restore(self): - """Restore the snapshotted state.""" + async def restore(self) -> None: + """Restore snapshotted state. + Snapshot: + - Client name + - Client volume + - Client muting status + - Client latency + """ if not self._snapshot: return - await self.set_name(self._snapshot["name"]) - await self.set_volume(self._snapshot["volume"]) - await self.set_muted(self._snapshot["muted"]) - await self.set_latency(self._snapshot["latency"]) + await self.set_name(self._snapshot['name']) + await self.set_volume(self._snapshot['volume']) + await self.set_muted(self._snapshot['muted']) + await self.set_latency(self._snapshot['latency']) self.callback() - _LOGGER.debug("restored snapshot of state of %s", self.friendly_name) + _LOGGER.debug('restored snapshot of state of %s', self.friendly_name) - def callback(self): - """Run the callback function if set.""" + def callback(self) -> None: + """Run callback function if set.""" if self._callback_func and callable(self._callback_func): self._callback_func(self) - def set_callback(self, func): - """Set the callback function. - - Args: - func (callable): The callback function to set. - """ + def set_callback(self, func: Callable[[Any], None]) -> None: + """Set callback function.""" self._callback_func = func - def __repr__(self): - """Return string representation of the client. - - Returns: - str: The string representation of the client. - """ - return f"Snapclient {self.version} ({self.friendly_name}, {self.identifier})" \ No newline at end of file + def __repr__(self) -> str: + """Return string representation of the client.""" + return f'Snapclient {self.version} ({self.friendly_name}, {self.identifier})' diff --git a/snapcast/control/group.py b/snapcast/control/group.py index c986f7b..a95b981 100644 --- a/snapcast/control/group.py +++ b/snapcast/control/group.py @@ -1,135 +1,79 @@ """Snapcast group.""" import logging +from typing import Any, Callable, Dict, List, Optional, Union _LOGGER = logging.getLogger(__name__) # pylint: disable=too-many-public-methods -class Snapgroup: - """Represents a snapcast group. - - Attributes: - _server (str): The server address. - _snapshot (dict): The snapshot of the group's state. - _callback_func (callable): The callback function for the group. - """ - def __init__(self, server, data): - """Initialize the group object. +class Snapgroup: + """Represents a snapcast group.""" - Args: - server (str): The server address. - data (dict): The initial data for the group. - """ - self._server = server - self._snapshot = None - self._callback_func = None + def __init__(self, server: Any, data: Dict[str, Any]) -> None: + """Initialize the group object.""" + self._server: Any = server + self._snapshot: Optional[Dict[str, Union[int, str, bool]]] = None + self._callback_func: Optional[Callable[[Any], None]] = None self.update(data) - def update(self, data): - """Update group data. - - Args: - data (dict): The updated group data. - """ - self._group = data + def update(self, data: Dict[str, Any]) -> None: + """Update group data.""" + self._group: Dict[str, Any] = data @property - def identifier(self): - """Get group identifier. - - Returns: - str: The identifier of the group. - """ - return self._group.get('id') + def identifier(self) -> str: + """Get group identifier.""" + return self._group.get('id', '') @property - def name(self): - """Get group name. - - Returns: - str: The name of the group. - """ - return self._group.get('name') + def name(self) -> str: + """Get group name.""" + return self._group.get('name', '') - async def set_name(self, name): - """Set a group name. - - Args: - name (str): The name to set for the group. - """ + async def set_name(self, name: str) -> None: + """Set a group name.""" if not name: name = '' self._group['name'] = name await self._server.group_name(self.identifier, name) @property - def stream(self): - """Get stream identifier. - - Returns: - str: The stream identifier of the group. - """ - return self._group.get('stream_id') - - async def set_stream(self, stream_id): - """Set group stream. + def stream(self) -> str: + """Get stream identifier.""" + return self._group.get('stream_id', '') - Args: - stream_id (str): The stream identifier to set for the group. - """ + async def set_stream(self, stream_id: str) -> None: + """Set group stream.""" self._group['stream_id'] = stream_id await self._server.group_stream(self.identifier, stream_id) _LOGGER.debug('set stream to %s on %s', stream_id, self.friendly_name) @property - def stream_status(self): - """Get stream status. - - Returns: - str: The status of the stream. - """ + def stream_status(self) -> Any: + """Get stream status.""" return self._server.stream(self.stream).status @property - def muted(self): - """Get mute status. - - Returns: - bool: True if the group is muted, False otherwise. - """ - return self._group.get('muted') + def muted(self) -> bool: + """Get mute status.""" + return self._group.get('muted', False) - async def set_muted(self, status): - """Set group mute status. - - Args: - status (bool): The mute status to set for the group. - """ + async def set_muted(self, status: bool) -> None: + """Set group mute status.""" self._group['muted'] = status await self._server.group_mute(self.identifier, status) _LOGGER.debug('set muted to %s on %s', status, self.friendly_name) @property - def volume(self): - """Get volume. - - Returns: - int: The volume percent of the group. - """ + def volume(self) -> int: + """Get volume.""" volume_sum = 0 - for client in self._group.get('clients'): + for client in self._group.get('clients', []): volume_sum += self._server.client(client.get('id')).volume - return int(volume_sum / len(self._group.get('clients'))) - - async def set_volume(self, volume): - """Set volume. - - Args: - volume (int): The volume percent to set for the group. + return int(volume_sum / len(self._group.get('clients', []))) - Raises: - ValueError: If volume percent is out of range. - """ + async def set_volume(self, volume: int) -> None: + """Set volume.""" if volume not in range(0, 101): raise ValueError('Volume out of range') current_volume = self.volume @@ -141,7 +85,7 @@ async def set_volume(self, volume): ratio = (current_volume - volume) / current_volume else: ratio = (volume - current_volume) / (100 - current_volume) - for data in self._group.get('clients'): + for data in self._group.get('clients', []): client = self._server.client(data.get('id')) client_volume = client.volume if delta < 0: @@ -159,32 +103,20 @@ async def set_volume(self, volume): _LOGGER.debug('set volume to %s on group %s', volume, self.friendly_name) @property - def friendly_name(self): - """Get friendly name. - - Returns: - str: The friendly name of the group. - """ + def friendly_name(self) -> str: + """Get group friendly name.""" fname = self.name if self.name != '' else "+".join( sorted([self._server.client(c).friendly_name for c in self.clients if c in [client.identifier for client in self._server.clients]])) return fname if fname != '' else self.identifier @property - def clients(self): - """Get client identifiers. + def clients(self) -> List[str]: + """Get all the client identifiers for the group.""" + return [client.get('id', '') for client in self._group.get('clients', [])] - Returns: - list: The list of client identifiers in the group. - """ - return [client.get('id') for client in self._group.get('clients')] - - async def add_client(self, client_identifier): - """Add a client to the group. - - Args: - client_identifier (str): The identifier of the client to add. - """ + async def add_client(self, client_identifier: str) -> None: + """Add a client to the group.""" if client_identifier in self.clients: _LOGGER.error('%s already in group %s', client_identifier, self.identifier) return @@ -197,12 +129,8 @@ async def add_client(self, client_identifier): self._server.client(client_identifier).callback() self.callback() - async def remove_client(self, client_identifier): - """Remove a client from the group. - - Args: - client_identifier (str): The identifier of the client to remove. - """ + async def remove_client(self, client_identifier: str) -> None: + """Remove a client from the group.""" new_clients = self.clients new_clients.remove(client_identifier) await self._server.group_clients(self.identifier, new_clients) @@ -212,46 +140,37 @@ async def remove_client(self, client_identifier): self._server.client(client_identifier).callback() self.callback() - def streams_by_name(self): - """Get available stream objects by name. - - Returns: - dict: A dictionary of stream objects keyed by their friendly names. - """ + def streams_by_name(self) -> Dict[str, Any]: + """Get available stream objects by name.""" return {stream.friendly_name: stream for stream in self._server.streams} - def update_mute(self, data): - """Update mute status. - - Args: - data (dict): The updated mute data. - """ + def update_mute(self, data: Dict[str, Any]) -> None: + """Update mute.""" self._group['muted'] = data['mute'] self.callback() _LOGGER.debug('updated mute on %s', self.friendly_name) - def update_name(self, data): - """Update group name. - - Args: - data (dict): The updated name data. - """ + def update_name(self, data: Dict[str, Any]) -> None: + """Update name.""" self._group['name'] = data['name'] _LOGGER.debug('updated name on %s', self.name) self.callback() - def update_stream(self, data): - """Update stream. - - Args: - data (dict): The updated stream data. - """ + def update_stream(self, data: Dict[str, Any]) -> None: + """Update stream.""" self._group['stream_id'] = data['stream_id'] self.callback() _LOGGER.debug('updated stream to %s on %s', self.stream, self.friendly_name) - def snapshot(self): - """Take a snapshot of the current state.""" + def snapshot(self) -> None: + """Snapshot current state. + + Snapshot: + - Group muting status + - Group volume + - Group stream identifier + + """ self._snapshot = { 'muted': self.muted, 'volume': self.volume, @@ -259,8 +178,13 @@ def snapshot(self): } _LOGGER.debug('took snapshot of current state of %s', self.friendly_name) - async def restore(self): - """Restore the snapshotted state.""" + async def restore(self) -> None: + """Restore snapshotted state. + Snapshot: + - Group muting status + - Group volume + - Group stream identifier + """ if not self._snapshot: return await self.set_muted(self._snapshot['muted']) @@ -269,23 +193,15 @@ async def restore(self): self.callback() _LOGGER.debug('restored snapshot of state of %s', self.friendly_name) - def callback(self): - """Run the callback function if set.""" + def callback(self) -> None: + """Run callback function if set.""" if self._callback_func and callable(self._callback_func): self._callback_func(self) - def set_callback(self, func): - """Set the callback function. - - Args: - func (callable): The callback function to set. - """ + def set_callback(self, func: Callable[[Any], None]) -> None: + """Set callback function.""" self._callback_func = func - def __repr__(self): - """Return string representation of the group. - - Returns: - str: The string representation of the group. - """ - return f'Snapgroup ({self.friendly_name}, {self.identifier})' \ No newline at end of file + def __repr__(self) -> str: + """Return string representation of the group.""" + return f'Snapgroup ({self.friendly_name}, {self.identifier})' diff --git a/snapcast/control/protocol.py b/snapcast/control/protocol.py index ed198f3..dc018f0 100644 --- a/snapcast/control/protocol.py +++ b/snapcast/control/protocol.py @@ -3,21 +3,14 @@ import asyncio import json import random +from typing import Any, Callable, Dict, Optional, Tuple SERVER_ONDISCONNECT = 'Server.OnDisconnect' +# pylint: disable=consider-using-f-string -def jsonrpc_request(method, identifier, params=None): - """Produce a JSONRPC request. - - Args: - method (str): The method name to be invoked. - identifier (int): The unique identifier for the request. - params (dict, optional): The parameters for the method. Defaults to None. - - Returns: - bytes: The JSONRPC request in bytes. - """ +def jsonrpc_request(method: str, identifier: int, params: Optional[Dict[str, Any]] = None) -> bytes: + """Produce a JSONRPC request.""" return '{}\r\n'.format(json.dumps({ 'id': identifier, 'method': method, @@ -25,47 +18,30 @@ def jsonrpc_request(method, identifier, params=None): 'jsonrpc': '2.0' })).encode() - class SnapcastProtocol(asyncio.Protocol): """Async Snapcast protocol.""" - def __init__(self, callbacks): - """Initialize the SnapcastProtocol. + def __init__(self, callbacks: Dict[str, Callable[[Any], None]]) -> None: + """Initialize the SnapcastProtocol.""" + self._transport: Optional[asyncio.Transport] = None + self._buffer: Dict[int, Dict[str, Any]] = {} + self._callbacks: Dict[str, Callable[[Any], None]] = callbacks + self._data_buffer: str = '' - Args: - callbacks (dict): A dictionary of callback functions for various events. - """ - self._transport = None - self._buffer = {} - self._callbacks = callbacks - self._data_buffer = '' - - def connection_made(self, transport): - """Handle a new connection. - - Args: - transport (asyncio.Transport): The transport representing the connection. - """ + def connection_made(self, transport: asyncio.Transport) -> None: + """Handle a new connection.""" self._transport = transport - def connection_lost(self, exc): - """Handle a lost connection. - - Args: - exc (Exception): The exception that caused the connection to be lost. - """ + def connection_lost(self, exc: Optional[Exception]) -> None: + """Handle a lost connection.""" for b in self._buffer.values(): b['error'] = {"code": -1, "message": "connection lost"} b['flag'].set() if SERVER_ONDISCONNECT in self._callbacks: self._callbacks[SERVER_ONDISCONNECT](exc) - def data_received(self, data): - """Handle received data. - - Args: - data (bytes): The data received from the connection. - """ + def data_received(self, data: bytes) -> None: + """Handle received data.""" self._data_buffer += data.decode() if not self._data_buffer.endswith('\r\n'): return @@ -78,47 +54,28 @@ def data_received(self, data): for item in data: self.handle_data(item) - def handle_data(self, data): - """Handle JSONRPC data. - - Args: - data (dict): The JSONRPC data to handle. - """ + def handle_data(self, data: Dict[str, Any]) -> None: + """Handle JSONRPC data.""" if 'id' in data: self.handle_response(data) else: self.handle_notification(data) - def handle_response(self, data): - """Handle JSONRPC response. - - Args: - data (dict): The JSONRPC response data. - """ + def handle_response(self, data: Dict[str, Any]) -> None: + """Handle JSONRPC response.""" identifier = data.get('id') - self._buffer[identifier]['data'] = data.get('result') - self._buffer[identifier]['error'] = data.get('error') - self._buffer[identifier]['flag'].set() + if identifier in self._buffer: + self._buffer[identifier]['data'] = data.get('result') + self._buffer[identifier]['error'] = data.get('error') + self._buffer[identifier]['flag'].set() - def handle_notification(self, data): - """Handle JSONRPC notification. - - Args: - data (dict): The JSONRPC notification data. - """ + def handle_notification(self, data: Dict[str, Any]) -> None: + """Handle JSONRPC notification.""" if data.get('method') in self._callbacks: - self._callbacks[data.get('method')](data.get('params')) - - async def request(self, method, params): - """Send a JSONRPC request. - - Args: - method (str): The method name to be invoked. - params (dict): The parameters for the method. + self._callbacks.get(data.get('method'))(data.get('params')) - Returns: - tuple: A tuple containing the result and error (if any) of the request. - """ + async def request(self, method: str, params: Optional[Dict[str, Any]] = None) -> Tuple[Optional[Any], Optional[Dict[str, Any]]]: + """Send a JSONRPC request.""" identifier = random.randint(1, 1000) self._transport.write(jsonrpc_request(method, identifier, params)) self._buffer[identifier] = {'flag': asyncio.Event()} @@ -127,4 +84,4 @@ async def request(self, method, params): error = self._buffer[identifier].get('error') self._buffer[identifier].clear() del self._buffer[identifier] - return result, error + return (result, error) diff --git a/snapcast/control/server.py b/snapcast/control/server.py index de39d14..8db8434 100644 --- a/snapcast/control/server.py +++ b/snapcast/control/server.py @@ -4,93 +4,69 @@ import logging from packaging import version - from snapcast.control.client import Snapclient from snapcast.control.group import Snapgroup from snapcast.control.protocol import SERVER_ONDISCONNECT, SnapcastProtocol from snapcast.control.stream import Snapstream +from typing import Any, Callable, Dict, List, Optional, Tuple, Union _LOGGER = logging.getLogger(__name__) CONTROL_PORT = 1705 -SERVER_GETSTATUS = "Server.GetStatus" -SERVER_GETRPCVERSION = "Server.GetRPCVersion" -SERVER_DELETECLIENT = "Server.DeleteClient" -SERVER_ONUPDATE = "Server.OnUpdate" - -CLIENT_GETSTATUS = "Client.GetStatus" -CLIENT_SETNAME = "Client.SetName" -CLIENT_SETLATENCY = "Client.SetLatency" -CLIENT_SETVOLUME = "Client.SetVolume" -CLIENT_ONCONNECT = "Client.OnConnect" -CLIENT_ONDISCONNECT = "Client.OnDisconnect" -CLIENT_ONVOLUMECHANGED = "Client.OnVolumeChanged" -CLIENT_ONLATENCYCHANGED = "Client.OnLatencyChanged" -CLIENT_ONNAMECHANGED = "Client.OnNameChanged" - -GROUP_GETSTATUS = "Group.GetStatus" -GROUP_SETMUTE = "Group.SetMute" -GROUP_SETSTREAM = "Group.SetStream" -GROUP_SETCLIENTS = "Group.SetClients" -GROUP_SETNAME = "Group.SetName" -GROUP_ONMUTE = "Group.OnMute" -GROUP_ONSTREAMCHANGED = "Group.OnStreamChanged" -GROUP_ONNAMECHANGED = "Group.OnNameChanged" - - -STREAM_ONPROPERTIES = "Stream.OnProperties" -STREAM_SETPROPERTY = "Stream.SetProperty" -STREAM_CONTROL = "Stream.Control" # not yet implemented -STREAM_SETMETA = "Stream.SetMeta" # deprecated -STREAM_ONUPDATE = "Stream.OnUpdate" -STREAM_ONMETA = "Stream.OnMetadata" # deprecated -STREAM_ADDSTREAM = "Stream.AddStream" -STREAM_REMOVESTREAM = "Stream.RemoveStream" +SERVER_GETSTATUS = 'Server.GetStatus' +SERVER_GETRPCVERSION = 'Server.GetRPCVersion' +SERVER_DELETECLIENT = 'Server.DeleteClient' +SERVER_ONUPDATE = 'Server.OnUpdate' + +CLIENT_GETSTATUS = 'Client.GetStatus' +CLIENT_SETNAME = 'Client.SetName' +CLIENT_SETLATENCY = 'Client.SetLatency' +CLIENT_SETVOLUME = 'Client.SetVolume' +CLIENT_ONCONNECT = 'Client.OnConnect' +CLIENT_ONDISCONNECT = 'Client.OnDisconnect' +CLIENT_ONVOLUMECHANGED = 'Client.OnVolumeChanged' +CLIENT_ONLATENCYCHANGED = 'Client.OnLatencyChanged' +CLIENT_ONNAMECHANGED = 'Client.OnNameChanged' + +GROUP_GETSTATUS = 'Group.GetStatus' +GROUP_SETMUTE = 'Group.SetMute' +GROUP_SETSTREAM = 'Group.SetStream' +GROUP_SETCLIENTS = 'Group.SetClients' +GROUP_SETNAME = 'Group.SetName' +GROUP_ONMUTE = 'Group.OnMute' +GROUP_ONSTREAMCHANGED = 'Group.OnStreamChanged' +GROUP_ONNAMECHANGED = 'Group.OnNameChanged' + + +STREAM_ONPROPERTIES = 'Stream.OnProperties' +STREAM_SETPROPERTY = 'Stream.SetProperty' +STREAM_CONTROL = 'Stream.Control' # not yet implemented +STREAM_SETMETA = 'Stream.SetMeta' # deprecated +STREAM_ONUPDATE = 'Stream.OnUpdate' +STREAM_ONMETA = 'Stream.OnMetadata' # deprecated +STREAM_ADDSTREAM = 'Stream.AddStream' +STREAM_REMOVESTREAM = 'Stream.RemoveStream' SERVER_RECONNECT_DELAY = 5 -_EVENTS = [ - SERVER_ONUPDATE, - CLIENT_ONVOLUMECHANGED, - CLIENT_ONLATENCYCHANGED, - CLIENT_ONNAMECHANGED, - CLIENT_ONCONNECT, - CLIENT_ONDISCONNECT, - GROUP_ONMUTE, - GROUP_ONSTREAMCHANGED, - GROUP_ONNAMECHANGED, - STREAM_ONUPDATE, - STREAM_ONMETA, - STREAM_ONPROPERTIES, -] -_METHODS = [ - SERVER_GETSTATUS, - SERVER_GETRPCVERSION, - SERVER_DELETECLIENT, - SERVER_DELETECLIENT, - CLIENT_GETSTATUS, - CLIENT_SETNAME, - CLIENT_SETLATENCY, - CLIENT_SETVOLUME, - GROUP_GETSTATUS, - GROUP_SETMUTE, - GROUP_SETSTREAM, - GROUP_SETCLIENTS, - GROUP_SETNAME, - STREAM_SETMETA, - STREAM_SETPROPERTY, - STREAM_CONTROL, - STREAM_ADDSTREAM, - STREAM_REMOVESTREAM, -] +_EVENTS = [SERVER_ONUPDATE, CLIENT_ONVOLUMECHANGED, CLIENT_ONLATENCYCHANGED, + CLIENT_ONNAMECHANGED, CLIENT_ONCONNECT, CLIENT_ONDISCONNECT, + GROUP_ONMUTE, GROUP_ONSTREAMCHANGED, GROUP_ONNAMECHANGED, STREAM_ONUPDATE, + STREAM_ONMETA, STREAM_ONPROPERTIES] +_METHODS = [SERVER_GETSTATUS, SERVER_GETRPCVERSION, SERVER_DELETECLIENT, + SERVER_DELETECLIENT, CLIENT_GETSTATUS, CLIENT_SETNAME, + CLIENT_SETLATENCY, CLIENT_SETVOLUME, + GROUP_GETSTATUS, GROUP_SETMUTE, GROUP_SETSTREAM, GROUP_SETCLIENTS, + GROUP_SETNAME, STREAM_SETMETA, STREAM_SETPROPERTY, STREAM_CONTROL, + STREAM_ADDSTREAM, STREAM_REMOVESTREAM] # server versions in which new methods were added _VERSIONS = { - GROUP_SETNAME: "0.16.0", - STREAM_SETPROPERTY: "0.26.0", - STREAM_ADDSTREAM: "0.16.0", - STREAM_REMOVESTREAM: "0.16.0", + GROUP_SETNAME: '0.16.0', + STREAM_SETPROPERTY: '0.26.0', + STREAM_ADDSTREAM: '0.16.0', + STREAM_REMOVESTREAM: '0.16.0', } @@ -99,43 +75,25 @@ class ServerVersionError(NotImplementedError): # pylint: disable=too-many-public-methods -class Snapserver: - """ - Represents a Snapserver instance. - - This class provides methods to interact with a Snapserver instance, such as starting and stopping the server, - retrieving server status, managing clients, groups, and streams, and performing various operations on them. - - Args: - loop (asyncio.AbstractEventLoop): The event loop to use for asynchronous operations. - host (str): The hostname or IP address of the Snapserver. - port (int, optional): The port number of the Snapserver control interface. Defaults to CONTROL_PORT. - reconnect (bool, optional): Whether to automatically reconnect to the Snapserver if the connection is lost. - Defaults to False. - Attributes: - version (str): The version of the Snapserver. - groups (list): A list of Snapgroup objects representing the groups in the Snapserver. - clients (list): A list of Snapclient objects representing the clients connected to the Snapserver. - streams (list): A list of Snapstream objects representing the streams available in the Snapserver. - - """ +class Snapserver: + """Represents a snapserver.""" # pylint: disable=too-many-instance-attributes - def __init__(self, loop, host, port=CONTROL_PORT, reconnect=False): + def __init__(self, loop: asyncio.AbstractEventLoop, host: str, port: int = CONTROL_PORT, reconnect: bool = False) -> None: """Initialize.""" - self._loop = loop - self._port = port - self._reconnect = reconnect - self._is_stopped = True - self._clients = {} - self._streams = {} - self._groups = {} - self._host = host - self._version = None - self._protocol = None - self._transport = None - self._callbacks = { + self._loop: asyncio.AbstractEventLoop = loop + self._port: int = port + self._reconnect: bool = reconnect + self._is_stopped: bool = True + self._clients: Dict[str, Any] = {} + self._streams: Dict[str, Any] = {} + self._groups: Dict[str, Any] = {} + self._host: str = host + self._version: Optional[str] = None + self._protocol: Optional[Any] = None + self._transport: Optional[asyncio.Transport] = None + self._callbacks: Dict[str, Callable[[Any], None]] = { CLIENT_ONCONNECT: self._on_client_connect, CLIENT_ONDISCONNECT: self._on_client_disconnect, CLIENT_ONVOLUMECHANGED: self._on_client_volume_changed, @@ -148,106 +106,61 @@ def __init__(self, loop, host, port=CONTROL_PORT, reconnect=False): STREAM_ONPROPERTIES: self._on_stream_properties, STREAM_ONUPDATE: self._on_stream_update, SERVER_ONDISCONNECT: self._on_server_disconnect, - SERVER_ONUPDATE: self._on_server_update, + SERVER_ONUPDATE: self._on_server_update } - self._on_update_callback_func = None - self._on_connect_callback_func = None - self._on_disconnect_callback_func = None - self._new_client_callback_func = None - - async def start(self): - """Initiate server connection. - - This method is used to establish a connection with the server. - It performs the necessary steps to connect to the server and - checks for a valid response. If the response is not valid, it - raises an OSError. + self._on_update_callback_func: Optional[Callable[[], None]] = None + self._on_connect_callback_func: Optional[Callable[[], None]] = None + self._on_disconnect_callback_func: Optional[Callable[[Optional[Exception]], None]] = None + self._new_client_callback_func: Optional[Callable[[Any], None]] = None - Raises: - OSError: If the server response is not valid. - - """ + async def start(self) -> None: + """Initiate server connection.""" self._is_stopped = False await self._do_connect() status, error = await self.status() - if (not isinstance(status, dict)) or ("server" not in status): - _LOGGER.warning("connected, but no valid response:\n%s", str(error)) + if (not isinstance(status, dict)) or ('server' not in status): + _LOGGER.warning('connected, but no valid response:\n%s', str(error)) self.stop() raise OSError - _LOGGER.debug("connected to snapserver on %s:%s", self._host, self._port) + _LOGGER.debug('connected to snapserver on %s:%s', self._host, self._port) self.synchronize(status) self._on_server_connect() - def stop(self): - """Stop server. - - This method stops the server by setting the `_is_stopped` flag to True, - disconnecting all clients, and resetting internal data structures. - - """ + def stop(self) -> None: + """Stop server connection.""" self._is_stopped = True self._do_disconnect() - _LOGGER.debug("Stopping") + _LOGGER.debug('Stopping') self._clients = {} self._streams = {} self._groups = {} self._version = None - def _do_disconnect(self): - """Perform the disconnection from the server. - - This method closes the transport connection to the server if it exists. - """ + def _do_disconnect(self) -> None: + """Perform the connection to the server.""" if self._transport: self._transport.close() - async def _do_connect(self): - """Perform the connection to the server. - - This method establishes a connection to the Snapcast server using the specified host and port. - It creates a transport and protocol using the asyncio `create_connection` method. - The `SnapcastProtocol` class is used as the protocol for handling communication with the server. - The `_callbacks` parameter is passed to the `SnapcastProtocol` constructor to handle callbacks. - - Returns: - None - - """ + async def _do_connect(self) -> None: + """Perform the connection to the server.""" self._transport, self._protocol = await self._loop.create_connection( - lambda: SnapcastProtocol(self._callbacks), self._host, self._port - ) - - def _reconnect_cb(self): - """Try to reconnect to the server. - - This method is called when a connection to the server is lost and - attempts to reconnect to the server. It first tries to establish a - new connection and then checks if the server responds with a valid - status. If the response is not valid, it logs a warning and stops - the connection. If the connection is successful and the status is - valid, it synchronizes the status and triggers the `_on_server_connect` - method. + lambda: SnapcastProtocol(self._callbacks), self._host, self._port) - """ - _LOGGER.debug("try reconnect") + def _reconnect_cb(self) -> None: + """Try to reconnect to the server.""" + _LOGGER.debug('try reconnect') - async def try_reconnect(): + async def try_reconnect() -> None: """Actual coroutine to try to reconnect or reschedule. - - This function attempts to reconnect to the server or reschedule the reconnection - based on the response received. If a valid response is received, the status is - synchronized and the `_on_server_connect` method is called. If no valid response - is received, a warning is logged and the connection is stopped. - - Raises: - OSError: If no valid response is received after reconnecting. - + + Raises: + OSError: If there isn't a valid response from the server. """ try: await self._do_connect() status, error = await self.status() - if (not isinstance(status, dict)) or ("server" not in status): - _LOGGER.warning("connected, but no valid response:\n%s", str(error)) + if (not isinstance(status, dict)) or ('server' not in status): + _LOGGER.warning('connected, but no valid response:\n%s', str(error)) self.stop() raise OSError except OSError: @@ -255,700 +168,318 @@ async def try_reconnect(): else: self.synchronize(status) self._on_server_connect() - asyncio.ensure_future(try_reconnect()) - async def _transact(self, method, params=None): - """Wrap requests. - - This method wraps requests made to the server. It checks if the server is connected, - and if so, it sends the request using the `_protocol` and `_transport` attributes. - If the server is not connected, it returns an error indicating that the server is not connected. - - Args: - method (str): The method to be requested. - params (dict, optional): The parameters to be sent with the request. - - Returns: - tuple: A tuple containing the result and error of the request. - - """ + async def _transact(self, method: str, params: Optional[Dict[str, Any]] = None) -> Tuple[Optional[Any], Optional[Dict[str, Any]]]: + """Wrap requests.""" result = error = None - if ( - self._protocol is None - or self._transport is None - or self._transport.is_closing() - ): + if self._protocol is None or self._transport is None or self._transport.is_closing(): error = {"code": None, "message": "Server not connected"} else: result, error = await self._protocol.request(method, params) - return (result, error) + return result, error @property - def version(self): - """ - Return the version of the server. - - Returns: - str: The version of the server. - """ + def version(self) -> Optional[str]: + """Get server version.""" return self._version - async def status(self): - """ - System status. - - Returns: - The system status. - - """ + async def status(self) -> Tuple[Optional[Any], Optional[Dict[str, Any]]]: + """Get system status.""" return await self._transact(SERVER_GETSTATUS) - async def rpc_version(self): - """ - RPC version. - - Returns: - The version of the RPC. - - """ + async def rpc_version(self) -> Tuple[Optional[Any], Optional[Dict[str, Any]]]: + """Get RPC version.""" return await self._transact(SERVER_GETRPCVERSION) - async def delete_client(self, identifier): - """Delete a client. - - Args: - identifier (str): The identifier of the client to be deleted. - - """ - params = {"id": identifier} + async def delete_client(self, identifier: str) -> None: + """Delete client from the server.""" + params = {'id': identifier} response, _ = await self._transact(SERVER_DELETECLIENT, params) self.synchronize(response) - async def client_name(self, identifier, name): - """ - Set client name. - - Args: - identifier (str): The identifier of the client. - name (str): The name to set for the client. - - Returns: - The result of the request. - - """ - return await self._request(CLIENT_SETNAME, identifier, "name", name) - - async def client_latency(self, identifier, latency): - """ - Set client latency. - - Args: - identifier (str): The identifier of the client. - latency (int): The latency value to set. - - Returns: - The result of the request. - - """ - return await self._request(CLIENT_SETLATENCY, identifier, "latency", latency) - - async def client_volume(self, identifier, volume): - """ - Set client volume. - - Args: - identifier (str): The identifier of the client. - volume (int): The volume level to set. - - Returns: - The result of the request. - - """ - return await self._request(CLIENT_SETVOLUME, identifier, "volume", volume) - - async def client_status(self, identifier): - """ - Get client status. - - Args: - identifier (str): The identifier of the client. - - Returns: - dict: A dictionary containing the status of the client. - - """ - return await self._request(CLIENT_GETSTATUS, identifier, "client") - - async def group_status(self, identifier): - """ - Get group status. - - Args: - identifier (str): The identifier of the group. - - Returns: - dict: The status of the group. - - """ - return await self._request(GROUP_GETSTATUS, identifier, "group") - - async def group_mute(self, identifier, status): - """ - Set group mute. - - Args: - identifier (str): The identifier of the group. - status (bool): The mute status to set. - - Returns: - The result of the request. - - """ - return await self._request(GROUP_SETMUTE, identifier, "mute", status) - - async def group_stream(self, identifier, stream_id): - """ - Set group stream. - - Args: - identifier (str): The identifier of the group. - stream_id (str): The ID of the stream to set. + async def client_name(self, identifier: str, name: str) -> Tuple[Optional[Any], Optional[Dict[str, Any]]]: + """Set client name.""" + return await self._request(CLIENT_SETNAME, identifier, 'name', name) - Returns: - The result of the request. + async def client_latency(self, identifier: str, latency: int) -> Tuple[Optional[Any], Optional[Dict[str, Any]]]: + """Set client latency.""" + return await self._request(CLIENT_SETLATENCY, identifier, 'latency', latency) - """ - return await self._request(GROUP_SETSTREAM, identifier, "stream_id", stream_id) + async def client_volume(self, identifier: str, volume: int) -> Tuple[Optional[Any], Optional[Dict[str, Any]]]: + """Set client volume.""" + return await self._request(CLIENT_SETVOLUME, identifier, 'volume', volume) - async def group_clients(self, identifier, clients): - """ - Set group clients. + async def client_status(self, identifier: str) -> Tuple[Optional[Any], Optional[Dict[str, Any]]]: + """Get client status.""" + return await self._request(CLIENT_GETSTATUS, identifier, 'client') - Args: - identifier (str): The identifier of the group. - clients (list): A list of client identifiers to be added to the group. + async def group_status(self, identifier: str) -> Tuple[Optional[Any], Optional[Dict[str, Any]]]: + """Get group status.""" + return await self._request(GROUP_GETSTATUS, identifier, 'group') - Returns: - The result of the request. + async def group_mute(self, identifier: str, status: bool) -> Tuple[Optional[Any], Optional[Dict[str, Any]]]: + """Set group mute.""" + return await self._request(GROUP_SETMUTE, identifier, 'mute', status) - """ - return await self._request(GROUP_SETCLIENTS, identifier, "clients", clients) + async def group_stream(self, identifier: str, stream_id: str) -> Tuple[Optional[Any], Optional[Dict[str, Any]]]: + """Set group stream.""" + return await self._request(GROUP_SETSTREAM, identifier, 'stream_id', stream_id) - async def group_name(self, identifier, name): - """ - Set the name of a group. + async def group_clients(self, identifier: str, clients: List[str]) -> Tuple[Optional[Any], Optional[Dict[str, Any]]]: + """Set group clients.""" + return await self._request(GROUP_SETCLIENTS, identifier, 'clients', clients) - Args: - identifier (str): The identifier of the group. - name (str): The new name for the group. - - Returns: - The result of the request. - - Raises: - VersionMismatchError: If the server version does not support the GROUP_SETNAME command. - """ + async def group_name(self, identifier: str, name: str) -> Tuple[Optional[Any], Optional[Dict[str, Any]]]: + """Set group name.""" self._version_check(GROUP_SETNAME) - return await self._request(GROUP_SETNAME, identifier, "name", name) - - async def stream_control(self, identifier, control_command, control_params): - """ - Set stream control. - - Args: - identifier (str): The identifier of the stream. - control_command (str): The control command to be executed. - control_params (dict): Additional parameters for the control command. + return await self._request(GROUP_SETNAME, identifier, 'name', name) - Returns: - The response from the server. - - Raises: - VersionError: If the server version does not support stream control. - - """ + async def stream_control(self, identifier: str, control_command: str, control_params: Dict[str, Any]) -> Tuple[Optional[Any], Optional[Dict[str, Any]]]: + """Set stream control.""" self._version_check(STREAM_SETPROPERTY) - return await self._request( - STREAM_CONTROL, identifier, "command", control_command, control_params - ) + return await self._request(STREAM_CONTROL, identifier, 'command', control_command, control_params) - async def stream_setmeta(self, identifier, meta): # deprecated + async def stream_setmeta(self, identifier: str, meta: Dict[str, Any]) -> Tuple[Optional[Any], Optional[Dict[str, Any]]]: # deprecated """Set stream metadata.""" - return await self._request(STREAM_SETMETA, identifier, "meta", meta) - - async def stream_setproperty(self, identifier, stream_property, value): - """ - Set stream metadata. - - Args: - identifier (str): The identifier of the stream. - stream_property (str): The property to set. - value: The value to set for the property. + return await self._request(STREAM_SETMETA, identifier, 'meta', meta) - Returns: - The response from the server. - - """ + async def stream_setproperty(self, identifier: str, stream_property: str, value: Any) -> Tuple[Optional[Any], Optional[Dict[str, Any]]]: + """Set stream metadata.""" self._version_check(STREAM_SETPROPERTY) - return await self._request( - STREAM_SETPROPERTY, - identifier, - parameters={"property": stream_property, "value": value}, - ) - - async def stream_add_stream(self, stream_uri): - """ - Add a stream. - - Args: - stream_uri (str): The URI of the stream to be added. - - Returns: - dict or str: The result of adding the stream. If successful, a dictionary - containing the stream ID will be returned. If unsuccessful, an error message - will be returned. + return await self._request(STREAM_SETPROPERTY, identifier, parameters={ + 'property': stream_property, + 'value': value + }) - """ + async def stream_add_stream(self, stream_uri: str) -> Tuple[Optional[Any], Optional[Dict[str, Any]]]: + """Add a stream.""" params = {"streamUri": stream_uri} result, error = await self._transact(STREAM_ADDSTREAM, params) if isinstance(result, dict) and ("id" in result): self.synchronize((await self.status())[0]) return result or error - async def stream_remove_stream(self, identifier): - """ - Remove a Stream. - - Args: - identifier (str): The identifier of the stream to be removed. - - Returns: - dict: The result of the removal operation. - - """ + async def stream_remove_stream(self, identifier: str) -> Tuple[Optional[Any], Optional[Dict[str, Any]]]: + """Remove a Stream from the server.""" result = await self._request(STREAM_REMOVESTREAM, identifier) if isinstance(result, dict) and ("id" in result): self.synchronize((await self.status())[0]) return result - def group(self, group_identifier): - """ - Get a group. - - Args: - group_identifier (str): The identifier of the group. - - Returns: - Group: The group object. - - """ + def group(self, group_identifier: str) -> Any: + """Get a group.""" return self._groups[group_identifier] - def stream(self, stream_identifier): - """ - Get a stream. - - Args: - stream_identifier (str): The identifier of the stream. - - Returns: - Stream: The stream object corresponding to the given identifier. - """ + def stream(self, stream_identifier: str) -> Any: + """Get a stream.""" return self._streams[stream_identifier] - def client(self, client_identifier): - """ - Get a client. - - Args: - client_identifier (str): The identifier of the client. - - Returns: - Client: The client object corresponding to the given identifier. - """ + def client(self, client_identifier: str) -> Any: + """Get a client.""" return self._clients[client_identifier] @property - def groups(self): - """ - Get groups. - - Returns: - list: A list of groups. - """ + def groups(self) -> List[Any]: + """Get groups.""" return list(self._groups.values()) @property - def clients(self): - """ - Get clients. - - Returns: - list: A list of clients. - """ + def clients(self) -> List[Any]: + """Get clients.""" return list(self._clients.values()) @property - def streams(self): - """ - Get streams. - - Returns: - list: A list of streams. - """ + def streams(self) -> List[Any]: + """Get streams.""" return list(self._streams.values()) - def synchronize(self, status): - """ - Synchronize snapserver. - - This method synchronizes the snapserver with the provided status. - It updates the internal state of the server, including the version, - groups, clients, and streams. - - Args: - status (dict): The status of the snapserver. - - Returns: - None - """ - self._version = status["server"]["server"]["snapserver"]["version"] - new_groups = {} - new_clients = {} - new_streams = {} - for stream in status.get("server").get("streams"): - if stream.get("id") in self._streams: - new_streams[stream.get("id")] = self._streams[stream.get("id")] - new_streams[stream.get("id")].update(stream) + def synchronize(self, status: Dict[str, Any]) -> None: + """Synchronize snapserver.""" + self._version = status['server']['server']['snapserver']['version'] + new_groups: Dict[str, Any] = {} + new_clients: Dict[str, Any] = {} + new_streams: Dict[str, Any] = {} + for stream in status.get('server', {}).get('streams', []): + if stream.get('id') in self._streams: + new_streams[stream.get('id')] = self._streams[stream.get('id')] + new_streams[stream.get('id')].update(stream) else: - new_streams[stream.get("id")] = Snapstream(stream) - _LOGGER.debug("stream found: %s", new_streams[stream.get("id")]) - for group in status.get("server").get("groups"): - if group.get("id") in self._groups: - new_groups[group.get("id")] = self._groups[group.get("id")] - new_groups[group.get("id")].update(group) + new_streams[stream.get('id')] = Snapstream(stream) + _LOGGER.debug('stream found: %s', new_streams[stream.get('id')]) + for group in status.get('server', {}).get('groups', []): + if group.get('id') in self._groups: + new_groups[group.get('id')] = self._groups[group.get('id')] + new_groups[group.get('id')].update(group) else: - new_groups[group.get("id")] = Snapgroup(self, group) - for client in group.get("clients"): - if client.get("id") in self._clients: - new_clients[client.get("id")] = self._clients[client.get("id")] - new_clients[client.get("id")].update(client) + new_groups[group.get('id')] = Snapgroup(self, group) + for client in group.get('clients', []): + if client.get('id') in self._clients: + new_clients[client.get('id')] = self._clients[client.get('id')] + new_clients[client.get('id')].update(client) else: - new_clients[client.get("id")] = Snapclient(self, client) - _LOGGER.debug("client found: %s", new_clients[client.get("id")]) - _LOGGER.debug("group found: %s", new_groups[group.get("id")]) + new_clients[client.get('id')] = Snapclient(self, client) + _LOGGER.debug('client found: %s', new_clients[client.get('id')]) + _LOGGER.debug('group found: %s', new_groups[group.get('id')]) self._groups = new_groups self._clients = new_clients self._streams = new_streams # pylint: disable=too-many-arguments - async def _request(self, method, identifier, key=None, value=None, parameters=None): - """ - Perform a request with the given identifier. - - Args: - method (str): The HTTP method to use for the request. - identifier (str): The identifier for the request. - key (str, optional): The key for the request parameter. Defaults to None. - value (str, optional): The value for the request parameter. Defaults to None. - parameters (dict, optional): Additional parameters for the request. Defaults to None. - - Returns: - The result of the request, or an error if the request failed. - """ - params = {"id": identifier} + async def _request(self, method: str, identifier: str, key: Optional[str] = None, value: Optional[Any] = None, parameters: Optional[Dict[str, Any]] = None) -> Tuple[Optional[Any], Optional[Dict[str, Any]]]: + """Perform request with identifier.""" + params = {'id': identifier} if key is not None and value is not None: params[key] = value if isinstance(parameters, dict): params.update(parameters) result, error = await self._transact(method, params) if isinstance(result, dict) and key in result: - return result.get(key) - return result or error - - def _on_server_connect(self): - """ - Handle server connection. - - This method is called when the server is successfully connected. - It logs a debug message and invokes the `_on_connect_callback_func` if it is callable. + return result.get(key), None + return result, error - """ - _LOGGER.debug("Server connected") + def _on_server_connect(self) -> None: + """Handle server connection.""" + _LOGGER.debug('Server connected') if self._on_connect_callback_func and callable(self._on_connect_callback_func): self._on_connect_callback_func() - def _on_server_disconnect(self, exception): - """ - Handle server disconnection. - - Args: - exception: The exception that caused the disconnection. - - Returns: - None - """ - _LOGGER.debug("Server disconnected: %s", str(exception)) - if self._on_disconnect_callback_func and callable( - self._on_disconnect_callback_func - ): + def _on_server_disconnect(self, exception: Optional[Exception]) -> None: + """Handle server disconnection.""" + _LOGGER.debug('Server disconnected: %s', str(exception)) + if self._on_disconnect_callback_func and callable(self._on_disconnect_callback_func): self._on_disconnect_callback_func(exception) self._protocol = None self._transport = None if (not self._is_stopped) and self._reconnect: self._reconnect_cb() - def _on_server_update(self, data): - """ - Handle server update. - - This method is responsible for handling updates received from the server. - It synchronizes the data and calls the update callback function if it is defined. - - Args: - data: The data received from the server. - - Returns: - None - """ + def _on_server_update(self, data: Dict[str, Any]) -> None: + """Handle server update.""" self.synchronize(data) if self._on_update_callback_func and callable(self._on_update_callback_func): self._on_update_callback_func() - def _on_group_mute(self, data): - """ - Handle group mute. - - This method is responsible for handling the mute event of a group. - It updates the mute status of the group and triggers a callback for each client in the group. - - Args: - data (dict): The data containing the group ID and mute status. - - Returns: - None - """ - group = self._groups.get(data.get("id")) - group.update_mute(data) - for client_id in group.clients: - self._clients.get(client_id).callback() - - def _on_group_name_changed(self, data): - """ - Handle group name changed. - - This method is called when the name of a group is changed. It updates the name of the group - with the new data provided. - - Args: - data (dict): A dictionary containing the updated group information. - - Returns: - None - """ - self._groups.get(data.get("id")).update_name(data) - - def _on_group_stream_changed(self, data): - """ - Handle group stream change. - - This method is called when there is a change in the stream of a group. - It updates the stream data for the corresponding group and triggers a callback - for each client in the group. - - Args: - data (dict): The data containing the information about the stream change. - """ - group = self._groups.get(data.get("id")) - group.update_stream(data) - for client_id in group.clients: - self._clients.get(client_id).callback() - - def _on_client_connect(self, data): - """ - Handle client connect. - - This method is called when a client connects to the server. It updates the - connection status of the client and creates a new `Snapclient` instance if - the client is not already present in the `_clients` dictionary. - - Args: - data (dict): A dictionary containing the client data, including the client ID. - """ + def _on_group_mute(self, data: Dict[str, Any]) -> None: + """Handle group mute.""" + group = self._groups.get(data.get('id')) + if group: + group.update_mute(data) + for client_id in group.clients: + self._clients.get(client_id).callback() + + def _on_group_name_changed(self, data: Dict[str, Any]) -> None: + """Handle group name changed.""" + if data.get('id') in self._groups: + self._groups[data.get('id')].update_name(data) + + def _on_group_stream_changed(self, data: Dict[str, Any]) -> None: + """Handle group stream change.""" + group = self._groups.get(data.get('id')) + if group: + group.update_stream(data) + for client_id in group.clients: + self._clients.get(client_id).callback() + + def _on_client_connect(self, data: Dict[str, Any]) -> None: + """Handle client connect.""" client = None - if data.get("id") in self._clients: - client = self._clients[data.get("id")] + if data.get('id') in self._clients: + client = self._clients[data.get('id')] client.update_connected(True) else: - client = Snapclient(self, data.get("client")) - self._clients[data.get("id")] = client - if self._new_client_callback_func and callable( - self._new_client_callback_func - ): + client = Snapclient(self, data.get('client')) + self._clients[data.get('id')] = client + if self._new_client_callback_func and callable(self._new_client_callback_func): self._new_client_callback_func(client) - _LOGGER.debug("client %s connected", client.friendly_name) - - def _on_client_disconnect(self, data): - """ - Handle client disconnect. - - This method is called when a client disconnects from the server. - It updates the connected status of the client and logs a debug message. - - Args: - data (dict): A dictionary containing information about the disconnected client. - """ - self._clients[data.get("id")].update_connected(False) - _LOGGER.debug( - "client %s disconnected", self._clients[data.get("id")].friendly_name - ) - - def _on_client_volume_changed(self, data): - """ - Handle client volume change. - - This method is called when the volume of a client is changed. - It updates the volume of the corresponding client object. - - Args: - data (dict): A dictionary containing the volume change information. - """ - self._clients.get(data.get("id")).update_volume(data) - - def _on_client_name_changed(self, data): - """ - Handle client name changed. - - Args: - data (dict): The data containing the client ID and the updated name. - """ - self._clients.get(data.get("id")).update_name(data) - - def _on_client_latency_changed(self, data): - """ - Handle client latency changed. - - This method is called when the latency of a client changes. It updates the latency information - for the corresponding client. - - Args: - data (dict): A dictionary containing the updated latency information for the client. - """ - self._clients.get(data.get("id")).update_latency(data) - - def _on_stream_meta(self, data): # deprecated + _LOGGER.debug('client %s connected', client.friendly_name) + + def _on_client_disconnect(self, data: Dict[str, Any]) -> None: + """Handle client disconnect.""" + if data.get('id') in self._clients: + self._clients[data.get('id')].update_connected(False) + _LOGGER.debug('client %s disconnected', self._clients[data.get('id')].friendly_name) + + def _on_client_volume_changed(self, data: Dict[str, Any]) -> None: + """Handle client volume change.""" + if data.get('id') in self._clients: + self._clients.get(data.get('id')).update_volume(data) + + def _on_client_name_changed(self, data: Dict[str, Any]) -> None: + """Handle client name changed.""" + if data.get('id') in self._clients: + self._clients.get(data.get('id')).update_name(data) + + def _on_client_latency_changed(self, data: Dict[str, Any]) -> None: + """Handle client latency changed.""" + if data.get('id') in self._clients: + self._clients.get(data.get('id')).update_latency(data) + + def _on_stream_meta(self, data: Dict[str, Any]) -> None: # deprecated """Handle stream metadata update.""" - stream = self._streams[data.get("id")] - stream.update_meta(data.get("meta")) - _LOGGER.debug("stream %s metadata updated", stream.friendly_name) + stream = self._streams[data.get('id')] + stream.update_meta(data.get('meta')) + _LOGGER.debug('stream %s metadata updated', stream.friendly_name) for group in self._groups.values(): - if group.stream == data.get("id"): + if group.stream == data.get('id'): group.callback() - def _on_stream_properties(self, data): - """ - Handle stream properties update. - - This method is called when the properties of a stream are updated. - It updates the properties of the corresponding stream object and triggers - the callback functions for the affected groups and clients. - - Args: - data (dict): A dictionary containing the updated stream properties. - """ - stream = self._streams[data.get("id")] - stream.update_properties(data.get("properties")) - _LOGGER.debug("stream %s properties updated", stream.friendly_name) + def _on_stream_properties(self, data: Dict[str, Any]) -> None: + """Handle stream properties update.""" + stream = self._streams[data.get('id')] + stream.update_properties(data.get('properties')) + _LOGGER.debug('stream %s properties updated', stream.friendly_name) for group in self._groups.values(): - if group.stream == data.get("id"): + if group.stream == data.get('id'): group.callback() for client_id in group.clients: self._clients.get(client_id).callback() - def _on_stream_update(self, data): - """ - Handle stream update. - - This method is called when a stream update event is received. It updates the - corresponding stream object with the new data, triggers the stream's callback, - and updates the associated groups and clients. - - Args: - data (dict): The data containing the stream update information. - """ - if data.get("id") in self._streams: - self._streams[data.get("id")].update(data.get("stream")) - _LOGGER.debug( - "stream %s updated", self._streams[data.get("id")].friendly_name - ) + def _on_stream_update(self, data: Dict[str, Any]) -> None: + """Handle stream update.""" + if data.get('id') in self._streams: + self._streams[data.get('id')].update(data.get('stream')) + _LOGGER.debug('stream %s updated', self._streams[data.get('id')].friendly_name) self._streams[data.get("id")].callback() for group in self._groups.values(): - if group.stream == data.get("id"): + if group.stream == data.get('id'): group.callback() for client_id in group.clients: self._clients.get(client_id).callback() else: - if ( - data.get("stream", {}).get("uri", {}).get("query", {}).get("codec") - == "null" - ): - _LOGGER.debug("stream %s is input-only, ignore", data.get("id")) + if data.get('stream', {}).get('uri', {}).get('query', {}).get('codec') == 'null': + _LOGGER.debug('stream %s is input-only, ignore', data.get('id')) else: - _LOGGER.info("stream %s not found, synchronize", data.get("id")) + _LOGGER.info('stream %s not found, synchronize', data.get('id')) - async def async_sync(): + async def async_sync() -> None: self.synchronize((await self.status())[0]) - asyncio.ensure_future(async_sync()) - def set_on_update_callback(self, func): - """ - Set the on update callback function. - - Parameters: - - func: The callback function to be set. - """ + def set_on_update_callback(self, func: Callable[[], None]) -> None: + """Set on update callback function.""" self._on_update_callback_func = func - def set_on_connect_callback(self, func): - """ - Set on connection callback function. - - Args: - func: The function to be called when a connection is established. - """ + def set_on_connect_callback(self, func: Callable[[], None]) -> None: + """Set on connection callback function.""" self._on_connect_callback_func = func - def set_on_disconnect_callback(self, func): - """ - Set on disconnection callback function. - - Args: - func: The function to be called when a connection is lost. - """ + def set_on_disconnect_callback(self, func: Callable[[Optional[Exception]], None]) -> None: + """Set on disconnection callback function.""" self._on_disconnect_callback_func = func - def set_new_client_callback(self, func): - """ - Set new client callback function. - - Parameters: - - func: The callback function to be set. - """ + def set_new_client_callback(self, func: Callable[[Any], None]) -> None: + """Set new client callback function.""" self._new_client_callback_func = func - def __repr__(self): - """Return string representation of the Snapserver object.""" - return f"Snapserver {self.version} ({self._host})" + def __repr__(self) -> str: + """Return string representation of the server.""" + return f'Snapserver {self.version} ({self._host})' - def _version_check(self, api_call): + def _version_check(self, api_call: str) -> None: """ Checks if the server version meets the minimum requirement for a given API call. - Args: - api_call (str): The name of the API call. - Raises: ServerVersionError: If the server version is lower than the required version for the API call. """ diff --git a/snapcast/control/stream.py b/snapcast/control/stream.py index 9624082..6cac892 100644 --- a/snapcast/control/stream.py +++ b/snapcast/control/stream.py @@ -1,173 +1,84 @@ +"""Snapcast stream.""" +from typing import Any, Callable, Optional + + class Snapstream: - """ - Represents a snapcast stream. - - Attributes: - identifier (str): The stream id. - status (str): The stream status. - name (str): The stream name. - friendly_name (str): The friendly name of the stream. - metadata (dict): The metadata of the stream. - properties (dict): The properties of the stream. - path (str): The stream path. - - Methods: - __init__(data): Initializes the Snapstream object. - update(data): Updates the stream data. - update_meta(data): Updates the stream metadata. - update_metadata(data): Updates the stream metadata. - update_properties(data): Updates the stream properties. - callback(): Runs the callback function. - set_callback(func): Sets the callback function. - """ - - def __init__(self, data): - """ - Initialize the Stream object. - - Args: - data (dict): A dictionary containing the initial data for the Stream. - - """ + """Represents a snapcast stream.""" + + def __init__(self, data: dict) -> None: + """Initialize the Stream object.""" self.update(data) - self._callback_func = None + self._callback_func: Optional[Callable[['Snapstream'], None]] = None @property - def identifier(self): - """ - Get stream id. - - Returns: - str: The stream id. - """ - return self._stream.get("id") + def identifier(self) -> str: + """Get stream id.""" + return self._stream.get('id') @property - def status(self): - """ - Get stream status. - - Returns: - The status of the stream. - """ - return self._stream.get("status") + def status(self) -> Any: + """Get stream status.""" + return self._stream.get('status') @property - def name(self): - """ - Get stream name. - - Returns: - str: The name of the stream. - """ - return self._stream.get("uri").get("query").get("name") + def name(self) -> str: + """Get stream name.""" + return self._stream.get('uri', {}).get('query', {}).get('name', '') @property - def friendly_name(self): - """ - Get friendly name. - - Returns: - str: The friendly name of the stream. If the name is empty, the identifier is returned instead. - """ - return self.name if self.name != "" else self.identifier + def friendly_name(self) -> str: + """Get friendly name.""" + return self.name if self.name != '' else self.identifier @property - def metadata(self): - """Get metadata. - - Returns: - The metadata of the stream, if available. Otherwise, returns None. - """ - if "properties" in self._stream: - return self._stream["properties"].get("metadata") - return self._stream.get("meta") + def metadata(self) -> Optional[dict]: + """Get metadata.""" + if 'properties' in self._stream: + return self._stream['properties'].get('metadata') + return self._stream.get('meta') @property - def meta(self): + def meta(self) -> Optional[dict]: """Get metadata. Deprecated.""" return self.metadata @property - def properties(self): - """ - Get properties. - - Returns: - dict: The properties of the stream. - """ - return self._stream.get("properties") + def properties(self) -> Optional[dict]: + """Get properties.""" + return self._stream.get('properties') @property - def path(self): - """ - Get stream path. - - Returns: - str: The path of the stream URI. + def path(self) -> str: + """Get stream path.""" + return self._stream.get('uri', {}).get('path', '') - """ - return self._stream.get("uri").get("path") - - def update(self, data): - """ - Update stream. - - Args: - data: The updated data for the stream. - - """ + def update(self, data: dict) -> None: + """Update stream.""" self._stream = data - def update_meta(self, data): - """ - Update stream metadata. - - Args: - data (dict): A dictionary containing the updated metadata. - """ + def update_meta(self, data: dict) -> None: + """Update stream metadata.""" self.update_metadata(data) - def update_metadata(self, data): - """ - Update stream metadata. - - Args: - data (dict): The updated metadata for the stream. + def update_metadata(self, data: dict) -> None: + """Update stream metadata.""" + if 'properties' in self._stream: + self._stream['properties']['metadata'] = data + self._stream['meta'] = data - """ - if "properties" in self._stream: - self._stream["properties"]["metadata"] = data - self._stream["meta"] = data + def update_properties(self, data: dict) -> None: + """Update stream properties.""" + self._stream['properties'] = data - def update_properties(self, data): - """ - Update stream properties. + def __repr__(self) -> str: + """Return string representation.""" + return f'Snapstream ({self.name})' - Args: - data (dict): A dictionary containing the updated properties of the stream. - """ - self._stream["properties"] = data - - def __repr__(self): - """Return string representation of the Snapstream object.""" - return f"Snapstream ({self.name})" - - def callback(self): - """Run callback. - - This method executes the callback function, if it exists and is callable. - It passes the current instance of the class as an argument to the callback function. - """ + def callback(self) -> None: + """Run callback if set.""" if self._callback_func and callable(self._callback_func): self._callback_func(self) - def set_callback(self, func): - """ - Set callback. - - Args: - func (callable): The callback function to be set. - - """ + def set_callback(self, func: Callable[['Snapstream'], None]) -> None: + """Set callback function.""" self._callback_func = func