Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ val supportedScalaVersions = List(scala212, scala213)
val sparkVersion = "3.3.3"
val circeVersion = "0.14.6"
val sttpVersion = "3.5.2"
val natchezVersion = "0.3.1"
val Specs2Version = "4.20.3"
val cogniteSdkVersion = "2.13.778"

Expand All @@ -20,6 +21,8 @@ sonatypeProfileName := "com.cognite" // default is same as organization and lead

lazy val gpgPass = Option(System.getenv("GPG_KEY_PASSWORD"))

addCompilerPlugin("org.typelevel" % "kind-projector" % "0.13.2" cross CrossVersion.full)

ThisBuild / scalafixDependencies += "org.typelevel" %% "typelevel-scalafix" % "0.1.4"

lazy val patchVersion = scala.io.Source.fromFile("patch_version.txt").mkString.trim
Expand Down Expand Up @@ -155,7 +158,12 @@ lazy val library = (project in file("."))
exclude("org.glassfish.hk2.external", "javax.inject"),
"org.apache.spark" %% "spark-sql" % sparkVersion % Provided
exclude("org.glassfish.hk2.external", "javax.inject"),
"org.log4s" %% "log4s" % log4sVersion
"org.log4s" %% "log4s" % log4sVersion,
"org.tpolecat" %% "natchez-core" % natchezVersion,
"org.tpolecat" %% "natchez-noop" % natchezVersion,
"org.tpolecat" %% "natchez-opentelemetry" % natchezVersion,
"io.opentelemetry" % "opentelemetry-sdk" % "1.23.0",
"com.lightstep.opentelemetry" % "opentelemetry-launcher" % "1.22.0"
),
coverageExcludedPackages := "com.cognite.data.*",
buildInfoKeys := Seq[BuildInfoKey](organization, version, organizationName),
Expand Down
44 changes: 20 additions & 24 deletions src/main/scala/cognite/spark/v1/AssetHierarchyBuilder.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package cognite.spark.v1

import cats.effect.IO
import cats.implicits._
import cognite.spark.v1.PushdownUtilities.stringSeqToCogniteExternalIdSeq
import cognite.spark.compiletime.macros.SparkSchemaHelper.{fromRow, structType}
Expand Down Expand Up @@ -76,19 +75,17 @@ class AssetHierarchyBuilder(config: RelationConfig)(val sqlContext: SQLContext)

import cognite.spark.compiletime.macros.StructTypeEncoderMacro._

import CdpConnector.ioRuntime

