Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ dependencies = [
"urllib3 >= 1.15.1",
"packaging",
"protobuf",
"jupytext >= 1.15.0",
]

[project.scripts]
Expand Down
199 changes: 199 additions & 0 deletions src/kaggle/api/kaggle_api_extended.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import zipfile
from dateutil.relativedelta import relativedelta
from os.path import expanduser
from pathlib import Path
from random import random

import bleach
Expand Down Expand Up @@ -3508,6 +3509,204 @@ def kernels_status_cli(self, kernel, kernel_opt=None):
else:
print('%s has status "%s"' % (kernel, status))

def _resolve_kernel_slug(self, kernel: str) -> tuple[str, str]:
"""Parses a kernel string into (owner_slug, kernel_slug)."""
if "/" in kernel:
self.validate_kernel_string(kernel)
owner_slug, kernel_slug = kernel.split("/", 1)
return owner_slug, kernel_slug

return self.get_config_value(self.CONFIG_NAME_USER), kernel

def benchmarks_pull(self, kernel: str, path: str = None, quiet: bool = False):
"""Pulls a benchmark notebook and converts it to a .py script."""

try:
import jupytext
except ImportError:
raise ImportError("jupytext is required for benchmarks functionality. Please install it.")

effective_path = self.kernels_pull(kernel, path=path, metadata=True, quiet=quiet)
effective_path_obj = Path(effective_path)

# After pulling, find the .ipynb file in effective_path
ipynb_files = list(effective_path_obj.glob("*.ipynb"))
if not ipynb_files:
raise ValueError("Could not find a .ipynb file in the pulled kernel.")

# Rename the first .ipynb file to benchmark.ipynb (if it isn't already)
ipynb_path = ipynb_files[0]
benchmark_ipynb_path = effective_path_obj / "benchmark.ipynb"

if ipynb_path.name != "benchmark.ipynb":
ipynb_path = ipynb_path.rename(benchmark_ipynb_path)
if not quiet:
print(f"Renamed {ipynb_path.name} to benchmark.ipynb")

# Convert to benchmark.py
benchmark_py_path = effective_path_obj / "benchmark.py"
notebook = jupytext.read(str(benchmark_ipynb_path))

# Strip confusing metadata formatting from the resulting Python document
notebook.metadata.setdefault("jupytext", {}).update({
"notebook_metadata_filter": "-all",
"cell_metadata_filter": "-all"
})

jupytext.write(notebook, str(benchmark_py_path), fmt="py:percent")
if not quiet:
print(f"Converted benchmark.ipynb to {benchmark_py_path}")
return effective_path

def benchmarks_pull_cli(self, kernel, kernel_opt=None, path=None):
effective_path = self.benchmarks_pull(kernel or kernel_opt, path=path, quiet=False)
print(f"Benchmark pulled and converted successfully to {effective_path}")

def benchmarks_publish_and_run(
self, kernel: str = None, path: str = None, file_name: str = None, quiet: bool = False
):
"""Converts a local .py benchmark to .ipynb and pushes it to Kaggle."""

try:
import jupytext
except ImportError:
raise ImportError("jupytext is required for benchmarks functionality. Please install it.")

path_obj = Path(path or os.getcwd())
py_path = path_obj / (file_name or "benchmark.py")

if not py_path.exists():
raise FileNotFoundError(f"Source file not found: {py_path}")

ipynb_path = path_obj / "benchmark.ipynb"
notebook = jupytext.read(str(py_path), fmt="py:percent")

# Inject default Kaggle kernelspec so Papermill executes the notebook correctly
notebook.metadata.setdefault("kernelspec", {
"display_name": "Python 3",
"language": "python",
"name": "python3"
})

jupytext.write(notebook, str(ipynb_path))
if not quiet:
print(f"Converted {py_path} to {ipynb_path}")

# Ensure kernel-metadata.json exists and has "personal-benchmark"
metadata_path = path_obj / self.KERNEL_METADATA_FILE
is_new_metadata = not metadata_path.exists()

if is_new_metadata:
metadata = {
"language": "python",
"kernel_type": "notebook",
"is_private": "true",
"enable_gpu": "false",
"enable_internet": "true",
"dataset_sources": [],
"competition_sources": [],
"kernel_sources": [],
"model_sources": [],
}
else:
with open(metadata_path, "r") as f:
metadata = json.load(f)

# The 'kernel' arg explicitly overrides the metadata ID and title.
# This is useful when pushing to a different kernel slug, or resolving missing metadata properties.
# However, if no 'kernel' is provided AND the metadata file doesn't exist yet, we must abort
# because we lack the necessary owner/slug information to create it from scratch.
if kernel:
owner_slug, kernel_slug = self._resolve_kernel_slug(kernel)
new_id = f"{owner_slug}/{kernel_slug}"

if metadata.get("id") != new_id:
metadata.update({
"id": new_id,
"title": kernel_slug.replace("-", " ").title()
})
metadata.pop("id_no", None)
elif is_new_metadata:
raise ValueError("A kernel slug must be specified to create a new metadata file.")

