Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 17 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,23 +110,28 @@ benchmark configurations using supported modules.**

## Support Matrix

### setup / installation
### Setup / Installation

| system | local | aws | docker | gcp | azure |
|------------|-------|---------------|--------|-----|-------|
| Exasol | ✗ | ✓<sup>1</sup> | ✗ | ✗ | ✗ |
| ClickHouse | ✗ | ✓<sup>1</sup> | ✗ | ✗ | ✗ |
| system | local | aws | docker | gcp | azure |
|------------|-------|-----|--------|-----|-------|
| Exasol | ✗ | ✓ | ✗ | ✗ | ✗ |
| ClickHouse | ✗ | ✓ | ✗ | ✗ | ✗ |

Notes:
### Workloads

1. Only single-node deployments supported at this time.
<!-- link definitions for table headers -->
[clickbench]: benchkit/workloads/clickbench/README.md "ClickHouse ClickBench"
[estuary]: benchkit/workloads/estuary/README.md "Estuary Warehouse Report"

### "tpch" workload
| system | tpch | [clickbench] | [estuary] |
|------------|------|--------------|-----------------|
| Exasol | ✓ | ✓ | ✓<sup>(1)</sup> |
| ClickHouse | ✓ | ✓ | ✗<sup>(1)</sup> |

Notes:

1. Work in Progress

| system | local | aws | docker | gcp | azure |
|------------|-------|-----|--------|-----|-------|
| Exasol | ✗ | ✓ | ✗ | ✗ | ✗ |
| ClickHouse | ✗ | ✓ | ✗ | ✗ | ✗ |

## Documentation

Expand Down
2 changes: 2 additions & 0 deletions benchkit/common/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .file_management import download_file_to_storage
from .markers import exclude_from_package
13 changes: 13 additions & 0 deletions benchkit/common/file_management.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from pathlib import Path


def download_file_to_storage(url: str, target: Path) -> None:
"""Download data from a given URL to given target path"""
import requests

with (
requests.get(url, allow_redirects=True, stream=True) as handle,
open(target, "wb") as file,
):
handle.raise_for_status()
file.write(handle.content)
2 changes: 1 addition & 1 deletion benchkit/common/markers.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@ def exclude_from_package(obj: T) -> T:
... def install(self) -> bool:
... return self._install_database()
"""
obj._exclude_from_package = True # type: ignore
obj._exclude_from_package = True # type: ignore[attr-defined]
return obj
16 changes: 6 additions & 10 deletions benchkit/package/creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,16 @@
import shutil
import zipfile
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal, cast
from typing import Any, Literal, cast

from jinja2 import Environment, FileSystemLoader
from rich.console import Console

if TYPE_CHECKING:
from ..systems.base import SystemUnderTest
from ..util import ensure_directory
from .code_minimizer import CodeMinimizer
from .formatter import PackageFormatter
from .import_cleaner import ImportCleaner
from ..util import ensure_directory
from .code_minimizer import CodeMinimizer
from .formatter import PackageFormatter
from .import_cleaner import ImportCleaner
from ..systems.base import SystemUnderTest

console = Console()

Expand Down Expand Up @@ -352,9 +351,6 @@ def _copy_minimal_framework_files(self) -> None:
"run": [
"parsers.py"
], # Only parsers, no __init__.py to avoid import issues
"package": [
"markers.py", # imported by systems/base
],
"systems": None, # Copy all - needed for database connections
"workloads": None, # Copy all - needed for workload execution
"common": None,
Expand Down
44 changes: 22 additions & 22 deletions benchkit/run/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,25 @@
from collections.abc import Callable
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Literal
from typing import TYPE_CHECKING, Any, Literal

import pandas as pd
from rich.console import Console

from ..common.markers import exclude_from_package
from benchkit.common import exclude_from_package
from benchkit.util import ensure_directory, load_json, save_json

from ..debug import is_debug_enabled
from ..systems import create_system
from ..systems.base import SystemUnderTest
from ..util import ensure_directory, load_json, save_json
from ..workloads import create_workload
from .parallel_executor import ParallelExecutor
from .parsers import normalize_runs

if TYPE_CHECKING:
from benchkit.systems import SystemUnderTest
from benchkit.workloads import Workload


console = Console()


Expand Down Expand Up @@ -409,7 +414,7 @@ def _execute_phase(
phase: PhaseConfig,
context: ExecutionContext,
force: bool = False,
workload: Any = None,
workload: "Workload | None" = None,
) -> bool | list[dict[str, Any]]:
"""
Universal phase executor handling setup, load, and query phases.
Expand Down Expand Up @@ -514,7 +519,7 @@ def _build_phase_tasks(
context: ExecutionContext,
force: bool,
package_path: Path | None,
workload: Any,
workload: "Workload",
) -> dict[str, Callable[[], TaskResult]]:
"""
Build task callables for each system in the benchmark.
Expand Down Expand Up @@ -650,7 +655,7 @@ def _get_system_for_context(
self,
system_config: dict[str, Any],
context: ExecutionContext,
) -> SystemUnderTest:
) -> "SystemUnderTest":
"""
Create system instance configured for the execution context.

