Skip to content

Commit 6e927e2

Browse files
committed
paimon: expanded tests for stats extractor
1 parent 203b651 commit 6e927e2

File tree

4 files changed

+240
-82
lines changed

4 files changed

+240
-82
lines changed

xtable-core/src/main/java/org/apache/xtable/paimon/PaimonDataFileExtractor.java

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.paimon.Snapshot;
2828
import org.apache.paimon.data.BinaryArray;
2929
import org.apache.paimon.data.BinaryRow;
30+
import org.apache.paimon.data.Timestamp;
3031
import org.apache.paimon.io.DataFileMeta;
3132
import org.apache.paimon.manifest.ManifestEntry;
3233
import org.apache.paimon.stats.SimpleStats;
@@ -62,13 +63,11 @@ public List<InternalDataFile> toInternalDataFiles(
6263
while (manifestEntryIterator.hasNext()) {
6364
result.add(toInternalDataFile(table, manifestEntryIterator.next(), internalSchema));
6465
}
65-
log.info("PaimonPartitionExtractor: Returning " + result.size() + " data files for " + table.name());
6666
return result;
6767
}
6868

6969
private InternalDataFile toInternalDataFile(
7070
FileStoreTable table, ManifestEntry entry, InternalSchema internalSchema) {
71-
// log.info("Adding manifest entry {}", entry.fileName());
7271
return InternalDataFile.builder()
7372
.physicalPath(toFullPhysicalPath(table, entry))
7473
.fileSizeBytes(entry.file().fileSize())
@@ -102,9 +101,7 @@ private List<ColumnStat> toColumnStats(DataFileMeta file, InternalSchema interna
102101
// stats for all columns are present in valueStats, we can safely ignore file.keyStats() - TODO: validate this assumption
103102
SimpleStats valueStats = file.valueStats();
104103
if (valueStats != null) {
105-
// log.info("Processing valueStats: {}", valueStats.toRow());
106104
List<String> colNames = file.valueStatsCols();
107-
// log.info("valueStatsCols: {}", colNames);
108105
if (colNames == null || colNames.isEmpty()) {
109106
// if column names are not present, we assume all columns in the schema are present in the same order as the schema - TODO: validate this assumption
110107
colNames =
@@ -137,11 +134,6 @@ private void extractStats(
137134
BinaryRow maxValues = stats.maxValues();
138135
BinaryArray nullCounts = stats.nullCounts();
139136

140-
// log.info("Extracting stats for columns: {}", colNames);
141-
// log.info("minValues: arity={}, {}", minValues.getFieldCount(), minValues);
142-
// log.info("maxValues: arity={}, {}", maxValues.getFieldCount(), maxValues);
143-
// log.info("fieldMap: {}", fieldMap.toString());
144-
145137
for (int i = 0; i < colNames.size(); i++) {
146138
String colName = colNames.get(i);
147139
InternalField field = fieldMap.get(colName);
@@ -161,14 +153,6 @@ private void extractStats(
161153
Object max = getValue(maxValues, i, type, field.getSchema());
162154
Long nullCount = (nullCounts != null && i < nullCounts.size()) ? nullCounts.getLong(i) : 0L;
163155

164-
// log.info(
165-
// "Column: {}, Index: {}, Min: {}, Max: {}, NullCount: {}",
166-
// colName,
167-
// i,
168-
// min,
169-
// max,
170-
// nullCount);
171-
172156
columnStats.add(
173157
ColumnStat.builder()
174158
.field(field)
@@ -210,16 +194,14 @@ private Object getValue(BinaryRow row, int index, InternalType type, InternalSch
210194
fieldSchema.getName());
211195
tsPrecision = TimestampType.DEFAULT_PRECISION;
212196
}
213-
// TODO: BinaryRow.getTimestamp().toInstant() is deprecated (use LocalZoneTimestamp), but BinaryRow does not have a method to get LocalZoneTimestamp?
214-
Instant timestamp = row.getTimestamp(index, tsPrecision).toInstant();
215-
long tsMillis = timestamp.toEpochMilli();
197+
Timestamp ts = row.getTimestamp(index, tsPrecision);
216198

217199
// according to docs for org.apache.xtable.model.stat.Range, timestamp is stored as millis
218200
// or micros - even if precision is higher than micros, return micros
219201
if (tsPrecisionEnum == InternalSchema.MetadataValue.MILLIS) {
220-
return tsMillis;
202+
return ts.getMillisecond();
221203
} else {
222-
return tsMillis * 1000 + timestamp.getNano() / 1000L;
204+
return ts.toMicros();
223205
}
224206
case FLOAT:
225207
return row.getFloat(index);

xtable-core/src/test/java/org/apache/xtable/TestPaimonTable.java

Lines changed: 44 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,23 @@ public static GenericTable<GenericRow, String> createTable(
7070
Path tempDir,
7171
Configuration hadoopConf,
7272
boolean additionalColumns) {
73+
74+
Schema schema = buildGenericSchema(partitionField, additionalColumns);
75+
return createTable(
76+
tableName, partitionField, tempDir, hadoopConf, additionalColumns, schema);
77+
}
78+
79+
public static GenericTable<GenericRow, String> createTable(
80+
String tableName,
81+
String partitionField,
82+
Path tempDir,
83+
Configuration hadoopConf,
84+
boolean additionalColumns,
85+
Schema schema) {
7386
String basePath = initBasePath(tempDir, tableName);
7487
Catalog catalog = createFilesystemCatalog(basePath, hadoopConf);
75-
FileStoreTable paimonTable = createTable(catalog, partitionField, additionalColumns);
88+
FileStoreTable paimonTable =
89+
createTable(catalog, tableName, schema);
7690

7791
System.out.println(
7892
"Initialized Paimon test table at base path: "
@@ -91,19 +105,20 @@ public static Catalog createFilesystemCatalog(String basePath, Configuration had
91105
}
92106

93107
public static FileStoreTable createTable(
94-
Catalog catalog, String partitionField, boolean additionalColumns) {
108+
Catalog catalog,
109+
String tableName,
110+
Schema schema) {
95111
try {
96112
catalog.createDatabase("test_db", true);
97-
Identifier identifier = Identifier.create("test_db", "test_table");
98-
Schema schema = buildSchema(partitionField, additionalColumns);
113+
Identifier identifier = Identifier.create("test_db", tableName);
99114
catalog.createTable(identifier, schema, true);
100115
return (FileStoreTable) catalog.getTable(identifier);
101116
} catch (Exception e) {
102117
throw new RuntimeException(e);
103118
}
104119
}
105120

106-
private static Schema buildSchema(String partitionField, boolean additionalColumns) {
121+
private static Schema buildGenericSchema(String partitionField, boolean additionalColumns) {
107122
Schema.Builder builder =
108123
Schema.newBuilder()
109124
.primaryKey("id")
@@ -179,20 +194,12 @@ public List<GenericRow> insertRecordsForSpecialPartition(int numRows) {
179194
}
180195

181196
private List<GenericRow> insertRecordsToPartition(int numRows, String partitionValue) {
182-
BatchWriteBuilder batchWriteBuilder = paimonTable.newBatchWriteBuilder();
183-
try (BatchTableWrite writer = batchWriteBuilder.newWrite()) {
184-
List<GenericRow> rows = new ArrayList<>(numRows);
185-
for (int i = 0; i < numRows; i++) {
186-
GenericRow row = buildGenericRow(i, paimonTable.schema(), partitionValue);
187-
writer.write(row);
188-
rows.add(row);
189-
}
190-
commitWrites(batchWriteBuilder, writer);
191-
compactTable();
192-
return rows;
193-
} catch (Exception e) {
194-
throw new RuntimeException("Failed to insert rows into Paimon table", e);
197+
List<GenericRow> rows = new ArrayList<>(numRows);
198+
for (int i = 0; i < numRows; i++) {
199+
rows.add(buildGenericRow(i, paimonTable.schema(), partitionValue));
195200
}
201+
writeRows(paimonTable, rows);
202+
return rows;
196203
}
197204

198205
@Override
@@ -225,8 +232,12 @@ public void deleteRows(List<GenericRow> rows) {
225232
}
226233

227234
private void compactTable() {
228-
BatchWriteBuilder batchWriteBuilder = paimonTable.newBatchWriteBuilder();
229-
SnapshotReader snapshotReader = paimonTable.newSnapshotReader();
235+
compactTable(paimonTable);
236+
}
237+
238+
public static void compactTable(FileStoreTable table) {
239+
BatchWriteBuilder batchWriteBuilder = table.newBatchWriteBuilder();
240+
SnapshotReader snapshotReader = table.newSnapshotReader();
230241
try (BatchTableWrite writer = batchWriteBuilder.newWrite()) {
231242
for (BucketEntry bucketEntry : snapshotReader.bucketEntries()) {
232243
writer.compact(bucketEntry.partition(), bucketEntry.bucket(), true);
@@ -237,6 +248,19 @@ private void compactTable() {
237248
}
238249
}
239250

251+
public static void writeRows(FileStoreTable table, List<GenericRow> rows) {
252+
BatchWriteBuilder batchWriteBuilder = table.newBatchWriteBuilder();
253+
try (BatchTableWrite writer = batchWriteBuilder.newWrite()) {
254+
for (GenericRow row : rows) {
255+
writer.write(row);
256+
}
257+
commitWrites(batchWriteBuilder, writer);
258+
compactTable(table);
259+
} catch (Exception e) {
260+
throw new RuntimeException("Failed to write rows into Paimon table", e);
261+
}
262+
}
263+
240264
private static void commitWrites(BatchWriteBuilder batchWriteBuilder, BatchTableWrite writer)
241265
throws Exception {
242266
BatchTableCommit commit = batchWriteBuilder.newCommit();

xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonConversionSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ void testGetTableWithUnpartitionedTable() {
9898
InternalTable result = unpartitionedSource.getTable(snapshot);
9999

100100
assertNotNull(result);
101-
assertEquals("test_table", result.getName());
101+
assertEquals("unpartitioned_table", result.getName());
102102
assertEquals(TableFormat.PAIMON, result.getTableFormat());
103103
assertNotNull(result.getReadSchema());
104104
assertEquals(DataLayoutStrategy.HIVE_STYLE_PARTITION, result.getLayoutStrategy());

0 commit comments

Comments
 (0)