metadata.setdefault("keywords", [])
if "personal-benchmark" not in metadata["keywords"]:
metadata["keywords"].append("personal-benchmark")
metadata["code_file"] = "benchmark.ipynb"

with open(metadata_path, "w") as f:
json.dump(metadata, f, indent=2)

if not quiet:
msg_prefix = "Created" if is_new_metadata else "Updated"
print(f"{msg_prefix} kernel metadata at {metadata_path}")

# Now push using kernels_push
return self.kernels_push(str(path_obj))

def benchmarks_publish_and_run_cli(self, kernel=None, kernel_opt=None, path=None, file_name=None):
result = self.benchmarks_publish_and_run(kernel or kernel_opt, path=path, file_name=file_name, quiet=False)
url_text = f"\nTracking URL: {result.url}" if getattr(result, "url", None) else ""
print(f"Benchmark pushed and started successfully.{url_text}\nRun kaggle benchmarks tasks results to stream output.")

def benchmarks_get_results(self, kernel: str = None, path: str = None, poll_interval: int = 60, timeout: int = None):
"""Polls the status of a benchmark until complete, then downloads the output."""

path_obj = Path(path or os.getcwd())

# If the user didn't provide a kernel slug explicitly, attempt to infer it
# from the local metadata file (e.g. kernel-metadata.json).
if not kernel:
meta_file = path_obj / self.KERNEL_METADATA_FILE
try:
if meta_file.exists():
with open(meta_file, "r") as f:
kernel = json.load(f).get("id")
except (json.JSONDecodeError, OSError):
# Fail gracefully if the file is invalid or unreadable;
# the fallback ValueError beneath will handle the missing kernel.
pass

if not kernel:
raise ValueError("A kernel must be specified, either directly or via a valid local metadata file.")

start_time = time.time()
print(f"Waiting for benchmark {kernel} to complete...")

while True:
response = self.kernels_status(kernel)
status_str = str(response.status).upper()

if "COMPLETE" in status_str:
print(f"Benchmark {kernel} completed.")
break

if "ERROR" in status_str:
message = getattr(response, "failure_message", "")
error_txt = f" Message: {message}" if message else ""
print(f"Benchmark {kernel} failed!{error_txt}\nAttempting to download partial logs for debugging...")
try:
out_path = path_obj / "output"
self.kernels_output(kernel, path=str(out_path), file_pattern=None, force=True, quiet=False)
except Exception as log_err:
print(f"Could not retrieve backend logs: {log_err}")
raise ValueError("Benchmark execution terminated with an error state.")

print(f"Status: {response.status}. Waiting {poll_interval}s...")
time.sleep(poll_interval)

if timeout is not None and (time.time() - start_time) > timeout:
raise TimeoutError(f"Timed out waiting for benchmark after {timeout} seconds.")

# Now download output
print(f"Downloading results for {kernel}...")
out_path = path_obj / "output"
return self.kernels_output(kernel=kernel, path=str(out_path), file_pattern=None, force=True, quiet=False)

def benchmarks_get_results_cli(self, kernel, kernel_opt=None, path=None, poll_interval=60, timeout=None):
self.benchmarks_get_results(kernel or kernel_opt, path=path, poll_interval=poll_interval, timeout=timeout)
print("Output downloaded successfully.")