Expand Down Expand Up @@ -736,11 +741,11 @@ def _get_system_for_context(
@exclude_from_package
def _setup_operation(
self,
system: SystemUnderTest,
system: "SystemUnderTest",
system_config: dict[str, Any],
instance_manager: Any,
package_path: Path | None, # unused for setup
workload: Any, # unused for setup
workload: "Workload", # unused for setup
) -> tuple[bool, dict[str, Any]]:
"""
Execute setup operation for a single system.
Expand Down Expand Up @@ -849,11 +854,11 @@ def _setup_operation(

def _load_operation(
self,
system: SystemUnderTest,
system: "SystemUnderTest",
system_config: dict[str, Any],
instance_manager: Any,
package_path: Path | None,
workload: Any,
workload: "Workload",
) -> tuple[bool, dict[str, Any]]:
"""
Execute load operation for a single system.
Expand Down Expand Up @@ -902,11 +907,11 @@ def _load_operation(

def _query_operation(
self,
system: SystemUnderTest,
system: "SystemUnderTest",
system_config: dict[str, Any],
instance_manager: Any,
package_path: Path | None,
workload: Any,
workload: "Workload",
) -> tuple[bool, list[dict[str, Any]]]:
"""
Execute query operation for a single system.
Expand Down Expand Up @@ -1044,7 +1049,7 @@ def _create_system_for_local_execution(
system_config: dict[str, Any],
instance_manager: Any,
output_callback: Callable[[str], None] | None = None,
) -> SystemUnderTest:
) -> "SystemUnderTest":
"""Create system object configured for local-to-remote execution.

IMPORTANT: Uses the PUBLIC IP from the cloud instance manager to enable
Expand Down Expand Up @@ -1483,17 +1488,12 @@ def _prepare_storage_phase(self) -> bool:
return True

def _execute_queries(
self, system: Any, workload: Any
self, system: "SystemUnderTest", workload: "Workload"
) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
"""Execute benchmark queries with timing and monitoring."""
query_names = self.config["workload"]["queries"]["include"]
runs_per_query = self.config["workload"]["runs_per_query"]
warmup_runs = self.config["workload"]["warmup_runs"]

# If no queries specified, use workload's default (all queries)
if not query_names:
query_names = workload.queries_to_include

# Extract multiuser configuration
multiuser_config = self.config["workload"].get("multiuser") or {}
num_streams = 1
Expand All @@ -1508,7 +1508,7 @@ def _execute_queries(
# Execute queries
result_dict = workload.run_workload(
system=system,
query_names=query_names,
query_names=workload.get_included_queries(),
runs_per_query=runs_per_query,
warmup_runs=warmup_runs,
num_streams=num_streams,
Expand Down Expand Up @@ -1570,7 +1570,7 @@ def _save_setup_summary(

def _load_setup_summary_to_system(
self,
system: SystemUnderTest,
system: "SystemUnderTest",
system_name: str,
executor: "ParallelExecutor | None" = None,
) -> None:
Expand Down
122 changes: 120 additions & 2 deletions benchkit/systems/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal

from benchkit.common.markers import exclude_from_package
from benchkit.common import exclude_from_package

from ..util import safe_command

if TYPE_CHECKING:
# avoid cyclic dependency problems
from ..util import safe_command
from ..workloads import Workload


TableOperation = Literal[
"DEFAULT",
"OPTIMIZE TABLE",
Expand Down Expand Up @@ -70,6 +72,8 @@ def __init__(

# Workload configuration for dynamic tuning
self.workload_config = workload_config or {}
# default schema for queries
self.schema: str | None = None

# Command recording for report reproduction
self.setup_commands: list[dict[str, Any]] = []
Expand Down Expand Up @@ -421,6 +425,64 @@ def load_data_from_iterable(
"""
pass

def load_data_from_url_with_download(
self,
schema_name: str,
table_name: str,
data_url: str | list[str],
/,
extension: str = ".csv",
**kwargs: Any,
) -> bool:
"""
Download given resources to host filesystem
and then load files into database using `load_data()` above
"""
from benchkit.common import download_file_to_storage

local_storage: Path = self.data_dir or Path("/var/tmp")
downloaded_files: list[Path] = []
file_part: int = 0
## download files locally
url_list: list[str] = [data_url] if isinstance(data_url, str) else data_url
for url in url_list:
file_name: str = f"{table_name}_{file_part}{extension}"
target_path: Path = local_storage / file_name
if target_path.exists():
self._log(f"Download: reusing existing file {target_path}")
downloaded_files.append(target_path)
continue
self._log(f"Downloading {file_name} from {url}")
try:
download_file_to_storage(url, target_path)
downloaded_files.append(target_path)
except Exception as e:
self._log(f"Error downloading {file_name}: {e}")
return False

## then, import files one by one
for file in downloaded_files:
if not self.load_data(table_name, file, schema=schema_name):
return False
return True

def load_data_from_url(
self,
schema_name: str,
table_name: str,
data_url: str | list[str],
/,
extension: str = ".csv",
**kwargs: Any,
) -> bool:
"""
Load table data from a URL or a set of URLs.
Default implementation downloads data to local storage and then imports the downloaded file(s) using load_data()
"""
return self.load_data_from_url_with_download(
schema_name, table_name, data_url, extension=extension, **kwargs
)

@abstractmethod
def execute_query(
self,
Expand Down Expand Up @@ -1774,6 +1836,62 @@ def estimate_execution_time(
"""
return timedelta(minutes=5)

# noinspection PyMethodMayBeStatic
def split_sql_statements(self, sql: str) -> list[str]:
"""
Split SQL script into individual statements.
Method is not static in case some system uses different syntax for splitting

Handles:
- Semicolon-separated statements
- SQL comments (-- and /* */)
- Empty lines

Returns:
List of individual SQL statements
"""
statements = []
current_statement = []
in_comment = False

for line in sql.split("\n"):
stripped = line.strip()

# Skip SQL comments
if stripped.startswith("--"):
continue

# Handle multi-line comments
if "/*" in stripped:
in_comment = True
if "*/" in stripped:
in_comment = False
continue
if in_comment:
continue

# Skip empty lines
if not stripped:
continue

# Check if line ends with semicolon (statement terminator)
if stripped.endswith(";"):
# Add the line without semicolon to current statement
current_statement.append(stripped[:-1])
# Join and add to statements list
statements.append("\n".join(current_statement))
# Reset for next statement
current_statement = []
else:
# Add line to current statement
current_statement.append(stripped)

# Add any remaining statement (for scripts without trailing semicolon)
if current_statement:
statements.append("\n".join(current_statement))

return statements


def get_system_class(system_kind: str) -> type | None:
"""
Expand Down
Loading