From ba0af037f24b69e255d83f1d4a7a4b6d40af6561 Mon Sep 17 00:00:00 2001 From: Allison Wang Date: Thu, 16 Oct 2025 17:30:08 -0700 Subject: [PATCH 1/2] feat: Add Spotify data source This commit introduces a new data source for reading data from the Spotify API. The data source currently supports reading a user's saved tracks. It uses OAuth 2.0 with a refresh token for authentication. The following files have been added: - pyspark_datasources/spotify.py: The implementation of the data source. - docs/datasources/spotify.md: Documentation for the data source. - examples/spotify_example.py: An example of how to use the data source. - tests/test_spotify.py: Unit tests for the data source. --- docs/datasources/spotify.md | 100 +++++++++++++++++++++++++ examples/spotify_example.py | 40 ++++++++++ pyspark_datasources/spotify.py | 129 +++++++++++++++++++++++++++++++++ tests/test_spotify.py | 97 +++++++++++++++++++++++++ 4 files changed, 366 insertions(+) create mode 100644 docs/datasources/spotify.md create mode 100644 examples/spotify_example.py create mode 100644 pyspark_datasources/spotify.py create mode 100644 tests/test_spotify.py diff --git a/docs/datasources/spotify.md b/docs/datasources/spotify.md new file mode 100644 index 0000000..f18147b --- /dev/null +++ b/docs/datasources/spotify.md @@ -0,0 +1,100 @@ + +# Spotify + +The Spotify data source allows you to read data from the Spotify API as a Spark DataFrame. Currently, it supports reading a user's saved tracks. + +## Authentication + +To use the Spotify data source, you need to authenticate with the Spotify API. This is done using OAuth 2.0. You will need to provide your Client ID, Client Secret, and a Refresh Token. + +### 1. Create a Spotify Developer App + +1. Go to the [Spotify Developer Dashboard](https://developer.spotify.com/dashboard) and log in. +2. Click on "Create an App". +3. Give your app a name and description, and agree to the terms. +4. Once the app is created, you will see your **Client ID** and **Client Secret**. Copy these values. +5. Click on "Edit Settings" and add a **Redirect URI**. For the purpose of generating a refresh token, you can use `https://example.com/callback`. Click "Save". + +### 2. Generate a Refresh Token + +A refresh token is a long-lived credential that can be used to obtain new access tokens. You only need to generate this once. + +To generate a refresh token, you can use the following Python script. You will need to install the `spotipy` library (`pip install spotipy`). + +```python +import spotipy +from spotipy.oauth2 import SpotifyOAuth + +# Replace with your Client ID, Client Secret, and Redirect URI +CLIENT_ID = "YOUR_CLIENT_ID" +CLIENT_SECRET = "YOUR_CLIENT_SECRET" +REDIRECT_URI = "https://example.com/callback" + +# The scope determines what permissions your app is requesting. +# For reading saved tracks, you need the 'user-library-read' scope. +SCOPE = "user-library-read" + +auth_manager = SpotifyOAuth( + client_id=CLIENT_ID, + client_secret=CLIENT_SECRET, + redirect_uri=REDIRECT_URI, + scope=SCOPE, + open_browser=True +) + +# This will open a browser window for you to log in and authorize the app. +# After you authorize, you will be redirected to the redirect URI. +# The URL of the redirected page will contain a 'code' parameter. +# Copy the entire URL and paste it into the terminal. + +auth_manager.get_access_token(as_dict=False) + +# The refresh token will be printed to the console. +# It will also be saved in a file named .cache in your current directory. + +print(f"Refresh Token: {auth_manager.get_cached_token()['refresh_token']}") + +``` + +Run this script, and it will guide you through the authorization process. At the end, it will print your refresh token. **Save this token securely.** + +## Usage + +Once you have your credentials, you can use the Spotify data source in PySpark. + +```python +from pyspark.sql import SparkSession + +# Create a SparkSession +spark = SparkSession.builder.appName("SpotifyExample").getOrCreate() + +# Register the data source +from pyspark_datasources.spotify import SpotifyDataSource +spark.dataSource.register(SpotifyDataSource) + +# Load your saved tracks +df = spark.read.format("spotify") \ + .option("spotify.client.id", "YOUR_CLIENT_ID") \ + .option("spotify.client.secret", "YOUR_CLIENT_SECRET") \ + .option("spotify.refresh.token", "YOUR_REFRESH_TOKEN") \ + .option("type", "tracks") \ + .load() + +# Show the data +df.show() +``` + +## Schema + +The schema for the `tracks` type is as follows: + +| Field | Type | +|-------------|---------------------| +| id | `string` | +| name | `string` | +| artists | `array` | +| album | `string` | +| duration_ms | `long` | +| popularity | `integer` | +| added_at | `string` | + diff --git a/examples/spotify_example.py b/examples/spotify_example.py new file mode 100644 index 0000000..17d6f8d --- /dev/null +++ b/examples/spotify_example.py @@ -0,0 +1,40 @@ + +from pyspark.sql import SparkSession + +# This is an example of how to use the Spotify data source. +# Before running this, make sure you have followed the authentication +# instructions in docs/datasources/spotify.md to get your credentials. + +# Create a SparkSession +spark = SparkSession.builder.appName("SpotifyExample").getOrCreate() + +# Register the data source +from pyspark_datasources.spotify import SpotifyDataSource +spark.dataSource.register(SpotifyDataSource) + +# Replace with your actual credentials +CLIENT_ID = "YOUR_CLIENT_ID" +CLIENT_SECRET = "YOUR_CLIENT_SECRET" +REFRESH_TOKEN = "YOUR_REFRESH_TOKEN" + +# Load your saved tracks +try: + df = ( + spark.read.format("spotify") + .option("spotify.client.id", CLIENT_ID) + .option("spotify.client.secret", CLIENT_SECRET) + .option("spotify.refresh.token", REFRESH_TOKEN) + .option("type", "tracks") + .load() + ) + + # Show the data + df.show() + + # Print the schema + df.printSchema() + +except Exception as e: + print(f"An error occurred: {e}") + print("Please ensure you have replaced the placeholder credentials with your actual credentials.") + diff --git a/pyspark_datasources/spotify.py b/pyspark_datasources/spotify.py new file mode 100644 index 0000000..2e172be --- /dev/null +++ b/pyspark_datasources/spotify.py @@ -0,0 +1,129 @@ + +import requests +from pyspark.sql import Row +from pyspark.sql.datasource import DataSource, DataSourceReader +from pyspark.sql.types import ( + StructType, + StructField, + StringType, + IntegerType, + LongType, + ArrayType, +) + + +class SpotifyDataSource(DataSource): + """ + A DataSource for reading data from Spotify. + + Name: `spotify` + + The `type` option is used to specify what data to load. Currently, only + the `tracks` type is supported, which loads the user's saved songs. + + Schema for `tracks` type: + - `id`: string + - `name`: string + - `artists`: array + - `album`: string + - `duration_ms`: long + - `popularity`: integer + - `added_at`: string + + Examples: + --------- + Register the data source. + + >>> from pyspark_datasources import SpotifyDataSource + >>> spark.dataSource.register(SpotifyDataSource) + + Load your saved tracks from Spotify. + + >>> df = ( + ... spark.read.format("spotify") + ... .option("spotify.client.id", "YOUR_CLIENT_ID") + ... .option("spotify.client.secret", "YOUR_CLIENT_SECRET") + ... .option("spotify.refresh.token", "YOUR_REFRESH_TOKEN") + ... .option("type", "tracks") + ... .load() + ... ) + >>> df.show() + +----------------------+--------------------+--------------------+--------------------+-----------+----------+--------------------+ + | id| name| artists| album|duration_ms|popularity| added_at| + +----------------------+--------------------+--------------------+--------------------+-----------+----------+--------------------+ + |1BxfuPKGuaTgP7aM0B...| All Too Well| [Taylor Swift]| Red| 329466| 82|2025-10-16T12:00:00Z| + | ...| ...| ...| ...| ...| ...| ...| + +----------------------+--------------------+--------------------+--------------------+-----------+----------+--------------------+ + """ + + @classmethod + def name(cls): + return "spotify" + + def schema(self): + # Simplified schema for tracks + return StructType( + [ + StructField("id", StringType(), True), + StructField("name", StringType(), True), + StructField( + "artists", ArrayType(StringType(), True), True + ), + StructField("album", StringType(), True), + StructField("duration_ms", LongType(), True), + StructField("popularity", IntegerType(), True), + StructField("added_at", StringType(), True), + ] + ) + + def reader(self, schema): + return SpotifyReader(self.options) + + +class SpotifyReader(DataSourceReader): + def __init__(self, options): + self.options = options + self.client_id = self.options.get("spotify.client.id") + self.client_secret = self.options.get("spotify.client.secret") + self.refresh_token = self.options.get("spotify.refresh.token") + self.type = self.options.get("type", "tracks") + + if not all([self.client_id, self.client_secret, self.refresh_token]): + raise ValueError( + "spotify.client.id, spotify.client.secret, and spotify.refresh.token must be specified in options" + ) + + def read(self, partition): + access_token = self._get_access_token() + headers = {"Authorization": f"Bearer {access_token}"} + + if self.type == "tracks": + url = "https://api.spotify.com/v1/me/tracks" + while url: + response = requests.get(url, headers=headers) + response.raise_for_status() + data = response.json() + for item in data["items"]: + track = item["track"] + yield Row( + id=track["id"], + name=track["name"], + artists=[artist["name"] for artist in track["artists"]], + album=track["album"]["name"], + duration_ms=track["duration_ms"], + popularity=track["popularity"], + added_at=item["added_at"], + ) + url = data.get("next") + else: + raise ValueError(f"Unsupported type: {self.type}") + + def _get_access_token(self): + response = requests.post( + "https://accounts.spotify.com/api/token", + data={"grant_type": "refresh_token", "refresh_token": self.refresh_token}, + auth=(self.client_id, self.client_secret), + ) + response.raise_for_status() + return response.json()["access_token"] + diff --git a/tests/test_spotify.py b/tests/test_spotify.py new file mode 100644 index 0000000..8a9fa36 --- /dev/null +++ b/tests/test_spotify.py @@ -0,0 +1,97 @@ +""" +Unit tests for the Spotify data source. + +These tests focus on the SpotifyReader class directly to avoid issues with +SparkSession creation in certain test environments. This approach allows for +testing the core logic of the data source, including authentication and data +parsing, without a full SparkSession. +""" +import pytest +from unittest.mock import patch, MagicMock +import requests +from pyspark.sql import Row +from pyspark_datasources.spotify import SpotifyReader + +@patch("pyspark_datasources.spotify.requests.post") +@patch("pyspark_datasources.spotify.requests.get") +def test_spotify_reader_success(mock_get, mock_post): + # Mock the response from the token endpoint + mock_post.return_value.json.return_value = {"access_token": "test_token"} + mock_post.return_value.raise_for_status = MagicMock() + + # Mock the response from the tracks endpoint (with pagination) + mock_get.side_effect = [ + MagicMock( + json=lambda: { + "items": [ + { + "added_at": "2025-10-16T12:00:00Z", + "track": { + "id": "track1", + "name": "Test Track 1", + "artists": [{"name": "Artist 1"}], + "album": {"name": "Album 1"}, + "duration_ms": 200000, + "popularity": 50, + }, + } + ], + "next": "https://api.spotify.com/v1/me/tracks?offset=1&limit=1", + }, + raise_for_status=MagicMock(), + ), + MagicMock( + json=lambda: { + "items": [ + { + "added_at": "2025-10-16T12:05:00Z", + "track": { + "id": "track2", + "name": "Test Track 2", + "artists": [{"name": "Artist 2"}], + "album": {"name": "Album 2"}, + "duration_ms": 220000, + "popularity": 60, + }, + } + ], + "next": None, + }, + raise_for_status=MagicMock(), + ), + ] + + options = { + "spotify.client.id": "test_id", + "spotify.client.secret": "test_secret", + "spotify.refresh.token": "test_refresh", + } + reader = SpotifyReader(options) + rows = list(reader.read(None)) + + assert len(rows) == 2 + assert rows[0]["name"] == "Test Track 1" + assert rows[1]["name"] == "Test Track 2" + +def test_spotify_reader_missing_credentials(): + with pytest.raises(ValueError, match="must be specified in options"): + SpotifyReader({}) + +@patch("pyspark_datasources.spotify.requests.post") +@patch("pyspark_datasources.spotify.requests.get") +def test_spotify_reader_api_error(mock_get, mock_post): + mock_post.return_value.json.return_value = {"access_token": "test_token"} + mock_post.return_value.raise_for_status = MagicMock() + + mock_get.return_value.raise_for_status.side_effect = requests.exceptions.HTTPError( + "401 Client Error: Unauthorized for url" + ) + + options = { + "spotify.client.id": "test_id", + "spotify.client.secret": "test_secret", + "spotify.refresh.token": "test_refresh", + } + reader = SpotifyReader(options) + with pytest.raises(requests.exceptions.HTTPError): + list(reader.read(None)) \ No newline at end of file From c150969a00cc52ed60d37748a1f7515191fe46c9 Mon Sep 17 00:00:00 2001 From: Allison Wang Date: Mon, 20 Oct 2025 15:03:36 -0700 Subject: [PATCH 2/2] fix --- .gitignore | 1 + docs/datasources/spotify.md | 18 +++++++++--------- pyspark_datasources/spotify.py | 12 ++++++------ 3 files changed, 16 insertions(+), 15 deletions(-) diff --git a/.gitignore b/.gitignore index f42ab3f..978e00a 100644 --- a/.gitignore +++ b/.gitignore @@ -165,3 +165,4 @@ claude_cache/ # Gemini .gemini/ +.vscode diff --git a/docs/datasources/spotify.md b/docs/datasources/spotify.md index f18147b..783255e 100644 --- a/docs/datasources/spotify.md +++ b/docs/datasources/spotify.md @@ -88,13 +88,13 @@ df.show() The schema for the `tracks` type is as follows: -| Field | Type | -|-------------|---------------------| -| id | `string` | -| name | `string` | -| artists | `array` | -| album | `string` | -| duration_ms | `long` | -| popularity | `integer` | -| added_at | `string` | +| Field | Type | +|-------------|-----------------| +| id | `string` | +| name | `string` | +| artists | `array` | +| album | `string` | +| duration_ms | `long` | +| popularity | `integer` | +| added_at | `string` | diff --git a/pyspark_datasources/spotify.py b/pyspark_datasources/spotify.py index 2e172be..de2ee13 100644 --- a/pyspark_datasources/spotify.py +++ b/pyspark_datasources/spotify.py @@ -48,12 +48,12 @@ class SpotifyDataSource(DataSource): ... .load() ... ) >>> df.show() - +----------------------+--------------------+--------------------+--------------------+-----------+----------+--------------------+ - | id| name| artists| album|duration_ms|popularity| added_at| - +----------------------+--------------------+--------------------+--------------------+-----------+----------+--------------------+ - |1BxfuPKGuaTgP7aM0B...| All Too Well| [Taylor Swift]| Red| 329466| 82|2025-10-16T12:00:00Z| - | ...| ...| ...| ...| ...| ...| ...| - +----------------------+--------------------+--------------------+--------------------+-----------+----------+--------------------+ + +----------------------+----------------+------------------+---------+-----------+----------+--------------------+ + |id |name |artists |album |duration_ms|popularity|added_at | + +----------------------+----------------+------------------+---------+-----------+----------+--------------------+ + |1BxfuPKGuaTgP7aM0B... |All Too Well |[Taylor Swift] |Red |329466 |82 |2025-10-16T12:00:00Z| + |... |... |... |... |... |... |... | + +----------------------+----------------+------------------+---------+-----------+----------+--------------------+ """ @classmethod