def model_get(self, model: str) -> ApiModel:
"""Gets a model.

Expand Down
89 changes: 89 additions & 0 deletions src/kaggle/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def main() -> None:
parse_models(subparsers)
parse_files(subparsers)
parse_config(subparsers)
parse_benchmarks(subparsers)
if api.enable_oauth:
parse_auth(subparsers)
args = parser.parse_args()
Expand Down Expand Up @@ -627,6 +628,68 @@ def parse_kernels(subparsers) -> None:
parser_kernels_delete.set_defaults(func=api.kernels_delete_cli)


def parse_benchmarks(subparsers) -> None:
parser_benchmarks = subparsers.add_parser(
"benchmarks", formatter_class=argparse.RawTextHelpFormatter, help=Help.group_benchmarks, aliases=["b"]
)
subparsers_benchmarks = parser_benchmarks.add_subparsers(title="commands", dest="command")
subparsers_benchmarks.required = True
subparsers_benchmarks.choices = Help.benchmarks_choices

# Benchmarks tasks
parser_benchmarks_tasks = subparsers_benchmarks.add_parser(
"tasks", formatter_class=argparse.RawTextHelpFormatter, help=Help.group_benchmark_tasks, aliases=["t"]
)
subparsers_benchmarks_tasks = parser_benchmarks_tasks.add_subparsers(title="commands", dest="command")
subparsers_benchmarks_tasks.required = True
subparsers_benchmarks_tasks.choices = Help.benchmark_tasks_choices

def add_task_command(name, help_text, func, extra_args=None):
cmd_parser = subparsers_benchmarks_tasks.add_parser(
name, formatter_class=argparse.RawTextHelpFormatter, help=help_text
)
opt_group = cmd_parser._action_groups.pop()
opt_group.add_argument("kernel", nargs="?", default=None, help=Help.param_benchmark_kernel)
opt_group.add_argument("-k", "--kernel", dest="kernel_opt", required=False, help=argparse.SUPPRESS)

for args, kwargs in (extra_args or []):
opt_group.add_argument(*args, **kwargs)

cmd_parser._action_groups.append(opt_group)
cmd_parser.set_defaults(func=func)

# Benchmarks Tasks pull
add_task_command(
"pull",
Help.command_benchmarks_pull,
api.benchmarks_pull_cli,
[(("-p", "--path"), {"dest": "path", "required": False, "help": Help.param_benchmark_pull_path})]
)

# Benchmarks publish_and_run
add_task_command(
"run",
Help.command_benchmarks_publish_and_run,
api.benchmarks_publish_and_run_cli,
[
(("-p", "--path"), {"dest": "path", "required": False, "help": Help.param_benchmark_push_path}),
(("-f", "--file"), {"dest": "file_name", "required": False, "help": Help.param_benchmark_file})
]
)

# Benchmarks results
add_task_command(
"results",
Help.command_benchmarks_get_results,
api.benchmarks_get_results_cli,
[
(("-p", "--path"), {"dest": "path", "required": False, "help": Help.param_benchmark_results_path}),
(("--poll-interval",), {"dest": "poll_interval", "default": 60, "type": int, "help": Help.param_benchmark_poll_interval}),
(("--timeout",), {"dest": "timeout", "default": None, "type": int, "help": Help.param_benchmark_timeout})
]
)


def parse_models(subparsers) -> None:
parser_models = subparsers.add_parser(
"models", formatter_class=argparse.RawTextHelpFormatter, help=Help.group_models, aliases=["m"]
Expand Down Expand Up @@ -1070,10 +1133,14 @@ class Help(object):
"f",
"config",
"auth",
"benchmarks",
"b",
]
competitions_choices = ["list", "files", "download", "submit", "submissions", "leaderboard"]
datasets_choices = ["list", "files", "download", "create", "version", "init", "metadata", "status", "delete"]
kernels_choices = ["list", "files", "get", "init", "push", "pull", "output", "status", "update", "delete"]
benchmarks_choices = ["tasks", "t"]
benchmark_tasks_choices = ["pull", "run", "results"]
models_choices = ["instances", "i", "variations", "v", "get", "list", "init", "create", "delete", "update"]
model_instances_choices = ["versions", "v", "get", "files", "list", "init", "create", "delete", "update"]
model_instance_versions_choices = ["init", "create", "download", "delete", "files", "list"]
Expand All @@ -1096,6 +1163,10 @@ class Help(object):
+ ", ".join(model_instance_versions_choices)
+ "}\nconfig {"
+ ", ".join(config_choices)
+ "}\nbenchmarks {"
+ ", ".join(benchmarks_choices)
+ "}\nbenchmarks tasks {"
+ ", ".join(benchmark_tasks_choices)
+ "}"
)
if api.enable_oauth:
Expand All @@ -1110,6 +1181,8 @@ class Help(object):
group_files = "Commands related files"
group_config = "Configuration settings"
group_auth = "Commands related to authentication"
group_benchmarks = "Commands related to Kaggle benchmarks"
group_benchmark_tasks = "Manage benchmark tasks"

# Competitions commands
command_competitions_list = "List available competitions"
Expand Down Expand Up @@ -1140,6 +1213,11 @@ class Help(object):
command_kernels_status = "Display the status of the latest kernel run"
command_kernels_delete = "Delete a kernel"

# Benchmarks commands
command_benchmarks_pull = "Pull an existing benchmark notebook to a local workspace"
command_benchmarks_publish_and_run = "Convert local Python benchmark script to a Kaggle notebook and push it to run"
command_benchmarks_get_results = "Poll the execution status of a benchmark and download the output artifacts"

# Models commands
command_models_files = "List model files"
command_models_get = "Get a model"
Expand Down Expand Up @@ -1306,6 +1384,17 @@ class Help(object):
)
param_kernel_acc = "Specify the type of accelerator to use for the kernel run"

# Benchmarks params
param_benchmark_kernel = (
'Kernel URL suffix in format <owner>/<kernel-name> (use "kaggle kernels list" to show options)'
)
param_benchmark_file = 'Path to the benchmark python file (e.g. "benchmark.py"). Defaults to looking for benchmark.py in the workspace if omitted.'
param_benchmark_pull_path = 'Folder to pull the benchmark into. Defaults to="./<kernel-name>" or the configured Kaggle download path (e.g., ~/.kaggle/kernels/<owner>/<kernel-name>) if omitted.'
param_benchmark_push_path = "Workspace folder containing the benchmark.py and optionally kernel-metadata.json. Defaults to current directory."
param_benchmark_results_path = 'Folder to download the outputs to. Defaults to the Kaggle configured download path appended with "output" (e.g., ~/.kaggle/kernels/<owner>/<kernel-name>/output), or the current directory.'
param_benchmark_poll_interval = "Number of seconds to wait between status checks. Default is 60"
param_benchmark_timeout = "Maximum seconds to wait for execution to complete. None means wait indefinitely"

# Models params
param_model = "Model URL suffix in format <owner>/<model-name>"
param_model_sort_by = (
Expand Down
Loading
Loading