Skip to content

Commit 76c7d58

Browse files
author
Devdutt Shenoi
committed
fix: .arrows naming convention
1 parent 240eb99 commit 76c7d58

File tree

2 files changed

+13
-11
lines changed

2 files changed

+13
-11
lines changed

src/parseable/staging/writer.rs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use std::{
2121
collections::{HashMap, HashSet},
2222
fs::{File, OpenOptions},
2323
io::BufWriter,
24-
path::PathBuf,
2524
sync::Arc,
2625
};
2726

@@ -40,19 +39,18 @@ use super::StagingError;
4039
pub struct DiskWriter<const N: usize> {
4140
inner: FileWriter<BufWriter<File>>,
4241
/// Used to ensure un"finish"ed arrow files are renamed on "finish"
43-
path_prefix: PathBuf,
42+
path_prefix: String,
4443
/// Number of rows written onto disk
4544
count: usize,
4645
/// Denotes distinct files created with similar schema during the same minute by the same ingestor
4746
file_id: usize,
4847
}
4948

5049
impl<const N: usize> DiskWriter<N> {
51-
pub fn new(path_prefix: PathBuf, schema: &Schema) -> Result<Self, StagingError> {
50+
pub fn new(path_prefix: String, schema: &Schema) -> Result<Self, StagingError> {
5251
let file_id = 0;
5352
// Live writes happen into partfile
54-
let mut partfile_path = path_prefix.clone();
55-
partfile_path.set_extension(format!("{file_id}.{ARROW_PART_FILE_EXTENSION}"));
53+
let partfile_path = format!("{path_prefix}.{file_id}.{ARROW_PART_FILE_EXTENSION}");
5654
let file = OpenOptions::new()
5755
.create(true)
5856
.append(true)
@@ -93,11 +91,14 @@ impl<const N: usize> DiskWriter<N> {
9391
pub fn finish(&mut self) -> Result<(), StagingError> {
9492
self.inner.finish()?;
9593

96-
let mut partfile_path = self.path_prefix.clone();
97-
partfile_path.set_extension(format!("{}.{ARROW_PART_FILE_EXTENSION}", self.file_id));
98-
99-
let mut arrows_path = self.path_prefix.clone();
100-
arrows_path.set_extension(format!("{}.{ARROW_FILE_EXTENSION}", self.file_id));
94+
let partfile_path = format!(
95+
"{}.{}.{ARROW_PART_FILE_EXTENSION}",
96+
self.path_prefix, self.file_id
97+
);
98+
let arrows_path = format!(
99+
"{}.{}.{ARROW_FILE_EXTENSION}",
100+
self.path_prefix, self.file_id
101+
);
101102

102103
// Rename from part file to finished arrows file
103104
std::fs::rename(partfile_path, arrows_path)?;

src/parseable/streams.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,8 @@ impl Stream {
124124
);
125125
std::fs::create_dir_all(&self.data_path)?;
126126

127-
let mut writer = DiskWriter::new(path_prefix, &record.schema())?;
127+
let mut writer =
128+
DiskWriter::new(path_prefix.display().to_string(), &record.schema())?;
128129
writer.write(record)?;
129130

130131
guard.disk.insert(schema_key.to_owned(), writer);

0 commit comments

Comments
 (0)