Skip to content

Conversation

@snazy
Copy link
Member

@snazy snazy commented Dec 12, 2025

API and implementations to perform long-running operations against object stores, mostly to purge files.

API and implementations to perform long-running operations against object stores, mostly to purge files.
Copy link
Contributor

@adam-christian-software adam-christian-software left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a fantastic PR! Thanks for pushing forward with this. I had some smaller questions, but this looks great.

Copy link
Contributor

@flyrain flyrain left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! This seems to overlap with the existing TableCleanupTaskHandler. My understanding is that the direction is to invest in a more general task execution framework (e.g., a delegation service outside of Polaris), rather than re-implementing file-level operations in individual features. It would be great to align this work with that direction.

@snazy
Copy link
Member Author

snazy commented Dec 20, 2025

Thanks Adam for the thorough review! Pushed a commit to address the findings.

@snazy snazy changed the title Object storage operations (proposal) Object storage operations Dec 20, 2025
Comment on lines +37 to +50
import org.projectnessie.catalog.formats.iceberg.IcebergSpec;
import org.projectnessie.catalog.formats.iceberg.manifest.IcebergDataContent;
import org.projectnessie.catalog.formats.iceberg.manifest.IcebergDataFile;
import org.projectnessie.catalog.formats.iceberg.manifest.IcebergFileFormat;
import org.projectnessie.catalog.formats.iceberg.manifest.IcebergManifestContent;
import org.projectnessie.catalog.formats.iceberg.manifest.IcebergManifestEntry;
import org.projectnessie.catalog.formats.iceberg.manifest.IcebergManifestEntryStatus;
import org.projectnessie.catalog.formats.iceberg.manifest.IcebergManifestFile;
import org.projectnessie.catalog.formats.iceberg.manifest.IcebergManifestFileWriter;
import org.projectnessie.catalog.formats.iceberg.manifest.IcebergManifestFileWriterSpec;
import org.projectnessie.catalog.formats.iceberg.manifest.IcebergManifestListWriter;
import org.projectnessie.catalog.formats.iceberg.manifest.IcebergManifestListWriterSpec;
import org.projectnessie.catalog.formats.iceberg.meta.IcebergPartitionSpec;
import org.projectnessie.catalog.formats.iceberg.meta.IcebergSchema;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think Polaris should depend on the Nessie project for Iceberg metadata. What if Iceberg itself changed, while Nessie doesn't change them or haven't change yet? cc @dennishuo @singhpk234 @adam-christian-software

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't see this before; it must have changed since I last looked it over. Personally, I'm not 100% sure what these classes do, however I agree that we should not be relying on Project Nessie for Iceberg-native classes. If these classes are needed, I would rather move them into Polaris rather than keep them in Project Nessie.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These classes are only needed for simpler tests, which are not possible or only possible with a lot of hacks with Iceberg Java code. These classes would only break, if the Iceberg spec changes in a breaking way.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it must have changed since I last looked it over.

Nope. Was in there since the very first commit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not fully agree with this.

These classes are not just test helpers. They introduce a parallel Iceberg metadata and behavior model, even if used only in tests today. That means they still encode assumptions about Iceberg semantics, which increases long term maintenance risk.

The idea that they would only break with a breaking Iceberg spec change is also optimistic. Iceberg often evolves through additive or clarifying changes that do not break the spec but can still invalidate assumptions in helper classes like these.

Finally, this creates an indirect dependency where Polaris has to wait for Nessie to catch up whenever Iceberg changes. That coupling feels unnecessary and makes it harder for Polaris to stay closely aligned with upstream Iceberg behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pingtimeout, @snazy, & @flyrain - I'm fine merging this in as-is with a quick follow up to remove the Nessie dependency. I played around with it and I have a version of IcebergFixtures that can rely directly on Iceberg without Project Nessie (see below).

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.polaris.storage.files.impl;

import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.UTF_8;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.IcebergBridge;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.PositionOutputStream;
import org.apache.iceberg.types.Types;

public class IcebergFixtures {
  public final Schema schema;
  public final PartitionSpec spec;
  public final TableMetadata tableMetadata;
  public final String tableMetadataString;
  public final byte[] tableMetadataBytes;

  public final String prefix;
  public final int numSnapshots;
  public final int numManifestFiles;
  public final int numDataFiles;

