Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store and return in the API iris of source files when copying #4618

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion delta/plugins/storage/src/main/resources/contexts/files.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
"_storage": {
"@id": "https://bluebrain.github.io/nexus/vocabulary/storage",
"@type": "@id"
}
},
"_sourceFile": "https://bluebrain.github.io/nexus/vocabulary/sourceFile"
},
"@id": "https://bluebrain.github.io/nexus/contexts/files.json"
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ final class Files(
for {
pc <- fetchContext.onCreate(projectRef)
iri <- generateId(pc)
_ <- test(CreateFile(iri, projectRef, testStorageRef, testStorageType, testAttributes, caller.subject, tag))
_ <- test(CreateFile(iri, projectRef, testStorageRef, testStorageType, testAttributes, caller.subject, tag, None))
(storageRef, storage) <- fetchAndValidateActiveStorage(storageId, projectRef, pc)
attributes <- extractFileAttributes(iri, entity, storage)
res <- eval(CreateFile(iri, projectRef, storageRef, storage.tpe, attributes, caller.subject, tag))
res <- eval(CreateFile(iri, projectRef, storageRef, storage.tpe, attributes, caller.subject, tag, None))
} yield res
}.span("createFile")

Expand All @@ -126,10 +126,10 @@ final class Files(
)(implicit caller: Caller): IO[FileResource] = {
for {
(iri, pc) <- id.expandIri(fetchContext.onCreate)
_ <- test(CreateFile(iri, id.project, testStorageRef, testStorageType, testAttributes, caller.subject, tag))
_ <- test(CreateFile(iri, id.project, testStorageRef, testStorageType, testAttributes, caller.subject, tag, None))
(storageRef, storage) <- fetchAndValidateActiveStorage(storageId, id.project, pc)
attributes <- extractFileAttributes(iri, entity, storage)
res <- eval(CreateFile(iri, id.project, storageRef, storage.tpe, attributes, caller.subject, tag))
res <- eval(CreateFile(iri, id.project, storageRef, storage.tpe, attributes, caller.subject, tag, None))
} yield res
}.span("createFile")

Expand Down Expand Up @@ -405,12 +405,12 @@ final class Files(
tag: Option[UserTag]
)(implicit caller: Caller): IO[FileResource] =
for {
_ <- test(CreateFile(iri, ref, testStorageRef, testStorageType, testAttributes, caller.subject, tag))
_ <- test(CreateFile(iri, ref, testStorageRef, testStorageType, testAttributes, caller.subject, tag, None))
(storageRef, storage) <- fetchAndValidateActiveStorage(storageId, ref, pc)
resolvedFilename <- IO.fromOption(filename.orElse(path.lastSegment))(InvalidFileLink(iri))
description <- FileDescription(resolvedFilename, mediaType)
attributes <- linkFile(storage, path, description, iri)
res <- eval(CreateFile(iri, ref, storageRef, storage.tpe, attributes, caller.subject, tag))
res <- eval(CreateFile(iri, ref, storageRef, storage.tpe, attributes, caller.subject, tag, None))
} yield res

