Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Request: Support stream commands (play, pause, next etc) #72

Open
Chreece opened this issue Jun 8, 2024 · 1 comment
Open

Request: Support stream commands (play, pause, next etc) #72

Chreece opened this issue Jun 8, 2024 · 1 comment

Comments

@Chreece
Copy link

Chreece commented Jun 8, 2024

Is it possible to add support for these functions?

Documentation

@Chreece
Copy link
Author

Chreece commented Jun 18, 2024

I'm no programmer, don't kill me please...
Just wanted to see how good is chatgpt:

stream.py:

"""Snapcast stream."""

import requests
import json

class Snapstream():
    """Represents a snapcast stream."""

    def __init__(self, data, host, port=1705):
        """Initialize."""
        self.update(data)
        self._callback_func = None
        self.host = host
        self.port = port
        self.url = f"http://{self.host}:{self.port}/jsonrpc"

    @property
    def identifier(self):
        """Get stream id."""
        return self._stream.get('id')

    @property
    def status(self):
        """Get stream status."""
        return self._stream.get('status')

    @property
    def name(self):
        """Get stream name."""
        return self._stream.get('uri').get('query').get('name')

    @property
    def friendly_name(self):
        """Get friendly name."""
        return self.name if self.name != '' else self.identifier

    @property
    def metadata(self):
        """Get metadata."""
        if 'properties' in self._stream:
            return self._stream['properties'].get('metadata')
        return self._stream.get('meta')

    @property
    def meta(self):
        """Get metadata. Deprecated."""
        return self.metadata

    @property
    def properties(self):
        """Get properties."""
        return self._stream.get('properties')

    @property
    def path(self):
        """Get stream path."""
        return self._stream.get('uri').get('path')

    def update(self, data):
        """Update stream."""
        self._stream = data

    def update_meta(self, data):
        """Update stream metadata."""
        self.update_metadata(data)

    def update_metadata(self, data):
        """Update stream metadata."""
        if 'properties' in self._stream:
            self._stream['properties']['metadata'] = data
        self._stream['meta'] = data

    def update_properties(self, data):
        """Update stream properties."""
        self._stream['properties'] = data

    def __repr__(self):
        """Return string representation."""
        return f'Snapstream ({self.name})'

    def callback(self):
        """Run callback."""
        if self._callback_func and callable(self._callback_func):
            self._callback_func(self)

    def set_callback(self, func):
        """Set callback."""
        self._callback_func = func

    def _send_command(self, method, params=None):
        headers = {'Content-Type': 'application/json'}
        payload = {
            "jsonrpc": "2.0",
            "method": method,
            "params": params if params else [],
            "id": 1
        }
        response = requests.post(self.url, headers=headers, data=json.dumps(payload))
        return response.json()

    def play(self):
        """Play the stream."""
        return self._send_command("StreamControl.Play", [self.identifier])

    def pause(self):
        """Pause the stream."""
        return self._send_command("StreamControl.Pause", [self.identifier])

    def next(self):
        """Next track in the stream."""
        return self._send_command("StreamControl.Next", [self.identifier])

    def previous(self):
        """Previous track in the stream."""
        return self._send_command("StreamControl.Previous", [self.identifier])

    def shuffle(self, enable=True):
        """Enable or disable shuffle."""
        return self._send_command("StreamControl.Shuffle", [self.identifier, enable])

    def repeat(self, mode):
        """Set repeat mode."""
        # mode should be one of 'off', 'one', 'all'
        return self._send_command("StreamControl.Repeat", [self.identifier, mode])
