From 9059bce977cee98e2d65769622c46a1941c563dd Mon Sep 17 00:00:00 2001 From: hongdd Date: Wed, 29 Apr 2020 18:47:27 +0800 Subject: [PATCH] [HUDI-702] Add test for HoodieLogFileCommand (#1522) --- .../hudi/cli/HoodieTableHeaderFields.java | 12 +- .../cli/commands/HoodieLogFileCommand.java | 19 +- .../commands/TestHoodieLogFileCommand.java | 222 ++++++++++++++++++ 3 files changed, 246 insertions(+), 7 deletions(-) create mode 100644 hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java index 92564937380d..3b398e3b9a2f 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java @@ -26,15 +26,25 @@ public class HoodieTableHeaderFields { public static final String HEADER_PARTITION_PATH = HEADER_PARTITION + " Path"; public static final String HEADER_FILE_ID = "FileId"; public static final String HEADER_BASE_INSTANT = "Base-Instant"; - + public static final String HEADER_INSTANT_TIME = "InstantTime"; public static final String HEADER_CLEAN_TIME = "CleanTime"; public static final String HEADER_EARLIEST_COMMAND_RETAINED = "EarliestCommandRetained"; public static final String HEADER_CLEANING_POLICY = "Cleaning policy"; + public static final String HEADER_TOTAL_FILES_DELETED = "Total Files Deleted"; public static final String HEADER_TOTAL_FILES_SUCCESSFULLY_DELETED = "Total Files Successfully Deleted"; public static final String HEADER_TOTAL_FAILED_DELETIONS = "Total Failed Deletions"; public static final String HEADER_TOTAL_TIME_TAKEN = "Total Time Taken"; + /** + * Fields of log file. + */ + public static final String HEADER_RECORDS = "Records"; + public static final String HEADER_RECORD_COUNT = "RecordCount"; + public static final String HEADER_BLOCK_TYPE = "BlockType"; + public static final String HEADER_HEADER_METADATA = "HeaderMetadata"; + public static final String HEADER_FOOTER_METADATA = "FooterMetadata"; + /** * Fields of data header. */ diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java index db1ab164fb1a..f7c4d653d07e 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java @@ -20,6 +20,7 @@ import org.apache.hudi.cli.HoodieCLI; import org.apache.hudi.cli.HoodiePrintHelper; +import org.apache.hudi.cli.HoodieTableHeaderFields; import org.apache.hudi.cli.TableHeader; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; @@ -53,6 +54,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -134,7 +136,6 @@ public String showLogFileCommits( reader.close(); } List rows = new ArrayList<>(); - int i = 0; ObjectMapper objectMapper = new ObjectMapper(); for (Map.Entry, Map>, Integer>>> entry : commitCountAndMetadata .entrySet()) { @@ -148,12 +149,14 @@ public String showLogFileCommits( output[3] = objectMapper.writeValueAsString(tuple3._2()._1()); output[4] = objectMapper.writeValueAsString(tuple3._2()._2()); rows.add(output); - i++; } } - TableHeader header = new TableHeader().addTableHeaderField("InstantTime").addTableHeaderField("RecordCount") - .addTableHeaderField("BlockType").addTableHeaderField("HeaderMetadata").addTableHeaderField("FooterMetadata"); + TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_INSTANT_TIME) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_RECORD_COUNT) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_BLOCK_TYPE) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_HEADER_METADATA) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_FOOTER_METADATA); return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, limit, headerOnly, rows); } @@ -173,7 +176,11 @@ public String showLogFileRecords( HoodieTableMetaClient client = HoodieCLI.getTableMetaClient(); FileSystem fs = client.getFs(); List logFilePaths = Arrays.stream(fs.globStatus(new Path(logFilePathPattern))) - .map(status -> status.getPath().toString()).collect(Collectors.toList()); + .map(status -> status.getPath().toString()).sorted(Comparator.reverseOrder()) + .collect(Collectors.toList()); + + // logFilePaths size must > 1 + assert logFilePaths.size() > 0 : "There is no log file"; // TODO : readerSchema can change across blocks/log files, fix this inside Scanner AvroSchemaConverter converter = new AvroSchemaConverter(); @@ -232,6 +239,6 @@ public String showLogFileRecords( rows[i] = data; i++; } - return HoodiePrintHelper.print(new String[] {"Records"}, rows); + return HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_RECORDS}, rows); } } diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java new file mode 100644 index 000000000000..b0d2504193a5 --- /dev/null +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cli.commands; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.cli.AbstractShellIntegrationTest; +import org.apache.hudi.cli.HoodieCLI; +import org.apache.hudi.cli.HoodieTableHeaderFields; +import org.apache.hudi.cli.HoodiePrintHelper; +import org.apache.hudi.cli.TableHeader; +import org.apache.hudi.cli.common.HoodieTestCommitMetadataGenerator; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.log.HoodieLogFormat; +import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; +import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; +import org.apache.hudi.common.table.log.block.HoodieLogBlock; +import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.SchemaTestUtil; +import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieMemoryConfig; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.BeforeEach; +import org.springframework.shell.core.CommandResult; + +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.util.SchemaTestUtil.getSimpleSchema; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Test Cases for {@link HoodieLogFileCommand}. + */ +public class TestHoodieLogFileCommand extends AbstractShellIntegrationTest { + + private String partitionPath; + private HoodieAvroDataBlock dataBlock; + private String tablePath; + + private static final String INSTANT_TIME = "100"; + + @BeforeEach + public void init() throws IOException, InterruptedException, URISyntaxException { + HoodieCLI.conf = jsc.hadoopConfiguration(); + + // Create table and connect + String tableName = "test_table"; + tablePath = basePath + File.separator + tableName; + partitionPath = tablePath + File.separator + HoodieTestCommitMetadataGenerator.DEFAULT_FIRST_PARTITION_PATH; + new TableCommand().createTable( + tablePath, tableName, HoodieTableType.MERGE_ON_READ.name(), + "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload"); + + Files.createDirectories(Paths.get(partitionPath)); + + HoodieLogFormat.Writer writer = null; + try { + writer = + HoodieLogFormat.newWriterBuilder().onParentPath(new Path(partitionPath)) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION) + .withFileId("test-log-fileid1").overBaseCommit("100").withFs(fs).build(); + + // write data to file + List records = SchemaTestUtil.generateTestRecords(0, 100); + Map header = new HashMap<>(); + header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, INSTANT_TIME); + header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString()); + dataBlock = new HoodieAvroDataBlock(records, header); + writer = writer.appendBlock(dataBlock); + } finally { + if (writer != null) { + writer.close(); + } + } + } + + /** + * Test case for 'show logfile metadata'. + */ + @Test + public void testShowLogFileCommits() throws JsonProcessingException { + CommandResult cr = getShell().executeCommand("show logfile metadata --logFilePathPattern " + partitionPath + "/*"); + assertTrue(cr.isSuccess()); + + TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_INSTANT_TIME) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_RECORD_COUNT) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_BLOCK_TYPE) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_HEADER_METADATA) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_FOOTER_METADATA); + + // construct expect result, there is only 1 line. + List rows = new ArrayList<>(); + ObjectMapper objectMapper = new ObjectMapper(); + String headerStr = objectMapper.writeValueAsString(dataBlock.getLogBlockHeader()); + String footerStr = objectMapper.writeValueAsString(dataBlock.getLogBlockFooter()); + Comparable[] output = new Comparable[]{INSTANT_TIME, 100, dataBlock.getBlockType(), headerStr, footerStr}; + rows.add(output); + + String expected = HoodiePrintHelper.print(header, new HashMap<>(), "", false, -1, false, rows); + + assertEquals(expected, cr.getResult().toString()); + } + + /** + * Test case for 'show logfile records'. + */ + @Test + public void testShowLogFileRecords() throws IOException, URISyntaxException { + CommandResult cr = getShell().executeCommand("show logfile records --logFilePathPattern " + partitionPath + "/*"); + assertTrue(cr.isSuccess()); + + // construct expect result, get 10 records. + List records = SchemaTestUtil.generateTestRecords(0, 10); + String[][] rows = records.stream().map(r -> new String[]{r.toString()}).toArray(String[][]::new); + String expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_RECORDS}, rows); + + assertEquals(expected, cr.getResult().toString()); + } + + /** + * Test case for 'show logfile records' with merge. + */ + @Test + public void testShowLogFileRecordsWithMerge() throws IOException, InterruptedException, URISyntaxException { + // create commit instant + HoodieTestCommitMetadataGenerator.createCommitFile(tablePath, INSTANT_TIME, HoodieCLI.conf); + + // write to path '2015/03/16'. + Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); + partitionPath = tablePath + File.separator + HoodieTestCommitMetadataGenerator.DEFAULT_SECOND_PARTITION_PATH; + Files.createDirectories(Paths.get(partitionPath)); + + HoodieLogFormat.Writer writer = null; + try { + // set little threshold to split file. + writer = + HoodieLogFormat.newWriterBuilder().onParentPath(new Path(partitionPath)) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION) + .withFileId("test-log-fileid1").overBaseCommit(INSTANT_TIME).withFs(fs).withSizeThreshold(500).build(); + + List records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100); + Map header = new HashMap<>(); + header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, INSTANT_TIME); + header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); + HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header); + writer = writer.appendBlock(dataBlock); + } finally { + if (writer != null) { + writer.close(); + } + } + + CommandResult cr = getShell().executeCommand("show logfile records --logFilePathPattern " + + partitionPath + "/* --mergeRecords true"); + assertTrue(cr.isSuccess()); + + // get expected result of 10 records. + List logFilePaths = Arrays.stream(fs.globStatus(new Path(partitionPath + "/*"))) + .map(status -> status.getPath().toString()).collect(Collectors.toList()); + HoodieMergedLogRecordScanner scanner = + new HoodieMergedLogRecordScanner(fs, tablePath, logFilePaths, schema, INSTANT_TIME, + HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES, + Boolean.parseBoolean(HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED), + Boolean.parseBoolean(HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED), + HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE, + HoodieMemoryConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH); + + Iterator> records = scanner.iterator(); + int num = 0; + int maxSize = 10; + List indexRecords = new ArrayList<>(); + while (records.hasNext() && num < maxSize) { + Option hoodieRecord = records.next().getData().getInsertValue(schema); + indexRecords.add(hoodieRecord.get()); + num++; + } + String[][] rows = indexRecords.stream().map(r -> new String[]{r.toString()}).toArray(String[][]::new); + assertNotNull(rows); + + String expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_RECORDS}, rows); + + assertEquals(expected, cr.getResult().toString()); + } +}