Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
package io.trino.operator;

import io.trino.spi.Page;
import io.trino.spi.PageBlockUtil;
import io.trino.spi.block.Block;
import io.trino.spi.block.ColumnarRow;
import io.trino.spi.block.RunLengthEncodedBlock;

import java.util.ArrayList;
import java.util.List;

import static com.google.common.base.Preconditions.checkArgument;
import static io.trino.spi.block.ColumnarRow.toColumnarRow;
import static io.trino.spi.block.RowBlock.getRowFieldsFromBlock;
import static io.trino.spi.predicate.Utils.nativeValueToBlock;
import static io.trino.spi.type.TinyintType.TINYINT;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -67,26 +67,24 @@ public Page transformPage(Page inputPage)
// TODO: Check with Karol to see if we can get empty pages
checkArgument(positionCount > 0, "positionCount should be > 0, but is %s", positionCount);

ColumnarRow mergeRow = toColumnarRow(inputPage.getBlock(mergeRowChannel));
checkArgument(!mergeRow.mayHaveNull(), "The mergeRow may not have null rows");

// We've verified that the mergeRow block has no null rows, so it's okay to get the field blocks

List<Block> builder = new ArrayList<>(dataColumnChannels.size() + 3);

Block mergeRow = inputPage.getBlock(mergeRowChannel).getLoadedBlock();
List<Block> fields = getRowFieldsFromBlock(mergeRow);
List<Block> builder = new ArrayList<>(dataColumnChannels.size() + 4);
for (int channel : dataColumnChannels) {
builder.add(mergeRow.getField(channel));
builder.add(fields.get(channel));
}
Block operationChannelBlock = mergeRow.getField(mergeRow.getFieldCount() - 2);
Block operationChannelBlock = fields.get(fields.size() - 2);
builder.add(operationChannelBlock);
Block caseNumberChannelBlock = fields.get(fields.size() - 1);
builder.add(caseNumberChannelBlock);
builder.add(inputPage.getBlock(rowIdChannel));
builder.add(RunLengthEncodedBlock.create(INSERT_FROM_UPDATE_BLOCK, positionCount));

Page result = new Page(builder.toArray(Block[]::new));

int defaultCaseCount = 0;
for (int position = 0; position < positionCount; position++) {
if (TINYINT.getByte(operationChannelBlock, position) == DEFAULT_CASE_OPERATION_NUMBER) {
if (mergeRow.isNull(position)) {
defaultCaseCount++;
}
}
Expand All @@ -97,14 +95,14 @@ public Page transformPage(Page inputPage)
int usedCases = 0;
int[] positions = new int[positionCount - defaultCaseCount];
for (int position = 0; position < positionCount; position++) {
if (TINYINT.getByte(operationChannelBlock, position) != DEFAULT_CASE_OPERATION_NUMBER) {
if (!mergeRow.isNull(position)) {
positions[usedCases] = position;
usedCases++;
}
}

checkArgument(usedCases + defaultCaseCount == positionCount, "usedCases (%s) + defaultCaseCount (%s) != positionCount (%s)", usedCases, defaultCaseCount, positionCount);

return result.getPositions(positions, 0, usedCases);
return PageBlockUtil.getPositions(result, positions, 0, usedCases);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,19 @@
import io.trino.spi.PageBuilder;
import io.trino.spi.block.Block;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.block.ColumnarRow;
import io.trino.spi.type.Type;

import java.util.List;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static io.trino.spi.block.ColumnarRow.toColumnarRow;
import static io.trino.spi.block.RowBlock.getRowFieldsFromBlock;
import static io.trino.spi.connector.ConnectorMergeSink.DELETE_OPERATION_NUMBER;
import static io.trino.spi.connector.ConnectorMergeSink.INSERT_OPERATION_NUMBER;
import static io.trino.spi.connector.ConnectorMergeSink.UPDATE_DELETE_OPERATION_NUMBER;
import static io.trino.spi.connector.ConnectorMergeSink.UPDATE_INSERT_OPERATION_NUMBER;
import static io.trino.spi.connector.ConnectorMergeSink.UPDATE_OPERATION_NUMBER;
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.TinyintType.TINYINT;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -100,8 +100,9 @@ public Page transformPage(Page inputPage)
int originalPositionCount = inputPage.getPositionCount();
checkArgument(originalPositionCount > 0, "originalPositionCount should be > 0, but is %s", originalPositionCount);

ColumnarRow mergeRow = toColumnarRow(inputPage.getBlock(mergeRowChannel));
Block operationChannelBlock = mergeRow.getField(mergeRow.getFieldCount() - 2);
Block mergeRow = inputPage.getBlock(mergeRowChannel);
List<Block> fields = getRowFieldsFromBlock(mergeRow);
Block operationChannelBlock = fields.get(fields.size() - 2);

int updatePositions = 0;
int insertPositions = 0;
Expand All @@ -123,6 +124,7 @@ public Page transformPage(Page inputPage)
List<Type> pageTypes = ImmutableList.<Type>builder()
.addAll(dataColumnTypes)
.add(TINYINT)
.add(INTEGER)
.add(rowIdType)
.add(TINYINT)
.build();
Expand All @@ -137,7 +139,7 @@ public Page transformPage(Page inputPage)
}
// Insert and update because both create an insert row
if (operation == INSERT_OPERATION_NUMBER || operation == UPDATE_OPERATION_NUMBER) {
addInsertRow(pageBuilder, mergeRow, position, operation != INSERT_OPERATION_NUMBER);
addInsertRow(pageBuilder, fields, position, operation != INSERT_OPERATION_NUMBER);
}
}
}
Expand Down Expand Up @@ -170,33 +172,39 @@ private void addDeleteRow(PageBuilder pageBuilder, Page originalPage, int positi
// Add the operation column == deleted
TINYINT.writeLong(pageBuilder.getBlockBuilder(dataColumnChannels.size()), causedByUpdate ? UPDATE_DELETE_OPERATION_NUMBER : DELETE_OPERATION_NUMBER);

// Add the dummy case number, delete and insert won't use it, use -1 to mark it shouldn't be used
INTEGER.writeLong(pageBuilder.getBlockBuilder(dataColumnChannels.size() + 1), -1);

// Copy row ID column
rowIdType.appendTo(originalPage.getBlock(rowIdChannel), position, pageBuilder.getBlockBuilder(dataColumnChannels.size() + 1));
rowIdType.appendTo(originalPage.getBlock(rowIdChannel), position, pageBuilder.getBlockBuilder(dataColumnChannels.size() + 2));

// Write 0, meaning this row is not an insert derived from an update
TINYINT.writeLong(pageBuilder.getBlockBuilder(dataColumnChannels.size() + 2), 0);
TINYINT.writeLong(pageBuilder.getBlockBuilder(dataColumnChannels.size() + 3), 0);

pageBuilder.declarePosition();
}