  public IcebergFixtures(String prefix, int numSnapshots, int numManifestFiles, int numDataFiles) {
    this.prefix = prefix;
    this.numSnapshots = numSnapshots;
    this.numManifestFiles = numManifestFiles;
    this.numDataFiles = numDataFiles;

    schema = new Schema(1, Types.NestedField.required(1, "foo", Types.StringType.get()));
    spec = PartitionSpec.unpartitioned();

    var tableMetadataBuilder =
        TableMetadata.buildFrom(
            TableMetadata.newTableMetadata(schema, spec, prefix, Map.of()).withUUID());
    for (var snapshotId = 1; snapshotId <= numSnapshots; snapshotId++) {
      var manifestList = manifestListPath(snapshotId);
      var snapshot =
          IcebergBridge.mockSnapshot(
              snapshotId + 1,
              snapshotId + 1,
              snapshotId > 0 ? (long) snapshotId : null,
              System.currentTimeMillis(),
              "APPEND",
              Map.of(),
              schema.schemaId(),
              manifestList,
              (long) numManifestFiles * numManifestFiles);
      tableMetadataBuilder.addSnapshot(snapshot);
    }
    tableMetadata = tableMetadataBuilder.build();

    tableMetadataString = TableMetadataParser.toJson(tableMetadata);
    tableMetadataBytes = tableMetadataString.getBytes(UTF_8);
  }

  public String manifestListPath(int snapshotId) {
    return format("%s%05d/snap-%d.avro", prefix, snapshotId, snapshotId);
  }

