Skip to content

Commit dcc4584

Browse files
committed
Address code review comments
1 parent 6111a90 commit dcc4584

File tree

1 file changed

+19
-75
lines changed

1 file changed

+19
-75
lines changed

crates/iceberg/src/puffin/writer.rs

Lines changed: 19 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,18 @@
1818
use std::collections::{HashMap, HashSet};
1919

2020
use bytes::Bytes;
21-
use futures::lock::Mutex;
2221

2322
use crate::io::{FileWrite, OutputFile};
2423
use crate::puffin::blob::Blob;
2524
use crate::puffin::compression::CompressionCodec;
2625
use crate::puffin::metadata::{BlobMetadata, FileMetadata, Flag};
2726
use crate::Result;
2827

29-
struct WriterState {
28+
/// Puffin writer
29+
pub(crate) struct PuffinWriter {
3030
writer: Box<dyn FileWrite>,
3131
is_header_written: bool,
3232
num_bytes_written: u64,
33-
}
34-
35-
/// Puffin writer
36-
pub(crate) struct PuffinWriter {
37-
writer_state: Mutex<WriterState>,
3833
written_blobs_metadata: Vec<BlobMetadata>,
3934
properties: HashMap<String, String>,
4035
footer_compression_codec: CompressionCodec,
@@ -56,14 +51,10 @@ impl PuffinWriter {
5651
CompressionCodec::None
5752
};
5853

59-
let initial_state = WriterState {
54+
Ok(Self {
6055
writer: output_file.writer().await?,
6156
is_header_written: false,
6257
num_bytes_written: 0,
63-
};
64-
65-
Ok(Self {
66-
writer_state: Mutex::new(initial_state),
6758
written_blobs_metadata: Vec::new(),
6859
properties,
6960
footer_compression_codec,
@@ -77,14 +68,12 @@ impl PuffinWriter {
7768
blob: Blob,
7869
compression_codec: CompressionCodec,
7970
) -> Result<()> {
80-
let mut writer_state = self.writer_state.lock().await;
81-
82-
PuffinWriter::write_header_once(&mut writer_state).await?;
71+
self.write_header_once().await?;
8372

84-
let offset = writer_state.num_bytes_written;
73+
let offset = self.num_bytes_written;
8574
let compressed_bytes: Bytes = compression_codec.compress(blob.data)?.into();
8675
let length = compressed_bytes.len().try_into()?;
87-
PuffinWriter::write(&mut writer_state, compressed_bytes).await?;
76+
self.write(compressed_bytes).await?;
8877
self.written_blobs_metadata.push(BlobMetadata {
8978
r#type: blob.r#type,
9079
fields: blob.fields,
@@ -100,26 +89,25 @@ impl PuffinWriter {
10089
}
10190

10291
/// Finalizes the Puffin file
103-
pub(crate) async fn close(&mut self) -> Result<()> {
104-
let mut writer_state = self.writer_state.lock().await;
105-
PuffinWriter::write_header_once(&mut writer_state).await?;
106-
self.write_footer(&mut writer_state).await?;
107-
writer_state.writer.close().await?;
92+
pub(crate) async fn close(mut self) -> Result<()> {
93+
self.write_header_once().await?;
94+
self.write_footer().await?;
95+
self.writer.close().await?;
10896
Ok(())
10997
}
11098

111-
async fn write(writer_state: &mut WriterState, bytes: Bytes) -> Result<()> {
99+
async fn write(&mut self, bytes: Bytes) -> Result<()> {
112100
let length = bytes.len();
113-
writer_state.writer.write(bytes).await?;
114-
writer_state.num_bytes_written += length as u64;
101+
self.writer.write(bytes).await?;
102+
self.num_bytes_written += length as u64;
115103
Ok(())
116104
}
117105

118-
async fn write_header_once(writer_state: &mut WriterState) -> Result<()> {
119-
if !writer_state.is_header_written {
106+
async fn write_header_once(&mut self) -> Result<()> {
107+
if !self.is_header_written {
120108
let bytes = Bytes::copy_from_slice(&FileMetadata::MAGIC);
121-
PuffinWriter::write(writer_state, bytes).await?;
122-
writer_state.is_header_written = true;
109+
self.write(bytes).await?;
110+
self.is_header_written = true;
123111
}
124112
Ok(())
125113
}
@@ -143,7 +131,7 @@ impl PuffinWriter {
143131
result
144132
}
145133

146-
async fn write_footer(&self, writer_state: &mut WriterState) -> Result<()> {
134+
async fn write_footer(&mut self) -> Result<()> {
147135
let mut footer_payload_bytes = self.footer_payload_bytes()?;
148136
let footer_payload_bytes_length = u32::to_le_bytes(footer_payload_bytes.len().try_into()?);
149137

@@ -154,7 +142,7 @@ impl PuffinWriter {
154142
footer_bytes.extend(self.flags_bytes());
155143
footer_bytes.extend(&FileMetadata::MAGIC);
156144

157-
PuffinWriter::write(writer_state, footer_bytes.into()).await
145+
self.write(footer_bytes.into()).await
158146
}
159147
}
160148

@@ -178,50 +166,6 @@ mod tests {
178166
use crate::puffin::writer::PuffinWriter;
179167
use crate::Result;
180168

181-
#[tokio::test]
182-
async fn test_throws_error_if_attempt_to_add_blob_after_closing() {
183-
let temp_dir = TempDir::new().unwrap();
184-
185-
let file_name = "temp_puffin.bin";
186-
let full_path = format!("{}/{}", temp_dir.path().to_str().unwrap(), file_name);
187-
188-
let file_io = FileIOBuilder::new_fs_io().build().unwrap();
189-
let output_file = file_io.new_output(full_path).unwrap();
190-
let mut writer = PuffinWriter::new(&output_file, HashMap::new(), false)
191-
.await
192-
.unwrap();
193-
writer.close().await.unwrap();
194-
195-
assert_eq!(
196-
writer
197-
.add(blob_0(), CompressionCodec::None)
198-
.await
199-
.unwrap_err()
200-
.to_string(),
201-
"Unexpected => Failure in doing io operation, source: Unexpected (persistent) at => writer has been closed or aborted",
202-
)
203-
}
204-
205-
#[tokio::test]
206-
async fn test_throws_error_if_attempt_to_close_multiple_times() {
207-
let temp_dir = TempDir::new().unwrap();
208-
209-
let file_name = "temp_puffin.bin";
210-
let full_path = format!("{}/{}", temp_dir.path().to_str().unwrap(), file_name);
211-
212-
let file_io = FileIOBuilder::new_fs_io().build().unwrap();
213-
let output_file = file_io.new_output(full_path).unwrap();
214-
let mut writer = PuffinWriter::new(&output_file, HashMap::new(), false)
215-
.await
216-
.unwrap();
217-
writer.close().await.unwrap();
218-
219-
assert_eq!(
220-
writer.close().await.unwrap_err().to_string(),
221-
"Unexpected => Failure in doing io operation, source: Unexpected (persistent) at => writer has been closed or aborted",
222-
)
223-
}
224-
225169
async fn write_puffin_file(
226170
temp_dir: &TempDir,
227171
blobs: Vec<(Blob, CompressionCodec)>,

0 commit comments

Comments
 (0)