Skip to content

Commit

Permalink
Fix deadlock and DELETE/UPDATE unindexed datasets (#3620)
Browse files Browse the repository at this point in the history
* Fix deadlock and DELETE/UPDATE unindexed datasets

PBENCH-1328

A bug revealed another bug:

1. `POST`/`DELETE` `/datasets/{id}` fails when the dataset isn't indexed; with
the ability to disable server indexing entirely and to remove dataset indexed
data, this is undesirable. As long as the dataset doesn't have a `WORKING` ops
status, we should be able to update or delete.
2. When the operation fails, the status is already set to `WORKING`, and this
was not corrected on exit, leaving the dataset locked perpetually.
  • Loading branch information
dbutenhof authored May 6, 2024
1 parent 9a76f13 commit 81eb077
Show file tree
Hide file tree
Showing 14 changed files with 326 additions and 220 deletions.
106 changes: 70 additions & 36 deletions lib/pbench/server/api/resources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@
MetadataBadKey,
MetadataError,
OperationName,
OperationState,
)
from pbench.server.database.models.server_settings import ServerSetting
from pbench.server.database.models.users import User
from pbench.server.sync import Sync


class APIAbort(Exception):
Expand Down Expand Up @@ -1419,6 +1421,51 @@ class UriBase:
host_value: str


@dataclass
class AuditContext:
"""Manage API audit context"""

audit: Optional[Audit] = None
finalize: bool = True
status: AuditStatus = AuditStatus.SUCCESS
reason: Optional[AuditReason] = None
attributes: Optional[JSONOBJECT] = None

def add_attribute(self, key: str, value: Any):
"""Add a single audit attribute
Args:
key: key name
value: key value
"""
if self.attributes is None:
self.attributes = {key: value}
else:
self.attributes[key] = value

def add_attributes(self, attr: JSONOBJECT):
"""Add multiple audit attributes as a JSON dict
Args:
attr: a JSON dict
"""
if self.attributes is None:
self.attributes = attr
else:
self.attributes.update(attr)

def set_error(self, error: str, reason: Optional[AuditReason] = None):
"""Set an audit error
Args:
error: error string
reason: audit failure reason
"""
self.add_attribute("error", error)
self.status = AuditStatus.FAILURE
self.reason = reason


