From 99fa47b43c39ed791bd2d75519c00f15cfcb3f36 Mon Sep 17 00:00:00 2001
From: Jolanrensen <j.j.r.rensen@student.tue.nl>
Date: Sat, 17 Jul 2021 17:12:59 +0200
Subject: [PATCH 1/9] trying to add some stdlib functions, still working on it

---
 .../org/jetbrains/kotlinx/spark/api/ApiV1.kt  | 157 ++++++++++++++++++
 .../jetbrains/kotlinx/spark/api/ApiTest.kt    |  10 ++
 2 files changed, 167 insertions(+)

diff --git a/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt
index 2dde48cb..63db2cce 100644
--- a/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt
+++ b/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt
@@ -185,6 +185,163 @@ private fun <T> kotlinClassEncoder(schema: DataType, kClass: KClass<*>): Encoder
     )
 }
 
+/**
+ * Allows `for (element in dataset)`.
+ */
+operator fun <T> Dataset<T>.iterator(): Iterator<T> = toLocalIterator()
+
+/**
+ * Returns `true` if [element] is found in the collection.
+ */
+operator fun <T> Dataset<T>.contains(element: T): Boolean {
+    return indexOf(element) >= 0L
+}
+
+/**
+ * Returns an element at the given [index] or throws an [IndexOutOfBoundsException] if the [index] is out of bounds of this collection.
+ */
+fun <T> Dataset<T>.elementAt(index: Long): T {
+    return elementAtOrElse(index) { throw IndexOutOfBoundsException("Collection doesn't contain element at index $index.") }
+}
+
+/**
+ * Returns an element at the given [index] or the result of calling the [defaultValue] function if the [index] is out of bounds of this collection.
+ */
+fun <T> Dataset<T>.elementAtOrElse(index: Long, defaultValue: (Long) -> T): T {
+    if (index < 0L)
+        return defaultValue(index)
+    val iterator = iterator()
+    var count = 0L
+    while (iterator.hasNext()) {
+        val element = iterator.next()
+        if (index == count++)
+            return element
+    }
+    return defaultValue(index)
+}
+
+/**
+ * Returns an element at the given [index] or `null` if the [index] is out of bounds of this collection.
+ */
+fun <T> Dataset<T>.elementAtOrNull(index: Long): T? {
+    if (index < 0L)
+        return null
+    val iterator = iterator()
+    var count = 0L
+    while (iterator.hasNext()) {
+        val element = iterator.next()
+        if (index == count++)
+            return element
+    }
+    return null
+}
+
+/**
+ * Returns the first element matching the given [predicate], or `null` if no such element was found.
+ */
+inline fun <T> Dataset<T>.find(predicate: (T) -> Boolean): T? {
+    return firstOrNull(predicate)
+}
+
+/**
+ * Returns the last element matching the given [predicate], or `null` if no such element was found.
+ */
+inline fun <T> Dataset<T>.findLast(predicate: (T) -> Boolean): T? {
+    return TODO()//lastOrNull(predicate)
+}
+
+/**
+ * Returns the first element matching the given [predicate].
+ * @throws [NoSuchElementException] if no such element is found.
+ */
+inline fun <reified T> Dataset<T>.first(predicate: (T) -> Boolean): T {
+    for (element in toLocalIterator()) if (predicate(element)) return element
+    throw NoSuchElementException("Collection contains no element matching the predicate.")
+}
+
+/**
+ * Returns the first non-null value produced by [transform] function being applied to elements of this collection in iteration order,
+ * or throws [NoSuchElementException] if no non-null value was produced.
+ */
+inline fun <reified T, R : Any> Dataset<T>.firstNotNullOf(transform: (T) -> R?): R {
+    return firstNotNullOfOrNull(transform) ?: throw NoSuchElementException("No element of the collection was transformed to a non-null value.")
+}
+
+/**
+ * Returns the first non-null value produced by [transform] function being applied to elements of this collection in iteration order,
+ * or `null` if no non-null value was produced.
+ */
+inline fun <reified T, R : Any> Dataset<T>.firstNotNullOfOrNull(transform: (T) -> R?): R? {
+    for (element in this) {
+        val result = transform(element)
+        if (result != null) {
+            return result
+        }
+    }
+    return null
+}
+
+/**
+ * Returns the first element, or `null` if the collection is empty.
+ */
+ fun <T> Dataset<T>.firstOrNull(): T? {
+    val iterator = iterator()
+    if (!iterator.hasNext())
+        return null
+    return iterator.next()
+}
+
+/**
+ * Returns the first element matching the given [predicate], or `null` if element was not found.
+ */
+inline fun <T> Dataset<T>.firstOrNull(predicate: (T) -> Boolean): T? {
+    for (element in this) if (predicate(element)) return element
+    return null
+}
+
+/**
+ * Returns first index of [element], or -1 if the collection does not contain element.
+ */
+fun <T> Dataset<T>.indexOf(element: T): Long {
+    var index = 0L
+    for (item in iterator()) {
+        if (element == item)
+            return index
+        index++
+    }
+    return -1L
+}
+
+/**
+ * Returns index of the first element matching the given [predicate], or -1 if the collection does not contain such element.
+ */
+inline fun <T> Dataset<T>.indexOfFirst(predicate: (T) -> Boolean): Long {
+    var index = 0L
+    for (item in this) {
+        if (predicate(item))
+            return index
+        index++
+    }
+    return -1L
+}
+
+/**
+ * Returns index of the last element matching the given [predicate], or -1 if the collection does not contain such element.
+ */
+inline fun <T> Dataset<T>.indexOfLast(predicate: (T) -> Boolean): Long {
+    // TODO might be able to improve
+    var lastIndex = -1L
+    var index = 0L
+    for (item in this) {
+        if (predicate(item))
+            lastIndex = index
+        index++
+    }
+    return lastIndex
+}
+
+
+
 inline fun <reified T, reified R> Dataset<T>.map(noinline func: (T) -> R): Dataset<R> =
     map(MapFunction(func), encoder<R>())
 
diff --git a/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt b/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt
index bff38ac1..5fefc020 100644
--- a/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt
+++ b/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt
@@ -557,6 +557,16 @@ class ApiTest : ShouldSpec({
                 first.someEnumArray shouldBe arrayOf(SomeEnum.A, SomeEnum.B)
                 first.someOtherArray shouldBe arrayOf(SomeOtherEnum.C, SomeOtherEnum.D)
                 first.enumMap shouldBe mapOf(SomeEnum.A to SomeOtherEnum.C)
+            }
+            should("Have more stdlib functions for Datasets") {
+                val dataset = listOf(1, 2, 3).toDS()
+                (1 in dataset) shouldBe true
+                dataset.indexOf(1) shouldBe 0L
+
+                dataset.first()
+
+
+
             }
         }
     }

From 2b15645b7be77b6908223145b3caf9dd8e92339d Mon Sep 17 00:00:00 2001
From: Jolanrensen <j.j.r.rensen@student.tue.nl>
Date: Mon, 19 Jul 2021 00:05:57 +0200
Subject: [PATCH 2/9] adding more and more functions from stdlib built on spark
 functions

---
 .../spark/extensions/KSparkExtensions.scala   |   1 +
 .../org/jetbrains/kotlinx/spark/api/ApiV1.kt  | 193 +++++++++---------
 .../jetbrains/kotlinx/spark/api/ApiTest.kt    |   4 +-
 3 files changed, 95 insertions(+), 103 deletions(-)