private def linkFile(storage: Storage, path: Uri.Path, desc: FileDescription, fileId: Iri): IO[FileAttributes] =
Expand Down Expand Up @@ -597,7 +597,7 @@ object Files {
): Option[FileState] = {
// format: off
def created(e: FileCreated): Option[FileState] = Option.when(state.isEmpty) {
FileState(e.id, e.project, e.storage, e.storageType, e.attributes, Tags(e.tag, e.rev), e.rev, deprecated = false, e.instant, e.subject, e.instant, e.subject)
FileState(e.id, e.project, e.storage, e.storageType, e.attributes, e.sourceFile, Tags(e.tag, e.rev), e.rev, deprecated = false, e.instant, e.subject, e.instant, e.subject)
}

def updated(e: FileUpdated): Option[FileState] = state.map { s =>
Expand Down Expand Up @@ -641,7 +641,7 @@ object Files {
def create(c: CreateFile) = state match {
case None =>
clock.realTimeInstant.map(
FileCreated(c.id, c.project, c.storage, c.storageType, c.attributes, 1, _, c.subject, c.tag)
FileCreated(c.id, c.project, c.storage, c.storageType, c.attributes, 1, _, c.subject, c.tag, c.sourceFile)
)
case Some(_) =>
IO.raiseError(ResourceAlreadyExists(c.id, c.project))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package ch.epfl.bluebrain.nexus.delta.plugins.storage.files.batch

import akka.http.scaladsl.model.Uri
import cats.data.NonEmptyList
import cats.effect.IO
import cats.implicits.toFunctorOps
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.FetchFileResource
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileRejection._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.routes.CopyFileSource
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.{FetchFileResource, FileResource}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage.{DiskStorage, RemoteDiskStorage}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.{Storage, StorageType}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.CopyFileRejection
Expand All @@ -20,12 +21,13 @@ import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.error.ServiceError.AuthorizationFailed
import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller
import ch.epfl.bluebrain.nexus.delta.sdk.model.IdSegmentRef
import shapeless.syntax.typeable.typeableOps

trait BatchCopy {
def copyFiles(source: CopyFileSource, destStorage: Storage)(implicit
c: Caller
): IO[NonEmptyList[FileAttributes]]
): IO[NonEmptyList[FileAttributes]] // return source file Iri here along with new file attributes
}

object BatchCopy {
Expand All @@ -49,14 +51,14 @@ object BatchCopy {

private def copyToRemoteStorage(source: CopyFileSource, dest: RemoteDiskStorage)(implicit c: Caller) =
for {
remoteCopyDetails <- source.files.traverse(fetchRemoteCopyDetails(dest, _))
remoteCopyDetails <- source.files.traverse(r => fetchRemoteCopyDetails(dest, FileId(r, source.project)))
_ <- validateFilesForStorage(dest, remoteCopyDetails.map(_.sourceAttributes.bytes))
attributes <- remoteDiskCopy.copyFiles(dest, remoteCopyDetails)
} yield attributes

private def copyToDiskStorage(source: CopyFileSource, dest: DiskStorage)(implicit c: Caller) =
for {
diskCopyDetails <- source.files.traverse(fetchDiskCopyDetails(dest, _))
diskCopyDetails <- source.files.traverse(r => fetchDiskCopyDetails(dest, FileId(r, source.project)))
_ <- validateFilesForStorage(dest, diskCopyDetails.map(_.sourceAttributes.bytes))
attributes <- diskCopy.copyFiles(dest, diskCopyDetails)
} yield attributes
Expand Down Expand Up @@ -112,14 +114,23 @@ object BatchCopy {
private def notEnoughSpace(totalSize: Long, spaceLeft: Long, destStorage: Iri) =
IO.raiseError(TotalCopySizeTooLarge(totalSize, spaceLeft, destStorage))

private def fetchFileAndValidateStorage(id: FileId)(implicit c: Caller) = {
private def fetchFileAndValidateStorage(id: FileId)(implicit c: Caller): IO[(File, Storage)] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be able to use ResourceRef directly here, I see that FileId use IdSegmentRef
Those are meant to be used for segment in the api enpoint where the Iri can be compacted.
The source file being in a payload, you should be able to use ResourceRef directly and make things simpler

for {
file <- fetchFile.fetch(id)
sourceStorage <- fetchStorage.fetch(file.value.storage, id.project)
perm = sourceStorage.value.storageValue.readPermission
_ <- aclCheck.authorizeForOr(id.project, perm)(AuthorizationFailed(id.project, perm))
} yield (file.value, sourceStorage.value)
}
}

def extractSourceFileIri(file: FileResource, fileId: FileId): Iri = {
val lookupQuery = fileId.id match {
case _: IdSegmentRef.Latest =>
// need to refer to specific revision, since the file may be updated after the copy
Uri.Query("rev" -> file.rev.toString)
case IdSegmentRef.Revision(_, rev) => Uri.Query("rev" -> rev.toString)
case IdSegmentRef.Tag(_, tag) => Uri.Query("tag" -> tag.value)
}
file.id.queryParams(lookupQuery)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ object BatchFiles {
destFilesAttributes <- batchCopy.copyFiles(source, destStorage).adaptError { case e: CopyFileRejection =>
CopyRejection(source.project, dest.project, destStorage.id, e)
}
fileResources <- createFileResources(pc, dest, destStorageRef, destStorage.tpe, destFilesAttributes)
destAttrAndSourceIris = destFilesAttributes.zip(source.files)
fileResources <- createFileResources(pc, dest, destStorageRef, destStorage.tpe, destAttrAndSourceIris)
} yield fileResources
}.span("copyFiles")

Expand All @@ -58,13 +59,22 @@ object BatchFiles {
dest: CopyFileDestination,
destStorageRef: ResourceRef.Revision,
destStorageTpe: StorageType,
destFilesAttributes: NonEmptyList[FileAttributes]
destAttrAndSourceResourceRefs: NonEmptyList[(FileAttributes, ResourceRef)]
)(implicit c: Caller): IO[NonEmptyList[FileResource]] =
destFilesAttributes.traverse { destFileAttributes =>
destAttrAndSourceResourceRefs.traverse { case (destFileAttributes, source) =>
for {
iri <- generateId(pc)
command =
CreateFile(iri, dest.project, destStorageRef, destStorageTpe, destFileAttributes, c.subject, dest.tag)
CreateFile(
iri,
dest.project,
destStorageRef,
destStorageTpe,
destFileAttributes,
c.subject,
dest.tag,
Some(source)
)
resource <- evalCreateCommand(command)
} yield resource
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, Tags}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ProjectRef, ResourceRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag
import io.circe.syntax._
import io.circe.{Encoder, Json}
import io.circe.{Encoder, Json, JsonObject}

/**
* A representation of a file information
Expand All @@ -39,7 +39,8 @@ final case class File(
storage: ResourceRef.Revision,
storageType: StorageType,
attributes: FileAttributes,
tags: Tags
tags: Tags,
sourceFile: Option[ResourceRef]
) {
def metadata: Metadata = Metadata(tags.tags)
}
Expand All @@ -56,7 +57,9 @@ object File {
keywords.tpe -> storageType.iri.asJson,
"_rev" -> file.storage.rev.asJson
)
file.attributes.asJsonObject.add("_storage", storageJson)
val attrJson = file.attributes.asJsonObject
val sourceFileJson = file.sourceFile.fold(JsonObject.empty)(f => JsonObject("_sourceFile" := f.asJson))
sourceFileJson deepMerge attrJson add ("_storage", storageJson)
}

implicit def fileJsonLdEncoder(implicit showLocation: ShowFileLocation): JsonLdEncoder[File] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ object FileCommand {
storageType: StorageType,
attributes: FileAttributes,
subject: Subject,
tag: Option[UserTag]
tag: Option[UserTag],
sourceFile: Option[ResourceRef]
) extends FileCommand {
override def rev: Int = 0
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ object FileEvent {
rev: Int,
instant: Instant,
subject: Subject,
tag: Option[UserTag]
tag: Option[UserTag],
sourceFile: Option[ResourceRef]
Copy link
Contributor

@imsdu imsdu Jan 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for missing it earlier but we also need the original source project to be able to properly identify the original file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm the field contains source project in the integration test 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh ok, it's because the Iri passed to ResourceRef is already expanded here - is this a bad idea?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@imsdu the purpose of this method seems to be exactly what I'm using it for - creating a resource ref that contains the full Iri with project information. It works as expected. I'm not sure I see anything wrong with it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is slightly different, this is meant for expanding segments in the url.
For example when someone creates a resource:
/resources/org/proj/prefix:suffix/
It relies on the configuration of the org/proj project to expand the iri by replacing the prefix thanks to the api mappings / the base defined in the project.
It is only meant to be used for segments, not in payloads where we can properly use json-ld contexts if we need to expand.
And it does not give information on where the resource lives as it is only the id.
In my example, prefix:suffix can point in a schema in another project for example.

) extends FileEvent

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ProjectRef, ResourceRef}
final case class FileId(id: IdSegmentRef, project: ProjectRef) {
def expandIri(fetchContext: ProjectRef => IO[ProjectContext]): IO[(Iri, ProjectContext)] =
fetchContext(project).flatMap(pc => iriExpander(id.value, pc).map(iri => (iri, pc)))

def toResourceRef(fetchContext: ProjectRef => IO[ProjectContext]): IO[ResourceRef] =
fetchContext(project).flatMap(pc => iriExpander(id, pc))
}

object FileId {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.sdk.model.{ResourceF, ResourceUris, Tags}
import ch.epfl.bluebrain.nexus.delta.sdk.implicits._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.instances._
import ch.epfl.bluebrain.nexus.delta.sdk.circe.dropNullValues
import ch.epfl.bluebrain.nexus.delta.sourcing.Serializer
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ResourceRef.Latest
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ProjectRef, ResourceRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.state.State.ScopedState
import io.circe.Codec
import io.circe.{Codec, Encoder}
import io.circe.generic.extras.Configuration
import io.circe.generic.extras.semiauto.deriveConfiguredCodec
import io.circe.generic.extras.semiauto.{deriveConfiguredCodec, deriveConfiguredDecoder, deriveConfiguredEncoder}

import java.time.Instant
import scala.annotation.nowarn
Expand Down Expand Up @@ -52,6 +53,7 @@ final case class FileState(
storage: ResourceRef.Revision,
storageType: StorageType,
attributes: FileAttributes,
sourceFile: Option[ResourceRef],
tags: Tags,
rev: Int,
deprecated: Boolean,
Expand All @@ -73,7 +75,7 @@ final case class FileState(
*/
def types: Set[Iri] = Set(nxvFile)

private def file: File = File(id, project, storage, storageType, attributes, tags)
private def file: File = File(id, project, storage, storageType, attributes, tags, sourceFile)

def toResource: FileResource =
ResourceF(
Expand Down Expand Up @@ -103,7 +105,8 @@ object FileState {
deriveConfiguredCodec[Digest]
implicit val fileAttributesCodec: Codec.AsObject[FileAttributes] =
deriveConfiguredCodec[FileAttributes]
implicit val codec: Codec.AsObject[FileState] = deriveConfiguredCodec[FileState]
implicit val enc: Encoder.AsObject[FileState] = deriveConfiguredEncoder[FileState].mapJsonObject(dropNullValues)
implicit val codec: Codec.AsObject[FileState] = Codec.AsObject.from(deriveConfiguredDecoder[FileState], enc)
Serializer.dropNullsInjectType()
}
}
Original file line number Diff line number Diff line change
@@ -1,42 +1,15 @@
package ch.epfl.bluebrain.nexus.delta.plugins.storage.files.routes

import cats.data.NonEmptyList
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileId
import ch.epfl.bluebrain.nexus.delta.sdk.model.IdSegment
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag
import io.circe.{Decoder, DecodingFailure, Json}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ProjectRef, ResourceRef}
import io.circe.Decoder

final case class CopyFileSource(
project: ProjectRef,
files: NonEmptyList[FileId]
files: NonEmptyList[ResourceRef]
)

object CopyFileSource {

implicit val dec: Decoder[CopyFileSource] = Decoder.instance { cur =>
def parseSingle(j: Json, proj: ProjectRef): Decoder.Result[FileId] =
for {
sourceFile <- j.hcursor.get[String]("sourceFileId").map(IdSegment(_))
sourceTag <- j.hcursor.get[Option[UserTag]]("sourceTag")
sourceRev <- j.hcursor.get[Option[Int]]("sourceRev")
fileId <- parseFileId(sourceFile, proj, sourceTag, sourceRev)
} yield fileId

def parseFileId(id: IdSegment, proj: ProjectRef, sourceTag: Option[UserTag], sourceRev: Option[Int]) =
(sourceTag, sourceRev) match {
case (Some(tag), None) => Right(FileId(id, tag, proj))
case (None, Some(rev)) => Right(FileId(id, rev, proj))
case (None, None) => Right(FileId(id, proj))
case (Some(_), Some(_)) =>
Left(
DecodingFailure("Tag and revision cannot be simultaneously present for source file lookup", Nil)
)
}

for {
sourceProj <- cur.get[ProjectRef]("sourceProjectRef")
files <- cur.get[NonEmptyList[Json]]("files").flatMap(_.traverse(parseSingle(_, sourceProj)))
} yield CopyFileSource(sourceProj, files)
}
implicit val dec: Decoder[CopyFileSource] =
Decoder.forProduct2("sourceProject", "files")(CopyFileSource.apply)
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ class FilesStmSpec extends CatsEffectSpec with FileFixtures with StorageFixtures
"evaluating an incoming command" should {

"create a new event from a CreateFile command" in {
val createCmd = CreateFile(id, projectRef, storageRef, DiskStorageType, attributes, bob, Some(myTag))
val createCmd = CreateFile(id, projectRef, storageRef, DiskStorageType, attributes, bob, Some(myTag), None)

evaluate(clock)(None, createCmd).accepted shouldEqual
FileCreated(id, projectRef, storageRef, DiskStorageType, attributes, 1, epoch, bob, Some(myTag))
FileCreated(id, projectRef, storageRef, DiskStorageType, attributes, 1, epoch, bob, Some(myTag), None)
}

"create a new event from a UpdateFile command" in {
Expand Down Expand Up @@ -121,7 +121,10 @@ class FilesStmSpec extends CatsEffectSpec with FileFixtures with StorageFixtures

"reject with ResourceAlreadyExists when file already exists" in {
val current = FileGen.state(id, projectRef, storageRef, attributes)
evaluate(clock)(Some(current), CreateFile(id, projectRef, storageRef, DiskStorageType, attributes, bob, None))
evaluate(clock)(
Some(current),
CreateFile(id, projectRef, storageRef, DiskStorageType, attributes, bob, None, None)
)
.rejectedWith[ResourceAlreadyExists]
}

Expand Down Expand Up @@ -179,7 +182,7 @@ class FilesStmSpec extends CatsEffectSpec with FileFixtures with StorageFixtures
"producing next state" should {

"from a new FileCreated event" in {
val event = FileCreated(id, projectRef, storageRef, DiskStorageType, attributes, 1, epoch, bob, None)
val event = FileCreated(id, projectRef, storageRef, DiskStorageType, attributes, 1, epoch, bob, None, None)
val nextState = FileGen.state(id, projectRef, storageRef, attributes, createdBy = bob, updatedBy = bob)

next(None, event).value shouldEqual nextState
Expand Down
Loading
Loading