Skip to content

Commit

Permalink
Add an endpoint to return the history of a resource (#5116)
Browse files Browse the repository at this point in the history
* Add an endpoint to return the history of a resource

* Fix test

---------

Co-authored-by: Simon Dumas <[email protected]>
  • Loading branch information
imsdu and Simon Dumas authored Aug 22, 2024
1 parent 70aeb2c commit d0ac5ee
Show file tree
Hide file tree
Showing 27 changed files with 777 additions and 207 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,10 @@
}
}
}
]
],
"properties": {
"rev": {
"type": "integer"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchC
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.config.ElasticSearchViewsConfig
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.deletion.{ElasticSearchDeletionTask, EventMetricsDeletionTask}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.{ElasticSearchCoordinator, ElasticSearchDefaultViewsResetter}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.metrics.{EventMetricsProjection, EventMetricsQuery}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.{contexts, schema => viewsSchemaId, ElasticSearchFiles, ElasticSearchView, ElasticSearchViewEvent}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.query.DefaultViewsQuery
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.routes._
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.routes.{ElasticSearchHistoryRoutes, _}
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.ContextValue.ContextObject
Expand Down Expand Up @@ -305,6 +306,21 @@ class ElasticSearchPluginModule(priority: Int) extends ModuleDef {
)
}

make[EventMetricsQuery].from { (client: ElasticSearchClient, config: ElasticSearchViewsConfig) =>
EventMetricsQuery(client, config.prefix)
}

make[ElasticSearchHistoryRoutes].from {
(
identities: Identities,
aclCheck: AclCheck,
metricsQuery: EventMetricsQuery,
rcr: RemoteContextResolution @Id("aggregate"),
ordering: JsonKeyOrdering
) =>
new ElasticSearchHistoryRoutes(identities, aclCheck, metricsQuery)(rcr, ordering)
}

make[ElasticSearchScopeInitialization]
.from { (views: ElasticSearchViews, serviceAccount: ServiceAccount, config: ElasticSearchViewsConfig) =>
new ElasticSearchScopeInitialization(views, serviceAccount, config.defaults)
Expand Down Expand Up @@ -372,6 +388,7 @@ class ElasticSearchPluginModule(priority: Int) extends ModuleDef {
query: ElasticSearchQueryRoutes,
indexing: ElasticSearchIndexingRoutes,
idResolutionRoute: IdResolutionRoutes,
historyRoutes: ElasticSearchHistoryRoutes,
schemeDirectives: DeltaSchemeDirectives,
baseUri: BaseUri
) =>
Expand All @@ -382,7 +399,8 @@ class ElasticSearchPluginModule(priority: Int) extends ModuleDef {
es.routes,
query.routes,
indexing.routes,
idResolutionRoute.routes
idResolutionRoute.routes,
historyRoutes.routes
)(baseUri),
requiresStrictEntity = true
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ object QueryBuilder {
*/
val empty: QueryBuilder = QueryBuilder(JsonObject.empty)

def unsafe(jsonObject: JsonObject): QueryBuilder = QueryBuilder(jsonObject)

/**
* A [[QueryBuilder]] using the filter ''params''.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.deletion

import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.EventMetricsProjection
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.metrics.eventMetricsIndex
import ch.epfl.bluebrain.nexus.delta.sdk.deletion.ProjectDeletionTask
import ch.epfl.bluebrain.nexus.delta.sdk.deletion.model.ProjectDeletionReport
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Identity, ProjectRef}
Expand All @@ -17,7 +17,7 @@ import io.circe.parser.parse
*/
final class EventMetricsDeletionTask(client: ElasticSearchClient, prefix: String) extends ProjectDeletionTask {

private val index = EventMetricsProjection.eventMetricsIndex(prefix)
private val index = eventMetricsIndex(prefix)

override def apply(project: ProjectRef)(implicit subject: Identity.Subject): IO[ProjectDeletionReport.Stage] =
searchByProject(project).flatMap { search =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch
package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.metrics

import cats.data.NonEmptyChain
import cats.effect.IO
import cats.effect.std.Env
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient.Refresh
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.{ElasticSearchClient, IndexLabel}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.ElasticSearchSink
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.{MetricsMapping, MetricsSettings}
import ch.epfl.bluebrain.nexus.delta.sdk.model.metrics.EventMetric._
Expand All @@ -25,8 +25,7 @@ trait EventMetricsProjection
object EventMetricsProjection {
private val logger = Logger[EventMetricsProjection]

val projectionMetadata: ProjectionMetadata = ProjectionMetadata("system", "event-metrics", None, None)
val eventMetricsIndex: String => IndexLabel = prefix => IndexLabel.unsafe(s"${prefix}_project_metrics")
val projectionMetadata: ProjectionMetadata = ProjectionMetadata("system", "event-metrics", None, None)

// We need a value to return to Distage
private val dummy = new EventMetricsProjection {}
Expand Down Expand Up @@ -81,14 +80,13 @@ object EventMetricsProjection {

for {
shouldRestart <- Env[IO].get("RESET_EVENT_METRICS").map(_.getOrElse("false").toBoolean)
_ <- IO.whenA(shouldRestart) {
client.deleteIndex(index) >>
logger.warn("Resetting event metrics as the env RESET_EVENT_METRICS is set") >> projections.reset(
projectionMetadata.name
)
}
_ <- IO.whenA(shouldRestart)(
logger.warn("Resetting event metrics as the env RESET_EVENT_METRICS is set...") >>
client.deleteIndex(index) >>
projections.reset(projectionMetadata.name)
)
metricsProjection <- apply(sink, supervisor, metrics, createIndex)
} yield (metricsProjection)
} yield metricsProjection

} else IO.pure(dummy)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.metrics

import akka.http.scaladsl.model.Uri
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.{ElasticSearchClient, QueryBuilder}
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.sdk.model.search.SearchResults
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import io.circe.JsonObject
import io.circe.literal.JsonStringContext
import io.circe.syntax.EncoderOps

trait EventMetricsQuery {

def history(project: ProjectRef, id: Iri): IO[SearchResults[JsonObject]]

}

object EventMetricsQuery {

def apply(client: ElasticSearchClient, prefix: String): EventMetricsQuery = new EventMetricsQuery {

val index = eventMetricsIndex(prefix)

private def searchQuery(project: ProjectRef, id: Iri) =
json"""{
"query": {
"bool": {
"must": [
{
"term": {
"project": ${project.asJson}
}
},
{
"term": {
"@id": ${id.asJson}
}
}
]
}
},
"size": 2000,
"from": 0,
"sort": [
{ "rev": { "order" : "asc" } }
]
}
""".asObject.toRight(new IllegalStateException("Should not happen, an es query is an object"))

override def history(project: ProjectRef, id: Iri): IO[SearchResults[JsonObject]] = {
for {
jsonQuery <- IO.fromEither(searchQuery(project, id))
queryBuilder = QueryBuilder.unsafe(jsonQuery)
results <- client.search(queryBuilder, Set(index.value), Uri.Query.Empty)
} yield results
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch

import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.IndexLabel

package object metrics {

val eventMetricsIndex: String => IndexLabel = prefix => IndexLabel.unsafe(s"${prefix}_project_metrics")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.routes

import akka.http.scaladsl.server.Route
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.metrics.EventMetricsQuery
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.query.ElasticSearchQueryError
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.directives.AuthDirectives
import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaDirectives.{emit, projectRef}
import ch.epfl.bluebrain.nexus.delta.sdk.directives.UriDirectives.iriSegment
import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities
import ch.epfl.bluebrain.nexus.delta.sdk.marshalling.RdfMarshalling
import ch.epfl.bluebrain.nexus.delta.sdk.model.search.SearchResults
import ch.epfl.bluebrain.nexus.delta.sdk.model.search.SearchResults.searchResultsEncoder
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions.resources.{read => Read}
import io.circe.syntax.EncoderOps
import io.circe.{Encoder, JsonObject}

class ElasticSearchHistoryRoutes(identities: Identities, aclCheck: AclCheck, metricsQuery: EventMetricsQuery)(implicit
cr: RemoteContextResolution,
ordering: JsonKeyOrdering
) extends AuthDirectives(identities, aclCheck)
with RdfMarshalling {
implicit private val searchEncoder: Encoder.AsObject[SearchResults[JsonObject]] = searchResultsEncoder(_ => None)

def routes: Route =
pathPrefix("history") {
pathPrefix("resources") {
extractCaller { implicit caller =>
projectRef.apply { project =>
authorizeFor(project, Read).apply {
(get & iriSegment & pathEndOrSingleSlash) { id =>
emit(metricsQuery.history(project, id).map(_.asJson).attemptNarrow[ElasticSearchQueryError])
}
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.deletion

import akka.http.scaladsl.model.Uri.Query
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.{ElasticSearchAction, QueryBuilder}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.{ElasticSearchClientSetup, EventMetricsProjection, Fixtures}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.metrics.eventMetricsIndex
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.{ElasticSearchClientSetup, Fixtures}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, Subject}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import ch.epfl.bluebrain.nexus.testkit.CirceLiteral
Expand All @@ -23,7 +24,7 @@ class EventMetricsDeletionTaskSuite

test("Delete all entries for a given project") {
val prefix = "test"
val index = EventMetricsProjection.eventMetricsIndex(prefix)
val index = eventMetricsIndex(prefix)
val projectToDelete = ProjectRef.unsafe("org", "marked-for-deletion")
val anotherProject = ProjectRef.unsafe("org", "another")

Expand Down
Original file line number Diff line number Diff line change
@@ -1,49 +1,90 @@
package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.metrics

import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.metrics.MetricsStream._
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.{EventMetricsProjection, Fixtures}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient.Refresh
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.ElasticSearchSink
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.metrics.EventMetricsProjectionSuite.{metric1, metric2}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.{ElasticSearchClientSetup, Fixtures}
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv
import ch.epfl.bluebrain.nexus.delta.rdf.syntax.iriStringContextSyntax
import ch.epfl.bluebrain.nexus.delta.sdk.metrics.ProjectScopedMetricStream
import ch.epfl.bluebrain.nexus.delta.sdk.model.metrics.EventMetric.{Created, ProjectScopedMetric, Updated}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Anonymous
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Label, ProjectRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{CacheSink, ProjectionProgress, SupervisorSetup}
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{ProjectionProgress, SupervisorSetup}
import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite
import ch.epfl.bluebrain.nexus.testkit.mu.ce.PatienceConfig
import io.circe.Json
import io.circe.syntax.EncoderOps
import io.circe.syntax.{EncoderOps, KeyOps}
import io.circe.{Json, JsonObject}
import munit.AnyFixture

import java.time.Instant
import scala.concurrent.duration.DurationInt

class EventMetricsProjectionSuite extends NexusSuite with SupervisorSetup.Fixture with Fixtures {
class EventMetricsProjectionSuite
extends NexusSuite
with SupervisorSetup.Fixture
with ElasticSearchClientSetup.Fixture
with Fixtures {

override def munitFixtures: Seq[AnyFixture[_]] = List(supervisor)
override def munitFixtures: Seq[AnyFixture[_]] = List(supervisor, esClient)

implicit private val patienceConfig: PatienceConfig = PatienceConfig(2.seconds, 10.millis)

private lazy val sv = supervisor().supervisor
private val sink = CacheSink.events[Json]
private val index = eventMetricsIndex("nexus")

test("Start the metrics projection") {
private lazy val sv = supervisor().supervisor
private lazy val client = esClient()
private lazy val sink = ElasticSearchSink.events(client, 2, 50.millis, index, Refresh.True)

test("Start the metrics projection and index metrics") {
def createIndex = client
.createIndex(index, Some(metricsMapping.value), Some(metricsSettings.value))
.assertEquals(true)
for {
_ <- EventMetricsProjection(
sink,
sv,
_ => metricsStream.take(2),
IO.unit
)
_ <- EventMetricsProjection(sink, sv, _ => EventMetricsProjectionSuite.stream, createIndex)
_ <- sv.describe(EventMetricsProjection.projectionMetadata.name)
.map(_.map(_.progress))
.assertEquals(Some(ProjectionProgress(Offset.at(2L), Instant.EPOCH, 2, 0, 0)))
.eventually
_ <- client.count(index.value).assertEquals(2L)
// Asserting the sources
_ <- client.getSource[Json](index, metric1.eventId).assertEquals(metric1.asJson)
_ <- client.getSource[Json](index, metric2.eventId).assertEquals(metric2.asJson)
} yield ()
}
}

test("Sink has the correct metrics") {
assertEquals(sink.successes.size, 2)
assert(sink.dropped.isEmpty)
assert(sink.failed.isEmpty)
assert(sink.successes.values.toSet.contains(metric1.asJson))
assert(sink.successes.values.toSet.contains(metric2.asJson))
}
object EventMetricsProjectionSuite {
private val org = Label.unsafe("org")
private val proj1 = Label.unsafe("proj1")
private val projectRef1: ProjectRef = ProjectRef(org, proj1)

private val metric1: ProjectScopedMetric = ProjectScopedMetric(
Instant.EPOCH,
Anonymous,
1,
Set(Created),
projectRef1,
org,
iri"http://bbp.epfl.ch/file1",
Set(nxv + "Resource1", nxv + "Resource2"),
JsonObject("extraField" := "extraValue")
)
private val metric2: ProjectScopedMetric = ProjectScopedMetric(
Instant.EPOCH,
Anonymous,
2,
Set(Updated),
projectRef1,
org,
iri"http://bbp.epfl.ch/file1",
Set(nxv + "Resource1", nxv + "Resource3"),
JsonObject(
"extraField" := "extraValue",
"extraField2" := 42
)
)

private val stream = ProjectScopedMetricStream(EntityType("entity"), metric1, metric2)
}
Loading

0 comments on commit d0ac5ee

Please sign in to comment.