diff --git a/core/3.0/src/main/scala/org/jetbrains/kotlinx/spark/extensions/KSparkExtensions.scala b/core/3.0/src/main/scala/org/jetbrains/kotlinx/spark/extensions/KSparkExtensions.scala
index 6b4935f1..3ee379fc 100644
--- a/core/3.0/src/main/scala/org/jetbrains/kotlinx/spark/extensions/KSparkExtensions.scala
+++ b/core/3.0/src/main/scala/org/jetbrains/kotlinx/spark/extensions/KSparkExtensions.scala
@@ -34,6 +34,7 @@ object KSparkExtensions {
 
   def collectAsList[T](ds: Dataset[T]): util.List[T] = JavaConverters.seqAsJavaList(ds.collect())
 
+  def tailAsList[T](ds: Dataset[T], n: Int): util.List[T] = util.Arrays.asList(ds.tail(n) : _*)
 
   def debugCodegen(df: Dataset[_]): Unit = {
     import org.apache.spark.sql.execution.debug._
diff --git a/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt
index 63db2cce..d093d624 100644
--- a/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt
+++ b/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt
@@ -74,6 +74,7 @@ import kotlin.Unit
 import kotlin.also
 import kotlin.apply
 import kotlin.invoke
+import kotlin.random.Random
 import kotlin.reflect.*
 import kotlin.reflect.full.findAnnotation
 import kotlin.reflect.full.isSubclassOf
@@ -188,160 +189,151 @@ private fun <T> kotlinClassEncoder(schema: DataType, kClass: KClass<*>): Encoder
 /**
  * Allows `for (element in dataset)`.
  */
+@Deprecated(
+    message = "Note that this creates an iterator which can consume lots of memory. `.forEach {}` might be more efficient.",
+    level = DeprecationLevel.WARNING
+)
 operator fun <T> Dataset<T>.iterator(): Iterator<T> = toLocalIterator()
 
 /**
  * Returns `true` if [element] is found in the collection.
  */
-operator fun <T> Dataset<T>.contains(element: T): Boolean {
-    return indexOf(element) >= 0L
-}
-
-/**
- * Returns an element at the given [index] or throws an [IndexOutOfBoundsException] if the [index] is out of bounds of this collection.
- */
-fun <T> Dataset<T>.elementAt(index: Long): T {
-    return elementAtOrElse(index) { throw IndexOutOfBoundsException("Collection doesn't contain element at index $index.") }
-}
-
-/**
- * Returns an element at the given [index] or the result of calling the [defaultValue] function if the [index] is out of bounds of this collection.
- */
-fun <T> Dataset<T>.elementAtOrElse(index: Long, defaultValue: (Long) -> T): T {
-    if (index < 0L)
-        return defaultValue(index)
-    val iterator = iterator()
-    var count = 0L
-    while (iterator.hasNext()) {
-        val element = iterator.next()
-        if (index == count++)
-            return element
-    }
-    return defaultValue(index)
-}
-
-/**
- * Returns an element at the given [index] or `null` if the [index] is out of bounds of this collection.
- */
-fun <T> Dataset<T>.elementAtOrNull(index: Long): T? {
-    if (index < 0L)
-        return null
-    val iterator = iterator()
-    var count = 0L
-    while (iterator.hasNext()) {
-        val element = iterator.next()
-        if (index == count++)
-            return element
-    }
-    return null
-}
+inline operator fun <reified T> Dataset<T>.contains(element: T): Boolean =
+    !filter { it == element }.isEmpty
 
 /**
  * Returns the first element matching the given [predicate], or `null` if no such element was found.
  */
-inline fun <T> Dataset<T>.find(predicate: (T) -> Boolean): T? {
+fun <T> Dataset<T>.find(predicate: (T) -> Boolean): T? {
     return firstOrNull(predicate)
 }
 
 /**
  * Returns the last element matching the given [predicate], or `null` if no such element was found.
  */
-inline fun <T> Dataset<T>.findLast(predicate: (T) -> Boolean): T? {
-    return TODO()//lastOrNull(predicate)
+fun <T> Dataset<T>.findLast(predicate: (T) -> Boolean): T? {
+    return lastOrNull(predicate)
 }
 
 /**
  * Returns the first element matching the given [predicate].
  * @throws [NoSuchElementException] if no such element is found.
  */
-inline fun <reified T> Dataset<T>.first(predicate: (T) -> Boolean): T {
-    for (element in toLocalIterator()) if (predicate(element)) return element
-    throw NoSuchElementException("Collection contains no element matching the predicate.")
-}
+fun <T> Dataset<T>.first(predicate: (T) -> Boolean): T =
+    filter(predicate).first()
 
 /**
  * Returns the first non-null value produced by [transform] function being applied to elements of this collection in iteration order,
  * or throws [NoSuchElementException] if no non-null value was produced.
  */
-inline fun <reified T, R : Any> Dataset<T>.firstNotNullOf(transform: (T) -> R?): R {
-    return firstNotNullOfOrNull(transform) ?: throw NoSuchElementException("No element of the collection was transformed to a non-null value.")
-}
+inline fun <reified T, reified R : Any> Dataset<T>.firstNotNullOf(noinline transform: (T) -> R?): R =
+    map(transform)
+        .filterNotNull()
+        .first()
 
 /**
  * Returns the first non-null value produced by [transform] function being applied to elements of this collection in iteration order,
  * or `null` if no non-null value was produced.
  */
-inline fun <reified T, R : Any> Dataset<T>.firstNotNullOfOrNull(transform: (T) -> R?): R? {
-    for (element in this) {
-        val result = transform(element)
-        if (result != null) {
-            return result
-        }
-    }
-    return null
-}
+inline fun <reified T, reified R : Any> Dataset<T>.firstNotNullOfOrNull(noinline transform: (T) -> R?): R? =
+    map(transform)
+        .filterNotNull()
+        .firstOrNull()
 
 /**
  * Returns the first element, or `null` if the collection is empty.
  */
- fun <T> Dataset<T>.firstOrNull(): T? {
-    val iterator = iterator()
-    if (!iterator.hasNext())
-        return null
-    return iterator.next()
-}
+fun <T> Dataset<T>.firstOrNull(): T? = if (isEmpty) null else first()
 
 /**
  * Returns the first element matching the given [predicate], or `null` if element was not found.
  */
-inline fun <T> Dataset<T>.firstOrNull(predicate: (T) -> Boolean): T? {
-    for (element in this) if (predicate(element)) return element
-    return null
-}
+fun <T> Dataset<T>.firstOrNull(predicate: (T) -> Boolean): T? = filter(predicate).firstOrNull()
 
 /**
- * Returns first index of [element], or -1 if the collection does not contain element.
+ * Returns the last element.
+ *
+ * @throws NoSuchElementException if the collection is empty.
  */
-fun <T> Dataset<T>.indexOf(element: T): Long {
-    var index = 0L
-    for (item in iterator()) {
-        if (element == item)
-            return index
-        index++
-    }
-    return -1L
+fun <T> Dataset<T>.last(): T = tailAsList(1).first()
+
+/**
+ * Returns the last element matching the given [predicate].
+ *
+ * @throws NoSuchElementException if no such element is found.
+ */
+fun <T> Dataset<T>.last(predicate: (T) -> Boolean): T = filter(predicate).last()
+
+/**
+ * Returns the last element, or `null` if the collection is empty.
+ */
+fun <T> Dataset<T>.lastOrNull(): T? = if (isEmpty) null else last()
+
+/**
+ * Returns the last element matching the given [predicate], or `null` if no such element was found.
+ */
+fun <T> Dataset<T>.lastOrNull(predicate: (T) -> Boolean): T? = filter(predicate).lastOrNull()
+
+/**
+ * Returns the last `n` rows in the Dataset as a list.
+ *
+ * Running tail requires moving data into the application's driver process, and doing so with
+ * a very large `n` can crash the driver process with OutOfMemoryError.
+ */
+fun <T> Dataset<T>.tailAsList(n: Int): List<T> = KSparkExtensions.tailAsList(this, n)
+
+/**
+ * Returns a random element from this Dataset using the specified source of randomness.
+ *
+ * @param seed seed for the random number generator
+ *
+ * @throws NoSuchElementException if this collection is empty.
+ */
+fun <T> Dataset<T>.random(seed: Long = Random.nextLong()): T =
+    randomOrNull(seed) ?: throw NoSuchElementException("Collection is empty.")
+
+/**
+ * Returns a random element from this collection using the specified source of randomness, or `null` if this collection is empty.
+ * @param seed seed for the random number generator
+ */
+fun <T> Dataset<T>.randomOrNull(seed: Long = Random.nextLong()): T? {
+    if (isEmpty)
+        return null
+
+    return toJavaRDD()
+        .takeSample(false, 1, seed)
+        .first()
 }
 
 /**
- * Returns index of the first element matching the given [predicate], or -1 if the collection does not contain such element.
+ * Returns the single element, or throws an exception if the Dataset is empty or has more than one element.
  */
-inline fun <T> Dataset<T>.indexOfFirst(predicate: (T) -> Boolean): Long {
-    var index = 0L
-    for (item in this) {
-        if (predicate(item))
-            return index
-        index++
+fun <T> Dataset<T>.single(): T {
+    if (isEmpty)
+        throw NoSuchElementException("Dataset is empty.")
+
+    val firstTwo: List<T> = takeAsList(2) // less heavy than count()
+    return when (firstTwo.size) {
+        1 -> firstTwo.first()
+        else -> throw IllegalArgumentException("Dataset has more than one element.")
     }
-    return -1L
 }
 
 /**
- * Returns index of the last element matching the given [predicate], or -1 if the collection does not contain such element.
+ * Returns single element, or `null` if the Dataset is empty or has more than one element.
  */
-inline fun <T> Dataset<T>.indexOfLast(predicate: (T) -> Boolean): Long {
-    // TODO might be able to improve
-    var lastIndex = -1L
-    var index = 0L
-    for (item in this) {
-        if (predicate(item))
-            lastIndex = index
-        index++
+fun <T> Dataset<T>.singleOrNull(): T? {
+    if (isEmpty)
+        return null
+
+    val firstTwo: List<T> = takeAsList(2) // less heavy than count()
+    return when (firstTwo.size) {
+        1 -> firstTwo.first()
+        else -> null
     }
-    return lastIndex
 }
 
 
-
 inline fun <reified T, reified R> Dataset<T>.map(noinline func: (T) -> R): Dataset<R> =
     map(MapFunction(func), encoder<R>())
 
@@ -357,7 +349,8 @@ inline fun <T, reified R> Dataset<T>.groupByKey(noinline func: (T) -> R): KeyVal
 inline fun <T, reified R> Dataset<T>.mapPartitions(noinline func: (Iterator<T>) -> Iterator<R>): Dataset<R> =
     mapPartitions(func, encoder<R>())
 
-fun <T> Dataset<T>.filterNotNull() = filter { it != null }
+@Suppress("UNCHECKED_CAST")
+fun <T> Dataset<T?>.filterNotNull(): Dataset<T> = filter { it != null } as Dataset<T>
 
 inline fun <KEY, VALUE, reified R> KeyValueGroupedDataset<KEY, VALUE>.mapValues(noinline func: (VALUE) -> R): KeyValueGroupedDataset<KEY, R> =
     mapValues(MapFunction(func), encoder<R>())
diff --git a/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt b/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt
index 5fefc020..d24c3f43 100644
--- a/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt
+++ b/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt
@@ -561,9 +561,7 @@ class ApiTest : ShouldSpec({
             should("Have more stdlib functions for Datasets") {
                 val dataset = listOf(1, 2, 3).toDS()
                 (1 in dataset) shouldBe true
-                dataset.indexOf(1) shouldBe 0L
-
-                dataset.first()
+                dataset.tailAsList(2) shouldBe listOf(2, 3)
 
 
 

From e0d9d6b4382ff70e38c0b7e19c9fe1353bb6f8ac Mon Sep 17 00:00:00 2001
From: Jolanrensen <j.j.r.rensen@student.tue.nl>
Date: Tue, 27 Jul 2021 23:30:45 +0200
Subject: [PATCH 3/9] working on dropWhile etc

---
 .../org/jetbrains/kotlinx/spark/api/ApiV1.kt  | 96 +++++++++++++++++--
 .../jetbrains/kotlinx/spark/api/ApiTest.kt    |  8 +-
 2 files changed, 92 insertions(+), 12 deletions(-)

diff --git a/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt
index d093d624..b4695b03 100644
--- a/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt
+++ b/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt
@@ -28,6 +28,7 @@ import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.sql.*
 import org.apache.spark.sql.Encoders.*
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.functions.*
 import org.apache.spark.sql.streaming.GroupState
 import org.apache.spark.sql.streaming.GroupStateTimeout
 import org.apache.spark.sql.streaming.OutputMode
@@ -188,21 +189,29 @@ private fun <T> kotlinClassEncoder(schema: DataType, kClass: KClass<*>): Encoder
 
 /**
  * Allows `for (element in dataset)`.
+ *
+ * Note that this creates an iterator which can consume lots of memory. `.forEach {}` might be more efficient.
+ * TODO: Add plugin inspection hint
  */
-@Deprecated(
-    message = "Note that this creates an iterator which can consume lots of memory. `.forEach {}` might be more efficient.",
-    level = DeprecationLevel.WARNING
-)
 operator fun <T> Dataset<T>.iterator(): Iterator<T> = toLocalIterator()
 
+fun <T> Dataset<T>.toIterable(): Iterable<T> = Iterable<T> { toLocalIterator() }
+
 /**
  * Returns `true` if [element] is found in the collection.
+ *
+ * Note: Converting the dataset to an [Iterable] first might be a faster but potentially more memory
+ * intensive solution. See [toIterable].
+ * TODO: Add plugin inspection hint
  */
-inline operator fun <reified T> Dataset<T>.contains(element: T): Boolean =
-    !filter { it == element }.isEmpty
+inline operator fun <reified T> Dataset<T>.contains(element: T): Boolean = !filter { it == element }.isEmpty
 
 /**
  * Returns the first element matching the given [predicate], or `null` if no such element was found.
+ *
+ * Note: Converting the dataset to an [Iterable] first might be a faster but potentially more memory
+ * intensive solution. See [toIterable].
+ * TODO: Add plugin inspection hint
  */
 fun <T> Dataset<T>.find(predicate: (T) -> Boolean): T? {
     return firstOrNull(predicate)
@@ -210,6 +219,7 @@ fun <T> Dataset<T>.find(predicate: (T) -> Boolean): T? {
 
 /**
  * Returns the last element matching the given [predicate], or `null` if no such element was found.
+ * TODO: Add plugin inspection hint
  */
 fun <T> Dataset<T>.findLast(predicate: (T) -> Boolean): T? {
     return lastOrNull(predicate)
@@ -218,6 +228,7 @@ fun <T> Dataset<T>.findLast(predicate: (T) -> Boolean): T? {
 /**
  * Returns the first element matching the given [predicate].
  * @throws [NoSuchElementException] if no such element is found.
+ * TODO: Add plugin inspection hint
  */
 fun <T> Dataset<T>.first(predicate: (T) -> Boolean): T =
     filter(predicate).first()
@@ -225,6 +236,7 @@ fun <T> Dataset<T>.first(predicate: (T) -> Boolean): T =
 /**
  * Returns the first non-null value produced by [transform] function being applied to elements of this collection in iteration order,
  * or throws [NoSuchElementException] if no non-null value was produced.
+ * TODO: Add plugin inspection hint
  */
 inline fun <reified T, reified R : Any> Dataset<T>.firstNotNullOf(noinline transform: (T) -> R?): R =
     map(transform)
@@ -234,6 +246,7 @@ inline fun <reified T, reified R : Any> Dataset<T>.firstNotNullOf(noinline trans
 /**
  * Returns the first non-null value produced by [transform] function being applied to elements of this collection in iteration order,
  * or `null` if no non-null value was produced.
+ * TODO: Add plugin inspection hint
  */
 inline fun <reified T, reified R : Any> Dataset<T>.firstNotNullOfOrNull(noinline transform: (T) -> R?): R? =
     map(transform)
@@ -247,6 +260,7 @@ fun <T> Dataset<T>.firstOrNull(): T? = if (isEmpty) null else first()
 
 /**
  * Returns the first element matching the given [predicate], or `null` if element was not found.
+ * TODO: Add plugin inspection hint
  */
 fun <T> Dataset<T>.firstOrNull(predicate: (T) -> Boolean): T? = filter(predicate).firstOrNull()
 
@@ -261,16 +275,19 @@ fun <T> Dataset<T>.last(): T = tailAsList(1).first()
  * Returns the last element matching the given [predicate].
  *
  * @throws NoSuchElementException if no such element is found.
+ * TODO: Add plugin inspection hint
  */
 fun <T> Dataset<T>.last(predicate: (T) -> Boolean): T = filter(predicate).last()
 
 /**
  * Returns the last element, or `null` if the collection is empty.
+ * TODO: Add plugin inspection hint
  */
 fun <T> Dataset<T>.lastOrNull(): T? = if (isEmpty) null else last()
 
 /**
  * Returns the last element matching the given [predicate], or `null` if no such element was found.
+ * TODO: Add plugin inspection hint
  */
 fun <T> Dataset<T>.lastOrNull(predicate: (T) -> Boolean): T? = filter(predicate).lastOrNull()
 
@@ -334,6 +351,71 @@ fun <T> Dataset<T>.singleOrNull(): T? {
 }
 
 
+fun Dataset<*>.getUniqueNewColumnName(): String {
+    val rowKeys = columns()
+    val alphabet = 'a'..'z'
+    var colName = alphabet.random().toString()
+    while (colName in rowKeys) colName += alphabet.random()
+
+    return colName
+}
+
+/**
+ * Returns a Dataset containing all elements except first [n] elements.
+ *
+ * @throws IllegalArgumentException if [n] is negative.
+ *
+ * TODO make more efficient
+ */
+inline fun <reified T> Dataset<T>.drop(n: Int): Dataset<T> {
+    require(n >= 0) { "Requested element count $n is less than zero." }
+    val index = getUniqueNewColumnName()
+    return withColumn(index, monotonicallyIncreasingId())
+        .orderBy(desc(index))
+        .dropLast(n)
+        .orderBy(index)
+        .drop(index)
+        .`as`<T>()
+}
+
+/**
+ * Returns a Dataset containing all elements except last [n] elements.
+ *
+ * @throws IllegalArgumentException if [n] is negative.
+ */
+fun <T> Dataset<T>.dropLast(n: Int): Dataset<T> {
+    require(n >= 0) { "Requested element count $n is less than zero." }
+    return limit(
+        (count() - n).toInt().coerceAtLeast(0)
+    )
+}
+
+/**
+ * Returns a Dataset containing all elements except last elements that satisfy the given [predicate].
+ */
+inline fun <T> Dataset<T>.dropLastWhile(predicate: (T) -> Boolean): Dataset<T> {
+    val reversedWithIndex = withColumn("index", monotonicallyIncreasingId())
+        .orderBy(desc("index"))
+
+    TODO()
+}
+
+/**
+ * Returns a Dataset containing all elements except first elements that satisfy the given [predicate].
+ *
+ * TODO Can definitely be made more efficient
+ * TODO Add plugin toIterable warning
+ */
+inline fun <reified T> Dataset<T>.dropWhile(noinline predicate: (T) -> Boolean): Dataset<T> {
+    val dropUntil = map(predicate)
+        .withColumn(getUniqueNewColumnName(), monotonicallyIncreasingId())
+        .firstOrNull { it.getBoolean(0) }
+        ?.getLong(1)
+        ?: -1L
+
+    return drop(dropUntil.toInt() + 1)
+}
+
 inline fun <reified T, reified R> Dataset<T>.map(noinline func: (T) -> R): Dataset<R> =
     map(MapFunction(func), encoder<R>())
 
@@ -924,7 +1006,7 @@ inline fun <reified T, reified U> Dataset<T>.col(column: KProperty1<T, U>): Type
  */
 @Suppress("UNCHECKED_CAST")
 inline fun <reified T, reified U> col(column: KProperty1<T, U>): TypedColumn<T, U> =
-    functions.col(column.name).`as`<U>() as TypedColumn<T, U>
+    col(column.name).`as`<U>() as TypedColumn<T, U>
 
 /**
  * Helper function to quickly get a [TypedColumn] (or [Column]) from a dataset in a refactor-safe manner.
diff --git a/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt b/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt
index d24c3f43..3b15069c 100644
--- a/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt
+++ b/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt
@@ -559,11 +559,9 @@ class ApiTest : ShouldSpec({
                 first.enumMap shouldBe mapOf(SomeEnum.A to SomeOtherEnum.C)
             }
             should("Have more stdlib functions for Datasets") {
-                val dataset = listOf(1, 2, 3).toDS()
-                (1 in dataset) shouldBe true
-                dataset.tailAsList(2) shouldBe listOf(2, 3)
-
-
+                val dataset = listOf(1, 2, 3).toDS().drop(2)
+                dataset.count() shouldBe 1L
+                (3 in dataset) shouldBe true
 
             }
         }

From d144fd211bbccc20b96064c8ad9c8d604e0f80e7 Mon Sep 17 00:00:00 2001
From: Jolanrensen <j.j.r.rensen@student.tue.nl>
Date: Thu, 29 Jul 2021 21:03:56 +0200
Subject: [PATCH 4/9] dropwhiles, all/any

---
 .../org/jetbrains/kotlinx/spark/api/ApiV1.kt  | 106 +++++++++++++++---
 1 file changed, 90 insertions(+), 16 deletions(-)

diff --git a/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt
index b4695b03..dc08ff6b 100644
--- a/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt
+++ b/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt
@@ -369,13 +369,20 @@ fun Dataset<*>.getUniqueNewColumnName(): String {
  */
 inline fun <reified T> Dataset<T>.drop(n: Int): Dataset<T> {
     require(n >= 0) { "Requested element count $n is less than zero." }
-    val index = getUniqueNewColumnName()
-    return withColumn(index, monotonicallyIncreasingId())
-        .orderBy(desc(index))
-        .dropLast(n)
-        .orderBy(index)
-        .drop(index)
-        .`as`<T>()
+    return when {
+        isEmpty -> this
+        n >= count() -> limit(0)
+        else -> {
+            val index = getUniqueNewColumnName()
+            withColumn(index, monotonicallyIncreasingId())
+                .orderBy(desc(index))
+                .dropLast(n)
+                .orderBy(index)
+                .drop(index)
+                .`as`<T>()
+        }
+    }
+
 }
 
 /**
@@ -385,19 +392,42 @@ inline fun <reified T> Dataset<T>.drop(n: Int): Dataset<T> {
  */
 fun <T> Dataset<T>.dropLast(n: Int): Dataset<T> {
     require(n >= 0) { "Requested element count $n is less than zero." }
-    return limit(
-        (count() - n).toInt().coerceAtLeast(0)
-    )
+    return when {
+        isEmpty -> this
+        n >= count() -> limit(0)
+        else -> limit(
+            (count() - n).toInt().coerceAtLeast(0)
+        )
+    }
+
 }
 
 /**
  * Returns a Dataset containing all elements except last elements that satisfy the given [predicate].
+ *
+ * TODO Add plugin toIterable warning
  */
-inline fun <T> Dataset<T>.dropLastWhile(predicate: (T) -> Boolean): Dataset<T> {
-    val reversedWithIndex = withColumn("index", monotonicallyIncreasingId())
-        .orderBy(desc("index"))
+inline fun <reified T> Dataset<T>.dropLastWhile(noinline predicate: (T) -> Boolean): Dataset<T> {
+    if (isEmpty) return this
 
-    TODO()
+    val filterApplied = map(predicate)
+        .withColumn(
+            getUniqueNewColumnName(),
+            monotonicallyIncreasingId(),
+        )
+
+    if (filterApplied.all { it.getBoolean(0) })
+        return limit(0)
+
+    if (filterApplied.all { !it.getBoolean(0) })
+        return this
+
+    val dropFrom = filterApplied
+        .lastOrNull { !it.getBoolean(0) }
+        ?.getLong(1)
+        ?: -1L
+
+    return dropLast(count().toInt() - (dropFrom.toInt() + 1))
 }
 
 /**
@@ -407,8 +437,21 @@ inline fun <T> Dataset<T>.dropLastWhile(predicate: (T) -> Boolean): Dataset<T> {
  * TODO Add plugin toIterable warning
  */
 inline fun <reified T> Dataset<T>.dropWhile(noinline predicate: (T) -> Boolean): Dataset<T> {
-    val dropUntil = map(predicate)
-        .withColumn(getUniqueNewColumnName(), monotonicallyIncreasingId())
+    if (isEmpty) return this
+
+    val filterApplied = map(predicate)
+        .withColumn(
+            getUniqueNewColumnName(),
+            monotonicallyIncreasingId(),
+        )
+
+    if (filterApplied.all { it.getBoolean(0) })
+        return limit(0)
+
+    if (filterApplied.all { !it.getBoolean(0) })
+        return this
+
+    val dropUntil = filterApplied
         .firstOrNull { it.getBoolean(0) }
         ?.getLong(1)
         ?: -1L
@@ -416,6 +459,37 @@ inline fun <reified T> Dataset<T>.dropWhile(noinline predicate: (T) -> Boolean):
     return drop(dropUntil.toInt() + 1)
 }
 
+
+/**
+ * Returns `true` if collection has at least one element.
+ */
+fun Dataset<*>.any(): Boolean = !isEmpty
+
+/**
+ * Returns `true` if all elements match the given [predicate].
+ *
+ * TODO plugin (!any)
+ */
+inline fun <reified T> Dataset<T>.all(noinline predicate: (T) -> Boolean): Boolean {
+    if (isEmpty) return true
+
+    return map(predicate)
+        .reduceK { a, b -> a && b }
+}
+
+
+/**
+ * Returns `true` if at least one element matches the given [predicate].
+ *
+ * TODO plugin note to make it faster
+ */
+inline fun <reified T> Dataset<T>.any(noinline predicate: (T) -> Boolean): Boolean {
+    if (isEmpty) return false
+
+    return map(predicate)
+        .reduceK { a, b -> a || b }
+}
+
 inline fun <reified T, reified R> Dataset<T>.map(noinline func: (T) -> R): Dataset<R> =
     map(MapFunction(func), encoder<R>())
 

From 475482085b27773523fe47aeec333c424de40351 Mon Sep 17 00:00:00 2001
From: Jolanrensen <j.j.r.rensen@student.tue.nl>
Date: Thu, 29 Jul 2021 21:50:46 +0200
Subject: [PATCH 5/9] dropwhiles, all/any

---
 .../org/jetbrains/kotlinx/spark/api/ApiV1.kt  | 38 +++++++++++++++++--
 .../jetbrains/kotlinx/spark/api/ApiTest.kt    |  4 +-
 2 files changed, 36 insertions(+), 6 deletions(-)

diff --git a/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt
index dc08ff6b..713c051d 100644
--- a/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt
+++ b/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt
@@ -459,6 +459,23 @@ inline fun <reified T> Dataset<T>.dropWhile(noinline predicate: (T) -> Boolean):
     return drop(dropUntil.toInt() + 1)
 }
 
+/**
+ * Returns a list containing only elements matching the given [predicate].
+ * @param [predicate] function that takes the index of an element and the element itself
+ * and returns the result of predicate evaluation on the element.
+ *
+ */
+inline fun <T> Dataset<T>.filterIndexed(predicate: (index: Int, T) -> Boolean): Dataset<T> {
+    val index = getUniqueNewColumnName()
+
+    val indices = withColumn(index, monotonicallyIncreasingId())
+        .selectTyped(col(index).`as`<Row, Long>())
+
+    TODO()
+//    return filterIndexedTo(ArrayList<T>(), predicate)
+}
+
+
 
 /**
  * Returns `true` if collection has at least one element.
@@ -949,11 +966,17 @@ operator fun Column.get(key: Any): Column = getItem(key)
 fun lit(a: Any) = functions.lit(a)
 
 /**
- * Provides a type hint about the expected return value of this column.  This information can
+ * Provides a type hint about the expected return value of this column. This information can
  * be used by operations such as `select` on a [Dataset] to automatically convert the
  * results into the correct JVM types.
+ *
+ * ```
+ * val df: Dataset<Row> = ...
+ * val typedColumn: Dataset<Int> = df.selectTyped( col("a").`as`<Row, Int>() )
+ * ```
  */
-inline fun <reified T> Column.`as`(): TypedColumn<Any, T> = `as`(encoder<T>())
+@Suppress("UNCHECKED_CAST")
+inline fun <S, reified T> Column.`as`(): TypedColumn<S, T> = `as`(encoder<T>()) as TypedColumn<S, T>
 
 /**
  * Alias for [Dataset.joinWith] which passes "left" argument
@@ -1068,7 +1091,7 @@ operator fun <T> Dataset<T>.invoke(colName: String): Column = col(colName)
 
 @Suppress("UNCHECKED_CAST")
 inline fun <reified T, reified U> Dataset<T>.col(column: KProperty1<T, U>): TypedColumn<T, U> =
-    col(column.name).`as`<U>() as TypedColumn<T, U>
+    col(column.name).`as`()
 
 /**
  * Returns a [Column] based on the given class attribute, not connected to a dataset.
@@ -1080,7 +1103,7 @@ inline fun <reified T, reified U> Dataset<T>.col(column: KProperty1<T, U>): Type
  */
 @Suppress("UNCHECKED_CAST")
 inline fun <reified T, reified U> col(column: KProperty1<T, U>): TypedColumn<T, U> =
-    col(column.name).`as`<U>() as TypedColumn<T, U>
+    col(column.name).`as`()
 
 /**
  * Helper function to quickly get a [TypedColumn] (or [Column]) from a dataset in a refactor-safe manner.
@@ -1108,6 +1131,13 @@ fun <T> Dataset<T>.sort(col: KProperty1<T, *>, vararg cols: KProperty1<T, *>): D
  */
 fun <T> Dataset<T>.showDS(numRows: Int = 20, truncate: Boolean = true) = apply { show(numRows, truncate) }
 
+/**
+ * Returns a new Dataset by computing the given [Column] expressions for each element.
+ */
+inline fun <reified T, reified U1> Dataset<T>.selectTyped(
+    c1: TypedColumn<T, U1>,
+): Dataset<U1> = select(c1)
+
 /**
  * Returns a new Dataset by computing the given [Column] expressions for each element.
  */
diff --git a/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt b/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt
index 3b15069c..b094ed1a 100644
--- a/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt
+++ b/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt
@@ -362,7 +362,7 @@ class ApiTest : ShouldSpec({
                     SomeClass(intArrayOf(1, 2, 4), 5),
                 )
 
-                val typedColumnA: TypedColumn<Any, IntArray> = dataset.col("a").`as`(encoder())
+                val typedColumnA: TypedColumn<SomeClass, IntArray> = dataset.col("a").`as`<SomeClass, IntArray>()
 
                 val newDS2 = dataset.selectTyped(
                     col(SomeClass::a), // NOTE: this only works on 3.0, returning a data class with an array in it
@@ -454,7 +454,7 @@ class ApiTest : ShouldSpec({
                 )
                 dataset.show()
 
-                val column = col("b").`as`<IntArray>()
+                val column = col("b").`as`<SomeOtherClass, IntArray>()
 
                 val b = dataset.where(column gt 3 and col(SomeOtherClass::c))
                 b.show()

From d9b9067ba13a648232b6dfa731a3f5a05db51e55 Mon Sep 17 00:00:00 2001
From: Jolanrensen <j.j.r.rensen@student.tue.nl>
Date: Thu, 29 Jul 2021 22:03:41 +0200
Subject: [PATCH 6/9] updated col().`as`() behavior and added single
 selectTyped() variant

---
 .../org/jetbrains/kotlinx/spark/api/ApiV1.kt  | 22 +++++++++++++++----
 .../jetbrains/kotlinx/spark/api/ApiTest.kt    | 15 ++++++++-----
 .../org/jetbrains/kotlinx/spark/api/ApiV1.kt  | 21 ++++++++++++++----
 .../jetbrains/kotlinx/spark/api/ApiTest.kt    | 15 ++++++++-----
 4 files changed, 53 insertions(+), 20 deletions(-)

diff --git a/kotlin-spark-api/2.4/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/2.4/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt
index 34152494..de9b7fca 100644
--- a/kotlin-spark-api/2.4/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt
+++ b/kotlin-spark-api/2.4/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt
@@ -647,11 +647,18 @@ operator fun Column.get(key: Any): Column = getItem(key)
 fun lit(a: Any) = functions.lit(a)
 
 /**
- * Provides a type hint about the expected return value of this column.  This information can
+ * Provides a type hint about the expected return value of this column. This information can
  * be used by operations such as `select` on a [Dataset] to automatically convert the
  * results into the correct JVM types.
+ *
+ * ```
+ * val df: Dataset<Row> = ...
+ * val typedColumn: Dataset<Int> = df.selectTyped( col("a").`as`<Row, Int>() )
+ * ```
  */
-inline fun <reified T> Column.`as`(): TypedColumn<Any, T> = `as`(encoder<T>())
+@Suppress("UNCHECKED_CAST")
+inline fun <S, reified T> Column.`as`(): TypedColumn<S, T> = `as`(encoder<T>()) as TypedColumn<S, T>
+
 
 /**
  * Alias for [Dataset.joinWith] which passes "left" argument
@@ -766,7 +773,7 @@ operator fun <T> Dataset<T>.invoke(colName: String): Column = col(colName)
 
 @Suppress("UNCHECKED_CAST")
 inline fun <reified T, reified U> Dataset<T>.col(column: KProperty1<T, U>): TypedColumn<T, U> =
-    col(column.name).`as`<U>() as TypedColumn<T, U>
+    col(column.name).`as`()
 
 /**
  * Returns a [Column] based on the given class attribute, not connected to a dataset.
@@ -778,7 +785,7 @@ inline fun <reified T, reified U> Dataset<T>.col(column: KProperty1<T, U>): Type
  */
 @Suppress("UNCHECKED_CAST")
 inline fun <reified T, reified U> col(column: KProperty1<T, U>): TypedColumn<T, U> =
-    functions.col(column.name).`as`<U>() as TypedColumn<T, U>
+    functions.col(column.name).`as`()
 
 /**
  * Helper function to quickly get a [TypedColumn] (or [Column]) from a dataset in a refactor-safe manner.
@@ -806,6 +813,13 @@ fun <T> Dataset<T>.sort(col: KProperty1<T, *>, vararg cols: KProperty1<T, *>): D
  */
 fun <T> Dataset<T>.showDS(numRows: Int = 20, truncate: Boolean = true) = apply { show(numRows, truncate) }
 
+/**
+ * Returns a new Dataset by computing the given [Column] expressions for each element.
+ */
+inline fun <reified T, reified U1> Dataset<T>.selectTyped(
+    c1: TypedColumn<T, U1>,
+): Dataset<U1> = select(c1)
+
 /**
  * Returns a new Dataset by computing the given [Column] expressions for each element.
  */
diff --git a/kotlin-spark-api/2.4/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt b/kotlin-spark-api/2.4/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt
index bae27b2e..66dbb872 100644
--- a/kotlin-spark-api/2.4/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt
+++ b/kotlin-spark-api/2.4/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt
@@ -339,23 +339,26 @@ class ApiTest : ShouldSpec({
                     SomeClass(intArrayOf(1, 2, 4), 5),
                 )
 
-                val typedColumnA: TypedColumn<Any, IntArray> = dataset.col("a").`as`(encoder())
+                val newDS1WithAs: Dataset<Int> = dataset.selectTyped(
+                    col("b").`as`(),
+                )
+                newDS1WithAs.show()
 
-                val newDS2 = dataset.selectTyped(
+                val newDS2: Dataset<Pair<Int, Int>> = dataset.selectTyped(
 //                    col(SomeClass::a), NOTE that this doesn't work on 2.4, returnting a data class with an array in it
                     col(SomeClass::b),
                     col(SomeClass::b),
                 )
                 newDS2.show()
 
-                val newDS3 = dataset.selectTyped(
+                val newDS3: Dataset<Triple<Int, Int, Int>> = dataset.selectTyped(
                     col(SomeClass::b),
                     col(SomeClass::b),
                     col(SomeClass::b),
                 )
                 newDS3.show()
 
-                val newDS4 = dataset.selectTyped(
+                val newDS4: Dataset<Arity4<Int, Int, Int, Int>> = dataset.selectTyped(
                     col(SomeClass::b),
                     col(SomeClass::b),
                     col(SomeClass::b),
@@ -363,7 +366,7 @@ class ApiTest : ShouldSpec({
                 )
                 newDS4.show()
 
-                val newDS5 = dataset.selectTyped(
+                val newDS5: Dataset<Arity5<Int, Int, Int, Int, Int>> = dataset.selectTyped(
                     col(SomeClass::b),
                     col(SomeClass::b),
                     col(SomeClass::b),
@@ -434,7 +437,7 @@ class ApiTest : ShouldSpec({
                 )
                 dataset.show()
 
-                val column = col("b").`as`<IntArray>()
+                val column = col("b").`as`<SomeOtherClass, IntArray>()
 
                 val b = dataset.where(column gt 3 and col(SomeOtherClass::c))
                 b.show()
diff --git a/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt
index 2dde48cb..604e98d8 100644
--- a/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt
+++ b/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt
@@ -643,11 +643,17 @@ operator fun Column.get(key: Any): Column = getItem(key)
 fun lit(a: Any) = functions.lit(a)
 
 /**
- * Provides a type hint about the expected return value of this column.  This information can
+ * Provides a type hint about the expected return value of this column. This information can
  * be used by operations such as `select` on a [Dataset] to automatically convert the
  * results into the correct JVM types.
+ *
+ * ```
+ * val df: Dataset<Row> = ...
+ * val typedColumn: Dataset<Int> = df.selectTyped( col("a").`as`<Row, Int>() )
+ * ```
  */
-inline fun <reified T> Column.`as`(): TypedColumn<Any, T> = `as`(encoder<T>())
+@Suppress("UNCHECKED_CAST")
+inline fun <S, reified T> Column.`as`(): TypedColumn<S, T> = `as`(encoder<T>()) as TypedColumn<S, T>
 
 /**
  * Alias for [Dataset.joinWith] which passes "left" argument
@@ -762,7 +768,7 @@ operator fun <T> Dataset<T>.invoke(colName: String): Column = col(colName)
 
 @Suppress("UNCHECKED_CAST")
 inline fun <reified T, reified U> Dataset<T>.col(column: KProperty1<T, U>): TypedColumn<T, U> =
-    col(column.name).`as`<U>() as TypedColumn<T, U>
+    col(column.name).`as`()
 
 /**
  * Returns a [Column] based on the given class attribute, not connected to a dataset.
@@ -774,7 +780,7 @@ inline fun <reified T, reified U> Dataset<T>.col(column: KProperty1<T, U>): Type
  */
 @Suppress("UNCHECKED_CAST")
 inline fun <reified T, reified U> col(column: KProperty1<T, U>): TypedColumn<T, U> =
-    functions.col(column.name).`as`<U>() as TypedColumn<T, U>
+    functions.col(column.name).`as`()
 
 /**
  * Helper function to quickly get a [TypedColumn] (or [Column]) from a dataset in a refactor-safe manner.
@@ -802,6 +808,13 @@ fun <T> Dataset<T>.sort(col: KProperty1<T, *>, vararg cols: KProperty1<T, *>): D
  */
 fun <T> Dataset<T>.showDS(numRows: Int = 20, truncate: Boolean = true) = apply { show(numRows, truncate) }
 
+/**
+ * Returns a new Dataset by computing the given [Column] expressions for each element.
+ */
+inline fun <reified T, reified U1> Dataset<T>.selectTyped(
+    c1: TypedColumn<T, U1>,
+): Dataset<U1> = select(c1)
+
 /**
  * Returns a new Dataset by computing the given [Column] expressions for each element.
  */
diff --git a/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt b/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt
index bff38ac1..a0e5c25f 100644
--- a/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt
+++ b/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt
@@ -362,22 +362,25 @@ class ApiTest : ShouldSpec({
                     SomeClass(intArrayOf(1, 2, 4), 5),
                 )
 
-                val typedColumnA: TypedColumn<Any, IntArray> = dataset.col("a").`as`(encoder())
+                val newDS1WithAs: Dataset<IntArray> = dataset.selectTyped(
+                    col("a").`as`(),
+                )
+                newDS1WithAs.show()
 
-                val newDS2 = dataset.selectTyped(
+                val newDS2: Dataset<Pair<IntArray, Int>> = dataset.selectTyped(
                     col(SomeClass::a), // NOTE: this only works on 3.0, returning a data class with an array in it
                     col(SomeClass::b),
                 )
                 newDS2.show()
 
-                val newDS3 = dataset.selectTyped(
+                val newDS3: Dataset<Triple<IntArray, Int, Int>> = dataset.selectTyped(
                     col(SomeClass::a),
                     col(SomeClass::b),
                     col(SomeClass::b),
                 )
                 newDS3.show()
 
-                val newDS4 = dataset.selectTyped(
+                val newDS4: Dataset<Arity4<IntArray, Int, Int, Int>> = dataset.selectTyped(
                     col(SomeClass::a),
                     col(SomeClass::b),
                     col(SomeClass::b),
@@ -385,7 +388,7 @@ class ApiTest : ShouldSpec({
                 )
                 newDS4.show()
 
-                val newDS5 = dataset.selectTyped(
+                val newDS5: Dataset<Arity5<IntArray, Int, Int, Int, Int>> = dataset.selectTyped(
                     col(SomeClass::a),
                     col(SomeClass::b),
                     col(SomeClass::b),
@@ -454,7 +457,7 @@ class ApiTest : ShouldSpec({
                 )
                 dataset.show()
 
-                val column = col("b").`as`<IntArray>()
+                val column = col("b").`as`<SomeOtherClass, IntArray>()
 
                 val b = dataset.where(column gt 3 and col(SomeOtherClass::c))
                 b.show()

From eae9cb4a65e47de3236deb1b4683ed37abe84b06 Mon Sep 17 00:00:00 2001
From: Jolanrensen <j.j.r.rensen@student.tue.nl>
Date: Thu, 29 Jul 2021 22:36:30 +0200
Subject: [PATCH 7/9] reverted col().`as`<>() behavior for simplicity. Updated
 selectTyped to accept any TypedColumn

---
 .../org/jetbrains/kotlinx/spark/api/ApiV1.kt  | 72 +++++++++++-------
 .../jetbrains/kotlinx/spark/api/ApiTest.kt    |  4 +-
 .../org/jetbrains/kotlinx/spark/api/ApiV1.kt  | 74 ++++++++++++-------
 .../jetbrains/kotlinx/spark/api/ApiTest.kt    |  4 +-
 4 files changed, 99 insertions(+), 55 deletions(-)

diff --git a/kotlin-spark-api/2.4/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/2.4/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt
index de9b7fca..122e1747 100644
--- a/kotlin-spark-api/2.4/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt
+++ b/kotlin-spark-api/2.4/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt
@@ -653,11 +653,11 @@ fun lit(a: Any) = functions.lit(a)
  *
  * ```
  * val df: Dataset<Row> = ...
- * val typedColumn: Dataset<Int> = df.selectTyped( col("a").`as`<Row, Int>() )
+ * val typedColumn: Dataset<Int> = df.selectTyped( col("a").`as`<Int>() )
  * ```
  */
 @Suppress("UNCHECKED_CAST")
-inline fun <S, reified T> Column.`as`(): TypedColumn<S, T> = `as`(encoder<T>()) as TypedColumn<S, T>
+inline fun <reified T> Column.`as`(): TypedColumn<Any, T> = `as`(encoder<T>())
 
 
 /**
@@ -773,7 +773,7 @@ operator fun <T> Dataset<T>.invoke(colName: String): Column = col(colName)
 
 @Suppress("UNCHECKED_CAST")
 inline fun <reified T, reified U> Dataset<T>.col(column: KProperty1<T, U>): TypedColumn<T, U> =
-    col(column.name).`as`()
+    col(column.name).`as`<U>() as TypedColumn<T, U>
 
 /**
  * Returns a [Column] based on the given class attribute, not connected to a dataset.
@@ -785,7 +785,7 @@ inline fun <reified T, reified U> Dataset<T>.col(column: KProperty1<T, U>): Type
  */
 @Suppress("UNCHECKED_CAST")
 inline fun <reified T, reified U> col(column: KProperty1<T, U>): TypedColumn<T, U> =
-    functions.col(column.name).`as`()
+    functions.col(column.name).`as`<U>() as TypedColumn<T, U>
 
 /**
  * Helper function to quickly get a [TypedColumn] (or [Column]) from a dataset in a refactor-safe manner.
@@ -816,52 +816,74 @@ fun <T> Dataset<T>.showDS(numRows: Int = 20, truncate: Boolean = true) = apply {
 /**
  * Returns a new Dataset by computing the given [Column] expressions for each element.
  */
+@Suppress("UNCHECKED_CAST")
 inline fun <reified T, reified U1> Dataset<T>.selectTyped(
-    c1: TypedColumn<T, U1>,
-): Dataset<U1> = select(c1)
+    c1: TypedColumn<out Any, U1>,
+): Dataset<U1> = select(c1 as TypedColumn<T, U1>)
 
 /**
  * Returns a new Dataset by computing the given [Column] expressions for each element.
  */
+@Suppress("UNCHECKED_CAST")
 inline fun <reified T, reified U1, reified U2> Dataset<T>.selectTyped(
-    c1: TypedColumn<T, U1>,
-    c2: TypedColumn<T, U2>,
+    c1: TypedColumn<out Any, U1>,
+    c2: TypedColumn<out Any, U2>,
 ): Dataset<Pair<U1, U2>> =
-    select(c1, c2).map { Pair(it._1(), it._2()) }
+    select(
+        c1 as TypedColumn<T, U1>,
+        c2 as TypedColumn<T, U2>,
+    ).map { Pair(it._1(), it._2()) }
 
 /**
  * Returns a new Dataset by computing the given [Column] expressions for each element.
  */
+@Suppress("UNCHECKED_CAST")
 inline fun <reified T, reified U1, reified U2, reified U3> Dataset<T>.selectTyped(
-    c1: TypedColumn<T, U1>,
-    c2: TypedColumn<T, U2>,
-    c3: TypedColumn<T, U3>,
+    c1: TypedColumn<out Any, U1>,
+    c2: TypedColumn<out Any, U2>,
+    c3: TypedColumn<out Any, U3>,
 ): Dataset<Triple<U1, U2, U3>> =
-    select(c1, c2, c3).map { Triple(it._1(), it._2(), it._3()) }
+    select(
+        c1 as TypedColumn<T, U1>,
+        c2 as TypedColumn<T, U2>,
+        c3 as TypedColumn<T, U3>,
+    ).map { Triple(it._1(), it._2(), it._3()) }
 
 /**
  * Returns a new Dataset by computing the given [Column] expressions for each element.
  */
+@Suppress("UNCHECKED_CAST")
 inline fun <reified T, reified U1, reified U2, reified U3, reified U4> Dataset<T>.selectTyped(
-    c1: TypedColumn<T, U1>,
-    c2: TypedColumn<T, U2>,
-    c3: TypedColumn<T, U3>,
-    c4: TypedColumn<T, U4>,
+    c1: TypedColumn<out Any, U1>,
+    c2: TypedColumn<out Any, U2>,
+    c3: TypedColumn<out Any, U3>,
+    c4: TypedColumn<out Any, U4>,
 ): Dataset<Arity4<U1, U2, U3, U4>> =
-    select(c1, c2, c3, c4).map { Arity4(it._1(), it._2(), it._3(), it._4()) }
+    select(
+        c1 as TypedColumn<T, U1>,
+        c2 as TypedColumn<T, U2>,
+        c3 as TypedColumn<T, U3>,
+        c4 as TypedColumn<T, U4>,
+    ).map { Arity4(it._1(), it._2(), it._3(), it._4()) }
 
 /**
  * Returns a new Dataset by computing the given [Column] expressions for each element.
  */
+@Suppress("UNCHECKED_CAST")
 inline fun <reified T, reified U1, reified U2, reified U3, reified U4, reified U5> Dataset<T>.selectTyped(
-    c1: TypedColumn<T, U1>,
-    c2: TypedColumn<T, U2>,
-    c3: TypedColumn<T, U3>,
-    c4: TypedColumn<T, U4>,
-    c5: TypedColumn<T, U5>,
+    c1: TypedColumn<out Any, U1>,
+    c2: TypedColumn<out Any, U2>,
+    c3: TypedColumn<out Any, U3>,
+    c4: TypedColumn<out Any, U4>,
+    c5: TypedColumn<out Any, U5>,
 ): Dataset<Arity5<U1, U2, U3, U4, U5>> =
-    select(c1, c2, c3, c4, c5).map { Arity5(it._1(), it._2(), it._3(), it._4(), it._5()) }
-
+    select(
+        c1 as TypedColumn<T, U1>,
+        c2 as TypedColumn<T, U2>,
+        c3 as TypedColumn<T, U3>,
+        c4 as TypedColumn<T, U4>,
+        c5 as TypedColumn<T, U5>,
+    ).map { Arity5(it._1(), it._2(), it._3(), it._4(), it._5()) }
 
 @OptIn(ExperimentalStdlibApi::class)
 inline fun <reified T> schema(map: Map<String, KType> = mapOf()) = schema(typeOf<T>(), map)
diff --git a/kotlin-spark-api/2.4/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt b/kotlin-spark-api/2.4/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt
index 66dbb872..ec8a6e14 100644
--- a/kotlin-spark-api/2.4/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt
+++ b/kotlin-spark-api/2.4/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt
@@ -340,7 +340,7 @@ class ApiTest : ShouldSpec({
                 )
 
                 val newDS1WithAs: Dataset<Int> = dataset.selectTyped(
-                    col("b").`as`(),
+                    col("b").`as`<Int>(),
                 )
                 newDS1WithAs.show()
 
@@ -437,7 +437,7 @@ class ApiTest : ShouldSpec({
                 )
                 dataset.show()
 
-                val column = col("b").`as`<SomeOtherClass, IntArray>()
+                val column = col("b").`as`<IntArray>()
 
                 val b = dataset.where(column gt 3 and col(SomeOtherClass::c))
                 b.show()
diff --git a/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt
index 604e98d8..6b59c272 100644
--- a/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt
+++ b/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt
@@ -649,11 +649,11 @@ fun lit(a: Any) = functions.lit(a)
  *
  * ```
  * val df: Dataset<Row> = ...
- * val typedColumn: Dataset<Int> = df.selectTyped( col("a").`as`<Row, Int>() )
+ * val typedColumn: Dataset<Int> = df.selectTyped( col("a").`as`<Int>() )
  * ```
  */
 @Suppress("UNCHECKED_CAST")
-inline fun <S, reified T> Column.`as`(): TypedColumn<S, T> = `as`(encoder<T>()) as TypedColumn<S, T>
+inline fun <reified T> Column.`as`(): TypedColumn<Any, T> = `as`(encoder<T>())
 
 /**
  * Alias for [Dataset.joinWith] which passes "left" argument
@@ -768,19 +768,18 @@ operator fun <T> Dataset<T>.invoke(colName: String): Column = col(colName)
 
 @Suppress("UNCHECKED_CAST")
 inline fun <reified T, reified U> Dataset<T>.col(column: KProperty1<T, U>): TypedColumn<T, U> =
-    col(column.name).`as`()
+    col(column.name).`as`<U>() as TypedColumn<T, U>
 
 /**
  * Returns a [Column] based on the given class attribute, not connected to a dataset.
  * ```kotlin
  *    val dataset: Dataset<YourClass> = ...
- *    val new: Dataset<Tuple2<TypeOfA, TypeOfB>> = dataset.select( col(YourClass::a), col(YourClass::b) )
+ *    val new: Dataset<Pair<TypeOfA, TypeOfB>> = dataset.select( col(YourClass::a), col(YourClass::b) )
  * ```
- * TODO: change example to [Pair]s when merged
  */
 @Suppress("UNCHECKED_CAST")
 inline fun <reified T, reified U> col(column: KProperty1<T, U>): TypedColumn<T, U> =
-    functions.col(column.name).`as`()
+    functions.col(column.name).`as`<U>() as TypedColumn<T, U>
 
 /**
  * Helper function to quickly get a [TypedColumn] (or [Column]) from a dataset in a refactor-safe manner.
@@ -811,51 +810,74 @@ fun <T> Dataset<T>.showDS(numRows: Int = 20, truncate: Boolean = true) = apply {
 /**
  * Returns a new Dataset by computing the given [Column] expressions for each element.
  */
+@Suppress("UNCHECKED_CAST")
 inline fun <reified T, reified U1> Dataset<T>.selectTyped(
-    c1: TypedColumn<T, U1>,
-): Dataset<U1> = select(c1)
+    c1: TypedColumn<out Any, U1>,
+): Dataset<U1> = select(c1 as TypedColumn<T, U1>)
 
 /**
  * Returns a new Dataset by computing the given [Column] expressions for each element.
  */
+@Suppress("UNCHECKED_CAST")
 inline fun <reified T, reified U1, reified U2> Dataset<T>.selectTyped(
-    c1: TypedColumn<T, U1>,
-    c2: TypedColumn<T, U2>,
+    c1: TypedColumn<out Any, U1>,
+    c2: TypedColumn<out Any, U2>,
 ): Dataset<Pair<U1, U2>> =
-    select(c1, c2).map { Pair(it._1(), it._2()) }
+    select(
+        c1 as TypedColumn<T, U1>,
+        c2 as TypedColumn<T, U2>,
+    ).map { Pair(it._1(), it._2()) }
 
 /**
  * Returns a new Dataset by computing the given [Column] expressions for each element.
  */
+@Suppress("UNCHECKED_CAST")
 inline fun <reified T, reified U1, reified U2, reified U3> Dataset<T>.selectTyped(
-    c1: TypedColumn<T, U1>,
-    c2: TypedColumn<T, U2>,
-    c3: TypedColumn<T, U3>,
+    c1: TypedColumn<out Any, U1>,
+    c2: TypedColumn<out Any, U2>,
+    c3: TypedColumn<out Any, U3>,
 ): Dataset<Triple<U1, U2, U3>> =
-    select(c1, c2, c3).map { Triple(it._1(), it._2(), it._3()) }
+    select(
+        c1 as TypedColumn<T, U1>,
+        c2 as TypedColumn<T, U2>,
+        c3 as TypedColumn<T, U3>,
+    ).map { Triple(it._1(), it._2(), it._3()) }
 
 /**
  * Returns a new Dataset by computing the given [Column] expressions for each element.
  */
+@Suppress("UNCHECKED_CAST")
 inline fun <reified T, reified U1, reified U2, reified U3, reified U4> Dataset<T>.selectTyped(
-    c1: TypedColumn<T, U1>,
-    c2: TypedColumn<T, U2>,
-    c3: TypedColumn<T, U3>,
-    c4: TypedColumn<T, U4>,
+    c1: TypedColumn<out Any, U1>,
+    c2: TypedColumn<out Any, U2>,
+    c3: TypedColumn<out Any, U3>,
+    c4: TypedColumn<out Any, U4>,
 ): Dataset<Arity4<U1, U2, U3, U4>> =
-    select(c1, c2, c3, c4).map { Arity4(it._1(), it._2(), it._3(), it._4()) }
+    select(
+        c1 as TypedColumn<T, U1>,
+        c2 as TypedColumn<T, U2>,
+        c3 as TypedColumn<T, U3>,
+        c4 as TypedColumn<T, U4>,
+    ).map { Arity4(it._1(), it._2(), it._3(), it._4()) }
 
 /**
  * Returns a new Dataset by computing the given [Column] expressions for each element.
  */
+@Suppress("UNCHECKED_CAST")
 inline fun <reified T, reified U1, reified U2, reified U3, reified U4, reified U5> Dataset<T>.selectTyped(
-    c1: TypedColumn<T, U1>,
-    c2: TypedColumn<T, U2>,
-    c3: TypedColumn<T, U3>,
-    c4: TypedColumn<T, U4>,
-    c5: TypedColumn<T, U5>,
+    c1: TypedColumn<out Any, U1>,
+    c2: TypedColumn<out Any, U2>,
+    c3: TypedColumn<out Any, U3>,
+    c4: TypedColumn<out Any, U4>,
+    c5: TypedColumn<out Any, U5>,
 ): Dataset<Arity5<U1, U2, U3, U4, U5>> =
-    select(c1, c2, c3, c4, c5).map { Arity5(it._1(), it._2(), it._3(), it._4(), it._5()) }
+    select(
+        c1 as TypedColumn<T, U1>,
+        c2 as TypedColumn<T, U2>,
+        c3 as TypedColumn<T, U3>,
+        c4 as TypedColumn<T, U4>,
+        c5 as TypedColumn<T, U5>,
+    ).map { Arity5(it._1(), it._2(), it._3(), it._4(), it._5()) }
 
 
 @OptIn(ExperimentalStdlibApi::class)
diff --git a/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt b/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt
index a0e5c25f..0b9c01a9 100644
--- a/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt
+++ b/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt
@@ -363,7 +363,7 @@ class ApiTest : ShouldSpec({
                 )
 
                 val newDS1WithAs: Dataset<IntArray> = dataset.selectTyped(
-                    col("a").`as`(),
+                    col("a").`as`<IntArray>(),
                 )
                 newDS1WithAs.show()
 
@@ -457,7 +457,7 @@ class ApiTest : ShouldSpec({
                 )
                 dataset.show()
 
-                val column = col("b").`as`<SomeOtherClass, IntArray>()
+                val column = col("b").`as`<IntArray>()
 
                 val b = dataset.where(column gt 3 and col(SomeOtherClass::c))
                 b.show()

From 65ecd39f3d0dd5043ed4072b6af0cc97ea5750d7 Mon Sep 17 00:00:00 2001
From: Jolanrensen <j.j.r.rensen@student.tue.nl>
Date: Thu, 29 Jul 2021 22:57:25 +0200
Subject: [PATCH 8/9] updated readme

---
 README.md | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/README.md b/README.md
index c6b11f71..3ab69c55 100644
--- a/README.md
+++ b/README.md
@@ -192,6 +192,9 @@ to create `TypedColumn`s and with those a new Dataset from pieces of another usi
 ```kotlin
 val dataset: Dataset<YourClass> = ...
 val newDataset: Dataset<Pair<TypeA, TypeB>> = dataset.selectTyped(col(YourClass::colA), col(YourClass::colB))
+
+// Alternatively, for instance when working with a Dataset<Row>
+val typedDataset: Dataset<Pair<String, Int>> = otherDataset.selectTyped(col("a").`as`<String>(), col("b").`as`<Int>())
 ```
 
 ### Overload resolution ambiguity

From 24c48ba3f50365a75cde6aca4bc0248586957753 Mon Sep 17 00:00:00 2001
From: Jolanrensen <j.j.r.rensen@student.tue.nl>
Date: Fri, 30 Jul 2021 14:30:19 +0200
Subject: [PATCH 9/9] working on filterIndexed

---
 .../org/jetbrains/kotlinx/spark/api/ApiV1.kt      | 15 ++++++++-------
 1 file changed, 8 insertions(+), 7 deletions(-)

diff --git a/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt
index c2bbe6a8..695d9c32 100644
--- a/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt
+++ b/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt
@@ -465,14 +465,15 @@ inline fun <reified T> Dataset<T>.dropWhile(noinline predicate: (T) -> Boolean):
  * and returns the result of predicate evaluation on the element.
  *
  */
-inline fun <T> Dataset<T>.filterIndexed(predicate: (index: Int, T) -> Boolean): Dataset<T> {
-    val index = getUniqueNewColumnName()
-
-    val indices = withColumn(index, monotonicallyIncreasingId())
-        .selectTyped(col(index).`as`<Row, Long>())
-
+inline fun <reified T> Dataset<T>.filterIndexed(crossinline predicate: (index: Long, T) -> Boolean): Dataset<T> {
     TODO()
-//    return filterIndexedTo(ArrayList<T>(), predicate)
+    val indices = selectTyped(monotonicallyIncreasingId().`as`<Long>())
+        // TODO this needs to zip, not join
+    val joined = indices.leftJoin(this, col(indices.columns().first()) neq -1L)
+    val filterResults = joined.map { (index, value) -> predicate(index, value!!) }
+    val filtered = selectTyped(col(filterResults.columns().first()).`as`<T>())
+
+    return filtered
 }