Skip to content

Commit 11e2819

Browse files
authored
Revert "PendingSegment" (#2710)
Reverts #2699 Going to take a different approach
1 parent ef8538d commit 11e2819

File tree

20 files changed

+88
-142
lines changed

20 files changed

+88
-142
lines changed

vortex-error/src/lib.rs

+1-20
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use std::error::Error;
1616
use std::fmt::{Debug, Display, Formatter};
1717
use std::num::TryFromIntError;
1818
use std::ops::Deref;
19-
use std::sync::{Arc, PoisonError};
19+
use std::sync::PoisonError;
2020
use std::{env, fmt, io};
2121

2222
pub use ext::*;
@@ -100,11 +100,6 @@ pub enum VortexError {
100100
/// A wrapper for other errors, carrying additional context.
101101
#[error("{0}: {1}")]
102102
Context(ErrString, #[source] Box<VortexError>),
103-
/// A wrapper for errors that need to be shared or cloneable.
104-
/// This is useful when the same underlying [`VortexError`] needs to be returned to multiple
105-
/// receivers.
106-
#[error(transparent)]
107-
Shared(Arc<VortexError>),
108103
/// A wrapper for errors from the Arrow library.
109104
#[error("{0}\nBacktrace:\n{1}")]
110105
ArrowError(arrow_schema::ArrowError, Backtrace),
@@ -247,20 +242,6 @@ impl Debug for VortexError {
247242
/// A type alias for Results that return VortexErrors as their error type.
248243
pub type VortexResult<T> = Result<T, VortexError>;
249244

250-
/// A vortex result that can be shared or cloned.
251-
pub type SharedVortexResult<T> = Result<T, Arc<VortexError>>;
252-
253-
impl From<&Arc<VortexError>> for VortexError {
254-
fn from(e: &Arc<VortexError>) -> Self {
255-
if let VortexError::Shared(e_inner) = e.as_ref() {
256-
// don't re-wrap
257-
VortexError::Shared(Arc::clone(e_inner))
258-
} else {
259-
VortexError::Shared(Arc::clone(e))
260-
}
261-
}
262-
}
263-
264245
/// A trait for unwrapping a VortexResult.
265246
pub trait VortexUnwrap {
266247
/// The type of the value being unwrapped.

vortex-file/src/generic.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use vortex_error::{VortexExpect, VortexResult, vortex_err, vortex_panic};
1616
use vortex_io::VortexReadAt;
1717
use vortex_layout::instrument;
1818
use vortex_layout::scan::ScanDriver;
19-
use vortex_layout::segments::{SegmentEvent, SegmentId, SegmentReader, SegmentStream};
19+
use vortex_layout::segments::{AsyncSegmentReader, SegmentEvent, SegmentId, SegmentStream};
2020
use vortex_metrics::{Counter, VortexMetrics};
2121

2222
use crate::footer::{Footer, Segment};
@@ -157,7 +157,7 @@ impl InflightSegments {
157157
impl<R: VortexReadAt> GenericScanDriver<R> {}
158158

159159
impl<R: VortexReadAt> ScanDriver for GenericScanDriver<R> {
160-
fn segment_reader(&self) -> Arc<dyn SegmentReader> {
160+
fn segment_reader(&self) -> Arc<dyn AsyncSegmentReader> {
161161
self.segment_channel.reader()
162162
}
163163

vortex-file/src/memory.rs

+7-5
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
use std::sync::Arc;
22

3+
use async_trait::async_trait;
34
use futures::{Stream, stream};
45
use vortex_buffer::ByteBuffer;
56
use vortex_error::{VortexResult, vortex_err};
67
use vortex_layout::scan::ScanDriver;
7-
use vortex_layout::segments::{PendingSegment, SegmentId, SegmentReader, SegmentStream};
8+
use vortex_layout::segments::{AsyncSegmentReader, SegmentId, SegmentStream};
89
use vortex_metrics::VortexMetrics;
910

1011
use crate::segments::SegmentCache;
@@ -47,7 +48,7 @@ impl FileType for InMemoryVortexFile {
4748
}
4849

4950
impl ScanDriver for InMemoryVortexFile {
50-
fn segment_reader(&self) -> Arc<dyn SegmentReader> {
51+
fn segment_reader(&self) -> Arc<dyn AsyncSegmentReader> {
5152
Arc::new(self.clone())
5253
}
5354

@@ -56,8 +57,9 @@ impl ScanDriver for InMemoryVortexFile {
5657
}
5758
}
5859

59-
impl SegmentReader for InMemoryVortexFile {
60-
fn get(&self, id: SegmentId) -> VortexResult<Arc<dyn PendingSegment>> {
60+
#[async_trait]
61+
impl AsyncSegmentReader for InMemoryVortexFile {
62+
async fn get(&self, id: SegmentId) -> VortexResult<ByteBuffer> {
6163
let segment: &Segment = self
6264
.footer
6365
.segment_map()
@@ -67,6 +69,6 @@ impl SegmentReader for InMemoryVortexFile {
6769
let start = usize::try_from(segment.offset).map_err(|_| vortex_err!("offset too large"))?;
6870
let end = start + segment.length as usize;
6971

70-
Ok(Arc::new(self.buffer.slice(start..end)))
72+
Ok(self.buffer.slice(start..end))
7173
}
7274
}

vortex-file/src/segments/channel.rs

+17-9
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
use std::sync::Arc;
22

3+
use async_trait::async_trait;
34
use futures::channel::mpsc;
4-
use futures::{FutureExt, Stream};
5+
use futures::{SinkExt, Stream};
56
use oneshot;
7+
use vortex_buffer::ByteBuffer;
68
use vortex_error::{VortexResult, vortex_err};
7-
use vortex_layout::segments::{PendingSegment, SegmentId, SegmentReader, SharedPendingSegment};
9+
use vortex_layout::segments::{AsyncSegmentReader, SegmentId};
810

911
use crate::segments::SegmentRequest;
1012

@@ -29,7 +31,7 @@ impl SegmentChannel {
2931
}
3032

3133
/// Returns a reader for the segment cache.
32-
pub fn reader(&self) -> Arc<dyn SegmentReader + 'static> {
34+
pub fn reader(&self) -> Arc<dyn AsyncSegmentReader + 'static> {
3335
Arc::new(SegmentChannelReader(self.request_send.clone()))
3436
}
3537

@@ -41,8 +43,9 @@ impl SegmentChannel {
4143

4244
struct SegmentChannelReader(mpsc::UnboundedSender<SegmentRequest>);
4345

44-
impl SegmentReader for SegmentChannelReader {
45-
fn get(&self, id: SegmentId) -> VortexResult<Arc<dyn PendingSegment>> {
46+
#[async_trait]
47+
impl AsyncSegmentReader for SegmentChannelReader {
48+
async fn get(&self, id: SegmentId) -> VortexResult<ByteBuffer> {
4649
// Set up a channel to send the segment back to the caller.
4750
let (send, recv) = oneshot::channel();
4851

@@ -52,12 +55,17 @@ impl SegmentReader for SegmentChannelReader {
5255
// Send a request to the segment channel.
5356
self.0
5457
.clone()
55-
.unbounded_send(SegmentRequest { id, callback: send })
58+
.send(SegmentRequest { id, callback: send })
59+
.await
5660
.map_err(|e| vortex_err!("Failed to request segment {} {:?}", id, e))?;
5761

5862
// Await the callback
59-
Ok(Arc::new(SharedPendingSegment::new(recv.map(|r| {
60-
r.unwrap_or_else(|_recv| Err(vortex_err!("Segment request handler was dropped")))
61-
}))))
63+
match recv.await {
64+
Ok(result) => result,
65+
Err(_canceled) => {
66+
// The sender was dropped before returning a result to us
67+
Err(vortex_err!("Segment request handler was dropped {}", id,))
68+
}
69+
}
6270
}
6371
}

vortex-layout/src/data.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use vortex_flatbuffers::{FlatBuffer, FlatBufferRoot, WriteFlatBuffer, layout};
1212
use crate::LayoutId;
1313
use crate::context::LayoutContext;
1414
use crate::reader::LayoutReader;
15-
use crate::segments::{SegmentCollector, SegmentId, SegmentReader};
15+
use crate::segments::{AsyncSegmentReader, SegmentCollector, SegmentId};
1616
use crate::vtable::LayoutVTableRef;
1717

1818
/// [`Layout`] is the lazy equivalent to [`vortex_array::ArrayRef`], providing a hierarchical
@@ -258,7 +258,7 @@ impl Layout {
258258
/// Create a reader for this layout.
259259
pub fn reader(
260260
&self,
261-
segment_reader: Arc<dyn SegmentReader>,
261+
segment_reader: Arc<dyn AsyncSegmentReader>,
262262
ctx: ArrayContext,
263263
) -> VortexResult<Arc<dyn LayoutReader>> {
264264
self.vtable().reader(self.clone(), ctx, segment_reader)

vortex-layout/src/layouts/chunked/eval_expr.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -80,14 +80,14 @@ mod test {
8080
use vortex_expr::Identity;
8181

8282
use crate::layouts::chunked::writer::ChunkedLayoutWriter;
83-
use crate::segments::SegmentReader;
83+
use crate::segments::AsyncSegmentReader;
8484
use crate::segments::test::TestSegments;
8585
use crate::writer::LayoutWriterExt;
8686
use crate::{Layout, RowMask};
8787

8888
#[fixture]
8989
/// Create a chunked layout with three chunks of primitive arrays.
90-
fn chunked_layout() -> (ArrayContext, Arc<dyn SegmentReader>, Layout) {
90+
fn chunked_layout() -> (ArrayContext, Arc<dyn AsyncSegmentReader>, Layout) {
9191
let ctx = ArrayContext::empty();
9292
let mut segments = TestSegments::default();
9393
let layout = ChunkedLayoutWriter::new(
@@ -111,7 +111,7 @@ mod test {
111111
fn test_chunked_evaluator(
112112
#[from(chunked_layout)] (ctx, segments, layout): (
113113
ArrayContext,
114-
Arc<dyn SegmentReader>,
114+
Arc<dyn AsyncSegmentReader>,
115115
Layout,
116116
),
117117
) {

vortex-layout/src/layouts/chunked/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use vortex_error::VortexResult;
1212
use crate::data::Layout;
1313
use crate::layouts::chunked::reader::ChunkedReader;
1414
use crate::reader::{LayoutReader, LayoutReaderExt};
15-
use crate::segments::{SegmentCollector, SegmentReader};
15+
use crate::segments::{AsyncSegmentReader, SegmentCollector};
1616
use crate::vtable::LayoutVTable;
1717
use crate::{CHUNKED_LAYOUT_ID, LayoutId};
1818

@@ -29,7 +29,7 @@ impl LayoutVTable for ChunkedLayout {
2929
&self,
3030
layout: Layout,
3131
ctx: ArrayContext,
32-
segment_reader: Arc<dyn SegmentReader>,
32+
segment_reader: Arc<dyn AsyncSegmentReader>,
3333
) -> VortexResult<Arc<dyn LayoutReader>> {
3434
Ok(ChunkedReader::try_new(layout, ctx, segment_reader)?.into_arc())
3535
}

vortex-layout/src/layouts/chunked/reader.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,14 @@ use vortex_error::{VortexResult, vortex_panic};
77

88
use crate::layouts::chunked::ChunkedLayout;
99
use crate::reader::LayoutReader;
10-
use crate::segments::SegmentReader;
10+
use crate::segments::AsyncSegmentReader;
1111
use crate::{Layout, LayoutVTable};
1212

1313
#[derive(Clone)]
1414
pub struct ChunkedReader {
1515
layout: Layout,
1616
ctx: ArrayContext,
17-
segment_reader: Arc<dyn SegmentReader>,
17+
segment_reader: Arc<dyn AsyncSegmentReader>,
1818

1919
/// Shared lazy chunk scanners
2020
chunk_readers: Arc<[OnceLock<Arc<dyn LayoutReader>>]>,
@@ -26,7 +26,7 @@ impl ChunkedReader {
2626
pub(super) fn try_new(
2727
layout: Layout,
2828
ctx: ArrayContext,
29-
segment_reader: Arc<dyn SegmentReader>,
29+
segment_reader: Arc<dyn AsyncSegmentReader>,
3030
) -> VortexResult<Self> {
3131
if layout.vtable().id() != ChunkedLayout.id() {
3232
vortex_panic!("Mismatched layout ID")

vortex-layout/src/layouts/flat/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use vortex_error::VortexResult;
1111

1212
use crate::layouts::flat::reader::FlatReader;
1313
use crate::reader::{LayoutReader, LayoutReaderExt};
14-
use crate::segments::{SegmentCollector, SegmentReader};
14+
use crate::segments::{AsyncSegmentReader, SegmentCollector};
1515
use crate::vtable::LayoutVTable;
1616
use crate::{FLAT_LAYOUT_ID, Layout, LayoutId};
1717

@@ -27,7 +27,7 @@ impl LayoutVTable for FlatLayout {
2727
&self,
2828
layout: Layout,
2929
ctx: ArrayContext,
30-
segment_reader: Arc<dyn SegmentReader>,
30+
segment_reader: Arc<dyn AsyncSegmentReader>,
3131
) -> VortexResult<Arc<dyn LayoutReader>> {
3232
Ok(FlatReader::try_new(layout, ctx, segment_reader)?.into_arc())
3333
}

vortex-layout/src/layouts/flat/reader.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@ use vortex_error::{VortexExpect, VortexResult, vortex_err, vortex_panic};
77

88
use crate::layouts::flat::FlatLayout;
99
use crate::reader::LayoutReader;
10-
use crate::segments::SegmentReader;
10+
use crate::segments::AsyncSegmentReader;
1111
use crate::{Layout, LayoutReaderExt, LayoutVTable, instrument};
1212

1313
pub struct FlatReader {
1414
layout: Layout,
1515
ctx: ArrayContext,
16-
segment_reader: Arc<dyn SegmentReader>,
16+
segment_reader: Arc<dyn AsyncSegmentReader>,
1717
// TODO(ngates): we need to add an invalidate_row_range function to evict these from the
1818
// cache.
1919
array: Arc<OnceCell<ArrayRef>>,
@@ -23,7 +23,7 @@ impl FlatReader {
2323
pub(crate) fn try_new(
2424
layout: Layout,
2525
ctx: ArrayContext,
26-
segment_reader: Arc<dyn SegmentReader>,
26+
segment_reader: Arc<dyn AsyncSegmentReader>,
2727
) -> VortexResult<Self> {
2828
if layout.vtable().id() != FlatLayout.id() {
2929
vortex_panic!("Mismatched layout ID")
@@ -59,7 +59,7 @@ impl FlatReader {
5959
);
6060

6161
// Fetch all the array segment.
62-
let buffer = self.segment_reader.get(segment_id)?.resolve().await?;
62+
let buffer = self.segment_reader.get(segment_id).await?;
6363
let row_count = usize::try_from(self.layout().row_count())
6464
.vortex_expect("FlatLayout row count does not fit within usize");
6565

vortex-layout/src/layouts/stats/eval_expr.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -82,14 +82,14 @@ mod test {
8282
use crate::layouts::chunked::writer::ChunkedLayoutWriter;
8383
use crate::layouts::flat::FlatLayout;
8484
use crate::layouts::stats::writer::{StatsLayoutOptions, StatsLayoutWriter};
85-
use crate::segments::SegmentReader;
85+
use crate::segments::AsyncSegmentReader;
8686
use crate::segments::test::TestSegments;
8787
use crate::writer::LayoutWriterExt;
8888
use crate::{ExprEvaluator, Layout, RowMask};
8989

9090
#[fixture]
9191
/// Create a stats layout with three chunks of primitive arrays.
92-
fn stats_layout() -> (ArrayContext, Arc<dyn SegmentReader>, Layout) {
92+
fn stats_layout() -> (ArrayContext, Arc<dyn AsyncSegmentReader>, Layout) {
9393
let ctx = ArrayContext::empty();
9494
let mut segments = TestSegments::default();
9595
let layout = StatsLayoutWriter::try_new(
@@ -124,7 +124,7 @@ mod test {
124124
fn test_stats_evaluator(
125125
#[from(stats_layout)] (ctx, segments, layout): (
126126
ArrayContext,
127-
Arc<dyn SegmentReader>,
127+
Arc<dyn AsyncSegmentReader>,
128128
Layout,
129129
),
130130
) {
@@ -150,7 +150,7 @@ mod test {
150150
fn test_stats_pruning_mask(
151151
#[from(stats_layout)] (ctx, segments, layout): (
152152
ArrayContext,
153-
Arc<dyn SegmentReader>,
153+
Arc<dyn AsyncSegmentReader>,
154154
Layout,
155155
),
156156
) {

vortex-layout/src/layouts/stats/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use vortex_error::VortexResult;
1313
use crate::data::Layout;
1414
use crate::layouts::stats::reader::StatsReader;
1515
use crate::reader::{LayoutReader, LayoutReaderExt};
16-
use crate::segments::{RequiredSegmentKind, SegmentCollector, SegmentReader};
16+
use crate::segments::{AsyncSegmentReader, RequiredSegmentKind, SegmentCollector};
1717
use crate::vtable::LayoutVTable;
1818
use crate::{LayoutId, STATS_LAYOUT_ID};
1919

@@ -30,7 +30,7 @@ impl LayoutVTable for StatsLayout {
3030
&self,
3131
layout: Layout,
3232
ctx: ArrayContext,
33-
segment_reader: Arc<dyn SegmentReader>,
33+
segment_reader: Arc<dyn AsyncSegmentReader>,
3434
) -> VortexResult<Arc<dyn LayoutReader>> {
3535
Ok(StatsReader::try_new(layout, ctx, segment_reader)?.into_arc())
3636
}

vortex-layout/src/layouts/stats/reader.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use vortex_mask::Mask;
1414
use crate::layouts::stats::StatsLayout;
1515
use crate::layouts::stats::stats_table::StatsTable;
1616
use crate::reader::LayoutReader;
17-
use crate::segments::SegmentReader;
17+
use crate::segments::AsyncSegmentReader;
1818
use crate::{ExprEvaluator, Layout, LayoutVTable, RowMask};
1919

2020
type PruningCache = Arc<OnceCell<Option<Mask>>>;
@@ -23,7 +23,7 @@ type PruningCache = Arc<OnceCell<Option<Mask>>>;
2323
pub struct StatsReader {
2424
layout: Layout,
2525
ctx: ArrayContext,
26-
segment_reader: Arc<dyn SegmentReader>,
26+
segment_reader: Arc<dyn AsyncSegmentReader>,
2727

2828
/// The number of blocks
2929
nblocks: usize,
@@ -43,7 +43,7 @@ impl StatsReader {
4343
pub(super) fn try_new(
4444
layout: Layout,
4545
ctx: ArrayContext,
46-
segment_reader: Arc<dyn SegmentReader>,
46+
segment_reader: Arc<dyn AsyncSegmentReader>,
4747
) -> VortexResult<Self> {
4848
if layout.vtable().id() != StatsLayout.id() {
4949
vortex_panic!("Mismatched layout ID")

0 commit comments

Comments
 (0)