  public byte[] serializedManifestList(long snapshotId) {
    var output = new ByteArrayOutputStream();
    OutputFile outputFile = new InMemoryOutputFile(output);

    try (FileAppender<ManifestFile> manifestListWriter =
        Avro.write(outputFile)
            .schema(ManifestFile.schema())
            .named("manifest_file")
            .overwrite()
            .build()) {
      for (int i = 0; i < numManifestFiles; i++) {
        var manifestPath = manifestFilePath(snapshotId, i);

        // Create a manifest file and get its ManifestFile metadata
        var tempManifestOutput = new ByteArrayOutputStream();
        OutputFile tempManifestOutputFile =
            new InMemoryOutputFile(tempManifestOutput, manifestPath);
        ManifestWriter<DataFile> tempWriter =
            org.apache.iceberg.ManifestFiles.write(2, spec, tempManifestOutputFile, snapshotId);
        for (int j = 0; j < numDataFiles; j++) {
          var dataFilePath = format("%s%05d/%05d/%05d/data.parquet", prefix, snapshotId, i, j);
          DataFile dataFile =
              DataFiles.builder(spec)
                  .withFormat(FileFormat.PARQUET)
                  .withPath(dataFilePath)
                  .withFileSizeInBytes(1024)
                  .withRecordCount(1)
                  .build();
          tempWriter.add(dataFile);
        }
        tempWriter.close();
        ManifestFile manifestFile = tempWriter.toManifestFile();

        manifestListWriter.add(manifestFile);
      }
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
    return output.toByteArray();
  }

  public String manifestFilePath(long snapshotId, int file) {
    return format("%s%05d/%05d/xyz-m-manifest.avro", prefix, snapshotId, file);
  }

  public byte[] serializedManifestFile(long snapshotId, int manifest, String path) {
    var output = new ByteArrayOutputStream();
    OutputFile outputFile = new InMemoryOutputFile(output);

    try (ManifestWriter<DataFile> manifestFileWriter =
        org.apache.iceberg.ManifestFiles.write(2, spec, outputFile, snapshotId)) {
      for (int i = 0; i < numDataFiles; i++) {
        var dataFilePath = format("%s%05d/%05d/%05d/data.parquet", prefix, snapshotId, manifest, i);
        DataFile dataFile =
            DataFiles.builder(spec)
                .withFormat(FileFormat.PARQUET)
                .withPath(dataFilePath)
                .withFileSizeInBytes(1024)
                .withRecordCount(1)
                .build();
        manifestFileWriter.add(dataFile);
      }
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
    return output.toByteArray();
  }

  private record InMemoryOutputFile(ByteArrayOutputStream output, String path) implements OutputFile {
      InMemoryOutputFile(ByteArrayOutputStream output) {
        this(output, "in-memory");
      }

    @Override
      public PositionOutputStream create() {
        return new ByteArrayPositionOutputStream(output);
      }

      @Override
      public PositionOutputStream createOrOverwrite() {
        output.reset();
        return new ByteArrayPositionOutputStream(output);
      }

      @Override
      public String location() {
        return path;
      }

      @Override
      public InputFile toInputFile() {
        throw new UnsupportedOperationException("Not implemented");
      }
    }

  private static class ByteArrayPositionOutputStream
      extends org.apache.iceberg.io.PositionOutputStream {
    private final ByteArrayOutputStream output;

    ByteArrayPositionOutputStream(ByteArrayOutputStream output) {
      this.output = output;
    }

    @Override
    public long getPos() {
      return output.size();
    }

    @Override
    public void write(int b) throws IOException {
      output.write(b);
    }

    @Override
    public void write(byte[] b, int off, int len) throws IOException {
      output.write(b, off, len);
    }

    @Override
    public void close() throws IOException {
      output.close();
    }
  }
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd have to agree we probably shouldn't use Nessie as a dependency here, if we want to re-use the fixture we should probably just copy the code into this project. I'm also onboard with @adam-christian-software 's implementation above.

Long term probably makes sense for the Iceberg project to provide something along these lines that downstream projects can use. Another minor note is this only uses V2 Iceberg tables, we probably should be double checking against V3 as well.

pingtimeout
pingtimeout previously approved these changes Jan 6, 2026
Copy link
Contributor

@pingtimeout pingtimeout left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @snazy! This is a well-designed PR that introduces valuable functionality for Iceberg table maintenance operations. Apart from a couple suggestions, the PR LGTM.

I cannot think of any argument to block this PR if it reuses some classes defined in Project Nessie. That being said, I am not opposed to pulling said classes in Polaris, if this suits the community better. I recommend moving forward with this PR as-is and opening a Github issue so that the relevant classes are pulled in Polaris in a follow-up PR, for the sake of keeping this PR size manageable. Any strong objection ? cc @flyrain @dennishuo @singhpk234 @adam-christian-software

@github-project-automation github-project-automation bot moved this from PRs In Progress to Ready to merge in Basic Kanban Board Jan 6, 2026
…es/api/FileOperations.java

Co-authored-by: Pierre Laporte <[email protected]>
Copy link
Contributor

@dimas-b dimas-b left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I support @pingtimeout suggestion to unblock this PR if there are no strong objections. Follow-up changes are certainly welcome (including in the areas that current use Nessie tools for tests).

CC: @dennishuo @flyrain @adam-christian-software @singhpk234

public enum FileType {
UNKNOWN(false, false),
ICEBERG_METADATA(true, false),
ICEBERG_STATISTICS(false, false),
Copy link
Member

@RussellSpitzer RussellSpitzer Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be better to just say Puffin? (I assume that's what this is covering) because these could be Statistics or could be Delete Files and I guess theoretically both? Or is it for the parquet statistics files for partitions?

* views. Rate-limiting on a single invocation may not be effective as expected.
*
* @param viewMetadataLocation Iceberg view-metadata location
* @param deduplicate if true, attempt to deduplicate files by their location, adding additional
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again i'm not sure why we need a deduplicate since I would think we can get the unique listing without any additional processing, I may be missing something here.

PurgeSpec withPurgeIssuedCallback(Consumer<String> purgeIssuedCallback);

/**
* Optional rate-limit on the number of individual file-deletions per second.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

number of individual files deleted per second?

file-deletions just seems like the spec also is allowing for individual file deletions outside of bulk deletions

return fileSources.stream().flatMap(Function.identity());
}

static Predicate<String> deduplicator(boolean deduplicate) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mentioned this above, but I think we can skip the "deduplicator" logic all together here by using a bit more of the information in the Iceberg Table.

Starting with the oldest snapshot, add all manifests and entries, then for all newer snapshots add only manifests which have been marked as added and the entries within those which have been added.

This only gets messed up if there is are other snapshots without parents (branches without a common snapshot, orphaned WAP - that one is probably not possible?) so we could handle duplicates for that, but it should only require-deduping at the manifest level I think. We only need to check added manifests in the snapshot and make sure they are deduped and then we can use the added/existing status within those manifests.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not that any of this is important now, I just think it's something that could greatly reduce the amount of things required to deduplicate

switch (manifestFile.content()) {
case DATA -> ManifestFiles.read(manifestFile, fileIO).iterator();
case DELETES ->
ManifestFiles.readDeleteManifest(manifestFile, fileIO, specsById).iterator();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

V3 Makes this one a bit more dupe filled as well since delete vector entries here are not for individual files but for byte offsets in a single other file. So this would generate duplicates at least in the current layout

// Object stores do delete the files that exist, but a BulkDeletionFailureException is
// still being thrown.
// However, not all FileIO implementations behave the same way as some don't throw in the
// non-existent-case.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

.build(),
output)) {
for (int i = 0; i < numManifestFiles; i++) {
var manifestPath = manifestFilePath(snapshotId, i);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also be making delete manifests and files here for testing? Also possibly branching and WAP, not a high priority since I think the current snapshot iteration should cover everything.

Copy link
Member

@RussellSpitzer RussellSpitzer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks pretty straight forward to me, I think there are some optimizations we can do for the streaming approach of listing files in an Iceberg table but not blockers for the moment.

I do think the Nessie dependency in testing should probably be removed but that can happen after this PR is in. As I noted below I think either copying / reimplementing are both probably preferable to depending on another project.

Also added a few notes since I think we currently aren't testing V3 tables (with puffin delete vector files) and it probably would be good to also test on tables with branches or WAP commits.

@flyrain
Copy link
Contributor

flyrain commented Jan 8, 2026

Thank you very much for the work you have put into this PR, we really appreciate the effort.

The three of us, Adam, Prashant, and I, had a chance to sync offline and discuss it briefly. Would it be possible to pause for a short while so we can do the following:

  1. Double check whether the underlying issue we are trying to address is indeed valid.
  2. Explore a few additional options, for example whether we could reuse existing Iceberg native libraries instead of reimplementing similar functionality.

Thanks again for your patience and for all the work here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants