diff --git a/pyproject.toml b/pyproject.toml index 2ffc61db..5919c0c3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,6 +32,7 @@ dependencies = [ "urllib3 >= 1.15.1", "packaging", "protobuf", + "jupytext >= 1.15.0", ] [project.scripts] diff --git a/src/kaggle/api/kaggle_api_extended.py b/src/kaggle/api/kaggle_api_extended.py index f1ccd972..12070424 100644 --- a/src/kaggle/api/kaggle_api_extended.py +++ b/src/kaggle/api/kaggle_api_extended.py @@ -35,6 +35,7 @@ import zipfile from dateutil.relativedelta import relativedelta from os.path import expanduser +from pathlib import Path from random import random import bleach @@ -3508,6 +3509,204 @@ def kernels_status_cli(self, kernel, kernel_opt=None): else: print('%s has status "%s"' % (kernel, status)) + def _resolve_kernel_slug(self, kernel: str) -> tuple[str, str]: + """Parses a kernel string into (owner_slug, kernel_slug).""" + if "/" in kernel: + self.validate_kernel_string(kernel) + owner_slug, kernel_slug = kernel.split("/", 1) + return owner_slug, kernel_slug + + return self.get_config_value(self.CONFIG_NAME_USER), kernel + + def benchmarks_pull(self, kernel: str, path: str = None, quiet: bool = False): + """Pulls a benchmark notebook and converts it to a .py script.""" + + try: + import jupytext + except ImportError: + raise ImportError("jupytext is required for benchmarks functionality. Please install it.") + + effective_path = self.kernels_pull(kernel, path=path, metadata=True, quiet=quiet) + effective_path_obj = Path(effective_path) + + # After pulling, find the .ipynb file in effective_path + ipynb_files = list(effective_path_obj.glob("*.ipynb")) + if not ipynb_files: + raise ValueError("Could not find a .ipynb file in the pulled kernel.") + + # Rename the first .ipynb file to benchmark.ipynb (if it isn't already) + ipynb_path = ipynb_files[0] + benchmark_ipynb_path = effective_path_obj / "benchmark.ipynb" + + if ipynb_path.name != "benchmark.ipynb": + ipynb_path = ipynb_path.rename(benchmark_ipynb_path) + if not quiet: + print(f"Renamed {ipynb_path.name} to benchmark.ipynb") + + # Convert to benchmark.py + benchmark_py_path = effective_path_obj / "benchmark.py" + notebook = jupytext.read(str(benchmark_ipynb_path)) + + # Strip confusing metadata formatting from the resulting Python document + notebook.metadata.setdefault("jupytext", {}).update({ + "notebook_metadata_filter": "-all", + "cell_metadata_filter": "-all" + }) + + jupytext.write(notebook, str(benchmark_py_path), fmt="py:percent") + if not quiet: + print(f"Converted benchmark.ipynb to {benchmark_py_path}") + return effective_path + + def benchmarks_pull_cli(self, kernel, kernel_opt=None, path=None): + effective_path = self.benchmarks_pull(kernel or kernel_opt, path=path, quiet=False) + print(f"Benchmark pulled and converted successfully to {effective_path}") + + def benchmarks_publish_and_run( + self, kernel: str = None, path: str = None, file_name: str = None, quiet: bool = False + ): + """Converts a local .py benchmark to .ipynb and pushes it to Kaggle.""" + + try: + import jupytext + except ImportError: + raise ImportError("jupytext is required for benchmarks functionality. Please install it.") + + path_obj = Path(path or os.getcwd()) + py_path = path_obj / (file_name or "benchmark.py") + + if not py_path.exists(): + raise FileNotFoundError(f"Source file not found: {py_path}") + + ipynb_path = path_obj / "benchmark.ipynb" + notebook = jupytext.read(str(py_path), fmt="py:percent") + + # Inject default Kaggle kernelspec so Papermill executes the notebook correctly + notebook.metadata.setdefault("kernelspec", { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }) + + jupytext.write(notebook, str(ipynb_path)) + if not quiet: + print(f"Converted {py_path} to {ipynb_path}") + + # Ensure kernel-metadata.json exists and has "personal-benchmark" + metadata_path = path_obj / self.KERNEL_METADATA_FILE + is_new_metadata = not metadata_path.exists() + + if is_new_metadata: + metadata = { + "language": "python", + "kernel_type": "notebook", + "is_private": "true", + "enable_gpu": "false", + "enable_internet": "true", + "dataset_sources": [], + "competition_sources": [], + "kernel_sources": [], + "model_sources": [], + } + else: + with open(metadata_path, "r") as f: + metadata = json.load(f) + + # The 'kernel' arg explicitly overrides the metadata ID and title. + # This is useful when pushing to a different kernel slug, or resolving missing metadata properties. + # However, if no 'kernel' is provided AND the metadata file doesn't exist yet, we must abort + # because we lack the necessary owner/slug information to create it from scratch. + if kernel: + owner_slug, kernel_slug = self._resolve_kernel_slug(kernel) + new_id = f"{owner_slug}/{kernel_slug}" + + if metadata.get("id") != new_id: + metadata.update({ + "id": new_id, + "title": kernel_slug.replace("-", " ").title() + }) + metadata.pop("id_no", None) + elif is_new_metadata: + raise ValueError("A kernel slug must be specified to create a new metadata file.") + + metadata.setdefault("keywords", []) + if "personal-benchmark" not in metadata["keywords"]: + metadata["keywords"].append("personal-benchmark") + metadata["code_file"] = "benchmark.ipynb" + + with open(metadata_path, "w") as f: + json.dump(metadata, f, indent=2) + + if not quiet: + msg_prefix = "Created" if is_new_metadata else "Updated" + print(f"{msg_prefix} kernel metadata at {metadata_path}") + + # Now push using kernels_push + return self.kernels_push(str(path_obj)) + + def benchmarks_publish_and_run_cli(self, kernel=None, kernel_opt=None, path=None, file_name=None): + result = self.benchmarks_publish_and_run(kernel or kernel_opt, path=path, file_name=file_name, quiet=False) + url_text = f"\nTracking URL: {result.url}" if getattr(result, "url", None) else "" + print(f"Benchmark pushed and started successfully.{url_text}\nRun kaggle benchmarks tasks results to stream output.") + + def benchmarks_get_results(self, kernel: str = None, path: str = None, poll_interval: int = 60, timeout: int = None): + """Polls the status of a benchmark until complete, then downloads the output.""" + + path_obj = Path(path or os.getcwd()) + + # If the user didn't provide a kernel slug explicitly, attempt to infer it + # from the local metadata file (e.g. kernel-metadata.json). + if not kernel: + meta_file = path_obj / self.KERNEL_METADATA_FILE + try: + if meta_file.exists(): + with open(meta_file, "r") as f: + kernel = json.load(f).get("id") + except (json.JSONDecodeError, OSError): + # Fail gracefully if the file is invalid or unreadable; + # the fallback ValueError beneath will handle the missing kernel. + pass + + if not kernel: + raise ValueError("A kernel must be specified, either directly or via a valid local metadata file.") + + start_time = time.time() + print(f"Waiting for benchmark {kernel} to complete...") + + while True: + response = self.kernels_status(kernel) + status_str = str(response.status).upper() + + if "COMPLETE" in status_str: + print(f"Benchmark {kernel} completed.") + break + + if "ERROR" in status_str: + message = getattr(response, "failure_message", "") + error_txt = f" Message: {message}" if message else "" + print(f"Benchmark {kernel} failed!{error_txt}\nAttempting to download partial logs for debugging...") + try: + out_path = path_obj / "output" + self.kernels_output(kernel, path=str(out_path), file_pattern=None, force=True, quiet=False) + except Exception as log_err: + print(f"Could not retrieve backend logs: {log_err}") + raise ValueError("Benchmark execution terminated with an error state.") + + print(f"Status: {response.status}. Waiting {poll_interval}s...") + time.sleep(poll_interval) + + if timeout is not None and (time.time() - start_time) > timeout: + raise TimeoutError(f"Timed out waiting for benchmark after {timeout} seconds.") + + # Now download output + print(f"Downloading results for {kernel}...") + out_path = path_obj / "output" + return self.kernels_output(kernel=kernel, path=str(out_path), file_pattern=None, force=True, quiet=False) + + def benchmarks_get_results_cli(self, kernel, kernel_opt=None, path=None, poll_interval=60, timeout=None): + self.benchmarks_get_results(kernel or kernel_opt, path=path, poll_interval=poll_interval, timeout=timeout) + print("Output downloaded successfully.") + def model_get(self, model: str) -> ApiModel: """Gets a model. diff --git a/src/kaggle/cli.py b/src/kaggle/cli.py index 87558529..e7f8f6d0 100644 --- a/src/kaggle/cli.py +++ b/src/kaggle/cli.py @@ -55,6 +55,7 @@ def main() -> None: parse_models(subparsers) parse_files(subparsers) parse_config(subparsers) + parse_benchmarks(subparsers) if api.enable_oauth: parse_auth(subparsers) args = parser.parse_args() @@ -627,6 +628,68 @@ def parse_kernels(subparsers) -> None: parser_kernels_delete.set_defaults(func=api.kernels_delete_cli) +def parse_benchmarks(subparsers) -> None: + parser_benchmarks = subparsers.add_parser( + "benchmarks", formatter_class=argparse.RawTextHelpFormatter, help=Help.group_benchmarks, aliases=["b"] + ) + subparsers_benchmarks = parser_benchmarks.add_subparsers(title="commands", dest="command") + subparsers_benchmarks.required = True + subparsers_benchmarks.choices = Help.benchmarks_choices + + # Benchmarks tasks + parser_benchmarks_tasks = subparsers_benchmarks.add_parser( + "tasks", formatter_class=argparse.RawTextHelpFormatter, help=Help.group_benchmark_tasks, aliases=["t"] + ) + subparsers_benchmarks_tasks = parser_benchmarks_tasks.add_subparsers(title="commands", dest="command") + subparsers_benchmarks_tasks.required = True + subparsers_benchmarks_tasks.choices = Help.benchmark_tasks_choices + + def add_task_command(name, help_text, func, extra_args=None): + cmd_parser = subparsers_benchmarks_tasks.add_parser( + name, formatter_class=argparse.RawTextHelpFormatter, help=help_text + ) + opt_group = cmd_parser._action_groups.pop() + opt_group.add_argument("kernel", nargs="?", default=None, help=Help.param_benchmark_kernel) + opt_group.add_argument("-k", "--kernel", dest="kernel_opt", required=False, help=argparse.SUPPRESS) + + for args, kwargs in (extra_args or []): + opt_group.add_argument(*args, **kwargs) + + cmd_parser._action_groups.append(opt_group) + cmd_parser.set_defaults(func=func) + + # Benchmarks Tasks pull + add_task_command( + "pull", + Help.command_benchmarks_pull, + api.benchmarks_pull_cli, + [(("-p", "--path"), {"dest": "path", "required": False, "help": Help.param_benchmark_pull_path})] + ) + + # Benchmarks publish_and_run + add_task_command( + "run", + Help.command_benchmarks_publish_and_run, + api.benchmarks_publish_and_run_cli, + [ + (("-p", "--path"), {"dest": "path", "required": False, "help": Help.param_benchmark_push_path}), + (("-f", "--file"), {"dest": "file_name", "required": False, "help": Help.param_benchmark_file}) + ] + ) + + # Benchmarks results + add_task_command( + "results", + Help.command_benchmarks_get_results, + api.benchmarks_get_results_cli, + [ + (("-p", "--path"), {"dest": "path", "required": False, "help": Help.param_benchmark_results_path}), + (("--poll-interval",), {"dest": "poll_interval", "default": 60, "type": int, "help": Help.param_benchmark_poll_interval}), + (("--timeout",), {"dest": "timeout", "default": None, "type": int, "help": Help.param_benchmark_timeout}) + ] + ) + + def parse_models(subparsers) -> None: parser_models = subparsers.add_parser( "models", formatter_class=argparse.RawTextHelpFormatter, help=Help.group_models, aliases=["m"] @@ -1070,10 +1133,14 @@ class Help(object): "f", "config", "auth", + "benchmarks", + "b", ] competitions_choices = ["list", "files", "download", "submit", "submissions", "leaderboard"] datasets_choices = ["list", "files", "download", "create", "version", "init", "metadata", "status", "delete"] kernels_choices = ["list", "files", "get", "init", "push", "pull", "output", "status", "update", "delete"] + benchmarks_choices = ["tasks", "t"] + benchmark_tasks_choices = ["pull", "run", "results"] models_choices = ["instances", "i", "variations", "v", "get", "list", "init", "create", "delete", "update"] model_instances_choices = ["versions", "v", "get", "files", "list", "init", "create", "delete", "update"] model_instance_versions_choices = ["init", "create", "download", "delete", "files", "list"] @@ -1096,6 +1163,10 @@ class Help(object): + ", ".join(model_instance_versions_choices) + "}\nconfig {" + ", ".join(config_choices) + + "}\nbenchmarks {" + + ", ".join(benchmarks_choices) + + "}\nbenchmarks tasks {" + + ", ".join(benchmark_tasks_choices) + "}" ) if api.enable_oauth: @@ -1110,6 +1181,8 @@ class Help(object): group_files = "Commands related files" group_config = "Configuration settings" group_auth = "Commands related to authentication" + group_benchmarks = "Commands related to Kaggle benchmarks" + group_benchmark_tasks = "Manage benchmark tasks" # Competitions commands command_competitions_list = "List available competitions" @@ -1140,6 +1213,11 @@ class Help(object): command_kernels_status = "Display the status of the latest kernel run" command_kernels_delete = "Delete a kernel" + # Benchmarks commands + command_benchmarks_pull = "Pull an existing benchmark notebook to a local workspace" + command_benchmarks_publish_and_run = "Convert local Python benchmark script to a Kaggle notebook and push it to run" + command_benchmarks_get_results = "Poll the execution status of a benchmark and download the output artifacts" + # Models commands command_models_files = "List model files" command_models_get = "Get a model" @@ -1306,6 +1384,17 @@ class Help(object): ) param_kernel_acc = "Specify the type of accelerator to use for the kernel run" + # Benchmarks params + param_benchmark_kernel = ( + 'Kernel URL suffix in format / (use "kaggle kernels list" to show options)' + ) + param_benchmark_file = 'Path to the benchmark python file (e.g. "benchmark.py"). Defaults to looking for benchmark.py in the workspace if omitted.' + param_benchmark_pull_path = 'Folder to pull the benchmark into. Defaults to="./" or the configured Kaggle download path (e.g., ~/.kaggle/kernels//) if omitted.' + param_benchmark_push_path = "Workspace folder containing the benchmark.py and optionally kernel-metadata.json. Defaults to current directory." + param_benchmark_results_path = 'Folder to download the outputs to. Defaults to the Kaggle configured download path appended with "output" (e.g., ~/.kaggle/kernels///output), or the current directory.' + param_benchmark_poll_interval = "Number of seconds to wait between status checks. Default is 60" + param_benchmark_timeout = "Maximum seconds to wait for execution to complete. None means wait indefinitely" + # Models params param_model = "Model URL suffix in format /" param_model_sort_by = ( diff --git a/tests/test_benchmarks.py b/tests/test_benchmarks.py new file mode 100644 index 00000000..d68990a4 --- /dev/null +++ b/tests/test_benchmarks.py @@ -0,0 +1,220 @@ +import os +import json +import unittest +import tempfile +from unittest.mock import MagicMock, patch + +from kaggle.api.kaggle_api_extended import KaggleApi + + +class TestBenchmarks(unittest.TestCase): + def setUp(self): + self.api = KaggleApi() + self.api.get_config_value = MagicMock(return_value="testuser") + + @patch("jupytext.read") + @patch("jupytext.write") + def test_benchmarks_pull(self, mock_write, mock_read): + with tempfile.TemporaryDirectory() as tmpdir: + self.api.kernels_pull = MagicMock(return_value=tmpdir) + # Create a dummy ipynb file as if kernels_pull pulled it + dummy_ipynb = os.path.join(tmpdir, "notebook.ipynb") + with open(dummy_ipynb, "w") as f: + f.write("{}") + + result_dir = self.api.benchmarks_pull( + "testuser/my-benchmark", path=tmpdir, quiet=True + ) + self.assertEqual(result_dir, tmpdir) + + # 1. notebooks should be renamed to benchmark.ipynb + benchmark_ipynb = os.path.join(tmpdir, "benchmark.ipynb") + self.assertTrue(os.path.exists(benchmark_ipynb)) + self.assertFalse(os.path.exists(dummy_ipynb)) + + # 2. jupytext read and write should be called + mock_read.assert_called_once_with(benchmark_ipynb) + mock_write.assert_called_once() + self.assertEqual( + mock_write.call_args[0][1], os.path.join(tmpdir, "benchmark.py") + ) + self.assertEqual(mock_write.call_args[1]["fmt"], "py:percent") + self.api.kernels_pull.assert_called_once_with( + "testuser/my-benchmark", path=tmpdir, metadata=True, quiet=True + ) + + @patch("jupytext.read") + @patch("jupytext.write") + def test_benchmarks_publish_and_run(self, mock_write, mock_read): + with tempfile.TemporaryDirectory() as tmpdir: + # Create a dummy python file + py_file = os.path.join(tmpdir, "benchmark.py") + with open(py_file, "w") as f: + f.write("print('hello')") + + self.api.kernels_push = MagicMock(return_value="push_success") + + result = self.api.benchmarks_publish_and_run( + kernel="testuser/new-benchmark", + path=tmpdir, + file_name="benchmark.py", + quiet=True, + ) + + self.assertEqual(result, "push_success") + + # verify jupytext was used to read .py and write .ipynb + mock_read.assert_called_once_with(py_file, fmt="py:percent") + mock_write.assert_called_once() + self.assertEqual( + mock_write.call_args[0][1], os.path.join(tmpdir, "benchmark.ipynb") + ) + + # verify metadata was created correctly + metadata_file = os.path.join(tmpdir, "kernel-metadata.json") + self.assertTrue(os.path.exists(metadata_file)) + with open(metadata_file, "r") as f: + metadata = json.load(f) + + self.assertEqual(metadata["id"], "testuser/new-benchmark") + self.assertEqual(metadata["code_file"], "benchmark.ipynb") + self.assertIn("personal-benchmark", metadata["keywords"]) + + self.api.kernels_push.assert_called_once_with(tmpdir) + + @patch("jupytext.read") + @patch("jupytext.write") + def test_benchmarks_publish_and_run_existing_metadata(self, mock_write, mock_read): + with tempfile.TemporaryDirectory() as tmpdir: + # Create a dummy python file + py_file = os.path.join(tmpdir, "benchmark.py") + with open(py_file, "w") as f: + f.write("print('hello')") + + # Create existing metadata + metadata_file = os.path.join(tmpdir, "kernel-metadata.json") + with open(metadata_file, "w") as f: + json.dump( + { + "id": "otheruser/existing-benchmark", + "code_file": "old.ipynb", + "keywords": ["tag1"], + }, + f, + ) + + self.api.kernels_push = MagicMock(return_value="push_success") + + # Act + self.api.benchmarks_publish_and_run(path=tmpdir, quiet=True) + + # Assert + with open(metadata_file, "r") as f: + metadata = json.load(f) + # The keyword and code_file should be forcibly updated + self.assertIn("personal-benchmark", metadata["keywords"]) + self.assertIn("tag1", metadata["keywords"]) + self.assertEqual(metadata["code_file"], "benchmark.ipynb") + # id remains the same + self.assertEqual(metadata["id"], "otheruser/existing-benchmark") + + @patch("jupytext.read") + @patch("jupytext.write") + def test_benchmarks_publish_and_run_explicit_kernel(self, mock_write, mock_read): + with tempfile.TemporaryDirectory() as tmpdir: + py_file = os.path.join(tmpdir, "benchmark.py") + with open(py_file, "w") as f: + f.write("print('hello')") + + # Create existing metadata + metadata_file = os.path.join(tmpdir, "kernel-metadata.json") + with open(metadata_file, "w") as f: + json.dump({"id": "old/existing", "id_no": 12345}, f) + + self.api.kernels_push = MagicMock(return_value="push_success") + + # Act with explicit kernel override + self.api.benchmarks_publish_and_run( + kernel="newuser/new-benchmark", path=tmpdir, quiet=True + ) + + # Assert + self.api.kernels_push.assert_called_once_with(tmpdir) + with open(metadata_file, "r") as f: + metadata = json.load(f) + + self.assertEqual(metadata["id"], "newuser/new-benchmark") + self.assertEqual(metadata["title"], "New Benchmark") + self.assertNotIn("id_no", metadata) + self.assertIn("personal-benchmark", metadata["keywords"]) + + @patch("time.sleep") + def test_benchmarks_get_results(self, mock_sleep): + # mock status to return 'running' once, then 'complete' + class MockStatus: + def __init__(self, status): + self.status = status + self.failure_message = "" + + self.api.kernels_status = MagicMock( + side_effect=[MockStatus("running"), MockStatus("complete")] + ) + self.api.kernels_output = MagicMock(return_value="output_data") + + result = self.api.benchmarks_get_results( + "testuser/my-bench", path="some_path", poll_interval=10 + ) + + self.assertEqual(result, "output_data") + self.assertEqual(self.api.kernels_status.call_count, 2) + mock_sleep.assert_called_once_with(10) + self.api.kernels_output.assert_called_once_with( + kernel="testuser/my-bench", + path=os.path.join("some_path", "output"), + file_pattern=None, + force=True, + quiet=False, + ) + + def test_benchmarks_get_results_error(self): + class MockStatusError: + def __init__(self): + self.status = "error" + self.failure_message = "syntax error" + + self.api.kernels_status = MagicMock(return_value=MockStatusError()) + self.api.kernels_output = MagicMock() + + with self.assertRaisesRegex(ValueError, "error state"): + self.api.benchmarks_get_results("testuser/my-bench") + + @patch("time.sleep") + def test_benchmarks_get_results_no_kernel(self, mock_sleep): + class MockStatus: + def __init__(self, status): + self.status = status + self.failure_message = "" + + self.api.kernels_status = MagicMock(return_value=MockStatus("complete")) + self.api.kernels_output = MagicMock(return_value="output_data") + + with tempfile.TemporaryDirectory() as tmpdir: + # Create existing metadata + metadata_file = os.path.join(tmpdir, "kernel-metadata.json") + with open(metadata_file, "w") as f: + json.dump({"id": "implicit/my-bench"}, f) + + result = self.api.benchmarks_get_results(kernel=None, path=tmpdir) + + self.assertEqual(result, "output_data") + self.api.kernels_output.assert_called_once_with( + kernel="implicit/my-bench", + path=os.path.join(tmpdir, "output"), + file_pattern=None, + force=True, + quiet=False, + ) + + +if __name__ == "__main__": + unittest.main()