class ApiBase(Resource):
"""A base class for Pbench queries that provides common parameter handling
behavior for specialized subclasses.
Expand Down Expand Up @@ -2031,67 +2078,54 @@ def _dispatch(
# wants to emit a special audit sequence it can disable "finalize"
# in the context. It can also pass "attributes" by setting that
# field.
auditing = {
"audit": audit,
"finalize": bool(audit),
"status": AuditStatus.SUCCESS,
"reason": None,
"attributes": None,
}

auditing = AuditContext(audit=audit)
context = {
"auditing": auditing,
"attributes": schema.attributes,
"raw_params": raw_params,
"sync": None,
}

response = None
sync_message = None
try:
response = execute(params, request, context)
except APIInternalError as e:
current_app.logger.exception("{} {}", api_name, e.details)
auditing.set_error(str(e), AuditReason.INTERNAL)
sync_message = str(e)
abort(e.http_status, message=str(e))
except APIAbort as e:
current_app.logger.warning(
"{} client error {}: '{}'", api_name, e.http_status, e
)
if auditing["finalize"]:
attr = auditing.get("attributes", {"message": str(e)})
try:
Audit.create(
root=auditing["audit"],
status=AuditStatus.FAILURE,
reason=auditing["reason"],
attributes=attr,
)
except Exception:
current_app.logger.error(
"Unexpected exception on audit: {}", auditing
)
auditing.set_error(str(e))
sync_message = str(e)
abort(e.http_status, message=str(e), **e.kwargs)
except Exception as e:
x = APIInternalError("Unexpected exception")
x.__cause__ = e
current_app.logger.exception(
"Exception {} API error: {}: {!r}", api_name, x, auditing
)
if auditing["finalize"]:
attr = auditing.get("attributes", {})
attr["message"] = str(e)
auditing.set_error(str(e), AuditReason.INTERNAL)
sync_message = str(e)
abort(x.http_status, message=x.message)
finally:
# If the operation created a Sync object, it will have been updated
# and removed unless the operation failed. This means we're here
# because of an exception, and one of the handlers has set an
# appropriate message to record in the operations table.
sync: Optional[Sync] = context.get("sync")
if sync:
sync.update(dataset, OperationState.FAILED, message=sync_message)
if auditing.audit and auditing.finalize:
Audit.create(
root=auditing["audit"],
status=AuditStatus.FAILURE,
reason=AuditReason.INTERNAL,
attributes=attr,
root=auditing.audit,
status=auditing.status,
reason=auditing.reason,
attributes=auditing.attributes,
)
abort(x.http_status, message=x.message)
if auditing["finalize"]:
Audit.create(
root=auditing["audit"],
status=auditing["status"],
reason=auditing["reason"],
attributes=auditing["attributes"],
)
return response

def _get(self, args: ApiParams, req: Request, context: ApiContext) -> Response:
Expand Down
7 changes: 4 additions & 3 deletions lib/pbench/server/api/resources/api_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,9 @@ def _post(self, params: ApiParams, req: Request, context: ApiContext) -> Respons
status = HTTPStatus.OK
except Exception as e:
raise APIInternalError(str(e)) from e
context["auditing"]["attributes"] = key.as_json()
response = jsonify(key.as_json())
result = key.as_json()
context["auditing"].add_attributes(result)
response = jsonify(result)
response.status_code = status
return response

Expand Down Expand Up @@ -162,7 +163,7 @@ def _delete(self, params: ApiParams, req: Request, context: ApiContext) -> Respo
raise APIAbort(HTTPStatus.NOT_FOUND, "Requested key not found")
key = keys[0]
try:
context["auditing"]["attributes"] = key.as_json()
context["auditing"].add_attributes(key.as_json())
key.delete()
return "deleted", HTTPStatus.OK
except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion lib/pbench/server/api/resources/datasets_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def _put(self, params: ApiParams, req: Request, context: ApiContext) -> Response
dataset = params.uri["dataset"]
metadata = params.body["metadata"]

context["auditing"]["attributes"] = {"updated": metadata}
context["auditing"].add_attribute("updated", metadata)

# Validate the authenticated user's authorization for the combination
# of "owner" and "access".
Expand Down
40 changes: 18 additions & 22 deletions lib/pbench/server/api/resources/query_apis/datasets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
ParamType,
)
from pbench.server.api.resources.query_apis import ElasticBase
from pbench.server.database.models.datasets import Dataset, Metadata
from pbench.server.database.models.datasets import Dataset
from pbench.server.database.models.index_map import IndexMap
from pbench.server.database.models.templates import Template

Expand Down Expand Up @@ -95,40 +95,36 @@ def get_index(
) -> str:
"""Retrieve ES indices based on a given root_index_name.
Datasets marked "archiveonly" aren't indexed, and can't be referenced
in most APIs that rely on Elasticsearch. Instead, we'll raise a
CONFLICT error.
Datasets without an index can't be referenced in most APIs that rely on
Elasticsearch. Instead, we'll raise a NOT_FOUND error. However, the
/api/v1/datasets API will specify ok_no_index as they need to operate
on the dataset regardless of whether indexing is enabled.
All indices are returned if root_index_name is omitted.
Args:
dataset: dataset object
root_index_name: A root index name like "run-data"
ok_no_index: Don't fail on an archiveonly dataset
ok_no_index: Don't fail if dataset has no indices
Raises:
APIAbort(CONFLICT) if indexing was disabled on the target dataset.
APIAbort(NOT_FOUND) if the dataset has no matching index data
APIAbort(NOT_FOUND) if index is required and the dataset has none
Returns:
A string that joins all selected indices with ",", suitable for use
in an Elasticsearch query URI.
"""

archive_only = Metadata.getvalue(dataset, Metadata.SERVER_ARCHIVE)
if archive_only:
if ok_no_index:
return ""
raise APIAbort(HTTPStatus.CONFLICT, "Dataset indexing was disabled")

index_keys = list(IndexMap.indices(dataset, root_index_name))
index_keys = IndexMap.indices(dataset, root_index_name)
if index_keys:
return ",".join(index_keys)
if ok_no_index:
return ""

if not index_keys:
raise APIAbort(
HTTPStatus.NOT_FOUND,
f"Dataset has no {root_index_name if root_index_name else 'indexed'!r} data",
)

indices = ",".join(index_keys)
return indices
raise APIAbort(
HTTPStatus.NOT_FOUND,
f"Dataset has no {root_index_name if root_index_name else 'indexed'!r} data",
)

def get_aggregatable_fields(
self, mappings: JSON, prefix: AnyStr = "", result: Union[List, None] = None
Expand Down
Loading

0 comments on commit 81eb077

Please sign in to comment.