diff --git a/Cargo.lock b/Cargo.lock index ff59158a99..05b87a60ba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5569,11 +5569,13 @@ dependencies = [ "bytes", "exponential-decay-histogram", "flatbuffers 25.2.10", + "flume", "futures", "itertools 0.14.0", "log", "pin-project-lite", "rstest", + "tokio", "vortex-array", "vortex-buffer", "vortex-dtype", diff --git a/vortex-datafusion/Cargo.toml b/vortex-datafusion/Cargo.toml index 15cf066482..a7dcac4328 100644 --- a/vortex-datafusion/Cargo.toml +++ b/vortex-datafusion/Cargo.toml @@ -47,7 +47,7 @@ vortex-error = { workspace = true, features = ["datafusion"] } vortex-expr = { workspace = true, features = ["datafusion"] } vortex-file = { workspace = true, features = ["object_store", "tokio"] } vortex-io = { workspace = true, features = ["object_store", "tokio"] } -vortex-layout = { workspace = true } +vortex-layout = { workspace = true, features = ["tokio"] } [features] tracing = ["dep:tracing", "dep:tracing-futures", "vortex-io/tracing"] diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 38c45d0f3b..82336b3f83 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -5,9 +5,11 @@ use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener use datafusion_common::Result as DFResult; use futures::{FutureExt as _, StreamExt}; use object_store::{ObjectStore, ObjectStoreScheme}; +use tokio::runtime::Handle; use vortex_array::{ContextRef, IntoArrayVariant}; use vortex_error::VortexResult; use vortex_expr::{ExprRef, VortexExpr}; +use vortex_file::executor::{TaskExecutor, TokioExecutor}; use vortex_file::{SplitBy, VortexOpenOptions}; use vortex_io::ObjectStoreReadAt; @@ -65,6 +67,7 @@ impl FileOpener for VortexFileOpener { let object_store = self.object_store.clone(); let projected_arrow_schema = self.projected_arrow_schema.clone(); let batch_size = self.batch_size; + let executor = TaskExecutor::Tokio(TokioExecutor::new(Handle::current())); Ok(async move { let vxf = VortexOpenOptions::file(read_at) @@ -86,6 +89,7 @@ impl FileOpener for VortexFileOpener { // but at the moment our scanner has too much overhead to process small // batches efficiently. .with_split_by(SplitBy::RowCount(8 * batch_size)) + .with_task_executor(executor) .into_array_stream()? .map(move |array| { let st = array?.into_struct()?; diff --git a/vortex-file/src/exec/inline.rs b/vortex-file/src/exec/inline.rs deleted file mode 100644 index ee7136f025..0000000000 --- a/vortex-file/src/exec/inline.rs +++ /dev/null @@ -1,26 +0,0 @@ -use futures::future::BoxFuture; -use futures::stream::BoxStream; -use futures::StreamExt; -use vortex_error::VortexResult; - -use crate::exec::ExecDriver; - -/// An [`ExecDriver`] implementation that awaits the futures inline using the caller's runtime. -pub struct InlineDriver { - concurrency: usize, -} - -impl InlineDriver { - pub fn with_concurrency(concurrency: usize) -> Self { - Self { concurrency } - } -} - -impl ExecDriver for InlineDriver { - fn drive( - &self, - stream: BoxStream<'static, BoxFuture<'static, VortexResult<()>>>, - ) -> BoxStream<'static, VortexResult<()>> { - stream.buffered(self.concurrency).boxed() - } -} diff --git a/vortex-file/src/exec/mod.rs b/vortex-file/src/exec/mod.rs deleted file mode 100644 index 3e39123b05..0000000000 --- a/vortex-file/src/exec/mod.rs +++ /dev/null @@ -1,23 +0,0 @@ -mod mode; -pub use mode::*; -pub mod inline; - -#[cfg(feature = "tokio")] -pub mod tokio; - -use futures::future::BoxFuture; -use futures::stream::BoxStream; -use vortex_error::VortexResult; - -/// An execution driver is used to drive the execution of the scan operation. -/// -/// It is passed a stream of futures that (typically) process a single split of the file. -/// Drivers are able to control the concurrency of the execution with [`futures::StreamExt::buffered`], -/// as well as _where_ the futures are executed by spawning them onto a specific runtime or thread -/// pool. -pub trait ExecDriver: Send + Sync { - fn drive( - &self, - stream: BoxStream<'static, BoxFuture<'static, VortexResult<()>>>, - ) -> BoxStream<'static, VortexResult<()>>; -} diff --git a/vortex-file/src/exec/mode.rs b/vortex-file/src/exec/mode.rs deleted file mode 100644 index e57bb22f29..0000000000 --- a/vortex-file/src/exec/mode.rs +++ /dev/null @@ -1,51 +0,0 @@ -use std::sync::Arc; - -use crate::exec::inline::InlineDriver; -#[cfg(feature = "tokio")] -use crate::exec::tokio::TokioDriver; -use crate::exec::ExecDriver; - -/// The [`ExecutionMode`] describes how the CPU-bound layout evaluation tasks are executed. -/// Typically, there is one task per file split (row-group). -#[derive(Debug, Clone)] -pub enum ExecutionMode { - /// Executes the tasks inline as part of polling the returned - /// [`vortex_array::stream::ArrayStream`]. In other words, uses the same runtime. - Inline, - /// Spawns the tasks onto a provided Rayon thread pool. - #[cfg(feature = "rayon")] - RayonThreadPool(Arc), - /// Spawns the tasks onto a provided Tokio runtime. - #[cfg(feature = "tokio")] - TokioRuntime(tokio::runtime::Handle), -} - -#[allow(clippy::derivable_impls)] -impl Default for ExecutionMode { - fn default() -> Self { - // Default to tokio-specific behavior if its enabled and there's a runtime running. - #[cfg(feature = "tokio")] - if let Ok(h) = tokio::runtime::Handle::try_current() { - return ExecutionMode::TokioRuntime(h); - } - - ExecutionMode::Inline - } -} - -impl ExecutionMode { - pub fn into_driver(self, concurrency: usize) -> Arc { - match self { - ExecutionMode::Inline => Arc::new(InlineDriver::with_concurrency(concurrency)), - #[cfg(feature = "rayon")] - ExecutionMode::RayonThreadPool(_) => { - todo!() - } - #[cfg(feature = "tokio")] - ExecutionMode::TokioRuntime(handle) => Arc::new(TokioDriver { - handle, - concurrency, - }), - } - } -} diff --git a/vortex-file/src/exec/tokio.rs b/vortex-file/src/exec/tokio.rs deleted file mode 100644 index a77b6bd95c..0000000000 --- a/vortex-file/src/exec/tokio.rs +++ /dev/null @@ -1,31 +0,0 @@ -use futures::future::BoxFuture; -use futures::stream::BoxStream; -use futures::StreamExt; -use tokio::runtime::Handle; -use vortex_error::{vortex_err, VortexResult}; - -use crate::exec::ExecDriver; - -/// An [`ExecDriver`] implementation that spawns the futures onto a Tokio runtime. -pub struct TokioDriver { - pub handle: Handle, - pub concurrency: usize, -} - -impl ExecDriver for TokioDriver { - fn drive( - &self, - stream: BoxStream<'static, BoxFuture<'static, VortexResult<()>>>, - ) -> BoxStream<'static, VortexResult<()>> { - let handle = self.handle.clone(); - - stream - .map(move |future| handle.spawn(future)) - .buffered(self.concurrency) - .map(|result| match result { - Ok(result) => result, - Err(e) => Err(vortex_err!("Failed to join Tokio result {}", e)), - }) - .boxed() - } -} diff --git a/vortex-file/src/lib.rs b/vortex-file/src/lib.rs index f2c658231b..9147b89179 100644 --- a/vortex-file/src/lib.rs +++ b/vortex-file/src/lib.rs @@ -86,7 +86,6 @@ //! buffers, and [cloud storage](vortex_io::ObjectStoreReadAt), can be used as the "linear and //! contiguous memory". -pub mod exec; mod file; mod footer; mod generic; diff --git a/vortex-layout/Cargo.toml b/vortex-layout/Cargo.toml index 3fa2b55dd7..b6a5abf863 100644 --- a/vortex-layout/Cargo.toml +++ b/vortex-layout/Cargo.toml @@ -21,18 +21,20 @@ bit-vec = { workspace = true } bytes = { workspace = true } exponential-decay-histogram = { workspace = true } flatbuffers = { workspace = true } -futures = { workspace = true, features = ["alloc"] } +flume = { workspace = true } +futures = { workspace = true, features = ["alloc", "executor"] } itertools = { workspace = true } log = { workspace = true } pin-project-lite = { workspace = true } vortex-array = { workspace = true } vortex-buffer = { workspace = true } vortex-dtype = { workspace = true } -vortex-error = { workspace = true } +vortex-error = { workspace = true, features = ["tokio"] } vortex-expr = { workspace = true } vortex-flatbuffers = { workspace = true, features = ["layout"] } vortex-mask = { workspace = true } vortex-scalar = { workspace = true } +tokio = { workspace = true, optional = true } [dev-dependencies] futures = { workspace = true, features = ["executor"] } diff --git a/vortex-layout/src/data.rs b/vortex-layout/src/data.rs index 8e478e530c..aac5c75153 100644 --- a/vortex-layout/src/data.rs +++ b/vortex-layout/src/data.rs @@ -12,8 +12,7 @@ use vortex_flatbuffers::{layout as fb, layout, FlatBufferRoot, WriteFlatBuffer}; use crate::context::LayoutContextRef; use crate::reader::LayoutReader; -use crate::scan::ScanExecutor; -use crate::segments::SegmentId; +use crate::segments::{AsyncSegmentReader, SegmentId}; use crate::vtable::LayoutVTableRef; use crate::LayoutId; @@ -292,10 +291,10 @@ impl Layout { /// Create a reader for this layout. pub fn reader( &self, - executor: Arc, + segment_reader: Arc, ctx: ContextRef, ) -> VortexResult> { - self.encoding().reader(self.clone(), ctx, executor) + self.encoding().reader(self.clone(), ctx, segment_reader) } /// Register splits for this layout. diff --git a/vortex-layout/src/layouts/chunked/eval_expr.rs b/vortex-layout/src/layouts/chunked/eval_expr.rs index 7e5badca3c..c7c635fa31 100644 --- a/vortex-layout/src/layouts/chunked/eval_expr.rs +++ b/vortex-layout/src/layouts/chunked/eval_expr.rs @@ -145,6 +145,7 @@ mod test { use std::sync::Arc; use futures::executor::block_on; + use rstest::{fixture, rstest}; use vortex_array::array::{BoolArray, ChunkedArray, ConstantArray}; use vortex_array::{IntoArray, IntoArrayVariant}; use vortex_buffer::buffer; @@ -154,13 +155,14 @@ mod test { use vortex_expr::{gt, lit, Identity}; use crate::layouts::chunked::writer::ChunkedLayoutWriter; - use crate::scan::ScanExecutor; use crate::segments::test::TestSegments; + use crate::segments::AsyncSegmentReader; use crate::writer::LayoutWriterExt; use crate::{Layout, RowMask}; + #[fixture] /// Create a chunked layout with three chunks of primitive arrays. - fn chunked_layout() -> (Arc, Layout) { + fn chunked_layout() -> (Arc, Layout) { let mut segments = TestSegments::default(); let layout = ChunkedLayoutWriter::new( &DType::Primitive(PType::I32, NonNullable), @@ -175,14 +177,14 @@ mod test { ], ) .unwrap(); - (ScanExecutor::inline(Arc::new(segments)), layout) + (Arc::new(segments), layout) } - #[test] - fn test_chunked_evaluator() { + #[rstest] + fn test_chunked_evaluator( + #[from(chunked_layout)] (segments, layout): (Arc, Layout), + ) { block_on(async { - let (segments, layout) = chunked_layout(); - let result = layout .reader(segments, Default::default()) .unwrap() @@ -200,10 +202,11 @@ mod test { }) } - #[test] - fn test_chunked_pruning_mask() { + #[rstest] + fn test_chunked_pruning_mask( + #[from(chunked_layout)] (segments, layout): (Arc, Layout), + ) { block_on(async { - let (segments, layout) = chunked_layout(); let row_count = layout.row_count(); let reader = layout.reader(segments, Default::default()).unwrap(); diff --git a/vortex-layout/src/layouts/chunked/mod.rs b/vortex-layout/src/layouts/chunked/mod.rs index 3625357e2c..344ec5ed16 100644 --- a/vortex-layout/src/layouts/chunked/mod.rs +++ b/vortex-layout/src/layouts/chunked/mod.rs @@ -13,7 +13,7 @@ use vortex_error::VortexResult; use crate::data::Layout; use crate::layouts::chunked::reader::ChunkedReader; use crate::reader::{LayoutReader, LayoutReaderExt}; -use crate::scan::ScanExecutor; +use crate::segments::AsyncSegmentReader; use crate::vtable::LayoutVTable; use crate::{LayoutId, CHUNKED_LAYOUT_ID}; @@ -33,9 +33,9 @@ impl LayoutVTable for ChunkedLayout { &self, layout: Layout, ctx: ContextRef, - executor: Arc, + segment_reader: Arc, ) -> VortexResult> { - Ok(ChunkedReader::try_new(layout, ctx, executor)?.into_arc()) + Ok(ChunkedReader::try_new(layout, ctx, segment_reader)?.into_arc()) } fn register_splits( diff --git a/vortex-layout/src/layouts/chunked/reader.rs b/vortex-layout/src/layouts/chunked/reader.rs index 94422d352d..b68e5b860b 100644 --- a/vortex-layout/src/layouts/chunked/reader.rs +++ b/vortex-layout/src/layouts/chunked/reader.rs @@ -14,7 +14,7 @@ use vortex_mask::Mask; use crate::layouts::chunked::stats_table::StatsTable; use crate::layouts::chunked::ChunkedLayout; use crate::reader::LayoutReader; -use crate::scan::ScanExecutor; +use crate::segments::AsyncSegmentReader; use crate::{ExprEvaluator, Layout, LayoutVTable, RowMask}; type PruningCache = Arc>>; @@ -23,7 +23,7 @@ type PruningCache = Arc>>; pub struct ChunkedReader { layout: Layout, ctx: ContextRef, - executor: Arc, + segment_reader: Arc, /// A cache of expr -> optional pruning result (applying the pruning expr to the stats table) pruning_result: Arc>>, @@ -39,7 +39,7 @@ impl ChunkedReader { pub(super) fn try_new( layout: Layout, ctx: ContextRef, - executor: Arc, + segment_reader: Arc, ) -> VortexResult { if layout.encoding().id() != ChunkedLayout.id() { vortex_panic!("Mismatched layout ID") @@ -71,7 +71,7 @@ impl ChunkedReader { Ok(Self { layout, ctx, - executor, + segment_reader, pruning_result: Arc::new(RwLock::new(HashMap::new())), stats_table: Arc::new(OnceCell::new()), chunk_readers, @@ -101,7 +101,7 @@ impl ChunkedReader { let stats_layout = self.layout.child(nchunks, stats_dtype.clone(), "stats")?; let stats_array = stats_layout - .reader(self.executor.clone(), self.ctx.clone())? + .reader(self.segment_reader.clone(), self.ctx.clone())? .evaluate_expr( RowMask::new_valid_between(0, nchunks as u64), Identity::new_expr(), @@ -161,7 +161,7 @@ impl ChunkedReader { let child_layout = self.layout .child(idx, self.layout.dtype().clone(), format!("[{}]", idx))?; - child_layout.reader(self.executor.clone(), self.ctx.clone()) + child_layout.reader(self.segment_reader.clone(), self.ctx.clone()) }) } diff --git a/vortex-layout/src/layouts/flat/eval_expr.rs b/vortex-layout/src/layouts/flat/eval_expr.rs index e1c00eaa8a..4ff933de44 100644 --- a/vortex-layout/src/layouts/flat/eval_expr.rs +++ b/vortex-layout/src/layouts/flat/eval_expr.rs @@ -1,10 +1,10 @@ use async_trait::async_trait; +use vortex_array::compute::{filter, slice}; use vortex_array::Array; use vortex_error::{VortexExpect, VortexResult}; use vortex_expr::{ExprRef, Identity}; use crate::layouts::flat::reader::FlatReader; -use crate::scan::ScanTask; use crate::{ExprEvaluator, RowMask}; #[async_trait] @@ -12,30 +12,28 @@ impl ExprEvaluator for FlatReader { async fn evaluate_expr(self: &Self, row_mask: RowMask, expr: ExprRef) -> VortexResult { assert!(row_mask.true_count() > 0); - let array = self.array().await?.clone(); + let mut array = self.array().await?.clone(); // TODO(ngates): what's the best order to apply the filter mask / expression? let begin = usize::try_from(row_mask.begin()) .vortex_expect("RowMask begin must fit within FlatLayout size"); - let mut tasks = Vec::with_capacity(3); - // Slice the array based on the row mask. if begin > 0 || (begin + row_mask.len()) < array.len() { - tasks.push(ScanTask::Slice(begin..begin + row_mask.len())); + array = slice(array, begin, begin + row_mask.len())?; } // Filter the array based on the row mask. if !row_mask.filter_mask().all_true() { - tasks.push(ScanTask::Filter(row_mask.filter_mask().clone())); + array = filter(&array, row_mask.filter_mask())?; } // Evaluate the projection expression. if !expr.as_any().is::() { - tasks.push(ScanTask::Expr(expr)); + array = expr.evaluate(&array)?; } - self.executor().evaluate(&array, &tasks).await + Ok(array) } async fn prune_mask(&self, row_mask: RowMask, _expr: ExprRef) -> VortexResult { @@ -57,7 +55,6 @@ mod test { use vortex_expr::{gt, ident, lit, Identity}; use crate::layouts::flat::writer::FlatLayoutWriter; - use crate::scan::ScanExecutor; use crate::segments::test::TestSegments; use crate::writer::LayoutWriterExt; use crate::RowMask; @@ -72,7 +69,7 @@ mod test { .unwrap(); let result = layout - .reader(ScanExecutor::inline(Arc::new(segments)), Default::default()) + .reader(Arc::new(segments), Default::default()) .unwrap() .evaluate_expr( RowMask::new_valid_between(0, layout.row_count()), @@ -98,7 +95,7 @@ mod test { let expr = gt(Identity::new_expr(), lit(3i32)); let result = layout - .reader(ScanExecutor::inline(Arc::new(segments)), Default::default()) + .reader(Arc::new(segments), Default::default()) .unwrap() .evaluate_expr(RowMask::new_valid_between(0, layout.row_count()), expr) .await @@ -123,7 +120,7 @@ mod test { .unwrap(); let result = layout - .reader(ScanExecutor::inline(Arc::new(segments)), Default::default()) + .reader(Arc::new(segments), Default::default()) .unwrap() .evaluate_expr(RowMask::new_valid_between(2, 4), ident()) .await diff --git a/vortex-layout/src/layouts/flat/mod.rs b/vortex-layout/src/layouts/flat/mod.rs index cd9332e568..603a8d7048 100644 --- a/vortex-layout/src/layouts/flat/mod.rs +++ b/vortex-layout/src/layouts/flat/mod.rs @@ -11,7 +11,7 @@ use vortex_error::VortexResult; use crate::layouts::flat::reader::FlatReader; use crate::reader::{LayoutReader, LayoutReaderExt}; -use crate::scan::ScanExecutor; +use crate::segments::AsyncSegmentReader; use crate::vtable::LayoutVTable; use crate::{Layout, LayoutId, FLAT_LAYOUT_ID}; @@ -27,9 +27,9 @@ impl LayoutVTable for FlatLayout { &self, layout: Layout, ctx: ContextRef, - executor: Arc, + segment_reader: Arc, ) -> VortexResult> { - Ok(FlatReader::try_new(layout, ctx, executor)?.into_arc()) + Ok(FlatReader::try_new(layout, ctx, segment_reader)?.into_arc()) } fn register_splits( diff --git a/vortex-layout/src/layouts/flat/reader.rs b/vortex-layout/src/layouts/flat/reader.rs index f8df4919a3..8a104c04eb 100644 --- a/vortex-layout/src/layouts/flat/reader.rs +++ b/vortex-layout/src/layouts/flat/reader.rs @@ -6,13 +6,13 @@ use vortex_error::{vortex_err, vortex_panic, VortexExpect, VortexResult}; use crate::layouts::flat::FlatLayout; use crate::reader::LayoutReader; -use crate::scan::ScanExecutor; +use crate::segments::AsyncSegmentReader; use crate::{Layout, LayoutReaderExt, LayoutVTable}; pub struct FlatReader { layout: Layout, ctx: ContextRef, - executor: Arc, + segment_reader: Arc, // TODO(ngates): we need to add an invalidate_row_range function to evict these from the // cache. array: Arc>, @@ -22,7 +22,7 @@ impl FlatReader { pub(crate) fn try_new( layout: Layout, ctx: ContextRef, - executor: Arc, + segment_reader: Arc, ) -> VortexResult { if layout.encoding().id() != FlatLayout.id() { vortex_panic!("Mismatched layout ID") @@ -31,7 +31,7 @@ impl FlatReader { Ok(Self { layout, ctx, - executor, + segment_reader, array: Arc::new(Default::default()), }) } @@ -40,10 +40,6 @@ impl FlatReader { self.ctx.clone() } - pub(crate) fn executor(&self) -> &ScanExecutor { - self.executor.as_ref() - } - pub(crate) async fn array(&self) -> VortexResult<&Array> { self.array .get_or_try_init(async move { @@ -59,7 +55,7 @@ impl FlatReader { ); // Fetch all the array segment. - let buffer = self.executor().get_segment(segment_id).await?; + let buffer = self.segment_reader.get(segment_id).await?; let row_count = usize::try_from(self.layout().row_count()) .vortex_expect("FlatLayout row count does not fit within usize"); diff --git a/vortex-layout/src/layouts/flat/writer.rs b/vortex-layout/src/layouts/flat/writer.rs index b8d98f2644..b0d439e33c 100644 --- a/vortex-layout/src/layouts/flat/writer.rs +++ b/vortex-layout/src/layouts/flat/writer.rs @@ -102,7 +102,6 @@ mod tests { use vortex_expr::ident; use crate::layouts::flat::writer::FlatLayoutWriter; - use crate::scan::ScanExecutor; use crate::segments::test::TestSegments; use crate::writer::LayoutWriterExt; use crate::RowMask; @@ -119,7 +118,7 @@ mod tests { .unwrap(); let result = layout - .reader(ScanExecutor::inline(Arc::new(segments)), Default::default()) + .reader(Arc::new(segments), Default::default()) .unwrap() .evaluate_expr(RowMask::new_valid_between(0, layout.row_count()), ident()) .await diff --git a/vortex-layout/src/layouts/struct_/eval_expr.rs b/vortex-layout/src/layouts/struct_/eval_expr.rs index 605a336de3..fed0c5c7be 100644 --- a/vortex-layout/src/layouts/struct_/eval_expr.rs +++ b/vortex-layout/src/layouts/struct_/eval_expr.rs @@ -8,7 +8,6 @@ use vortex_error::{VortexExpect, VortexResult}; use vortex_expr::ExprRef; use crate::layouts::struct_::reader::StructReader; -use crate::scan::ScanTask; use crate::{ExprEvaluator, RowMask}; #[async_trait] @@ -52,9 +51,7 @@ impl ExprEvaluator for StructReader { )? .into_array(); - self.executor() - .evaluate(&root_scope, &[ScanTask::Expr(partitioned.root.clone())]) - .await + partitioned.root.evaluate(&root_scope) } async fn prune_mask(&self, row_mask: RowMask, expr: ExprRef) -> VortexResult { @@ -82,6 +79,7 @@ mod tests { use std::sync::Arc; use futures::executor::block_on; + use rstest::{fixture, rstest}; use vortex_array::array::StructArray; use vortex_array::{IntoArray, IntoArrayVariant}; use vortex_buffer::buffer; @@ -92,13 +90,14 @@ mod tests { use crate::layouts::flat::writer::FlatLayoutWriter; use crate::layouts::struct_::writer::StructLayoutWriter; - use crate::scan::ScanExecutor; use crate::segments::test::TestSegments; + use crate::segments::AsyncSegmentReader; use crate::writer::LayoutWriterExt; use crate::{Layout, RowMask}; + #[fixture] /// Create a chunked layout with three chunks of primitive arrays. - fn struct_layout() -> (Arc, Layout) { + fn struct_layout() -> (Arc, Layout) { let mut segments = TestSegments::default(); let layout = StructLayoutWriter::new( @@ -128,13 +127,13 @@ mod tests { .map(IntoArray::into_array)], ) .unwrap(); - (ScanExecutor::inline(Arc::new(segments)), layout) + (Arc::new(segments), layout) } - #[test] - fn test_struct_layout() { - let (segments, layout) = struct_layout(); - + #[rstest] + fn test_struct_layout( + #[from(struct_layout)] (segments, layout): (Arc, Layout), + ) { let reader = layout.reader(segments, Default::default()).unwrap(); let expr = gt(get_item("a", ident()), get_item("b", ident())); let result = @@ -150,10 +149,10 @@ mod tests { ); } - #[test] - fn test_struct_layout_row_mask() { - let (segments, layout) = struct_layout(); - + #[rstest] + fn test_struct_layout_row_mask( + #[from(struct_layout)] (segments, layout): (Arc, Layout), + ) { let reader = layout.reader(segments, Default::default()).unwrap(); let expr = gt(get_item("a", ident()), get_item("b", ident())); let result = block_on(reader.evaluate_expr( diff --git a/vortex-layout/src/layouts/struct_/mod.rs b/vortex-layout/src/layouts/struct_/mod.rs index aea4a595eb..4d8cbd27c4 100644 --- a/vortex-layout/src/layouts/struct_/mod.rs +++ b/vortex-layout/src/layouts/struct_/mod.rs @@ -12,7 +12,7 @@ use vortex_error::{vortex_bail, VortexResult}; use crate::data::Layout; use crate::reader::{LayoutReader, LayoutReaderExt}; -use crate::scan::ScanExecutor; +use crate::segments::AsyncSegmentReader; use crate::vtable::LayoutVTable; use crate::{LayoutId, COLUMNAR_LAYOUT_ID}; @@ -28,9 +28,9 @@ impl LayoutVTable for StructLayout { &self, layout: Layout, ctx: ContextRef, - executor: Arc, + segment_reader: Arc, ) -> VortexResult> { - Ok(StructReader::try_new(layout, ctx, executor)?.into_arc()) + Ok(StructReader::try_new(layout, ctx, segment_reader)?.into_arc()) } fn register_splits( diff --git a/vortex-layout/src/layouts/struct_/reader.rs b/vortex-layout/src/layouts/struct_/reader.rs index 9c1e30f247..a2597d2b1a 100644 --- a/vortex-layout/src/layouts/struct_/reader.rs +++ b/vortex-layout/src/layouts/struct_/reader.rs @@ -9,14 +9,14 @@ use vortex_expr::transform::partition::{partition, PartitionedExpr}; use vortex_expr::ExprRef; use crate::layouts::struct_::StructLayout; -use crate::scan::ScanExecutor; +use crate::segments::AsyncSegmentReader; use crate::{Layout, LayoutReader, LayoutReaderExt, LayoutVTable}; #[derive(Clone)] pub struct StructReader { layout: Layout, ctx: ContextRef, - executor: Arc, + segment_reader: Arc, field_readers: Arc<[OnceLock>]>, field_lookup: Option>, @@ -27,7 +27,7 @@ impl StructReader { pub(super) fn try_new( layout: Layout, ctx: ContextRef, - executor: Arc, + segment_reader: Arc, ) -> VortexResult { if layout.encoding().id() != StructLayout.id() { vortex_panic!("Mismatched layout ID") @@ -55,17 +55,13 @@ impl StructReader { Ok(Self { layout, ctx, - executor, + segment_reader, field_readers, field_lookup, partitioned_expr_cache: Arc::new(Default::default()), }) } - pub(crate) fn executor(&self) -> &ScanExecutor { - self.executor.as_ref() - } - /// Return the [`StructDType`] of this layout. pub(crate) fn struct_dtype(&self) -> &StructDType { self.dtype() @@ -87,7 +83,7 @@ impl StructReader { let child_layout = self.layout .child(idx, self.struct_dtype().field_by_index(idx)?, name)?; - child_layout.reader(self.executor.clone(), self.ctx.clone()) + child_layout.reader(self.segment_reader.clone(), self.ctx.clone()) }) } diff --git a/vortex-layout/src/scan/executor/mod.rs b/vortex-layout/src/scan/executor/mod.rs new file mode 100644 index 0000000000..800002cad3 --- /dev/null +++ b/vortex-layout/src/scan/executor/mod.rs @@ -0,0 +1,44 @@ +#[cfg(feature = "tokio")] +mod tokio; + +mod threads; + +use std::future::Future; + +use futures::future::BoxFuture; +pub use threads::*; +#[cfg(feature = "tokio")] +pub use tokio::*; +use vortex_error::VortexResult; + +pub trait Executor { + /// Spawns a future to run on a different runtime. + /// The runtime will make progress on the future without being directly polled, returning its output. + fn spawn(&self, f: F) -> BoxFuture<'static, VortexResult> + where + F: Future + Send + 'static, + ::Output: Send + 'static; +} + +/// Generic wrapper around different async runtimes. Used to spawn async tasks to run in the background, concurrently with other tasks. +#[derive(Clone)] +pub enum TaskExecutor { + Threads(ThreadsExecutor), + #[cfg(feature = "tokio")] + Tokio(TokioExecutor), +} + +#[async_trait::async_trait] +impl Executor for TaskExecutor { + fn spawn(&self, f: F) -> BoxFuture<'static, VortexResult> + where + F: Future + Send + 'static, + ::Output: Send + 'static, + { + match self { + TaskExecutor::Threads(threads_executor) => threads_executor.spawn(f), + #[cfg(feature = "tokio")] + TaskExecutor::Tokio(tokio_executor) => tokio_executor.spawn(f), + } + } +} diff --git a/vortex-layout/src/scan/executor/threads.rs b/vortex-layout/src/scan/executor/threads.rs new file mode 100644 index 0000000000..c7064277c5 --- /dev/null +++ b/vortex-layout/src/scan/executor/threads.rs @@ -0,0 +1,116 @@ +use std::future::Future; +use std::num::NonZeroUsize; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +use futures::channel::oneshot; +use futures::future::BoxFuture; +use futures::{FutureExt as _, TryFutureExt as _}; +use vortex_error::{vortex_err, VortexResult, VortexUnwrap}; + +use super::Executor; + +trait Task { + fn run(self: Box); +} + +struct ExecutorTask { + task: F, + result: oneshot::Sender, +} + +impl Task for ExecutorTask +where + F: Future + Send, + R: Send, +{ + fn run(self: Box) { + let Self { task, result } = *self; + futures::executor::block_on(async move { + let output = task.await; + _ = result.send(output); + }) + } +} + +/// Multithreaded task executor, runs tasks on a dedicated thread pool. +#[derive(Clone, Default)] +pub struct ThreadsExecutor { + inner: Arc, +} + +impl ThreadsExecutor { + pub fn new(num_threads: NonZeroUsize) -> Self { + Self { + inner: Arc::new(Inner::new(num_threads)), + } + } +} + +struct Inner { + submitter: flume::Sender>, + /// True as long as the runtime should be running + is_running: Arc, +} + +impl Default for Inner { + fn default() -> Self { + // Safety: + // 1 isn't 0 + Self::new(unsafe { NonZeroUsize::new_unchecked(1) }) + } +} + +impl Inner { + fn new(num_threads: NonZeroUsize) -> Self { + let (tx, rx) = flume::unbounded::>(); + let shutdown_signal = Arc::new(AtomicBool::new(true)); + (0..num_threads.get()).for_each(|_| { + let rx = rx.clone(); + let shutdown_signal = shutdown_signal.clone(); + std::thread::spawn(move || { + // The channel errors if all senders are dropped, which means we probably don't care about the task anymore, + // and we can break and let the thread end. + while shutdown_signal.load(Ordering::Relaxed) { + if let Ok(task) = rx.recv() { + task.run() + } else { + break; + } + } + }); + }); + + Self { + submitter: tx, + is_running: shutdown_signal, + } + } +} + +impl Executor for ThreadsExecutor { + fn spawn(&self, f: F) -> BoxFuture<'static, VortexResult> + where + F: Future + Send + 'static, + ::Output: Send + 'static, + { + let (tx, rx) = oneshot::channel(); + let task = Box::new(ExecutorTask { + task: f, + result: tx, + }); + self.inner + .submitter + .send(task) + .map_err(|e| vortex_err!("Failed to submit work to executor: {e}")) + .vortex_unwrap(); + + rx.map_err(|e| vortex_err!("Future canceled: {e}")).boxed() + } +} + +impl Drop for Inner { + fn drop(&mut self) { + self.is_running.store(false, Ordering::SeqCst); + } +} diff --git a/vortex-layout/src/scan/executor/tokio.rs b/vortex-layout/src/scan/executor/tokio.rs new file mode 100644 index 0000000000..97ab29729f --- /dev/null +++ b/vortex-layout/src/scan/executor/tokio.rs @@ -0,0 +1,29 @@ +use std::future::Future; + +use futures::future::BoxFuture; +use futures::{FutureExt, TryFutureExt}; +use tokio::runtime::Handle; +use vortex_error::{VortexError, VortexResult}; + +use super::Executor; + +/// Tokio-based async task executor, runs task on the provided runtime. +#[derive(Clone)] +pub struct TokioExecutor(Handle); + +impl TokioExecutor { + pub fn new(handle: Handle) -> Self { + Self(handle) + } +} + +#[async_trait::async_trait] +impl Executor for TokioExecutor { + fn spawn(&self, f: F) -> BoxFuture<'static, VortexResult> + where + F: Future + Send + 'static, + ::Output: Send + 'static, + { + self.0.spawn(f).map_err(VortexError::from).boxed() + } +} diff --git a/vortex-layout/src/scan/mod.rs b/vortex-layout/src/scan/mod.rs index 9c716f7cf1..abc9172130 100644 --- a/vortex-layout/src/scan/mod.rs +++ b/vortex-layout/src/scan/mod.rs @@ -1,31 +1,30 @@ use std::sync::Arc; -use futures::stream::BoxStream; -use futures::{stream, Stream}; +use executor::{Executor as _, TaskExecutor, ThreadsExecutor}; +use futures::{stream, Stream, StreamExt}; use itertools::Itertools; -use vortex_array::stream::{ArrayStream, ArrayStreamAdapter, ArrayStreamExt}; -use vortex_buffer::{Buffer, ByteBuffer}; -use vortex_expr::{ExprRef, Identity}; -pub(crate) mod filter; -mod split_by; -mod task; -pub mod unified; - -use futures::StreamExt; pub use split_by::*; -pub use task::*; +use vortex_array::arrow::{FromArrowArray as _, IntoArrowArray as _}; +use vortex_array::stream::{ArrayStream, ArrayStreamAdapter, ArrayStreamExt}; use vortex_array::{Array, ContextRef}; +use vortex_buffer::Buffer; use vortex_dtype::{DType, Field, FieldMask, FieldPath}; -use vortex_error::{vortex_err, VortexExpect, VortexResult}; +use vortex_error::{vortex_err, ResultExt, VortexExpect, VortexResult}; use vortex_expr::transform::immediate_access::immediate_scope_access; use vortex_expr::transform::simplify_typed::simplify_typed; +use vortex_expr::{ExprRef, Identity}; use vortex_mask::Mask; use crate::scan::filter::FilterExpr; use crate::scan::unified::UnifiedDriverStream; -use crate::segments::{AsyncSegmentReader, SegmentId}; +use crate::segments::AsyncSegmentReader; use crate::{ExprEvaluator, Layout, LayoutReader, LayoutReaderExt, RowMask}; +pub mod executor; +pub(crate) mod filter; +mod split_by; +pub mod unified; + pub trait ScanDriver: 'static + Sized { fn segment_reader(&self) -> Arc; @@ -44,7 +43,7 @@ pub trait ScanDriver: 'static + Sized { /// A struct for building a scan operation. pub struct ScanBuilder { driver: D, - task_executor: Option>, + task_executor: Option, layout: Layout, ctx: ContextRef, // TODO(ngates): store this on larger context on Layout projection: ExprRef, @@ -108,13 +107,14 @@ impl ScanBuilder { self } - /// The number of row splits to make progress on concurrently. + /// The number of row splits to make progress on concurrently, must be greater than 0. pub fn with_concurrency(mut self, concurrency: usize) -> Self { + assert!(concurrency > 0); self.concurrency = concurrency; self } - pub fn with_task_executor(mut self, task_executor: Arc) -> Self { + pub fn with_task_executor(mut self, task_executor: TaskExecutor) -> Self { self.task_executor = Some(task_executor); self } @@ -181,7 +181,7 @@ impl ScanBuilder { driver: self.driver, task_executor: self .task_executor - .unwrap_or_else(|| Arc::new(InlineTaskExecutor)), + .unwrap_or(TaskExecutor::Threads(ThreadsExecutor::default())), layout: self.layout, ctx: self.ctx, projection, @@ -202,32 +202,9 @@ impl ScanBuilder { } } -pub struct ScanExecutor { - segment_reader: Arc, - task_executor: Arc, -} - -impl ScanExecutor { - /// Mostly used for testing, this creates a ScanExecutor with an inline task executor. - pub fn inline(segments: Arc) -> Arc { - Arc::new(Self { - segment_reader: segments, - task_executor: Arc::new(InlineTaskExecutor), - }) - } - - pub async fn get_segment(&self, id: SegmentId) -> VortexResult { - self.segment_reader.get(id).await - } - - pub async fn evaluate(&self, array: &Array, tasks: &[ScanTask]) -> VortexResult { - self.task_executor.execute(array, tasks).await - } -} - pub struct Scan { driver: D, - task_executor: Arc, + task_executor: TaskExecutor, layout: Layout, ctx: ContextRef, // Guaranteed to be simplified @@ -236,6 +213,7 @@ pub struct Scan { filter: Option, row_masks: Vec, canonicalize: bool, + //TODO(adam): bake this into the executors? concurrency: usize, } @@ -246,21 +224,16 @@ impl Scan { /// frequent polling to make progress. pub fn into_array_stream(self) -> VortexResult { // Create a single LayoutReader that is reused for the entire scan. - let executor = Arc::new(ScanExecutor { - segment_reader: self.driver.segment_reader(), - task_executor: self.task_executor.clone(), - }); - let reader: Arc = - self.layout.reader(executor.clone(), self.ctx.clone())?; - - // We start with a stream of row masks - let row_masks = stream::iter(self.row_masks); - - // If we have a filter expression, we set up a filter stream - let row_masks: BoxStream<'static, VortexResult> = - if let Some(filter) = self.filter.clone() { - let reader = reader.clone(); + let segment_reader = self.driver.segment_reader(); + let task_executor = self.task_executor.clone(); + let reader: Arc = self + .layout + .reader(segment_reader.clone(), self.ctx.clone())?; + let pruning = self + .filter + .as_ref() + .map(|filter| { let pruning = Arc::new(FilterExpr::try_new( reader .dtype() @@ -272,57 +245,60 @@ impl Scan { filter.clone(), )?); - row_masks - .map(move |row_mask| { - let reader = reader.clone(); - let filter = filter.clone(); - let pruning = pruning.clone(); - - log::debug!( - "Evaluating filter {} for row mask {}..{} {}", - &filter, - row_mask.begin(), - row_mask.end(), - row_mask.filter_mask().density() - ); - async move { pruning.new_evaluation(&row_mask).evaluate(reader).await } - }) - // Instead of buffering, we should be smarter where we poll the stream until - // the I/O queue has ~256MB of requests in it. Our working set size. - // We then return Pending and the I/O thread is free to spawn the requests. - .buffered(self.concurrency) - .filter_map(|r| async move { - r.map(|r| (!r.filter_mask().all_false()).then_some(r)) - .transpose() - }) - .boxed() - } else { - row_masks.map(Ok).boxed() - }; - - // Setup a projection stream - let reader = reader.clone(); + VortexResult::Ok(pruning) + }) + .transpose()?; + + // We start with a stream of row masks + let row_masks = stream::iter(self.row_masks); let projection = self.projection.clone(); - let array_stream = row_masks + + let exec_stream = row_masks .map(move |row_mask| { let reader = reader.clone(); let projection = projection.clone(); - let executor = executor.clone(); + let pruning = pruning.clone(); + let reader = reader.clone(); + + // This future is the processing task async move { - let row_mask = row_mask?; - let mut array = reader.evaluate_expr(row_mask, projection).await?; - if self.canonicalize { - array = executor.evaluate(&array, &[ScanTask::Canonicalize]).await?; + let row_mask = match pruning { + None => row_mask, + Some(pruning_filter) => { + pruning_filter + .new_evaluation(&row_mask) + .evaluate(reader.clone()) + .await? + } + }; + + // Filter out all-false masks + if row_mask.filter_mask().all_false() { + Ok(None) + } else { + let mut array = reader.evaluate_expr(row_mask, projection).await?; + if self.canonicalize { + // TODO(ngates): replace this with into_canonical. We want a fully recursive + // canonicalize here, so we pretend by converting via Arrow. + let is_nullable = array.dtype().is_nullable(); + array = Array::from_arrow( + array.clone().into_arrow_preferred()?, + is_nullable, + ); + } + + VortexResult::Ok(Some(array)) } - Ok(array) } }) - .buffered(self.concurrency); + .map(move |processing_task| task_executor.spawn(processing_task)) + .buffered(self.concurrency) + .filter_map(|v| async move { v.unnest().transpose() }); let io_stream = self.driver.io_stream(); let unified = UnifiedDriverStream { - exec_stream: array_stream, + exec_stream, io_stream, }; diff --git a/vortex-layout/src/scan/task.rs b/vortex-layout/src/scan/task.rs deleted file mode 100644 index 8217522086..0000000000 --- a/vortex-layout/src/scan/task.rs +++ /dev/null @@ -1,56 +0,0 @@ -use std::ops::Range; - -use async_trait::async_trait; -use vortex_array::arrow::{FromArrowArray, IntoArrowArray}; -use vortex_array::compute::{filter, slice}; -use vortex_array::Array; -use vortex_error::VortexResult; -use vortex_expr::ExprRef; -use vortex_mask::Mask; - -/// A blocking task that can be spawned by a [`crate::LayoutReader`]. -#[derive(Debug, Clone)] -pub enum ScanTask { - Filter(Mask), - Expr(ExprRef), - Slice(Range), - Canonicalize, -} - -impl ScanTask { - pub fn execute(&self, array: &Array) -> VortexResult { - match self { - ScanTask::Filter(mask) => filter(array, mask), - ScanTask::Expr(expr) => expr.evaluate(array), - ScanTask::Slice(range) => slice(array, range.start, range.end), - ScanTask::Canonicalize => { - // TODO(ngates): replace this with into_canonical. We want a fully recursive - // canonicalize here, so we pretend by converting via Arrow. - let is_nullable = array.dtype().is_nullable(); - Ok(Array::from_arrow( - array.clone().into_arrow_preferred()?, - is_nullable, - )) - } - } - } -} - -/// A trait used to spawn and execute blocking tasks. -#[async_trait] -pub trait TaskExecutor: 'static + Send + Sync { - async fn execute(&self, array: &Array, tasks: &[ScanTask]) -> VortexResult; -} - -pub struct InlineTaskExecutor; - -#[async_trait] -impl TaskExecutor for InlineTaskExecutor { - async fn execute(&self, array: &Array, tasks: &[ScanTask]) -> VortexResult { - let mut array = array.clone(); - for task in tasks { - array = task.execute(&array)?; - } - Ok(array) - } -} diff --git a/vortex-layout/src/vtable.rs b/vortex-layout/src/vtable.rs index 66a128ffd4..f6449749bd 100644 --- a/vortex-layout/src/vtable.rs +++ b/vortex-layout/src/vtable.rs @@ -7,7 +7,7 @@ use vortex_array::ContextRef; use vortex_dtype::FieldMask; use vortex_error::VortexResult; -use crate::scan::ScanExecutor; +use crate::segments::AsyncSegmentReader; use crate::{Layout, LayoutId, LayoutReader}; /// A reference to a layout VTable, either static or arc'd. @@ -52,7 +52,7 @@ pub trait LayoutVTable: Debug + Send + Sync { &self, layout: Layout, ctx: ContextRef, - executor: Arc, + segment_reader: Arc, ) -> VortexResult>; /// Register the row splits for this layout, these represent natural boundaries at which