Skip to content

Commit e123048

Browse files
Merge branch 'main' into sequence-numbers
2 parents d371c70 + 69f6c58 commit e123048

File tree

5 files changed

+85
-33
lines changed

5 files changed

+85
-33
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ iceberg = { git = "https://github.com/splitgraph/iceberg-rust", rev = "e7008f399
2020
log = "0.4"
2121
native-tls = "0.2.11"
2222
object_store = { version = "0.11", features = ["aws"] }
23+
opendal = { version = "0.50" }
2324
parquet = { version = "53" }
2425
postgres = { version = "0.19.7", git = "https://github.com/splitgraph/rust-postgres", rev = "88c2c7714a4558aed6a63e2e2b140a8359568858" }
2526
postgres-native-tls = { version = "0.5.0", git = "https://github.com/splitgraph/rust-postgres", rev = "88c2c7714a4558aed6a63e2e2b140a8359568858" }

src/error.rs

+2
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,6 @@ pub enum DataLoadingError {
1818
ObjectStoreError(#[from] object_store::Error),
1919
#[error("join error")]
2020
JoinError(#[from] tokio::task::JoinError),
21+
#[error("optimistic concurrency error")]
22+
OptimisticConcurrencyError(),
2123
}

src/iceberg_destination.rs

+23-12
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use core::str;
22
use std::collections::HashMap;
3+
use std::error::Error;
34
use std::sync::Arc;
45
use std::time::{SystemTime, UNIX_EPOCH};
56

@@ -119,24 +120,24 @@ pub async fn record_batches_to_iceberg(
119120
"Table exists. Pass the overwrite flag to lakehouse-loader to overwrite data",
120121
)));
121122
}
122-
let x = version_hint_input.read().await?;
123-
let y: String = String::from_utf8(x.to_vec()).map_err(|_| {
124-
DataLoadingError::IcebergError(iceberg::Error::new(
125-
iceberg::ErrorKind::DataInvalid,
126-
"Could not parse UTF-8 in version-hint.text",
127-
))
128-
})?;
129-
let z = y.trim().parse::<u64>().map_err(|_| {
123+
let version_hint_bytes = version_hint_input.read().await?;
124+
let version_hint_string: String =
125+
String::from_utf8(version_hint_bytes.to_vec()).map_err(|_| {
126+
DataLoadingError::IcebergError(iceberg::Error::new(
127+
iceberg::ErrorKind::DataInvalid,
128+
"Could not parse UTF-8 in version-hint.text",
129+
))
130+
})?;
131+
let version_hint_u64 = version_hint_string.trim().parse::<u64>().map_err(|_| {
130132
DataLoadingError::IcebergError(iceberg::Error::new(
131133
iceberg::ErrorKind::DataInvalid,
132134
"Could not parse integer version in version-hint.text",
133135
))
134136
})?;
135-
Some(z)
137+
Some(version_hint_u64)
136138
} else {
137139
None
138140
};
139-
140141
let (previous_metadata, previous_metadata_location) = match old_version_hint {
141142
Some(version_hint) => {
142143
let old_metadata_location =
@@ -275,10 +276,20 @@ pub async fn record_batches_to_iceberg(
275276
target_url, new_version_hint
276277
);
277278

278-
file_io
279+
if let Err(iceberg_error) = file_io
279280
.new_output(&new_metadata_location)?
280281
.write_exclusive(serde_json::to_vec(&new_metadata).unwrap().into())
281-
.await?;
282+
.await
283+
{
284+
if let Some(iceberg_error_source) = iceberg_error.source() {
285+
if let Some(opendal_error) = iceberg_error_source.downcast_ref::<opendal::Error>() {
286+
if opendal_error.kind() == opendal::ErrorKind::ConditionNotMatch {
287+
return Err(DataLoadingError::OptimisticConcurrencyError());
288+
}
289+
}
290+
}
291+
return Err(iceberg_error.into());
292+
};
282293
info!("Wrote new metadata: {:?}", new_metadata_location);
283294

284295
file_io

src/lib.rs

+58-21
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ enum Commands {
7575
},
7676
}
7777

78+
const OPTIMISTIC_CONCURRENCY_RETRIES: u32 = 3;
79+
7880
pub async fn do_main(args: Cli) -> Result<(), DataLoadingError> {
7981
match args.command {
8082
Commands::ParquetToDelta {
@@ -117,20 +119,35 @@ pub async fn do_main(args: Cli) -> Result<(), DataLoadingError> {
117119
target_url,
118120
overwrite,
119121
} => {
120-
let file = tokio::fs::File::open(source_file).await?;
121-
let record_batch_reader = ParquetRecordBatchStreamBuilder::new(file)
122-
.await?
123-
.build()
124-
.unwrap();
125-
let schema = record_batch_reader.schema().clone();
126-
info!("File schema: {}", schema);
127-
record_batches_to_iceberg(
128-
record_batch_reader.map_err(DataLoadingError::ParquetError),
129-
schema,
130-
target_url,
131-
overwrite,
132-
)
133-
.await
122+
for _ in 0..OPTIMISTIC_CONCURRENCY_RETRIES {
123+
let file = tokio::fs::File::open(&source_file).await?;
124+
let record_batch_reader = ParquetRecordBatchStreamBuilder::new(file)
125+
.await?
126+
.build()
127+
.unwrap();
128+
let arrow_schema = record_batch_reader.schema().clone();
129+
info!("File schema: {}", arrow_schema);
130+
match record_batches_to_iceberg(
131+
record_batch_reader.map_err(DataLoadingError::ParquetError),
132+
arrow_schema,
133+
target_url.clone(),
134+
overwrite,
135+
)
136+
.await
137+
{
138+
Err(DataLoadingError::OptimisticConcurrencyError()) => {
139+
info!("Optimistic concurrency error. Retrying");
140+
continue;
141+
}
142+
Err(e) => {
143+
return Err(e);
144+
}
145+
Ok(_) => {
146+
break;
147+
}
148+
}
149+
}
150+
Ok(())
134151
}
135152
Commands::PgToIceberg {
136153
connection_string,
@@ -139,14 +156,34 @@ pub async fn do_main(args: Cli) -> Result<(), DataLoadingError> {
139156
overwrite,
140157
batch_size,
141158
} => {
142-
let mut source = PgArrowSource::new(connection_string.as_ref(), &query, batch_size)
143-
.await
144-
.map_err(DataLoadingError::PostgresError)?;
145-
let arrow_schema = source.get_arrow_schema();
146-
let record_batch_stream = source.get_record_batch_stream();
147-
info!("Rowset schema: {}", arrow_schema);
148-
record_batches_to_iceberg(record_batch_stream, arrow_schema, target_url, overwrite)
159+
for _ in 0..OPTIMISTIC_CONCURRENCY_RETRIES {
160+
let mut source = PgArrowSource::new(connection_string.as_ref(), &query, batch_size)
161+
.await
162+
.map_err(DataLoadingError::PostgresError)?;
163+
let arrow_schema = source.get_arrow_schema();
164+
let record_batch_stream = source.get_record_batch_stream();
165+
info!("Rowset schema: {}", arrow_schema);
166+
match record_batches_to_iceberg(
167+
record_batch_stream,
168+
arrow_schema,
169+
target_url.clone(),
170+
overwrite,
171+
)
149172
.await
173+
{
174+
Err(DataLoadingError::OptimisticConcurrencyError()) => {
175+
info!("Optimistic concurrency error. Retrying");
176+
continue;
177+
}
178+
Err(e) => {
179+
return Err(e);
180+
}
181+
Ok(_) => {
182+
break;
183+
}
184+
}
185+
}
186+
Ok(())
150187
}
151188
}
152189
// TODO

0 commit comments

Comments
 (0)