Skip to content

Commit 179d78c

Browse files
author
Devdutt Shenoi
authored
Merge branch 'main' into fix-timepartition
2 parents 96af702 + d9da876 commit 179d78c

File tree

5 files changed

+216
-144
lines changed

5 files changed

+216
-144
lines changed

src/event/format/mod.rs

-1
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,6 @@ pub trait EventFormat: Sized {
171171
update_field_type_in_schema(new_schema, None, time_partition, None, schema_version);
172172

173173
let rb = Self::decode(data, new_schema.clone())?;
174-
175174
let rb = add_parseable_fields(rb, p_timestamp, p_custom_fields)?;
176175

177176
Ok((rb, is_first))

src/handlers/http/ingest.rs

+8-46
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,6 @@ mod tests {
395395
use crate::{
396396
event::format::{json, EventFormat},
397397
metadata::SchemaVersion,
398-
utils::json::{convert_array_to_object, flatten::convert_to_array},
399398
};
400399

401400
trait TestExt {
@@ -560,21 +559,6 @@ mod tests {
560559
assert_eq!(rb.num_columns(), 1);
561560
}
562561

563-
#[test]
564-
fn non_object_arr_is_err() {
565-
let json = json!([1]);
566-
567-
assert!(convert_array_to_object(
568-
json,
569-
None,
570-
None,
571-
None,
572-
SchemaVersion::V0,
573-
&crate::event::format::LogSource::default()
574-
)
575-
.is_err())
576-
}
577-
578562
#[test]
579563
fn array_into_recordbatch_inffered_schema() {
580564
let json = json!([
@@ -770,28 +754,17 @@ mod tests {
770754
{
771755
"a": 1,
772756
"b": "hello",
773-
"c": [{"a": 1}]
757+
"c_a": [1],
774758
},
775759
{
776760
"a": 1,
777761
"b": "hello",
778-
"c": [{"a": 1, "b": 2}]
762+
"c_a": [1],
763+
"c_b": [2],
779764
},
780765
]);
781-
let flattened_json = convert_to_array(
782-
convert_array_to_object(
783-
json,
784-
None,
785-
None,
786-
None,
787-
SchemaVersion::V0,
788-
&crate::event::format::LogSource::default(),
789-
)
790-
.unwrap(),
791-
)
792-
.unwrap();
793766

794-
let (rb, _) = json::Event::new(flattened_json)
767+
let (rb, _) = json::Event::new(json)
795768
.into_recordbatch(
796769
&HashMap::default(),
797770
false,
@@ -859,28 +832,17 @@ mod tests {
859832
{
860833
"a": 1,
861834
"b": "hello",
862-
"c": [{"a": 1}]
835+
"c_a": 1,
863836
},
864837
{
865838
"a": 1,
866839
"b": "hello",
867-
"c": [{"a": 1, "b": 2}]
840+
"c_a": 1,
841+
"c_b": 2,
868842
},
869843
]);
870-
let flattened_json = convert_to_array(
871-
convert_array_to_object(
872-
json,
873-
None,
874-
None,
875-
None,
876-
SchemaVersion::V1,
877-
&crate::event::format::LogSource::default(),
878-
)
879-
.unwrap(),
880-
)
881-
.unwrap();
882844

883-
let (rb, _) = json::Event::new(flattened_json)
845+
let (rb, _) = json::Event::new(json)
884846
.into_recordbatch(
885847
&HashMap::default(),
886848
false,

src/parseable/streams.rs

+71-15
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,18 @@ use super::{
6767
LogStream, ARROW_FILE_EXTENSION,
6868
};
6969

70+
/// Returns the filename for parquet if provided arrows file path is valid as per our expectation
71+
fn arrow_path_to_parquet(path: &Path, random_string: &str) -> Option<PathBuf> {
72+
let filename = path.file_stem()?.to_str()?;
73+
let (_, front) = filename.split_once('.')?;
74+
assert!(front.contains('.'), "contains the delim `.`");
75+
let filename_with_random_number = format!("{front}.{random_string}.parquet");
76+
let mut parquet_path = path.to_owned();
77+
parquet_path.set_file_name(filename_with_random_number);
78+
79+
Some(parquet_path)
80+
}
81+
7082
#[derive(Debug, thiserror::Error)]
7183
#[error("Stream not found: {0}")]
7284
pub struct StreamNotFound(pub String);
@@ -182,7 +194,10 @@ impl Stream {
182194
let paths = dir
183195
.flatten()
184196
.map(|file| file.path())
185-
.filter(|file| file.extension().is_some_and(|ext| ext.eq("arrows")))
197+
.filter(|file| {
198+
file.extension()
199+
.is_some_and(|ext| ext.eq(ARROW_FILE_EXTENSION))
200+
})
186201
.sorted_by_key(|f| f.metadata().unwrap().modified().unwrap())
187202
.collect();
188203

@@ -225,12 +240,13 @@ impl Stream {
225240
&arrow_file_path, self.stream_name
226241
);
227242
remove_file(&arrow_file_path).unwrap();
228-
} else {
229-
let key = Self::arrow_path_to_parquet(&arrow_file_path, &random_string);
243+
} else if let Some(key) = arrow_path_to_parquet(&arrow_file_path, &random_string) {
230244
grouped_arrow_file
231245
.entry(key)
232246
.or_default()
233247
.push(arrow_file_path);
248+
} else {
249+
warn!("Unexpected arrows file: {}", arrow_file_path.display());
234250
}
235251
}
236252
grouped_arrow_file
@@ -289,17 +305,6 @@ impl Stream {
289305
}
290306
}
291307

292-
fn arrow_path_to_parquet(path: &Path, random_string: &str) -> PathBuf {
293-
let filename = path.file_stem().unwrap().to_str().unwrap();
294-
let (_, filename) = filename.split_once('.').unwrap();
295-
assert!(filename.contains('.'), "contains the delim `.`");
296-
let filename_with_random_number = format!("{filename}.{random_string}.arrows");
297-
let mut parquet_path = path.to_owned();
298-
parquet_path.set_file_name(filename_with_random_number);
299-
parquet_path.set_extension("parquet");
300-
parquet_path
301-
}
302-
303308
/// Converts arrow files in staging into parquet files, does so only for past minutes when run with `!shutdown_signal`
304309
pub fn prepare_parquet(&self, shutdown_signal: bool) -> Result<(), StagingError> {
305310
info!(
@@ -831,7 +836,7 @@ impl Streams {
831836

832837
#[cfg(test)]
833838
mod tests {
834-
use std::{sync::Barrier, thread::spawn, time::Duration};
839+
use std::{io::Write, sync::Barrier, thread::spawn, time::Duration};
835840

836841
use arrow_array::{Int32Array, StringArray, TimestampMillisecondArray};
837842
use arrow_schema::{DataType, Field, TimeUnit};
@@ -1207,6 +1212,57 @@ mod tests {
12071212
assert_eq!(staging.arrow_files().len(), 1);
12081213
}
12091214

1215+
fn create_test_file(dir: &TempDir, filename: &str) -> PathBuf {
1216+
let file_path = dir.path().join(filename);
1217+
let mut file = File::create(&file_path).expect("Failed to create test file");
1218+
// Write some dummy content
1219+
file.write_all(b"test content")
1220+
.expect("Failed to write to test file");
1221+
file_path
1222+
}
1223+
1224+
#[test]
1225+
fn test_valid_arrow_path_conversion() {
1226+
let temp_dir = TempDir::new().expect("Failed to create temp dir");
1227+
let filename = "12345abcde&key1=value1.date=2020-01-21.hour=10.minute=30.key1=value1.key2=value2.ee529ffc8e76.data.arrows";
1228+
let file_path = create_test_file(&temp_dir, filename);
1229+
let random_string = "random123";
1230+
1231+
let result = arrow_path_to_parquet(&file_path, random_string);
1232+
1233+
assert!(result.is_some());
1234+
let parquet_path = result.unwrap();
1235+
assert_eq!(
1236+
parquet_path.file_name().unwrap().to_str().unwrap(),
1237+
"date=2020-01-21.hour=10.minute=30.key1=value1.key2=value2.ee529ffc8e76.data.random123.parquet"
1238+
);
1239+
}
1240+
1241+
#[test]
1242+
fn test_complex_path() {
1243+
let temp_dir = TempDir::new().expect("Failed to create temp dir");
1244+
let nested_dir = temp_dir.path().join("nested/directory/structure");
1245+
std::fs::create_dir_all(&nested_dir).expect("Failed to create nested directories");
1246+
1247+
let filename = "20200201T1830f8a5fc1edc567d56&key1=value1&key2=value2.date=2020-01-21.hour=10.minute=30.region=us-west.ee529ffc8e76.data.arrows";
1248+
let file_path = nested_dir.join(filename);
1249+
1250+
let mut file = File::create(&file_path).expect("Failed to create test file");
1251+
file.write_all(b"test content")
1252+
.expect("Failed to write to test file");
1253+
1254+
let random_string = "random456";
1255+
1256+
let result = arrow_path_to_parquet(&file_path, random_string);
1257+
1258+
assert!(result.is_some());
1259+
let parquet_path = result.unwrap();
1260+
assert_eq!(
1261+
parquet_path.file_name().unwrap().to_str().unwrap(),
1262+
"date=2020-01-21.hour=10.minute=30.region=us-west.ee529ffc8e76.data.random456.parquet"
1263+
);
1264+
}
1265+
12101266
#[test]
12111267
fn get_or_create_returns_existing_stream() {
12121268
let streams = Streams::default();

src/utils/arrow/mod.rs

+3-82
Original file line numberDiff line numberDiff line change
@@ -17,37 +17,12 @@
1717
*
1818
*/
1919

20-
//! example function for concat recordbatch(may not work)
21-
//! ```rust
22-
//! # use arrow::record_batch::RecordBatch;
23-
//! # use arrow::error::Result;
24-
//!
25-
//! fn concat_batches(batch1: RecordBatch, batch2: RecordBatch) -> Result<RecordBatch> {
26-
//! let schema = batch1.schema();
27-
//! let columns = schema
28-
//! .fields()
29-
//! .iter()
30-
//! .enumerate()
31-
//! .map(|(i, _)| -> Result<_> {
32-
//! let array1 = batch1.column(i);
33-
//! let array2 = batch2.column(i);
34-
//! let array = arrow::compute::concat(&[array1.as_ref(), array2.as_ref()])?;
35-
//! Ok(array)
36-
//! })
37-
//! .collect::<Result<Vec<_>>>()?;
38-
//!
39-
//! RecordBatch::try_new(schema.clone(), columns)
40-
//! }
41-
//! ```
42-
4320
use std::{
4421
collections::{HashMap, HashSet},
4522
sync::Arc,
4623
};
4724

48-
use arrow_array::{
49-
Array, ArrayRef, RecordBatch, StringArray, TimestampMillisecondArray, UInt64Array,
50-
};
25+
use arrow_array::{ArrayRef, RecordBatch, StringArray, TimestampMillisecondArray, UInt64Array};
5126
use arrow_schema::{ArrowError, DataType, Field, Schema, TimeUnit};
5227
use arrow_select::take::take;
5328
use chrono::{DateTime, Utc};
@@ -62,31 +37,6 @@ use serde_json::{Map, Value};
6237

6338
use crate::event::DEFAULT_TIMESTAMP_KEY;
6439

65-
/// Replaces columns in a record batch with new arrays.
66-
///
67-
/// # Arguments
68-
///
69-
/// * `schema` - The schema of the record batch.
70-
/// * `batch` - The record batch to modify.
71-
/// * `indexes` - The indexes of the columns to replace.
72-
/// * `arrays` - The new arrays to replace the columns with.
73-
///
74-
/// # Returns
75-
///
76-
/// The modified record batch with the columns replaced.
77-
pub fn replace_columns(
78-
schema: Arc<Schema>,
79-
batch: &RecordBatch,
80-
indexes: &[usize],
81-
arrays: &[Arc<dyn Array + 'static>],
82-
) -> RecordBatch {
83-
let mut batch_arrays = batch.columns().iter().map(Arc::clone).collect_vec();
84-
for (&index, arr) in indexes.iter().zip(arrays.iter()) {
85-
batch_arrays[index] = Arc::clone(arr);
86-
}
87-
RecordBatch::try_new(schema, batch_arrays).unwrap()
88-
}
89-
9040
/// Converts a slice of record batches to JSON.
9141
///
9242
/// # Arguments
@@ -213,40 +163,11 @@ pub fn reverse(rb: &RecordBatch) -> RecordBatch {
213163
mod tests {
214164
use std::sync::Arc;
215165

216-
use arrow_array::{Array, Int32Array, RecordBatch};
217-
use arrow_schema::{DataType, Field, Schema};
166+
use arrow_array::RecordBatch;
167+
use arrow_schema::Schema;
218168

219169
use super::*;
220170

221-
#[test]
222-
fn check_replace() {
223-
let schema = Schema::new(vec![
224-
Field::new("a", DataType::Int32, false),
225-
Field::new("b", DataType::Int32, false),
226-
Field::new("c", DataType::Int32, false),
227-
]);
228-
229-
let schema_ref = Arc::new(schema);
230-
231-
let rb = RecordBatch::try_new(
232-
schema_ref.clone(),
233-
vec![
234-
Arc::new(Int32Array::from_value(0, 3)),
235-
Arc::new(Int32Array::from_value(0, 3)),
236-
Arc::new(Int32Array::from_value(0, 3)),
237-
],
238-
)
239-
.unwrap();
240-
241-
let arr: Arc<dyn Array + 'static> = Arc::new(Int32Array::from_value(0, 3));
242-
243-
let new_rb = replace_columns(schema_ref.clone(), &rb, &[2], &[arr]);
244-
245-
assert_eq!(new_rb.schema(), schema_ref);
246-
assert_eq!(new_rb.num_columns(), 3);
247-
assert_eq!(new_rb.num_rows(), 3)
248-
}
249-
250171
#[test]
251172
fn check_empty_json_to_record_batches() {
252173
let r = RecordBatch::new_empty(Arc::new(Schema::empty()));

0 commit comments

Comments
 (0)