def delete(data: DataFrame): Unit = {
def delete(data: DataFrame): TracedIO[Unit] = {
val partitionedData = if (config.enableSinglePartitionDeleteAssetHierarchy) {
data.repartition(numPartitions = 1)
} else {
data
}

partitionedData.foreachPartition((rows: Iterator[Row]) => {
SparkF(partitionedData).foreachPartition((rows: Iterator[Row]) => {
val deletes = rows.map(r => fromRow[DeleteItemByCogniteId](r))
Stream
.fromIterator[IO](deletes, chunkSize = batchSize)
.fromIterator[TracedIO](deletes, chunkSize = batchSize)
.chunks
.parEvalMapUnordered(config.parallelismPerPartition) { chunk =>
client.assets
Expand All @@ -100,24 +97,22 @@ class AssetHierarchyBuilder(config: RelationConfig)(val sqlContext: SQLContext)
}
.compile
.drain
.unsafeRunSync()
})
}

def buildFromDf(data: DataFrame): Unit =
def buildFromDf(data: DataFrame): TracedIO[Unit] =
// Do not use .collect to run the builder on one of the executors and not on the driver
data
.repartition(numPartitions = 1)
SparkF(data.repartition(numPartitions = 1))
.foreachPartition((rows: Iterator[Row]) => {
build(rows).unsafeRunSync()
build(rows)
})

private val batchSize = config.batchSize.getOrElse(Constants.DefaultBatchSize)

private val deleteMissingAssets = config.deleteMissingAssets
private val subtreeMode = config.subtrees

def build(data: Iterator[Row]): IO[Unit] = {
def build(data: Iterator[Row]): TracedIO[Unit] = {
val sourceTree = data.map(r => fromRow[AssetsIngestSchema](r)).toArray

val subtrees =
Expand Down Expand Up @@ -153,7 +148,7 @@ class AssetHierarchyBuilder(config: RelationConfig)(val sqlContext: SQLContext)
} yield ()
}

def validateSubtreeRoots(roots: Vector[AssetsIngestSchema]): IO[Unit] =
def validateSubtreeRoots(roots: Vector[AssetsIngestSchema]): TracedIO[Unit] =
// check that all `parentExternalId`s exist
batchedOperation[String, Asset](
roots.map(_.parentExternalId).filter(_.nonEmpty).distinct,
Expand All @@ -173,7 +168,7 @@ class AssetHierarchyBuilder(config: RelationConfig)(val sqlContext: SQLContext)
})
).void

def buildSubtrees(trees: Vector[AssetSubtree]): IO[Unit] =
def buildSubtrees(trees: Vector[AssetSubtree]): TracedIO[Unit] =
for {
// fetch existing roots and update or insert them first
cdfRoots <- fetchCdfAssets(trees.map(_.root.externalId))
Expand All @@ -191,7 +186,7 @@ class AssetHierarchyBuilder(config: RelationConfig)(val sqlContext: SQLContext)
_ <- update(toUpdate)
} yield ()

def insert(toInsert: Seq[AssetsIngestSchema]): IO[Vector[Asset]] = {
def insert(toInsert: Seq[AssetsIngestSchema]): TracedIO[Vector[Asset]] = {
val assetCreatesToInsert = toInsert.map(toAssetCreate)
// Traverse batches in order to ensure writing parents first
assetCreatesToInsert
Expand All @@ -205,7 +200,7 @@ class AssetHierarchyBuilder(config: RelationConfig)(val sqlContext: SQLContext)
.map(_.flatten)
}

def deleteMissingChildren(trees: Vector[AssetSubtree], deleteMissingAssets: Boolean): IO[Unit] =
def deleteMissingChildren(trees: Vector[AssetSubtree], deleteMissingAssets: Boolean): TracedIO[Unit] =
if (deleteMissingAssets) {
val ingestedNodeSet = trees.flatMap(_.allNodes).map(_.externalId).toSet
// list all subtrees of the tree root and filter those which are not in the ingested set
Expand All @@ -219,6 +214,7 @@ class AssetHierarchyBuilder(config: RelationConfig)(val sqlContext: SQLContext)
.map(asset => CogniteInternalId(asset.id))
.compile
.toVector
.map(x => x)
)
_ <- batchedOperation[CogniteInternalId, Nothing](
idsToDelete,
Expand All @@ -230,10 +226,10 @@ class AssetHierarchyBuilder(config: RelationConfig)(val sqlContext: SQLContext)
)
} yield ()
} else {
IO.unit
TracedIO.unit
}

def update(toUpdate: Vector[AssetsIngestSchema]): IO[Unit] =
def update(toUpdate: Vector[AssetsIngestSchema]): TracedIO[Unit] =
batchedOperation[AssetsIngestSchema, Asset](
toUpdate,
updateBatch => {
Expand All @@ -243,7 +239,7 @@ class AssetHierarchyBuilder(config: RelationConfig)(val sqlContext: SQLContext)
}
).void

def fetchCdfAssets(sourceRootExternalIds: Vector[String]): IO[Map[String, Asset]] =
def fetchCdfAssets(sourceRootExternalIds: Vector[String]): TracedIO[Map[String, Asset]] =
batchedOperation[String, Asset](
sourceRootExternalIds,
batch =>
Expand All @@ -253,7 +249,7 @@ class AssetHierarchyBuilder(config: RelationConfig)(val sqlContext: SQLContext)

def upsertRoots( // scalastyle:off
newRoots: Vector[AssetsIngestSchema],
sourceRoots: Map[String, Asset]): IO[Vector[Asset]] = {
sourceRoots: Map[String, Asset]): TracedIO[Vector[Asset]] = {

// Assets without corresponding source root will be created
val (toCreate, toUpdate, toIgnore) = nodesToInsertUpdate(newRoots, sourceRoots)
Expand Down Expand Up @@ -403,18 +399,18 @@ class AssetHierarchyBuilder(config: RelationConfig)(val sqlContext: SQLContext)

private def batchedOperation[I, R](
list: Vector[I],
op: Vector[I] => IO[Seq[R]],
batchSize: Int = this.batchSize): IO[Vector[R]] =
op: Vector[I] => TracedIO[Seq[R]],
batchSize: Int = this.batchSize): TracedIO[Vector[R]] =
if (list.nonEmpty) {
Stream
.fromIterator[IO](list.iterator, chunkSize = batchSize)
.fromIterator[TracedIO](list.iterator, chunkSize = batchSize)
.chunks
.parEvalMapUnordered(config.parallelismPerPartition)(chunk => op(chunk.toVector).map(Chunk.seq))
.flatMap(Stream.chunk)
.compile
.toVector
} else {
IO.pure(Vector.empty)
TracedIO.pure(Vector.empty)
}
}

