diff --git a/Cargo.toml b/Cargo.toml
index efff593b3..71809fdb7 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -18,11 +18,12 @@
 [workspace]
 resolver = "2"
 members = [
-  "crates/catalog/*",
-  "crates/examples",
-  "crates/iceberg",
-  "crates/integrations/*",
-  "crates/test_utils",
+    "crates/catalog/*",
+    "crates/examples",
+    "crates/iceberg",
+    "crates/integration_tests",
+    "crates/integrations/*",
+    "crates/test_utils",
 ]
 exclude = ["bindings/python"]
 
diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs
index 6905e17c7..fce5fe2be 100644
--- a/crates/catalog/rest/src/catalog.rs
+++ b/crates/catalog/rest/src/catalog.rs
@@ -1376,7 +1376,7 @@ mod tests {
             .with_schema_id(0)
             .with_summary(Summary {
                 operation: Operation::Append,
-                other: HashMap::from_iter([
+                additional_properties: HashMap::from_iter([
                     ("spark.app.id", "local-1646787004168"),
                     ("added-data-files", "1"),
                     ("added-records", "1"),
diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs
index 9430447de..b897d1574 100644
--- a/crates/iceberg/src/catalog/mod.rs
+++ b/crates/iceberg/src/catalog/mod.rs
@@ -1423,7 +1423,7 @@ mod tests {
                 .with_schema_id(1)
                 .with_summary(Summary {
                     operation: Operation::Append,
-                    other: HashMap::default(),
+                    additional_properties: HashMap::default(),
                 })
                 .build(),
         };
@@ -1457,7 +1457,7 @@ mod tests {
                 .with_manifest_list("s3://a/b/2.avro")
                 .with_summary(Summary {
                     operation: Operation::Append,
-                    other: HashMap::default(),
+                    additional_properties: HashMap::default(),
                 })
                 .build(),
         };
diff --git a/crates/iceberg/src/io/object_cache.rs b/crates/iceberg/src/io/object_cache.rs
index 35b6a2c94..88e2d0e2d 100644
--- a/crates/iceberg/src/io/object_cache.rs
+++ b/crates/iceberg/src/io/object_cache.rs
@@ -185,7 +185,7 @@ mod tests {
     use crate::spec::{
         DataContentType, DataFileBuilder, DataFileFormat, FormatVersion, Literal, Manifest,
         ManifestContentType, ManifestEntry, ManifestListWriter, ManifestMetadata, ManifestStatus,
-        ManifestWriter, Struct, TableMetadata, EMPTY_SNAPSHOT_ID,
+        ManifestWriter, Struct, TableMetadata,
     };
     use crate::table::Table;
     use crate::TableIdent;
@@ -293,9 +293,7 @@ mod tests {
                     .new_output(current_snapshot.manifest_list())
                     .unwrap(),
                 current_snapshot.snapshot_id(),
-                current_snapshot
-                    .parent_snapshot_id()
-                    .unwrap_or(EMPTY_SNAPSHOT_ID),
+                current_snapshot.parent_snapshot_id(),
                 current_snapshot.sequence_number(),
             );
             manifest_list_write
diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs
index 89e8846f0..8f0bc38f6 100644
--- a/crates/iceberg/src/scan.rs
+++ b/crates/iceberg/src/scan.rs
@@ -977,7 +977,6 @@ mod tests {
         DataContentType, DataFileBuilder, DataFileFormat, Datum, FormatVersion, Literal, Manifest,
         ManifestContentType, ManifestEntry, ManifestListWriter, ManifestMetadata, ManifestStatus,
         ManifestWriter, NestedField, PrimitiveType, Schema, Struct, TableMetadata, Type,
-        EMPTY_SNAPSHOT_ID,
     };
     use crate::table::Table;
     use crate::TableIdent;
@@ -1124,9 +1123,7 @@ mod tests {
                     .new_output(current_snapshot.manifest_list())
                     .unwrap(),
                 current_snapshot.snapshot_id(),
-                current_snapshot
-                    .parent_snapshot_id()
-                    .unwrap_or(EMPTY_SNAPSHOT_ID),
+                current_snapshot.parent_snapshot_id(),
                 current_snapshot.sequence_number(),
             );
             manifest_list_write
diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs
index a0bb89b23..a868c7b11 100644
--- a/crates/iceberg/src/spec/manifest.rs
+++ b/crates/iceberg/src/spec/manifest.rs
@@ -900,6 +900,12 @@ impl ManifestEntry {
         }
     }
 
+    /// Snapshot id
+    #[inline]
+    pub fn snapshot_id(&self) -> Option<i64> {
+        self.snapshot_id
+    }
+
     /// Data sequence number.
     #[inline]
     pub fn sequence_number(&self) -> Option<i64> {
@@ -1328,7 +1334,7 @@ mod _serde {
             Ok(Self {
                 content: value.content as i32,
                 file_path: value.file_path,
-                file_format: value.file_format.to_string(),
+                file_format: value.file_format.to_string().to_ascii_uppercase(),
                 partition: RawLiteral::try_from(
                     Literal::Struct(value.partition),
                     &Type::Struct(partition_type.clone()),
diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs
index 3aaecf12d..5768b79d5 100644
--- a/crates/iceberg/src/spec/manifest_list.rs
+++ b/crates/iceberg/src/spec/manifest_list.rs
@@ -106,15 +106,17 @@ impl std::fmt::Debug for ManifestListWriter {
 
 impl ManifestListWriter {
     /// Construct a v1 [`ManifestListWriter`] that writes to a provided [`OutputFile`].
-    pub fn v1(output_file: OutputFile, snapshot_id: i64, parent_snapshot_id: i64) -> Self {
-        let metadata = HashMap::from_iter([
+    pub fn v1(output_file: OutputFile, snapshot_id: i64, parent_snapshot_id: Option<i64>) -> Self {
+        let mut metadata = HashMap::from_iter([
             ("snapshot-id".to_string(), snapshot_id.to_string()),
-            (
-                "parent-snapshot-id".to_string(),
-                parent_snapshot_id.to_string(),
-            ),
             ("format-version".to_string(), "1".to_string()),
         ]);
+        if let Some(parent_snapshot_id) = parent_snapshot_id {
+            metadata.insert(
+                "parent-snapshot-id".to_string(),
+                parent_snapshot_id.to_string(),
+            );
+        }
         Self::new(FormatVersion::V1, output_file, metadata, 0, snapshot_id)
     }
 
@@ -122,18 +124,20 @@ impl ManifestListWriter {
     pub fn v2(
         output_file: OutputFile,
         snapshot_id: i64,
-        parent_snapshot_id: i64,
+        parent_snapshot_id: Option<i64>,
         sequence_number: i64,
     ) -> Self {
-        let metadata = HashMap::from_iter([
+        let mut metadata = HashMap::from_iter([
             ("snapshot-id".to_string(), snapshot_id.to_string()),
-            (
-                "parent-snapshot-id".to_string(),
-                parent_snapshot_id.to_string(),
-            ),
             ("sequence-number".to_string(), sequence_number.to_string()),
             ("format-version".to_string(), "2".to_string()),
         ]);
+        metadata.insert(
+            "parent-snapshot-id".to_string(),
+            parent_snapshot_id
+                .map(|v| v.to_string())
+                .unwrap_or("null".to_string()),
+        );
         Self::new(
             FormatVersion::V2,
             output_file,
@@ -580,6 +584,18 @@ pub struct ManifestFile {
     pub key_metadata: Vec<u8>,
 }
 
+impl ManifestFile {
+    /// Checks if the manifest file has any added files.
+    pub fn has_added_files(&self) -> bool {
+        self.added_files_count.is_none() || self.added_files_count.unwrap() > 0
+    }
+
+    /// Checks if the manifest file has any existed files.
+    pub fn has_existing_files(&self) -> bool {
+        self.existing_files_count.is_none() || self.existing_files_count.unwrap() > 0
+    }
+}
+
 /// The type of files tracked by the manifest, either data or delete files; Data(0) for all v1 manifests
 #[derive(Debug, PartialEq, Clone, Eq)]
 pub enum ManifestContentType {
@@ -1146,7 +1162,7 @@ mod test {
         let mut writer = ManifestListWriter::v1(
             file_io.new_output(full_path.clone()).unwrap(),
             1646658105718557341,
-            1646658105718557341,
+            Some(1646658105718557341),
         );
 
         writer
@@ -1213,7 +1229,7 @@ mod test {
         let mut writer = ManifestListWriter::v2(
             file_io.new_output(full_path.clone()).unwrap(),
             1646658105718557341,
-            1646658105718557341,
+            Some(1646658105718557341),
             1,
         );
 
@@ -1335,7 +1351,7 @@ mod test {
         let io = FileIOBuilder::new_fs_io().build().unwrap();
         let output_file = io.new_output(path.to_str().unwrap()).unwrap();
 
-        let mut writer = ManifestListWriter::v1(output_file, 1646658105718557341, 0);
+        let mut writer = ManifestListWriter::v1(output_file, 1646658105718557341, Some(0));
         writer
             .add_manifests(expected_manifest_list.entries.clone().into_iter())
             .unwrap();
@@ -1391,7 +1407,7 @@ mod test {
         let io = FileIOBuilder::new_fs_io().build().unwrap();
         let output_file = io.new_output(path.to_str().unwrap()).unwrap();
 
-        let mut writer = ManifestListWriter::v2(output_file, snapshot_id, 0, seq_num);
+        let mut writer = ManifestListWriter::v2(output_file, snapshot_id, Some(0), seq_num);
         writer
             .add_manifests(expected_manifest_list.entries.clone().into_iter())
             .unwrap();
@@ -1445,7 +1461,7 @@ mod test {
         let io = FileIOBuilder::new_fs_io().build().unwrap();
         let output_file = io.new_output(path.to_str().unwrap()).unwrap();
 
-        let mut writer = ManifestListWriter::v2(output_file, 1646658105718557341, 0, 1);
+        let mut writer = ManifestListWriter::v2(output_file, 1646658105718557341, Some(0), 1);
         writer
             .add_manifests(expected_manifest_list.entries.clone().into_iter())
             .unwrap();
diff --git a/crates/iceberg/src/spec/snapshot.rs b/crates/iceberg/src/spec/snapshot.rs
index b4bb2cd1f..81fd6eae6 100644
--- a/crates/iceberg/src/spec/snapshot.rs
+++ b/crates/iceberg/src/spec/snapshot.rs
@@ -59,7 +59,7 @@ pub struct Summary {
     pub operation: Operation,
     /// Other summary data.
     #[serde(flatten)]
-    pub other: HashMap<String, String>,
+    pub additional_properties: HashMap<String, String>,
 }
 
 impl Default for Operation {
@@ -291,7 +291,7 @@ pub(super) mod _serde {
                    },
                 summary: v1.summary.unwrap_or(Summary {
                     operation: Operation::default(),
-                    other: HashMap::new(),
+                    additional_properties: HashMap::new(),
                 }),
                 schema_id: v1.schema_id,
             })
@@ -372,6 +372,21 @@ pub enum SnapshotRetention {
     },
 }
 
+impl SnapshotRetention {
+    /// Create a new branch retention policy
+    pub fn branch(
+        min_snapshots_to_keep: Option<i32>,
+        max_snapshot_age_ms: Option<i64>,
+        max_ref_age_ms: Option<i64>,
+    ) -> Self {
+        SnapshotRetention::Branch {
+            min_snapshots_to_keep,
+            max_snapshot_age_ms,
+            max_ref_age_ms,
+        }
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use std::collections::HashMap;
@@ -408,7 +423,7 @@ mod tests {
         assert_eq!(
             Summary {
                 operation: Operation::Append,
-                other: HashMap::new()
+                additional_properties: HashMap::new()
             },
             *result.summary()
         );
diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs
index 9609d9829..cd242f4b5 100644
--- a/crates/iceberg/src/spec/table_metadata.rs
+++ b/crates/iceberg/src/spec/table_metadata.rs
@@ -201,6 +201,18 @@ impl TableMetadata {
         self.last_sequence_number
     }
 
+    /// Returns the next sequence number for the table.
+    ///
+    /// For format version 1, it always returns the initial sequence number.
+    /// For other versions, it returns the last sequence number incremented by 1.
+    #[inline]
+    pub fn next_sequence_number(&self) -> i64 {
+        match self.format_version {
+            FormatVersion::V1 => INITIAL_SEQUENCE_NUMBER,
+            _ => self.last_sequence_number + 1,
+        }
+    }
+
     /// Returns last updated time.
     #[inline]
     pub fn last_updated_timestamp(&self) -> Result<DateTime<Utc>> {
@@ -1476,7 +1488,7 @@ mod tests {
             .with_sequence_number(0)
             .with_schema_id(0)
             .with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro")
-            .with_summary(Summary { operation: Operation::Append, other: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) })
+            .with_summary(Summary { operation: Operation::Append, additional_properties: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) })
             .build();
 
         let expected = TableMetadata {
@@ -1895,7 +1907,7 @@ mod tests {
             .with_manifest_list("s3://a/b/1.avro")
             .with_summary(Summary {
                 operation: Operation::Append,
-                other: HashMap::new(),
+                additional_properties: HashMap::new(),
             })
             .build();
 
@@ -1908,7 +1920,7 @@ mod tests {
             .with_manifest_list("s3://a/b/2.avro")
             .with_summary(Summary {
                 operation: Operation::Append,
-                other: HashMap::new(),
+                additional_properties: HashMap::new(),
             })
             .build();
 
diff --git a/crates/iceberg/src/spec/table_metadata_builder.rs b/crates/iceberg/src/spec/table_metadata_builder.rs
index 7e60d9b9d..ed4ab7902 100644
--- a/crates/iceberg/src/spec/table_metadata_builder.rs
+++ b/crates/iceberg/src/spec/table_metadata_builder.rs
@@ -1818,7 +1818,7 @@ mod tests {
             .with_manifest_list("/snap-1.avro")
             .with_summary(Summary {
                 operation: Operation::Append,
-                other: HashMap::from_iter(vec![
+                additional_properties: HashMap::from_iter(vec![
                     (
                         "spark.app.id".to_string(),
                         "local-1662532784305".to_string(),
@@ -1881,7 +1881,7 @@ mod tests {
             .with_manifest_list("/snap-1.avro")
             .with_summary(Summary {
                 operation: Operation::Append,
-                other: HashMap::from_iter(vec![
+                additional_properties: HashMap::from_iter(vec![
                     (
                         "spark.app.id".to_string(),
                         "local-1662532784305".to_string(),
@@ -1901,7 +1901,7 @@ mod tests {
             .with_manifest_list("/snap-1.avro")
             .with_summary(Summary {
                 operation: Operation::Append,
-                other: HashMap::from_iter(vec![
+                additional_properties: HashMap::from_iter(vec![
                     (
                         "spark.app.id".to_string(),
                         "local-1662532784305".to_string(),
@@ -1949,7 +1949,7 @@ mod tests {
             .with_manifest_list("/snap-1.avro")
             .with_summary(Summary {
                 operation: Operation::Append,
-                other: HashMap::new(),
+                additional_properties: HashMap::new(),
             })
             .build();
 
@@ -1994,7 +1994,7 @@ mod tests {
             .with_manifest_list("/snap-1.avro")
             .with_summary(Summary {
                 operation: Operation::Append,
-                other: HashMap::from_iter(vec![
+                additional_properties: HashMap::from_iter(vec![
                     (
                         "spark.app.id".to_string(),
                         "local-1662532784305".to_string(),
@@ -2114,7 +2114,7 @@ mod tests {
             .with_manifest_list("/snap-1")
             .with_summary(Summary {
                 operation: Operation::Append,
-                other: HashMap::new(),
+                additional_properties: HashMap::new(),
             })
             .build();
 
@@ -2140,7 +2140,7 @@ mod tests {
             .with_parent_snapshot_id(Some(1))
             .with_summary(Summary {
                 operation: Operation::Append,
-                other: HashMap::new(),
+                additional_properties: HashMap::new(),
             })
             .build();
 
diff --git a/crates/iceberg/src/spec/values.rs b/crates/iceberg/src/spec/values.rs
index 552ac497f..6fb070527 100644
--- a/crates/iceberg/src/spec/values.rs
+++ b/crates/iceberg/src/spec/values.rs
@@ -1537,6 +1537,14 @@ impl Literal {
         })?;
         Ok(Self::decimal(decimal.mantissa()))
     }
+
+    /// Attempts to convert the Literal to a PrimitiveLiteral
+    pub fn as_primitive_literal(&self) -> Option<PrimitiveLiteral> {
+        match self {
+            Literal::Primitive(primitive) => Some(primitive.clone()),
+            _ => None,
+        }
+    }
 }
 
 /// The partition struct stores the tuple of partition values for each file.
@@ -1576,6 +1584,11 @@ impl Struct {
     pub fn is_null_at_index(&self, index: usize) -> bool {
         self.null_bitmap[index]
     }
+
+    /// Return fields in the struct.
+    pub fn fields(&self) -> &[Literal] {
+        &self.fields
+    }
 }
 
 impl Index<usize> for Struct {
diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs
index f29cf5122..edf1a8596 100644
--- a/crates/iceberg/src/transaction.rs
+++ b/crates/iceberg/src/transaction.rs
@@ -19,14 +19,26 @@
 
 use std::cmp::Ordering;
 use std::collections::HashMap;
+use std::future::Future;
 use std::mem::discriminant;
+use std::ops::RangeFrom;
+
+use uuid::Uuid;
 
 use crate::error::Result;
-use crate::spec::{FormatVersion, NullOrder, SortDirection, SortField, SortOrder, Transform};
+use crate::io::OutputFile;
+use crate::spec::{
+    DataFile, DataFileFormat, FormatVersion, Manifest, ManifestEntry, ManifestFile,
+    ManifestListWriter, ManifestMetadata, ManifestWriter, NullOrder, Operation, Snapshot,
+    SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Struct, StructType,
+    Summary, Transform, MAIN_BRANCH,
+};
 use crate::table::Table;
 use crate::TableUpdate::UpgradeFormatVersion;
 use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate};
 
+const META_ROOT_PATH: &str = "metadata";
+
 /// Table transaction.
 pub struct Transaction<'a> {
     table: &'a Table,
@@ -96,6 +108,44 @@ impl<'a> Transaction<'a> {
         Ok(self)
     }
 
+    fn generate_unique_snapshot_id(&self) -> i64 {
+        let generate_random_id = || -> i64 {
+            let (lhs, rhs) = Uuid::new_v4().as_u64_pair();
+            let snapshot_id = (lhs ^ rhs) as i64;
+            if snapshot_id < 0 {
+                -snapshot_id
+            } else {
+                snapshot_id
+            }
+        };
+        let mut snapshot_id = generate_random_id();
+        while self
+            .table
+            .metadata()
+            .snapshots()
+            .any(|s| s.snapshot_id() == snapshot_id)
+        {
+            snapshot_id = generate_random_id();
+        }
+        snapshot_id
+    }
+
+    /// Creates a fast append action.
+    pub fn fast_append(
+        self,
+        commit_uuid: Option<Uuid>,
+        key_metadata: Vec<u8>,
+    ) -> Result<FastAppendAction<'a>> {
+        let snapshot_id = self.generate_unique_snapshot_id();
+        FastAppendAction::new(
+            self,
+            snapshot_id,
+            commit_uuid.unwrap_or_else(Uuid::now_v7),
+            key_metadata,
+            HashMap::new(),
+        )
+    }
+
     /// Creates replace sort order action.
     pub fn replace_sort_order(self) -> ReplaceSortOrderAction<'a> {
         ReplaceSortOrderAction {
@@ -122,6 +172,365 @@ impl<'a> Transaction<'a> {
     }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the table.
+pub struct FastAppendAction<'a> {
+    snapshot_produce_action: SnapshotProduceAction<'a>,
+}
+
+impl<'a> FastAppendAction<'a> {
+    #[allow(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        tx: Transaction<'a>,
+        snapshot_id: i64,
+        commit_uuid: Uuid,
+        key_metadata: Vec<u8>,
+        snapshot_properties: HashMap<String, String>,
+    ) -> Result<Self> {
+        Ok(Self {
+            snapshot_produce_action: SnapshotProduceAction::new(
+                tx,
+                snapshot_id,
+                key_metadata,
+                commit_uuid,
+                snapshot_properties,
+            )?,
+        })
+    }
+
+    /// Add data files to the snapshot.
+    pub fn add_data_files(
+        &mut self,
+        data_files: impl IntoIterator<Item = DataFile>,
+    ) -> Result<&mut Self> {
+        self.snapshot_produce_action.add_data_files(data_files)?;
+        Ok(self)
+    }
+
+    /// Finished building the action and apply it to the transaction.
+    pub async fn apply(self) -> Result<Transaction<'a>> {
+        self.snapshot_produce_action
+            .apply(FastAppendOperation, DefaultManifestProcess)
+            .await
+    }
+}
+
+struct FastAppendOperation;
+
+impl SnapshotProduceOperation for FastAppendOperation {
+    fn operation(&self) -> Operation {
+        Operation::Append
+    }
+
+    async fn delete_entries(
+        &self,
+        _snapshot_produce: &SnapshotProduceAction<'_>,
+    ) -> Result<Vec<ManifestEntry>> {
+        Ok(vec![])
+    }
+
+    async fn existing_manifest(
+        &self,
+        snapshot_produce: &SnapshotProduceAction<'_>,
+    ) -> Result<Vec<ManifestFile>> {
+        let Some(snapshot) = snapshot_produce.tx.table.metadata().current_snapshot() else {
+            return Ok(vec![]);
+        };
+
+        let manifest_list = snapshot
+            .load_manifest_list(
+                snapshot_produce.tx.table.file_io(),
+                &snapshot_produce.tx.table.metadata_ref(),
+            )
+            .await?;
+
+        Ok(manifest_list
+            .entries()
+            .iter()
+            .filter(|entry| entry.has_added_files() || entry.has_existing_files())
+            .cloned()
+            .collect())
+    }
+}
+
+trait SnapshotProduceOperation: Send + Sync {
+    fn operation(&self) -> Operation;
+    #[allow(unused)]
+    fn delete_entries(
+        &self,
+        snapshot_produce: &SnapshotProduceAction,
+    ) -> impl Future<Output = Result<Vec<ManifestEntry>>> + Send;
+    fn existing_manifest(
+        &self,
+        snapshot_produce: &SnapshotProduceAction,
+    ) -> impl Future<Output = Result<Vec<ManifestFile>>> + Send;
+}
+
+struct DefaultManifestProcess;
+
+impl ManifestProcess for DefaultManifestProcess {
+    fn process_manifeset(&self, manifests: Vec<ManifestFile>) -> Vec<ManifestFile> {
+        manifests
+    }
+}
+
+trait ManifestProcess: Send + Sync {
+    fn process_manifeset(&self, manifests: Vec<ManifestFile>) -> Vec<ManifestFile>;
+}
+
+struct SnapshotProduceAction<'a> {
+    tx: Transaction<'a>,
+    snapshot_id: i64,
+    key_metadata: Vec<u8>,
+    commit_uuid: Uuid,
+    snapshot_properties: HashMap<String, String>,
+    added_data_files: Vec<DataFile>,
+    // A counter used to generate unique manifest file names.
+    // It starts from 0 and increments for each new manifest file.
+    // Note: This counter is limited to the range of (0..u64::MAX).
+    manifest_counter: RangeFrom<u64>,
+}
+
+impl<'a> SnapshotProduceAction<'a> {
+    pub(crate) fn new(
+        tx: Transaction<'a>,
+        snapshot_id: i64,
+        key_metadata: Vec<u8>,
+        commit_uuid: Uuid,
+        snapshot_properties: HashMap<String, String>,
+    ) -> Result<Self> {
+        Ok(Self {
+            tx,
+            snapshot_id,
+            commit_uuid,
+            snapshot_properties,
+            added_data_files: vec![],
+            manifest_counter: (0..),
+            key_metadata,
+        })
+    }
+
+    // Check if the partition value is compatible with the partition type.
+    fn validate_partition_value(
+        partition_value: &Struct,
+        partition_type: &StructType,
+    ) -> Result<()> {
+        if partition_value.fields().len() != partition_type.fields().len() {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                "Partition value is not compatitable with partition type",
+            ));
+        }
+        for (value, field) in partition_value.fields().iter().zip(partition_type.fields()) {
+            if !field
+                .field_type
+                .as_primitive_type()
+                .ok_or_else(|| {
+                    Error::new(
+                        ErrorKind::Unexpected,
+                        "Partition field should only be primitive type.",
+                    )
+                })?
+                .compatible(&value.as_primitive_literal().unwrap())
+            {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "Partition value is not compatitable partition type",
+                ));
+            }
+        }
+        Ok(())
+    }
+
+    /// Add data files to the snapshot.
+    pub fn add_data_files(
+        &mut self,
+        data_files: impl IntoIterator<Item = DataFile>,
+    ) -> Result<&mut Self> {
+        let data_files: Vec<DataFile> = data_files.into_iter().collect();
+        for data_file in &data_files {
+            if data_file.content_type() != crate::spec::DataContentType::Data {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "Only data content type is allowed for fast append",
+                ));
+            }
+            Self::validate_partition_value(
+                data_file.partition(),
+                self.tx
+                    .table
+                    .metadata()
+                    .default_partition_spec()
+                    .partition_type(),
+            )?;
+        }
+        self.added_data_files.extend(data_files);
+        Ok(self)
+    }
+
+    fn new_manifest_output(&mut self) -> Result<OutputFile> {
+        let new_manifest_path = format!(
+            "{}/{}/{}-m{}.{}",
+            self.tx.table.metadata().location(),
+            META_ROOT_PATH,
+            self.commit_uuid,
+            self.manifest_counter.next().unwrap(),
+            DataFileFormat::Avro
+        );
+        self.tx.table.file_io().new_output(new_manifest_path)
+    }
+
+    // Write manifest file for added data files and return the ManifestFile for ManifestList.
+    async fn write_added_manifest(&mut self) -> Result<ManifestFile> {
+        let added_data_files = std::mem::take(&mut self.added_data_files);
+        let manifest_entries = added_data_files
+            .into_iter()
+            .map(|data_file| {
+                let builder = ManifestEntry::builder()
+                    .status(crate::spec::ManifestStatus::Added)
+                    .data_file(data_file);
+                if self.tx.table.metadata().format_version() == FormatVersion::V1 {
+                    builder.snapshot_id(self.snapshot_id).build()
+                } else {
+                    // For format version > 1, we set the snapshot id at the inherited time to avoid rewrite the manifest file when
+                    // commit failed.
+                    builder.build()
+                }
+            })
+            .collect();
+        let schema = self.tx.table.metadata().current_schema();
+        let manifest_meta = ManifestMetadata::builder()
+            .schema(schema.clone())
+            .schema_id(schema.schema_id())
+            .format_version(self.tx.table.metadata().format_version())
+            .partition_spec(
+                self.tx
+                    .table
+                    .metadata()
+                    .default_partition_spec()
+                    .as_ref()
+                    .clone(),
+            )
+            .content(crate::spec::ManifestContentType::Data)
+            .build();
+        let manifest = Manifest::new(manifest_meta, manifest_entries);
+        let writer = ManifestWriter::new(
+            self.new_manifest_output()?,
+            self.snapshot_id,
+            self.key_metadata.clone(),
+        );
+        writer.write(manifest).await
+    }
+
+    async fn manifest_file<OP: SnapshotProduceOperation, MP: ManifestProcess>(
+        &mut self,
+        snapshot_produce_operation: &OP,
+        manifest_process: &MP,
+    ) -> Result<Vec<ManifestFile>> {
+        let added_manifest = self.write_added_manifest().await?;
+        let existing_manifests = snapshot_produce_operation.existing_manifest(self).await?;
+        // # TODO
+        // Support process delete entries.
+
+        let mut manifest_files = vec![added_manifest];
+        manifest_files.extend(existing_manifests);
+        let manifest_files = manifest_process.process_manifeset(manifest_files);
+        Ok(manifest_files)
+    }
+
+    // # TODO
+    // Fulfill this function
+    fn summary<OP: SnapshotProduceOperation>(&self, snapshot_produce_operation: &OP) -> Summary {
+        Summary {
+            operation: snapshot_produce_operation.operation(),
+            additional_properties: self.snapshot_properties.clone(),
+        }
+    }
+
+    fn generate_manifest_list_file_path(&self, attempt: i64) -> String {
+        format!(
+            "{}/{}/snap-{}-{}-{}.{}",
+            self.tx.table.metadata().location(),
+            META_ROOT_PATH,
+            self.snapshot_id,
+            attempt,
+            self.commit_uuid,
+            DataFileFormat::Avro
+        )
+    }
+
+    /// Finished building the action and apply it to the transaction.
+    pub async fn apply<OP: SnapshotProduceOperation, MP: ManifestProcess>(
+        mut self,
+        snapshot_produce_operation: OP,
+        process: MP,
+    ) -> Result<Transaction<'a>> {
+        let new_manifests = self
+            .manifest_file(&snapshot_produce_operation, &process)
+            .await?;
+        let next_seq_num = self.tx.table.metadata().next_sequence_number();
+
+        let summary = self.summary(&snapshot_produce_operation);
+
+        let manifest_list_path = self.generate_manifest_list_file_path(0);
+
+        let mut manifest_list_writer = match self.tx.table.metadata().format_version() {
+            FormatVersion::V1 => ManifestListWriter::v1(
+                self.tx
+                    .table
+                    .file_io()
+                    .new_output(manifest_list_path.clone())?,
+                self.snapshot_id,
+                self.tx.table.metadata().current_snapshot_id(),
+            ),
+            FormatVersion::V2 => ManifestListWriter::v2(
+                self.tx
+                    .table
+                    .file_io()
+                    .new_output(manifest_list_path.clone())?,
+                self.snapshot_id,
+                self.tx.table.metadata().current_snapshot_id(),
+                next_seq_num,
+            ),
+        };
+        manifest_list_writer.add_manifests(new_manifests.into_iter())?;
+        manifest_list_writer.close().await?;
+
+        let commit_ts = chrono::Utc::now().timestamp_millis();
+        let new_snapshot = Snapshot::builder()
+            .with_manifest_list(manifest_list_path)
+            .with_snapshot_id(self.snapshot_id)
+            .with_parent_snapshot_id(self.tx.table.metadata().current_snapshot_id())
+            .with_sequence_number(next_seq_num)
+            .with_summary(summary)
+            .with_schema_id(self.tx.table.metadata().current_schema_id())
+            .with_timestamp_ms(commit_ts)
+            .build();
+
+        self.tx.append_updates(vec![
+            TableUpdate::AddSnapshot {
+                snapshot: new_snapshot,
+            },
+            TableUpdate::SetSnapshotRef {
+                ref_name: MAIN_BRANCH.to_string(),
+                reference: SnapshotReference::new(
+                    self.snapshot_id,
+                    SnapshotRetention::branch(None, None, None),
+                ),
+            },
+        ])?;
+        self.tx.append_requirements(vec![
+            TableRequirement::UuidMatch {
+                uuid: self.tx.table.metadata().uuid(),
+            },
+            TableRequirement::RefSnapshotIdMatch {
+                r#ref: MAIN_BRANCH.to_string(),
+                snapshot_id: self.tx.table.metadata().current_snapshot_id(),
+            },
+        ])?;
+        Ok(self.tx)
+    }
+}
+
 /// Transaction action for replacing sort order.
 pub struct ReplaceSortOrderAction<'a> {
     tx: Transaction<'a>,
@@ -203,10 +612,13 @@ mod tests {
     use std::fs::File;
     use std::io::BufReader;
 
-    use crate::io::FileIO;
-    use crate::spec::{FormatVersion, TableMetadata};
+    use crate::io::FileIOBuilder;
+    use crate::spec::{
+        DataContentType, DataFileBuilder, DataFileFormat, FormatVersion, Literal, Struct,
+        TableMetadata,
+    };
     use crate::table::Table;
-    use crate::transaction::Transaction;
+    use crate::transaction::{Transaction, MAIN_BRANCH};
     use crate::{TableIdent, TableRequirement, TableUpdate};
 
     fn make_v1_table() -> Table {
@@ -223,7 +635,7 @@ mod tests {
             .metadata(resp)
             .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
             .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
-            .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
+            .file_io(FileIOBuilder::new("memory").build().unwrap())
             .build()
             .unwrap()
     }
@@ -242,7 +654,26 @@ mod tests {
             .metadata(resp)
             .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
             .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
-            .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
+            .file_io(FileIOBuilder::new("memory").build().unwrap())
+            .build()
+            .unwrap()
+    }
+
+    fn make_v2_minimal_table() -> Table {
+        let file = File::open(format!(
+            "{}/testdata/table_metadata/{}",
+            env!("CARGO_MANIFEST_DIR"),
+            "TableMetadataV2ValidMinimal.json"
+        ))
+        .unwrap();
+        let reader = BufReader::new(file);
+        let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
+
+        Table::builder()
+            .metadata(resp)
+            .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
+            .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
+            .file_io(FileIOBuilder::new("memory").build().unwrap())
             .build()
             .unwrap()
     }
@@ -347,6 +778,88 @@ mod tests {
         );
     }
 
+    #[tokio::test]
+    async fn test_fast_append_action() {
+        let table = make_v2_minimal_table();
+        let tx = Transaction::new(&table);
+        let mut action = tx.fast_append(None, vec![]).unwrap();
+
+        // check add data file with uncompatitable partition value
+        let data_file = DataFileBuilder::default()
+            .content(DataContentType::Data)
+            .file_path("test/3.parquet".to_string())
+            .file_format(DataFileFormat::Parquet)
+            .file_size_in_bytes(100)
+            .record_count(1)
+            .partition(Struct::from_iter([Some(Literal::string("test"))]))
+            .build()
+            .unwrap();
+        assert!(action.add_data_files(vec![data_file.clone()]).is_err());
+
+        let data_file = DataFileBuilder::default()
+            .content(DataContentType::Data)
+            .file_path("test/3.parquet".to_string())
+            .file_format(DataFileFormat::Parquet)
+            .file_size_in_bytes(100)
+            .record_count(1)
+            .partition(Struct::from_iter([Some(Literal::long(300))]))
+            .build()
+            .unwrap();
+        action.add_data_files(vec![data_file.clone()]).unwrap();
+        let tx = action.apply().await.unwrap();
+
+        // check updates and requirements
+        assert!(
+            matches!((&tx.updates[0],&tx.updates[1]), (TableUpdate::AddSnapshot { snapshot },TableUpdate::SetSnapshotRef { reference,ref_name }) if snapshot.snapshot_id() == reference.snapshot_id && ref_name == MAIN_BRANCH)
+        );
+        assert_eq!(
+            vec![
+                TableRequirement::UuidMatch {
+                    uuid: tx.table.metadata().uuid()
+                },
+                TableRequirement::RefSnapshotIdMatch {
+                    r#ref: MAIN_BRANCH.to_string(),
+                    snapshot_id: tx.table.metadata().current_snapshot_id
+                }
+            ],
+            tx.requirements
+        );
+
+        // check manifest list
+        let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &tx.updates[0] {
+            snapshot
+        } else {
+            unreachable!()
+        };
+        let manifest_list = new_snapshot
+            .load_manifest_list(table.file_io(), table.metadata())
+            .await
+            .unwrap();
+        assert_eq!(1, manifest_list.entries().len());
+        assert_eq!(
+            manifest_list.entries()[0].sequence_number,
+            new_snapshot.sequence_number()
+        );
+
+        // check manifset
+        let manifest = manifest_list.entries()[0]
+            .load_manifest(table.file_io())
+            .await
+            .unwrap();
+        assert_eq!(1, manifest.entries().len());
+        assert_eq!(
+            new_snapshot.sequence_number(),
+            manifest.entries()[0]
+                .sequence_number()
+                .expect("Inherit sequence number by load manifest")
+        );
+        assert_eq!(
+            new_snapshot.snapshot_id(),
+            manifest.entries()[0].snapshot_id().unwrap()
+        );
+        assert_eq!(data_file, *manifest.entries()[0].data_file());
+    }
+
     #[test]
     fn test_do_same_update_in_same_transaction() {
         let table = make_v2_table();
diff --git a/crates/integration_tests/Cargo.toml b/crates/integration_tests/Cargo.toml
new file mode 100644
index 000000000..f9ba9e414
--- /dev/null
+++ b/crates/integration_tests/Cargo.toml
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "iceberg-integration-tests"
+version = { workspace = true }
+edition = { workspace = true }
+homepage = { workspace = true }
+repository = { workspace = true }
+license = { workspace = true }
+rust-version = { workspace = true }
+
+[dependencies]
+arrow-array = { workspace = true }
+arrow-schema = { workspace = true }
+futures = { workspace = true }
+iceberg = { workspace = true }
+iceberg-catalog-rest = { workspace = true }
+iceberg_test_utils = { path = "../test_utils", features = ["tests"] }
+log = { workspace = true }
+parquet = { workspace = true }
+port_scanner = { workspace = true }
+tokio = { workspace = true }
diff --git a/crates/integration_tests/src/lib.rs b/crates/integration_tests/src/lib.rs
new file mode 100644
index 000000000..5777a4018
--- /dev/null
+++ b/crates/integration_tests/src/lib.rs
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+
+use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
+use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
+use iceberg_test_utils::docker::DockerCompose;
+use iceberg_test_utils::{normalize_test_name, set_up};
+use port_scanner::scan_port_addr;
+use tokio::time::sleep;
+
+const REST_CATALOG_PORT: u16 = 8181;
+
+pub struct TestFixture {
+    pub _docker_compose: DockerCompose,
+    pub rest_catalog: RestCatalog,
+}
+
+pub async fn set_test_fixture(func: &str) -> TestFixture {
+    set_up();
+    let docker_compose = DockerCompose::new(
+        normalize_test_name(format!("{}_{func}", module_path!())),
+        format!("{}/testdata", env!("CARGO_MANIFEST_DIR")),
+    );
+
+    // Start docker compose
+    docker_compose.run();
+
+    let rest_catalog_ip = docker_compose.get_container_ip("rest");
+
+    let read_port = format!("{}:{}", rest_catalog_ip, REST_CATALOG_PORT);
+    loop {
+        if !scan_port_addr(&read_port) {
+            log::info!("Waiting for 1s rest catalog to ready...");
+            sleep(std::time::Duration::from_millis(1000)).await;
+        } else {
+            break;
+        }
+    }
+
+    let container_ip = docker_compose.get_container_ip("minio");
+    let read_port = format!("{}:{}", container_ip, 9000);
+
+    let config = RestCatalogConfig::builder()
+        .uri(format!("http://{}:{}", rest_catalog_ip, REST_CATALOG_PORT))
+        .props(HashMap::from([
+            (S3_ENDPOINT.to_string(), format!("http://{}", read_port)),
+            (S3_ACCESS_KEY_ID.to_string(), "admin".to_string()),
+            (S3_SECRET_ACCESS_KEY.to_string(), "password".to_string()),
+            (S3_REGION.to_string(), "us-east-1".to_string()),
+        ]))
+        .build();
+    let rest_catalog = RestCatalog::new(config);
+
+    TestFixture {
+        _docker_compose: docker_compose,
+        rest_catalog,
+    }
+}
diff --git a/crates/integration_tests/testdata/docker-compose.yaml b/crates/integration_tests/testdata/docker-compose.yaml
new file mode 100644
index 000000000..490f4eb94
--- /dev/null
+++ b/crates/integration_tests/testdata/docker-compose.yaml
@@ -0,0 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+networks:
+  rest_bridge:
+
+services:
+  rest:
+    image: tabulario/iceberg-rest:0.10.0
+    environment:
+      - AWS_ACCESS_KEY_ID=admin
+      - AWS_SECRET_ACCESS_KEY=password
+      - AWS_REGION=us-east-1
+      - CATALOG_CATOLOG__IMPL=org.apache.iceberg.jdbc.JdbcCatalog
+      - CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory
+      - CATALOG_WAREHOUSE=s3://icebergdata/demo
+      - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
+      - CATALOG_S3_ENDPOINT=http://minio:9000
+    depends_on:
+      - minio
+    networks:
+      rest_bridge:
+        aliases:
+          - icebergdata.minio
+    ports:
+      - 8181:8181
+    expose:
+      - 8181
+
+  minio:
+    image: minio/minio:RELEASE.2024-03-07T00-43-48Z
+    environment:
+      - MINIO_ROOT_USER=admin
+      - MINIO_ROOT_PASSWORD=password
+      - MINIO_DOMAIN=minio
+      - MINIO_DEFAULT_BUCKETS=icebergdata
+    hostname: icebergdata.minio
+    networks:
+      rest_bridge:
+    ports:
+      - 9001:9001
+    expose:
+      - 9001
+      - 9000
+    command: ["server", "/data", "--console-address", ":9001"]
+
+  mc:
+    depends_on:
+      - minio
+    image: minio/mc:RELEASE.2024-03-07T00-31-49Z
+    environment:
+      - AWS_ACCESS_KEY_ID=admin
+      - AWS_SECRET_ACCESS_KEY=password
+      - AWS_REGION=us-east-1
+    entrypoint: >
+      /bin/sh -c " until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done; /usr/bin/mc rm -r --force minio/icebergdata; /usr/bin/mc mb minio/icebergdata; /usr/bin/mc policy set public minio/icebergdata; tail -f /dev/null "
+    networks:
+      rest_bridge:
diff --git a/crates/integration_tests/tests/append_data_file_test.rs b/crates/integration_tests/tests/append_data_file_test.rs
new file mode 100644
index 000000000..87e805c23
--- /dev/null
+++ b/crates/integration_tests/tests/append_data_file_test.rs
@@ -0,0 +1,178 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Integration tests for rest catalog.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray};
+use futures::TryStreamExt;
+use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
+use iceberg::transaction::Transaction;
+use iceberg::writer::base_writer::data_file_writer::{DataFileWriterBuilder, DataFileWriterConfig};
+use iceberg::writer::file_writer::location_generator::{
+    DefaultFileNameGenerator, DefaultLocationGenerator,
+};
+use iceberg::writer::file_writer::ParquetWriterBuilder;
+use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
+use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation};
+use iceberg_integration_tests::set_test_fixture;
+use parquet::arrow::arrow_reader::ArrowReaderOptions;
+use parquet::file::properties::WriterProperties;
+
+#[tokio::test]
+async fn test_append_data_file() {
+    let fixture = set_test_fixture("test_create_table").await;
+
+    let ns = Namespace::with_properties(
+        NamespaceIdent::from_strs(["apple", "ios"]).unwrap(),
+        HashMap::from([
+            ("owner".to_string(), "ray".to_string()),
+            ("community".to_string(), "apache".to_string()),
+        ]),
+    );
+
+    fixture
+        .rest_catalog
+        .create_namespace(ns.name(), ns.properties().clone())
+        .await
+        .unwrap();
+
+    let schema = Schema::builder()
+        .with_schema_id(1)
+        .with_identifier_field_ids(vec![2])
+        .with_fields(vec![
+            NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
+            NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
+            NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
+        ])
+        .build()
+        .unwrap();
+
+    let table_creation = TableCreation::builder()
+        .name("t1".to_string())
+        .schema(schema.clone())
+        .build();
+
+    let table = fixture
+        .rest_catalog
+        .create_table(ns.name(), table_creation)
+        .await
+        .unwrap();
+
+    // Create the writer and write the data
+    let schema: Arc<arrow_schema::Schema> = Arc::new(
+        table
+            .metadata()
+            .current_schema()
+            .as_ref()
+            .try_into()
+            .unwrap(),
+    );
+    let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
+    let file_name_generator = DefaultFileNameGenerator::new(
+        "test".to_string(),
+        None,
+        iceberg::spec::DataFileFormat::Parquet,
+    );
+    let parquet_writer_builder = ParquetWriterBuilder::new(
+        WriterProperties::default(),
+        table.metadata().current_schema().clone(),
+        table.file_io().clone(),
+        location_generator.clone(),
+        file_name_generator.clone(),
+    );
+    let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder);
+    let mut data_file_writer = data_file_writer_builder
+        .build(DataFileWriterConfig::new(None))
+        .await
+        .unwrap();
+    let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]);
+    let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]);
+    let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]);
+    let batch = RecordBatch::try_new(schema.clone(), vec![
+        Arc::new(col1) as ArrayRef,
+        Arc::new(col2) as ArrayRef,
+        Arc::new(col3) as ArrayRef,
+    ])
+    .unwrap();
+    data_file_writer.write(batch.clone()).await.unwrap();
+    let data_file = data_file_writer.close().await.unwrap();
+
+    // check parquet file schema
+    let content = table
+        .file_io()
+        .new_input(data_file[0].file_path())
+        .unwrap()
+        .read()
+        .await
+        .unwrap();
+    let parquet_reader = parquet::arrow::arrow_reader::ArrowReaderMetadata::load(
+        &content,
+        ArrowReaderOptions::default(),
+    )
+    .unwrap();
+    let field_ids: Vec<i32> = parquet_reader
+        .parquet_schema()
+        .columns()
+        .iter()
+        .map(|col| col.self_type().get_basic_info().id())
+        .collect();
+    assert_eq!(field_ids, vec![1, 2, 3]);
+
+    // commit result
+    let tx = Transaction::new(&table);
+    let mut append_action = tx.fast_append(None, vec![]).unwrap();
+    append_action.add_data_files(data_file.clone()).unwrap();
+    let tx = append_action.apply().await.unwrap();
+    let table = tx.commit(&fixture.rest_catalog).await.unwrap();
+
+    // check result
+    let batch_stream = table
+        .scan()
+        .select_all()
+        .build()
+        .unwrap()
+        .to_arrow()
+        .await
+        .unwrap();
+    let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
+    assert_eq!(batches.len(), 1);
+    assert_eq!(batches[0], batch);
+
+    // commit result again
+    let tx = Transaction::new(&table);
+    let mut append_action = tx.fast_append(None, vec![]).unwrap();
+    append_action.add_data_files(data_file.clone()).unwrap();
+    let tx = append_action.apply().await.unwrap();
+    let table = tx.commit(&fixture.rest_catalog).await.unwrap();
+
+    // check result again
+    let batch_stream = table
+        .scan()
+        .select_all()
+        .build()
+        .unwrap()
+        .to_arrow()
+        .await
+        .unwrap();
+    let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
+    assert_eq!(batches.len(), 2);
+    assert_eq!(batches[0], batch);
+    assert_eq!(batches[1], batch);
+}
diff --git a/crates/integration_tests/tests/conflict_commit_test.rs b/crates/integration_tests/tests/conflict_commit_test.rs
new file mode 100644
index 000000000..f3dd70f16
--- /dev/null
+++ b/crates/integration_tests/tests/conflict_commit_test.rs
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Integration tests for rest catalog.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray};
+use futures::TryStreamExt;
+use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
+use iceberg::transaction::Transaction;
+use iceberg::writer::base_writer::data_file_writer::{DataFileWriterBuilder, DataFileWriterConfig};
+use iceberg::writer::file_writer::location_generator::{
+    DefaultFileNameGenerator, DefaultLocationGenerator,
+};
+use iceberg::writer::file_writer::ParquetWriterBuilder;
+use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
+use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation};
+use iceberg_integration_tests::set_test_fixture;
+use parquet::file::properties::WriterProperties;
+
+#[tokio::test]
+async fn test_append_data_file_conflict() {
+    let fixture = set_test_fixture("test_create_table").await;
+
+    let ns = Namespace::with_properties(
+        NamespaceIdent::from_strs(["apple", "ios"]).unwrap(),
+        HashMap::from([
+            ("owner".to_string(), "ray".to_string()),
+            ("community".to_string(), "apache".to_string()),
+        ]),
+    );
+
+    fixture
+        .rest_catalog
+        .create_namespace(ns.name(), ns.properties().clone())
+        .await
+        .unwrap();
+
+    let schema = Schema::builder()
+        .with_schema_id(1)
+        .with_identifier_field_ids(vec![2])
+        .with_fields(vec![
+            NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
+            NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
+            NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
+        ])
+        .build()
+        .unwrap();
+
+    let table_creation = TableCreation::builder()
+        .name("t1".to_string())
+        .schema(schema.clone())
+        .build();
+
+    let table = fixture
+        .rest_catalog
+        .create_table(ns.name(), table_creation)
+        .await
+        .unwrap();
+
+    // Create the writer and write the data
+    let schema: Arc<arrow_schema::Schema> = Arc::new(
+        table
+            .metadata()
+            .current_schema()
+            .as_ref()
+            .try_into()
+            .unwrap(),
+    );
+    let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
+    let file_name_generator = DefaultFileNameGenerator::new(
+        "test".to_string(),
+        None,
+        iceberg::spec::DataFileFormat::Parquet,
+    );
+    let parquet_writer_builder = ParquetWriterBuilder::new(
+        WriterProperties::default(),
+        table.metadata().current_schema().clone(),
+        table.file_io().clone(),
+        location_generator.clone(),
+        file_name_generator.clone(),
+    );
+    let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder);
+    let mut data_file_writer = data_file_writer_builder
+        .build(DataFileWriterConfig::new(None))
+        .await
+        .unwrap();
+    let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]);
+    let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]);
+    let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]);
+    let batch = RecordBatch::try_new(schema.clone(), vec![
+        Arc::new(col1) as ArrayRef,
+        Arc::new(col2) as ArrayRef,
+        Arc::new(col3) as ArrayRef,
+    ])
+    .unwrap();
+    data_file_writer.write(batch.clone()).await.unwrap();
+    let data_file = data_file_writer.close().await.unwrap();
+
+    // start two transaction and commit one of them
+    let tx1 = Transaction::new(&table);
+    let mut append_action = tx1.fast_append(None, vec![]).unwrap();
+    append_action.add_data_files(data_file.clone()).unwrap();
+    let tx1 = append_action.apply().await.unwrap();
+    let tx2 = Transaction::new(&table);
+    let mut append_action = tx2.fast_append(None, vec![]).unwrap();
+    append_action.add_data_files(data_file.clone()).unwrap();
+    let tx2 = append_action.apply().await.unwrap();
+    let table = tx2
+        .commit(&fixture.rest_catalog)
+        .await
+        .expect("The first commit should not fail.");
+
+    // check result
+    let batch_stream = table
+        .scan()
+        .select_all()
+        .build()
+        .unwrap()
+        .to_arrow()
+        .await
+        .unwrap();
+    let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
+    assert_eq!(batches.len(), 1);
+    assert_eq!(batches[0], batch);
+
+    // another commit should fail
+    assert!(tx1.commit(&fixture.rest_catalog).await.is_err());
+}