Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.sql.delta.files

import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.backendsapi.velox.VeloxBatchType
import org.apache.gluten.columnarbatch.VeloxColumnarBatches
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution._
import org.apache.gluten.execution.datasource.GlutenFormatFactory
Expand Down Expand Up @@ -45,6 +46,7 @@ import org.apache.spark.sql.execution.datasources.FileFormatWriter._
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.{SerializableConfiguration, Utils}

import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -590,43 +592,86 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
record match {
case carrierRow: BatchCarrierRow =>
carrierRow match {
case placeholderRow: PlaceholderRow =>
case _: PlaceholderRow =>
// Do nothing.
case terminalRow: TerminalRow =>
val numRows = terminalRow.batch().numRows()
if (numRows > 0) {
val blockStripes = GlutenFormatFactory.rowSplitter
.splitBlockByPartitionAndBucket(
terminalRow.batch(),
partitionColIndice,
isBucketed)
val iter = blockStripes.iterator()
while (iter.hasNext) {
val blockStripe = iter.next()
val headingRow = blockStripe.getHeadingRow
beforeWrite(headingRow)
val currentColumnBatch = blockStripe.getColumnarBatch
val numRowsOfCurrentColumnarBatch = currentColumnBatch.numRows()
assert(numRowsOfCurrentColumnarBatch > 0)
val currentTerminalRow = terminalRow.withNewBatch(currentColumnBatch)
currentWriter.write(currentTerminalRow)
statsTrackers.foreach {
tracker =>
tracker.newRow(currentWriter.path, currentTerminalRow)
for (_ <- 0 until numRowsOfCurrentColumnarBatch - 1) {
tracker.newRow(currentWriter.path, new PlaceholderRow())
}
}
currentColumnBatch.close()
}
blockStripes.release()
recordsInFile += numRows
}
writePartitionedBatch(terminalRow)
}
case _ =>
beforeWrite(record)
writeRecord(record)
}
}

private def writeCurrentBatch(terminalRow: TerminalRow, rowCount: Int): Unit = {
assert(rowCount > 0)
currentWriter.write(terminalRow)
statsTrackers.foreach(_.newRow(currentWriter.path, terminalRow))
recordsInFile += rowCount
}

private def writeCurrentBatchWithMaxRecords(
terminalRow: TerminalRow,
columnBatch: ColumnarBatch): Unit = {
val numRows = columnBatch.numRows()
var offset = 0
while (offset < numRows) {
val rowsRemaining = numRows - offset
val rowsToWrite = if (description.maxRecordsPerFile > 0) {
if (recordsInFile >= description.maxRecordsPerFile) {
renewCurrentWriterIfTooManyRecords(currentPartitionValues, currentBucketId)
}
math.min(rowsRemaining.toLong, description.maxRecordsPerFile - recordsInFile).toInt
} else {
rowsRemaining
}

assert(rowsToWrite > 0)
val batchToWrite =
if (offset == 0 && rowsToWrite == numRows) {
columnBatch
} else {
VeloxColumnarBatches.slice(columnBatch, offset, rowsToWrite)
}
try {
writeCurrentBatch(terminalRow.withNewBatch(batchToWrite), rowsToWrite)
} finally {
if (batchToWrite ne columnBatch) {
batchToWrite.close()
}
}
offset += rowsToWrite
}
}

private def writePartitionStripe(terminalRow: TerminalRow, blockStripe: BlockStripe): Unit = {
beforeWrite(blockStripe.getHeadingRow)
val currentColumnBatch = blockStripe.getColumnarBatch
try {
assert(currentColumnBatch.numRows() > 0)
writeCurrentBatchWithMaxRecords(terminalRow, currentColumnBatch)
} finally {
currentColumnBatch.close()
}
}

private def writePartitionedBatch(terminalRow: TerminalRow): Unit = {
val numRows = terminalRow.batch().numRows()
if (numRows > 0) {
val blockStripes = GlutenFormatFactory.rowSplitter
.splitBlockByPartitionAndBucket(
terminalRow.batch(),
partitionColIndice,
isBucketed)
try {
val iter = blockStripes.iterator()
while (iter.hasNext) {
writePartitionStripe(terminalRow, iter.next())
}
} finally {
blockStripes.release()
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.sql.delta.files

import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.backendsapi.velox.VeloxBatchType
import org.apache.gluten.columnarbatch.VeloxColumnarBatches
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution._
import org.apache.gluten.execution.datasource.GlutenFormatFactory
Expand Down Expand Up @@ -46,6 +47,7 @@ import org.apache.spark.sql.execution.datasources.FileFormatWriter._
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.{SerializableConfiguration, Utils}

import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -583,42 +585,84 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
record match {
case carrierRow: BatchCarrierRow =>
carrierRow match {
case placeholderRow: PlaceholderRow =>
case _: PlaceholderRow =>
// Do nothing.
case terminalRow: TerminalRow =>
val numRows = terminalRow.batch().numRows()
if (numRows > 0) {
val blockStripes = GlutenFormatFactory.rowSplitter
.splitBlockByPartitionAndBucket(terminalRow.batch(), partitionColIndice,
isBucketed)
val iter = blockStripes.iterator()
while (iter.hasNext) {
val blockStripe = iter.next()
val headingRow = blockStripe.getHeadingRow
beforeWrite(headingRow)
val currentColumnBatch = blockStripe.getColumnarBatch
val numRowsOfCurrentColumnarBatch = currentColumnBatch.numRows()
assert(numRowsOfCurrentColumnarBatch > 0)
val currentTerminalRow = terminalRow.withNewBatch(currentColumnBatch)
currentWriter.write(currentTerminalRow)
statsTrackers.foreach {
tracker =>
tracker.newRow(currentWriter.path, currentTerminalRow)
for (_ <- 0 until numRowsOfCurrentColumnarBatch - 1) {
tracker.newRow(currentWriter.path, new PlaceholderRow())
}
}
currentColumnBatch.close()
}
blockStripes.release()
recordsInFile += numRows
}
writePartitionedBatch(terminalRow)
}
case _ =>
beforeWrite(record)
writeRecord(record)
}
}

private def writeCurrentBatch(terminalRow: TerminalRow, rowCount: Int): Unit = {
assert(rowCount > 0)
currentWriter.write(terminalRow)
statsTrackers.foreach(_.newRow(currentWriter.path, terminalRow))
recordsInFile += rowCount
}

private def writeCurrentBatchWithMaxRecords(
terminalRow: TerminalRow,
columnBatch: ColumnarBatch): Unit = {
val numRows = columnBatch.numRows()
var offset = 0
while (offset < numRows) {
val rowsRemaining = numRows - offset
val rowsToWrite = if (description.maxRecordsPerFile > 0) {
if (recordsInFile >= description.maxRecordsPerFile) {
renewCurrentWriterIfTooManyRecords(currentPartitionValues, currentBucketId)
}
math.min(rowsRemaining.toLong, description.maxRecordsPerFile - recordsInFile).toInt
} else {
rowsRemaining
}

assert(rowsToWrite > 0)
val batchToWrite =
if (offset == 0 && rowsToWrite == numRows) {
columnBatch
} else {
VeloxColumnarBatches.slice(columnBatch, offset, rowsToWrite)
}
try {
writeCurrentBatch(terminalRow.withNewBatch(batchToWrite), rowsToWrite)
} finally {
if (batchToWrite ne columnBatch) {
batchToWrite.close()
}
}
offset += rowsToWrite
}
}

private def writePartitionStripe(terminalRow: TerminalRow, blockStripe: BlockStripe): Unit = {
beforeWrite(blockStripe.getHeadingRow)
val currentColumnBatch = blockStripe.getColumnarBatch
try {
assert(currentColumnBatch.numRows() > 0)
writeCurrentBatchWithMaxRecords(terminalRow, currentColumnBatch)
} finally {
currentColumnBatch.close()
}
}

private def writePartitionedBatch(terminalRow: TerminalRow): Unit = {
val numRows = terminalRow.batch().numRows()
if (numRows > 0) {
val blockStripes = GlutenFormatFactory.rowSplitter
.splitBlockByPartitionAndBucket(terminalRow.batch(), partitionColIndice, isBucketed)
try {
val iter = blockStripes.iterator()
while (iter.hasNext) {
writePartitionStripe(terminalRow, iter.next())
}
} finally {
blockStripes.release()
}
}
}
}
}
// spotless:on
Loading
Loading