Skip to content

Commit

Permalink
feat: Spawn layout evaluation (#2348)
Browse files Browse the repository at this point in the history
Spawn split evaluation on a dedicated runtime
---------

Co-authored-by: Nicholas Gates <[email protected]>
  • Loading branch information
AdamGS and gatesn authored Feb 18, 2025
1 parent 80ec2f6 commit 661f6cd
Show file tree
Hide file tree
Showing 26 changed files with 338 additions and 364 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vortex-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ vortex-error = { workspace = true, features = ["datafusion"] }
vortex-expr = { workspace = true, features = ["datafusion"] }
vortex-file = { workspace = true, features = ["object_store", "tokio"] }
vortex-io = { workspace = true, features = ["object_store", "tokio"] }
vortex-layout = { workspace = true }
vortex-layout = { workspace = true, features = ["tokio"] }
[features]
tracing = ["dep:tracing", "dep:tracing-futures", "vortex-io/tracing"]

Expand Down
4 changes: 4 additions & 0 deletions vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener
use datafusion_common::Result as DFResult;
use futures::{FutureExt as _, StreamExt};
use object_store::{ObjectStore, ObjectStoreScheme};
use tokio::runtime::Handle;
use vortex_array::{ContextRef, IntoArrayVariant};
use vortex_error::VortexResult;
use vortex_expr::{ExprRef, VortexExpr};
use vortex_file::executor::{TaskExecutor, TokioExecutor};
use vortex_file::{SplitBy, VortexOpenOptions};
use vortex_io::ObjectStoreReadAt;

Expand Down Expand Up @@ -65,6 +67,7 @@ impl FileOpener for VortexFileOpener {
let object_store = self.object_store.clone();
let projected_arrow_schema = self.projected_arrow_schema.clone();
let batch_size = self.batch_size;
let executor = TaskExecutor::Tokio(TokioExecutor::new(Handle::current()));

Ok(async move {
let vxf = VortexOpenOptions::file(read_at)
Expand All @@ -86,6 +89,7 @@ impl FileOpener for VortexFileOpener {
// but at the moment our scanner has too much overhead to process small
// batches efficiently.
.with_split_by(SplitBy::RowCount(8 * batch_size))
.with_task_executor(executor)
.into_array_stream()?
.map(move |array| {
let st = array?.into_struct()?;
Expand Down
26 changes: 0 additions & 26 deletions vortex-file/src/exec/inline.rs

This file was deleted.

23 changes: 0 additions & 23 deletions vortex-file/src/exec/mod.rs

This file was deleted.

51 changes: 0 additions & 51 deletions vortex-file/src/exec/mode.rs

This file was deleted.

31 changes: 0 additions & 31 deletions vortex-file/src/exec/tokio.rs

This file was deleted.

1 change: 0 additions & 1 deletion vortex-file/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@
//! buffers, and [cloud storage](vortex_io::ObjectStoreReadAt), can be used as the "linear and
//! contiguous memory".
pub mod exec;
mod file;
mod footer;
mod generic;
Expand Down
6 changes: 4 additions & 2 deletions vortex-layout/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,20 @@ bit-vec = { workspace = true }
bytes = { workspace = true }
exponential-decay-histogram = { workspace = true }
flatbuffers = { workspace = true }
futures = { workspace = true, features = ["alloc"] }
flume = { workspace = true }
futures = { workspace = true, features = ["alloc", "executor"] }
itertools = { workspace = true }
log = { workspace = true }
pin-project-lite = { workspace = true }
vortex-array = { workspace = true }
vortex-buffer = { workspace = true }
vortex-dtype = { workspace = true }
vortex-error = { workspace = true }
vortex-error = { workspace = true, features = ["tokio"] }
vortex-expr = { workspace = true }
vortex-flatbuffers = { workspace = true, features = ["layout"] }
vortex-mask = { workspace = true }
vortex-scalar = { workspace = true }
tokio = { workspace = true, optional = true }

[dev-dependencies]
futures = { workspace = true, features = ["executor"] }
Expand Down
7 changes: 3 additions & 4 deletions vortex-layout/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ use vortex_flatbuffers::{layout as fb, layout, FlatBufferRoot, WriteFlatBuffer};

use crate::context::LayoutContextRef;
use crate::reader::LayoutReader;
use crate::scan::ScanExecutor;
use crate::segments::SegmentId;
use crate::segments::{AsyncSegmentReader, SegmentId};
use crate::vtable::LayoutVTableRef;
use crate::LayoutId;

Expand Down Expand Up @@ -292,10 +291,10 @@ impl Layout {
/// Create a reader for this layout.
pub fn reader(
&self,
executor: Arc<ScanExecutor>,
segment_reader: Arc<dyn AsyncSegmentReader>,
ctx: ContextRef,
) -> VortexResult<Arc<dyn LayoutReader + 'static>> {
self.encoding().reader(self.clone(), ctx, executor)
self.encoding().reader(self.clone(), ctx, segment_reader)
}

/// Register splits for this layout.
Expand Down
23 changes: 13 additions & 10 deletions vortex-layout/src/layouts/chunked/eval_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ mod test {
use std::sync::Arc;

use futures::executor::block_on;
use rstest::{fixture, rstest};
use vortex_array::array::{BoolArray, ChunkedArray, ConstantArray};
use vortex_array::{IntoArray, IntoArrayVariant};
use vortex_buffer::buffer;
Expand All @@ -154,13 +155,14 @@ mod test {
use vortex_expr::{gt, lit, Identity};

use crate::layouts::chunked::writer::ChunkedLayoutWriter;
use crate::scan::ScanExecutor;
use crate::segments::test::TestSegments;
use crate::segments::AsyncSegmentReader;
use crate::writer::LayoutWriterExt;
use crate::{Layout, RowMask};

#[fixture]
/// Create a chunked layout with three chunks of primitive arrays.
fn chunked_layout() -> (Arc<ScanExecutor>, Layout) {
fn chunked_layout() -> (Arc<dyn AsyncSegmentReader>, Layout) {
let mut segments = TestSegments::default();
let layout = ChunkedLayoutWriter::new(
&DType::Primitive(PType::I32, NonNullable),
Expand All @@ -175,14 +177,14 @@ mod test {
],
)
.unwrap();
(ScanExecutor::inline(Arc::new(segments)), layout)
(Arc::new(segments), layout)
}

#[test]
fn test_chunked_evaluator() {
#[rstest]
fn test_chunked_evaluator(
#[from(chunked_layout)] (segments, layout): (Arc<dyn AsyncSegmentReader>, Layout),
) {
block_on(async {
let (segments, layout) = chunked_layout();

let result = layout
.reader(segments, Default::default())
.unwrap()
Expand All @@ -200,10 +202,11 @@ mod test {
})
}

#[test]
fn test_chunked_pruning_mask() {
#[rstest]
fn test_chunked_pruning_mask(
#[from(chunked_layout)] (segments, layout): (Arc<dyn AsyncSegmentReader>, Layout),
) {
block_on(async {
let (segments, layout) = chunked_layout();
let row_count = layout.row_count();
let reader = layout.reader(segments, Default::default()).unwrap();

Expand Down
6 changes: 3 additions & 3 deletions vortex-layout/src/layouts/chunked/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use vortex_error::VortexResult;
use crate::data::Layout;
use crate::layouts::chunked::reader::ChunkedReader;
use crate::reader::{LayoutReader, LayoutReaderExt};
use crate::scan::ScanExecutor;
use crate::segments::AsyncSegmentReader;
use crate::vtable::LayoutVTable;
use crate::{LayoutId, CHUNKED_LAYOUT_ID};

Expand All @@ -33,9 +33,9 @@ impl LayoutVTable for ChunkedLayout {
&self,
layout: Layout,
ctx: ContextRef,
executor: Arc<ScanExecutor>,
segment_reader: Arc<dyn AsyncSegmentReader>,
) -> VortexResult<Arc<dyn LayoutReader>> {
Ok(ChunkedReader::try_new(layout, ctx, executor)?.into_arc())
Ok(ChunkedReader::try_new(layout, ctx, segment_reader)?.into_arc())
}

fn register_splits(
Expand Down
12 changes: 6 additions & 6 deletions vortex-layout/src/layouts/chunked/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use vortex_mask::Mask;
use crate::layouts::chunked::stats_table::StatsTable;
use crate::layouts::chunked::ChunkedLayout;
use crate::reader::LayoutReader;
use crate::scan::ScanExecutor;
use crate::segments::AsyncSegmentReader;
use crate::{ExprEvaluator, Layout, LayoutVTable, RowMask};

type PruningCache = Arc<OnceCell<Option<Mask>>>;
Expand All @@ -23,7 +23,7 @@ type PruningCache = Arc<OnceCell<Option<Mask>>>;
pub struct ChunkedReader {
layout: Layout,
ctx: ContextRef,
executor: Arc<ScanExecutor>,
segment_reader: Arc<dyn AsyncSegmentReader>,

/// A cache of expr -> optional pruning result (applying the pruning expr to the stats table)
pruning_result: Arc<RwLock<HashMap<ExprRef, PruningCache>>>,
Expand All @@ -39,7 +39,7 @@ impl ChunkedReader {
pub(super) fn try_new(
layout: Layout,
ctx: ContextRef,
executor: Arc<ScanExecutor>,
segment_reader: Arc<dyn AsyncSegmentReader>,
) -> VortexResult<Self> {
if layout.encoding().id() != ChunkedLayout.id() {
vortex_panic!("Mismatched layout ID")
Expand Down Expand Up @@ -71,7 +71,7 @@ impl ChunkedReader {
Ok(Self {
layout,
ctx,
executor,
segment_reader,
pruning_result: Arc::new(RwLock::new(HashMap::new())),
stats_table: Arc::new(OnceCell::new()),
chunk_readers,
Expand Down Expand Up @@ -101,7 +101,7 @@ impl ChunkedReader {
let stats_layout = self.layout.child(nchunks, stats_dtype.clone(), "stats")?;

let stats_array = stats_layout
.reader(self.executor.clone(), self.ctx.clone())?
.reader(self.segment_reader.clone(), self.ctx.clone())?
.evaluate_expr(
RowMask::new_valid_between(0, nchunks as u64),
Identity::new_expr(),
Expand Down Expand Up @@ -161,7 +161,7 @@ impl ChunkedReader {
let child_layout =
self.layout
.child(idx, self.layout.dtype().clone(), format!("[{}]", idx))?;
child_layout.reader(self.executor.clone(), self.ctx.clone())
child_layout.reader(self.segment_reader.clone(), self.ctx.clone())
})
}

Expand Down
Loading

0 comments on commit 661f6cd

Please sign in to comment.