Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 45 additions & 35 deletions gittensor/validator/storage/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import logging
from contextlib import contextmanager
from typing import Dict, List
from typing import Callable, Dict, List, Optional

import numpy as np

Expand Down Expand Up @@ -74,6 +74,41 @@ def execute_command(self, query: str, params: tuple = (), commit: bool = True) -
self.logger.error(f'Error executing command: {e}')
return False

def execute_bulk(
self,
query: str,
values: List[tuple],
entity_label: str,
commit: bool = True,
error_detail_fn: Optional[Callable[[], str]] = None,
) -> int:
"""
Execute a bulk INSERT/UPDATE via executemany.

Args:
query: SQL command string
values: List of parameter tuples passed to executemany
entity_label: Name used in the error log (e.g. 'pull request')
commit: Whether to commit after execution (default True)
error_detail_fn: Optional callable producing extra context appended to
the error log; only invoked on failure

Returns:
Number of rows passed to executemany on success, 0 on failure
"""
try:
with self.get_cursor() as cursor:
cursor.executemany(query, values)
if commit:
self.db.commit()
return len(values)
except Exception as e:
if commit:
self.db.rollback()
detail = error_detail_fn() if error_detail_fn else ''
self.logger.error(f'Error in bulk {entity_label} storage: {e}{detail}')
return 0

def set_entity(self, query: str, params: tuple, commit: bool = True) -> bool:
"""
Insert or update an entity using the provided query.
Expand Down Expand Up @@ -201,17 +236,7 @@ def store_pull_requests_bulk(self, pull_requests: List[PullRequest], commit: boo
)
)

try:
with self.get_cursor() as cursor:
cursor.executemany(BULK_UPSERT_PULL_REQUESTS, values)
if commit:
self.db.commit()
return len(values)
except Exception as e:
if commit:
self.db.rollback()
self.logger.error(f'Error in bulk pull request storage: {e}')
return 0
return self.execute_bulk(BULK_UPSERT_PULL_REQUESTS, values, 'pull request', commit=commit)

def store_issues_bulk(self, issues: List[Issue], commit: bool = True) -> int:
"""
Expand Down Expand Up @@ -253,17 +278,7 @@ def store_issues_bulk(self, issues: List[Issue], commit: bool = True) -> int:
)
)

try:
with self.get_cursor() as cursor:
cursor.executemany(BULK_UPSERT_ISSUES, values)
if commit:
self.db.commit()
return len(values)
except Exception as e:
if commit:
self.db.rollback()
self.logger.error(f'Error in bulk issue storage: {e}')
return 0
return self.execute_bulk(BULK_UPSERT_ISSUES, values, 'issue', commit=commit)

def store_file_changes_bulk(self, file_changes: List[FileChange], commit: bool = True) -> int:
"""
Expand Down Expand Up @@ -296,18 +311,13 @@ def store_file_changes_bulk(self, file_changes: List[FileChange], commit: bool =
)
)

try:
with self.get_cursor() as cursor:
cursor.executemany(BULK_UPSERT_FILE_CHANGES, values)
if commit:
self.db.commit()
return len(values)
except Exception as e:
if commit:
self.db.rollback()
prs = {(fc.pr_number, fc.repository_full_name) for fc in file_changes}
self.logger.error(f'Error in bulk file change storage: {e} | PRs: {prs}')
return 0
return self.execute_bulk(
BULK_UPSERT_FILE_CHANGES,
values,
'file change',
commit=commit,
error_detail_fn=lambda: f' | PRs: {set((fc.pr_number, fc.repository_full_name) for fc in file_changes)}',
)

def set_miner_evaluation(
self,
Expand Down
62 changes: 62 additions & 0 deletions tests/validator/test_execute_bulk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""Tests for BaseRepository.execute_bulk — the shared executemany helper used by
the store_*_bulk methods."""

from contextlib import contextmanager
from unittest.mock import MagicMock

from gittensor.validator.storage.repository import BaseRepository


def _repo_with_mock_cursor():
db = MagicMock()
repo = BaseRepository(db)
cursor = MagicMock()

@contextmanager
def fake_cursor():
yield cursor

repo.get_cursor = fake_cursor # type: ignore[method-assign]
return repo, cursor, db


def test_success_returns_row_count_and_commits():
repo, cursor, db = _repo_with_mock_cursor()
values = [(1,), (2,), (3,)]

assert repo.execute_bulk('SQL', values, 'pull request') == 3

cursor.executemany.assert_called_once_with('SQL', values)
db.commit.assert_called_once()
db.rollback.assert_not_called()


def test_commit_false_skips_commit():
repo, cursor, db = _repo_with_mock_cursor()

assert repo.execute_bulk('SQL', [(1,)], 'issue', commit=False) == 1

db.commit.assert_not_called()
db.rollback.assert_not_called()


def test_failure_rolls_back_and_returns_zero():
repo, cursor, db = _repo_with_mock_cursor()
cursor.executemany.side_effect = Exception('boom')

assert repo.execute_bulk('SQL', [(1,)], 'file change') == 0

db.rollback.assert_called_once()
db.commit.assert_not_called()


def test_error_detail_fn_only_invoked_on_failure():
repo, cursor, _ = _repo_with_mock_cursor()
detail = MagicMock(return_value=' | extra')

repo.execute_bulk('SQL', [(1,)], 'file change', error_detail_fn=detail)
detail.assert_not_called()

cursor.executemany.side_effect = Exception('boom')
repo.execute_bulk('SQL', [(1,)], 'file change', error_detail_fn=detail)
detail.assert_called_once()
Loading