private void addInsertRow(PageBuilder pageBuilder, ColumnarRow mergeCaseBlock, int position, boolean causedByUpdate)
private void addInsertRow(PageBuilder pageBuilder, List<Block> fields, int position, boolean causedByUpdate)
{
// Copy the values from the merge block
for (int targetChannel : dataColumnChannels) {
Type columnType = dataColumnTypes.get(targetChannel);
BlockBuilder targetBlock = pageBuilder.getBlockBuilder(targetChannel);
// The value comes from that column of the page
columnType.appendTo(mergeCaseBlock.getField(targetChannel), position, targetBlock);
columnType.appendTo(fields.get(targetChannel), position, targetBlock);
}

// Add the operation column == insert
TINYINT.writeLong(pageBuilder.getBlockBuilder(dataColumnChannels.size()), causedByUpdate ? UPDATE_INSERT_OPERATION_NUMBER : INSERT_OPERATION_NUMBER);

// Add the dummy case number, delete and insert won't use it
INTEGER.writeLong(pageBuilder.getBlockBuilder(dataColumnChannels.size() + 1), 0);

// Add null row ID column
pageBuilder.getBlockBuilder(dataColumnChannels.size() + 1).appendNull();
pageBuilder.getBlockBuilder(dataColumnChannels.size() + 2).appendNull();

// Write 1 if this row is an insert derived from an update, 0 otherwise
TINYINT.writeLong(pageBuilder.getBlockBuilder(dataColumnChannels.size() + 2), causedByUpdate ? 1 : 0);
TINYINT.writeLong(pageBuilder.getBlockBuilder(dataColumnChannels.size() + 3), causedByUpdate ? 1 : 0);

pageBuilder.declarePosition();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@
import static io.airlift.slice.SizeOf.sizeOf;
import static io.trino.operator.output.PositionsAppenderUtil.calculateBlockResetSize;
import static io.trino.operator.output.PositionsAppenderUtil.calculateNewArraySize;
import static io.trino.spi.block.RowBlock.fromFieldBlocks;
import static io.trino.spi.block.RowBlock.fromNotNullSuppressedFieldBlocks;
import static java.util.Objects.requireNonNull;

public class RowPositionsAppender
implements PositionsAppender
{
private static final int INSTANCE_SIZE = instanceSize(RowPositionsAppender.class);
private final RowType type;
private final PositionsAppender[] fieldAppenders;
private int initialEntryCount;
private boolean initialized;
Expand All @@ -55,11 +56,12 @@ public static RowPositionsAppender createRowAppender(
for (int i = 0; i < fields.length; i++) {
fields[i] = positionsAppenderFactory.create(type.getFields().get(i).getType(), expectedPositions, maxPageSizeInBytes);
}
return new RowPositionsAppender(fields, expectedPositions);
return new RowPositionsAppender(type, fields, expectedPositions);
}

private RowPositionsAppender(PositionsAppender[] fieldAppenders, int expectedPositions)
private RowPositionsAppender(RowType type, PositionsAppender[] fieldAppenders, int expectedPositions)
{
this.type = type;
this.fieldAppenders = requireNonNull(fieldAppenders, "fields is null");
this.initialEntryCount = expectedPositions;
resetSize();
Expand Down Expand Up @@ -88,14 +90,17 @@ public void append(IntArrayList positions, Block block)

List<Block> fieldBlocks = sourceRowBlock.getChildren();
for (int i = 0; i < fieldAppenders.length; i++) {
fieldAppenders[i].append(nonNullPositions, fieldBlocks.get(i));
fieldAppenders[i].append(positions, fieldBlocks.get(i));
}
}
else if (allPositionsNull(positions, block)) {
// all input positions are null. We can handle that even if block type is not RowBLock.
// append positions.size() nulls
Arrays.fill(rowIsNull, positionCount, positionCount + positions.size(), true);
hasNullRow = true;
for (int i = 0; i < fieldAppenders.length; i++) {
fieldAppenders[i].append(positions, block);
}
}
else {
throw new IllegalArgumentException("unsupported block type: " + block);
Expand All @@ -113,6 +118,9 @@ public void appendRle(Block value, int rlePositionCount)
if (sourceRowBlock.isNull(0)) {
// append rlePositionCount nulls
Arrays.fill(rowIsNull, positionCount, positionCount + rlePositionCount, true);
for (int i = 0; i < fieldAppenders.length; i++) {
fieldAppenders[i].appendRle(value.getSingleValueBlock(0), rlePositionCount);
}
hasNullRow = true;
}
else {
Expand All @@ -128,6 +136,9 @@ public void appendRle(Block value, int rlePositionCount)
else if (value.isNull(0)) {
// append rlePositionCount nulls
Arrays.fill(rowIsNull, positionCount, positionCount + rlePositionCount, true);
for (int i = 0; i < fieldAppenders.length; i++) {
fieldAppenders[i].appendRle(value.getSingleValueBlock(0), rlePositionCount);
}
hasNullRow = true;
}
else {
Expand All @@ -145,6 +156,9 @@ public void append(int position, Block value)
if (sourceRowBlock.isNull(position)) {
rowIsNull[positionCount] = true;
hasNullRow = true;
for (int i = 0; i < fieldAppenders.length; i++) {
fieldAppenders[i].append(position, value);
}
}
else {
// append not null row value
Expand All @@ -159,6 +173,9 @@ public void append(int position, Block value)
else if (value.isNull(position)) {
rowIsNull[positionCount] = true;
hasNullRow = true;
for (int i = 0; i < fieldAppenders.length; i++) {
fieldAppenders[i].append(position, value);
}
}
else {
throw new IllegalArgumentException("unsupported block type: " + value);
Expand All @@ -176,10 +193,10 @@ public Block build()
}
Block result;
if (hasNonNullRow) {
result = fromFieldBlocks(positionCount, hasNullRow ? Optional.of(rowIsNull) : Optional.empty(), fieldBlocks);
result = fromNotNullSuppressedFieldBlocks(positionCount, hasNullRow ? Optional.of(rowIsNull) : Optional.empty(), fieldBlocks);
}
else {
Block nullRowBlock = fromFieldBlocks(1, Optional.of(new boolean[] {true}), fieldBlocks);
Block nullRowBlock = type.createNullBlock();
result = RunLengthEncodedBlock.create(nullRowBlock, positionCount);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,8 @@ public PlanNode plan(Delete node)
List<Symbol> columnSymbols = columnSymbolsBuilder.build();
Symbol operationSymbol = symbolAllocator.newSymbol("operation", TINYINT);
assignmentsBuilder.put(operationSymbol, new GenericLiteral("TINYINT", String.valueOf(DELETE_OPERATION_NUMBER)));
Symbol caseNumberSymbol = symbolAllocator.newSymbol("case_number", INTEGER);
assignmentsBuilder.put(caseNumberSymbol, new GenericLiteral("INTEGER", String.valueOf(0)));
Symbol projectedRowIdSymbol = symbolAllocator.newSymbol(rowIdSymbol.getName(), rowIdType);
assignmentsBuilder.put(projectedRowIdSymbol, rowIdSymbol.toSymbolReference());
assignmentsBuilder.put(symbolAllocator.newSymbol("insert_from_update", TINYINT), new GenericLiteral("TINYINT", "0"));
Expand Down Expand Up @@ -929,11 +931,13 @@ private MergeWriterNode createMergePipeline(Table table, RelationPlan relationPl
}

Symbol operationSymbol = symbolAllocator.newSymbol("operation", TINYINT);
Symbol caseNumberSymbol = symbolAllocator.newSymbol("case_number", INTEGER);
Symbol insertFromUpdateSymbol = symbolAllocator.newSymbol("insert_from_update", TINYINT);

List<Symbol> projectedSymbols = ImmutableList.<Symbol>builder()
.addAll(columnSymbols)
.add(operationSymbol)
.add(caseNumberSymbol)
.add(rowIdSymbol)
.add(insertFromUpdateSymbol)
.build();
Expand Down
Loading