Expand Down
22 changes: 11 additions & 11 deletions src/main/scala/cognite/spark/v1/AssetsRelation.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package cognite.spark.v1

import cats.effect.IO
import cats.implicits._
import cognite.spark.v1.PushdownUtilities._
import cognite.spark.compiletime.macros.SparkSchemaHelper._
import com.cognite.sdk.scala.common._
Expand All @@ -24,7 +24,7 @@ class AssetsRelation(config: RelationConfig, subtreeIds: Option[List[CogniteId]]

Array("name", "source", "dataSetId", "labels", "id", "externalId", "externalIdPrefix")
override def getStreams(sparkFilters: Array[Filter])(
client: GenericClient[IO]): Seq[Stream[IO, AssetsReadSchema]] = {
client: GenericClient[TracedIO]): Seq[Stream[TracedIO, AssetsReadSchema]] = {
val (ids, filters) =
pushdownToFilters(
sparkFilters,
Expand Down Expand Up @@ -61,48 +61,48 @@ class AssetsRelation(config: RelationConfig, subtreeIds: Option[List[CogniteId]]
assetSubtreeIds = subtreeIds
)

override def insert(rows: Seq[Row]): IO[Unit] = {
override def insert(rows: Seq[Row]): TracedIO[Unit] = {
val assetsInsertions = rows.map(fromRow[AssetsInsertSchema](_))
val assets = assetsInsertions.map(
_.into[AssetCreate]
.withFieldComputed(_.labels, u => stringSeqToCogniteExternalIdSeq(u.labels))
.transform)
client.assets
.create(assets)
.flatTap(_ => incMetrics(itemsCreated, assets.size)) *> IO.unit
.flatTap(_ => incMetrics(itemsCreated, assets.size)) *> TracedIO.unit
}

private def isUpdateEmpty(u: AssetUpdate): Boolean = u == AssetUpdate()

override def update(rows: Seq[Row]): IO[Unit] = {
override def update(rows: Seq[Row]): TracedIO[Unit] = {
val assetUpdates = rows.map(r => fromRow[AssetsUpsertSchema](r))

updateByIdOrExternalId[AssetsUpsertSchema, AssetUpdate, Assets[IO], Asset](
updateByIdOrExternalId[AssetsUpsertSchema, AssetUpdate, Assets[TracedIO], Asset](
assetUpdates,
client.assets,
isUpdateEmpty
)
}

override def delete(rows: Seq[Row]): IO[Unit] = {
override def delete(rows: Seq[Row]): TracedIO[Unit] = {
val deletes = rows.map(fromRow[DeleteItemByCogniteId](_))
deleteWithIgnoreUnknownIds(client.assets, deletes.map(_.toCogniteId), config.ignoreUnknownIds)
}

override def upsert(rows: Seq[Row]): IO[Unit] = {
override def upsert(rows: Seq[Row]): TracedIO[Unit] = {
val assets = rows.map(fromRow[AssetsUpsertSchema](_))
genericUpsert[Asset, AssetsUpsertSchema, AssetCreate, AssetUpdate, Assets[IO]](
genericUpsert[Asset, AssetsUpsertSchema, AssetCreate, AssetUpdate, Assets[TracedIO]](
assets,
isUpdateEmpty,
client.assets,
mustBeUpdate = r => r.name.isEmpty && r.getExternalId.nonEmpty
)
}

override def getFromRowsAndCreate(rows: Seq[Row], @unused doUpsert: Boolean = true): IO[Unit] = {
override def getFromRowsAndCreate(rows: Seq[Row], @unused doUpsert: Boolean = true): TracedIO[Unit] = {
val assetsUpserts = rows.map(fromRow[AssetsUpsertSchema](_))
val assets = assetsUpserts.map(_.transformInto[AssetCreate])
createOrUpdateByExternalId[Asset, AssetUpdate, AssetCreate, AssetCreate, Option, Assets[IO]](
createOrUpdateByExternalId[Asset, AssetUpdate, AssetCreate, AssetCreate, Option, Assets[TracedIO]](
Set.empty,
assets,
client.assets,
Expand Down
17 changes: 9 additions & 8 deletions src/main/scala/cognite/spark/v1/CdfRelation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,19 @@ abstract class CdfRelation(config: RelationConfig, shortNameStr: String)
@transient lazy protected val itemsUpserted: Counter =
MetricsSource.getOrCreateCounter(config.metricsPrefix, s"$shortName.upserted")

@transient lazy val client: GenericClient[IO] =
@transient lazy val client: GenericClient[TracedIO] =
CdpConnector.clientFromConfig(config)

@transient lazy val alphaClient: GenericClient[IO] =
@transient lazy val alphaClient: GenericClient[TracedIO] =
CdpConnector.clientFromConfig(config, Some("alpha"))

def incMetrics(counter: Counter, count: Int): IO[Unit] =
IO(
if (config.collectMetrics) {
counter.inc(count.toLong)
}
)
def incMetrics(counter: Counter, count: Int): TracedIO[Unit] =
TracedIO.liftIO(
IO(
if (config.collectMetrics) {
counter.inc(count.toLong)
}
))

// Needed for labels property when transforming from UpsertSchema to Update
implicit def seqStrToCogIdSetter
Expand Down
32 changes: 19 additions & 13 deletions src/main/scala/cognite/spark/v1/CdfSparkAuth.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,41 @@ import com.cognite.sdk.scala.common.{Auth, AuthProvider, OAuth2}
import sttp.client3.SttpBackend

sealed trait CdfSparkAuth extends Serializable {
def provider(implicit clock: Clock[IO], sttpBackend: SttpBackend[IO, Any]): IO[AuthProvider[IO]]
def provider(
implicit clock: Clock[TracedIO],
sttpBackend: SttpBackend[TracedIO, Any]): TracedIO[AuthProvider[TracedIO]]
}

object CdfSparkAuth {
final case class Static(auth: Auth) extends CdfSparkAuth {
override def provider(
implicit clock: Clock[IO],
sttpBackend: SttpBackend[IO, Any]): IO[AuthProvider[IO]] =
IO(AuthProvider(auth))
implicit clock: Clock[TracedIO],
sttpBackend: SttpBackend[TracedIO, Any]): TracedIO[AuthProvider[TracedIO]] =
TracedIO.liftIO(IO(AuthProvider[TracedIO](auth)))
}

final case class OAuth2ClientCredentials(credentials: OAuth2.ClientCredentials) extends CdfSparkAuth {

override def provider(
implicit clock: Clock[IO],
sttpBackend: SttpBackend[IO, Any]): IO[AuthProvider[IO]] =
OAuth2.ClientCredentialsProvider[IO](credentials)
implicit clock: Clock[TracedIO],
sttpBackend: SttpBackend[TracedIO, Any]): TracedIO[AuthProvider[TracedIO]] =
OAuth2
.ClientCredentialsProvider[TracedIO](credentials)
.map(x => x)
}

final case class OAuth2Sessions(session: OAuth2.Session) extends CdfSparkAuth {

private val refreshSecondsBeforeExpiration = 300L

override def provider(
implicit clock: Clock[IO],
sttpBackend: SttpBackend[IO, Any]): IO[AuthProvider[IO]] =
OAuth2.SessionProvider[IO](
session,
refreshSecondsBeforeExpiration = refreshSecondsBeforeExpiration
)
implicit clock: Clock[TracedIO],
sttpBackend: SttpBackend[TracedIO, Any]): TracedIO[AuthProvider[TracedIO]] =
OAuth2
.SessionProvider[TracedIO](
session,
refreshSecondsBeforeExpiration = refreshSecondsBeforeExpiration
)
.map(x => x)
}
}
Loading