if __name__ == "__main__":
    # Example usage
    sample_data = {
        'id': 'example_stream_id',
        'status': 'playing',
        'uri': {
            'query': {
                'name': 'Example Stream'
            },
            'path': '/example/path'
        },
        'properties': {
            'metadata': 'Example Metadata'
        },
        'meta': 'Example Meta'
    }

    host = 'localhost'
    stream = Snapstream(sample_data, host)

    # Play the stream
    play_response = stream.play()
    print("Play response:", play_response)

    # Pause the stream
    pause_response = stream.pause()
    print("Pause response:", pause_response)

    # Next track
    next_response = stream.next()
    print("Next response:", next_response)

    # Previous track
    previous_response = stream.previous()
    print("Previous response:", previous_response)

    # Enable shuffle
    shuffle_response = stream.shuffle(True)
    print("Shuffle response:", shuffle_response)

    # Set repeat mode to 'all'
    repeat_response = stream.repeat('all')
    print("Repeat response:", repeat_response)

init.py:

"""Snapcast control for Snapcast 0.11.1."""

from snapcast.control.server import Snapserver, CONTROL_PORT
from snapcast.control.stream import Snapstream  # Ensure you import the updated Snapstream class

async def create_server(loop, host, port=CONTROL_PORT, reconnect=False):
    """Server factory."""
    server = Snapserver(loop, host, port, reconnect)
    await server.start()
    return server

class ExtendedSnapserver(Snapserver):
    """Extended Snapserver with stream control."""

    def __init__(self, loop, host, port=CONTROL_PORT, reconnect=False):
        super().__init__(loop, host, port, reconnect)
        self.streams = {}

    async def start(self):
        """Start the server and initialize streams."""
        await super().start()
        # Initialize Snapstream instances for each stream
        for stream_id, stream_data in self.streams.items():
            self.streams[stream_id] = Snapstream(stream_data, self.host, self.port)

    def add_stream(self, stream_data):
        """Add a stream to the server."""
        stream_id = stream_data['id']
        self.streams[stream_id] = Snapstream(stream_data, self.host, self.port)

    def play_stream(self, stream_id):
        """Play a stream."""
        if stream_id in self.streams:
            return self.streams[stream_id].play()

    def pause_stream(self, stream_id):
        """Pause a stream."""
        if stream_id in self.streams:
            return self.streams[stream_id].pause()

    def next_stream(self, stream_id):
        """Next track in the stream."""
        if stream_id in self.streams:
            return self.streams[stream_id].next()

    def previous_stream(self, stream_id):
        """Previous track in the stream."""
        if stream_id in self.streams:
            return self.streams[stream_id].previous()

    def shuffle_stream(self, stream_id, enable=True):
        """Enable or disable shuffle for the stream."""
        if stream_id in self.streams:
            return self.streams[stream_id].shuffle(enable)

    def repeat_stream(self, stream_id, mode):
        """Set repeat mode for the stream."""
        if stream_id in self.streams:
            return self.streams[stream_id].repeat(mode)
# Example usage of ExtendedSnapserver
async def main():
    loop = asyncio.get_event_loop()
    server = ExtendedSnapserver(loop, 'localhost')
    await server.start()

    # Add a sample stream
    sample_stream_data = {
        'id': 'example_stream_id',
        'status': 'playing',
        'uri': {
            'query': {
                'name': 'Example Stream'
            },
            'path': '/example/path'
        },
        'properties': {
            'metadata': 'Example Metadata'
        },
        'meta': 'Example Meta'
    }
    server.add_stream(sample_stream_data)

    # Play the stream
    play_response = server.play_stream('example_stream_id')
    print("Play response:", play_response)

    # Pause the stream
    pause_response = server.pause_stream('example_stream_id')
    print("Pause response:", pause_response)

    # Next track
    next_response = server.next_stream('example_stream_id')
    print("Next response:", next_response)

    # Previous track
    previous_response = server.previous_stream('example_stream_id')
    print("Previous response:", previous_response)

    # Enable shuffle
    shuffle_response = server.shuffle_stream('example_stream_id', True)
    print("Shuffle response:", shuffle_response)

    # Set repeat mode to 'all'
    repeat_response = server.repeat_stream('example_stream_id', 'all')
    print("Repeat response:", repeat_response)

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

Can someone use them?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant