Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ private static class PositionDeltaWriteFactory implements DeltaWriterFactory {
@Override
public DeltaWriter<InternalRow> createWriter(int partitionId, long taskId) {
int batch_size = SparkOptions.getBatchSize(config);
LanceArrowWriter arrowWriter = LanceDatasetAdapter.getArrowWriter(sparkSchema, batch_size);
LanceArrowWriter arrowWriter = LanceDatasetAdapter.getArrowWriter(sparkSchema, batch_size, config);
WriteParams params = SparkOptions.genWriteParamsFromConfig(config);
Callable<List<FragmentMetadata>> fragmentCreator =
() -> LanceDatasetAdapter.createFragment(config.getDatasetUri(), arrowWriter, params);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class SparkOptions {
private static final String max_bytes_per_file = "max_bytes_per_file";
private static final String batch_size = "batch_size";
private static final String topN_push_down = "topN_push_down";
private static final String arrow_max_allocation_bytes = "arrow.max.allocation.bytes";
private static final String arrow_var_width_avg_bytes = "arrow.var.width.avg.bytes";

public static ReadOptions genReadOptionFromConfig(LanceConfig config) {
ReadOptions.Builder builder = new ReadOptions.Builder();
Expand Down Expand Up @@ -88,4 +90,36 @@ public static boolean enableTopNPushDown(LanceConfig config) {
public static boolean overwrite(LanceConfig config) {
return config.getOptions().getOrDefault(write_mode, "append").equalsIgnoreCase("overwrite");
}

/**
* Get the maximum allocation size for Arrow buffers in bytes. Default is Long.MAX_VALUE to allow
* allocations beyond Integer.MAX_VALUE (2GB). Can be configured to a smaller value if needed to
* limit memory usage.
*
* @param config Lance configuration
* @return Maximum allocation size in bytes
*/
public static long getArrowMaxAllocationBytes(LanceConfig config) {
Map<String, String> options = config.getOptions();
if (options.containsKey(arrow_max_allocation_bytes)) {
return Long.parseLong(options.get(arrow_max_allocation_bytes));
}
return Long.MAX_VALUE;
}

/**
* Get the average number of bytes per element for variable-width vectors (strings, binary). This
* is used to pre-allocate buffers to avoid frequent reallocations. Default is 64 bytes, which is
* a conservative estimate for most workloads.
*
* @param config Lance configuration
* @return Average bytes per variable-width element
*/
public static int getArrowVarWidthAvgBytes(LanceConfig config) {
Map<String, String> options = config.getOptions();
if (options.containsKey(arrow_var_width_avg_bytes)) {
return Integer.parseInt(options.get(arrow_var_width_avg_bytes));
}
return 64;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,14 @@ public static FragmentMetadata deleteRows(
}
}

public static LanceArrowWriter getArrowWriter(StructType sparkSchema, int batchSize) {
public static LanceArrowWriter getArrowWriter(
StructType sparkSchema, int batchSize, LanceConfig config) {
return new LanceArrowWriter(
allocator,
LanceArrowUtils.toArrowSchema(sparkSchema, "UTC", false, false),
sparkSchema,
batchSize);
batchSize,
SparkOptions.getArrowVarWidthAvgBytes(config));
}

public static List<FragmentMetadata> createFragment(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class LanceArrowWriter extends ArrowReader {
private final Schema schema;
private final StructType sparkSchema;
private final int batchSize;
private final int avgBytesPerVarWidthElement;

@GuardedBy("monitor")
private volatile boolean finished = false;
Expand All @@ -43,13 +44,19 @@ public class LanceArrowWriter extends ArrowReader {
private final Semaphore loadToken;

public LanceArrowWriter(
BufferAllocator allocator, Schema schema, StructType sparkSchema, int batchSize) {
BufferAllocator allocator,
Schema schema,
StructType sparkSchema,
int batchSize,
int avgBytesPerVarWidthElement) {
super(allocator);
Preconditions.checkNotNull(schema);
Preconditions.checkArgument(batchSize > 0);
Preconditions.checkArgument(avgBytesPerVarWidthElement > 0);
this.schema = schema;
this.sparkSchema = sparkSchema;
this.batchSize = batchSize;
this.avgBytesPerVarWidthElement = avgBytesPerVarWidthElement;
this.writeToken = new Semaphore(0);
this.loadToken = new Semaphore(0);
}
Expand Down Expand Up @@ -79,7 +86,7 @@ public void prepareLoadNextBatch() throws IOException {
super.prepareLoadNextBatch();
arrowWriter =
com.lancedb.lance.spark.arrow.LanceArrowWriter$.MODULE$.create(
this.getVectorSchemaRoot(), sparkSchema);
this.getVectorSchemaRoot(), sparkSchema, batchSize, avgBytesPerVarWidthElement);
// release batch size token for write
writeToken.release(batchSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ protected WriterFactory(StructType schema, LanceConfig config) {
@Override
public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
int batch_size = SparkOptions.getBatchSize(config);
LanceArrowWriter arrowWriter = LanceDatasetAdapter.getArrowWriter(schema, batch_size);
LanceArrowWriter arrowWriter = LanceDatasetAdapter.getArrowWriter(schema, batch_size, config);
WriteParams params = SparkOptions.genWriteParamsFromConfig(config);
Callable<List<FragmentMetadata>> fragmentCreator =
() -> LanceDatasetAdapter.createFragment(config.getDatasetUri(), arrowWriter, params);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,58 @@ object LanceArrowWriter {
new LanceArrowWriter(root, children.toArray)
}

def create(
root: VectorSchemaRoot,
sparkSchema: StructType,
batchSize: Int,
avgBytesPerVarWidthElement: Int): LanceArrowWriter = {
val children = root.getFieldVectors().asScala.zipWithIndex.map { case (vector, index) =>
val sparkField = sparkSchema.fields(index)
allocateVectorWithSize(vector, batchSize, sparkField.dataType, avgBytesPerVarWidthElement)
createFieldWriter(vector, sparkField.dataType, sparkField.metadata)
}
new LanceArrowWriter(root, children.toArray)
}

private def allocateVectorWithSize(
vector: ValueVector,
batchSize: Int,
sparkType: DataType,
avgBytesPerVarWidthElement: Int): Unit = {
vector match {
// FixedSizeListVector: Calculate exact size = batchSize * listSize * elementSize
case fixedSizeList: FixedSizeListVector =>
val listSize = fixedSizeList.getListSize()
val totalElements = batchSize * listSize
val dataVector = fixedSizeList.getDataVector()

// Allocate the underlying data vector
dataVector match {
case fwv: FixedWidthVector =>
fwv.allocateNew(totalElements)
case vwv: BaseVariableWidthVector =>
// For variable-width elements in fixed-size lists, use configured estimate
vwv.allocateNew(totalElements * avgBytesPerVarWidthElement, totalElements)
case _ =>
dataVector.allocateNew()
}
fixedSizeList.setValueCount(batchSize)

// FixedWidthVectors: Allocate exact count
case fixedWidth: FixedWidthVector =>
fixedWidth.allocateNew(batchSize)

// VariableWidthVectors: Allocate with size estimate
case varWidth: BaseVariableWidthVector =>
// Use configured average bytes per element
varWidth.allocateNew(batchSize * avgBytesPerVarWidthElement, batchSize)

// Default: Use default allocation
case _ =>
vector.allocateNew()
}
}

private[arrow] def createFieldWriter(
vector: ValueVector,
sparkType: DataType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ public void test() throws Exception {

final int totalRows = 125;
final int batchSize = 34;
final int avgBytesPerVarWidthElement = 64;
final LanceArrowWriter arrowWriter =
new LanceArrowWriter(allocator, schema, sparkSchema, batchSize);
new LanceArrowWriter(
allocator, schema, sparkSchema, batchSize, avgBytesPerVarWidthElement);

AtomicInteger rowsWritten = new AtomicInteger(0);
AtomicInteger rowsRead = new AtomicInteger(0);
Expand Down
Loading