From 308d80412f92ff41f7341ff73a998a9bf54bdc1f Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Thu, 13 Feb 2025 15:14:31 +0000 Subject: [PATCH 1/4] . --- Cargo.lock | 4 +++- vortex-error/Cargo.toml | 1 + vortex-error/src/lib.rs | 8 ++++++++ vortex-layout/Cargo.toml | 3 ++- vortex-layout/src/scan/mod.rs | 18 ++++++++++++------ 5 files changed, 26 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 53750c867e..e9396d92cb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -5371,6 +5371,7 @@ dependencies = [ "pyo3", "rancor", "thiserror 2.0.11", + "tokio", "url", "worker", ] @@ -5542,6 +5543,7 @@ dependencies = [ "log", "pin-project-lite", "rstest", + "tokio", "vortex-array", "vortex-buffer", "vortex-dtype", diff --git a/vortex-error/Cargo.toml b/vortex-error/Cargo.toml index 9f2fbfbe87..68809e088e 100644 --- a/vortex-error/Cargo.toml +++ b/vortex-error/Cargo.toml @@ -35,6 +35,7 @@ rancor = { workspace = true, optional = true } thiserror = { workspace = true } url = { workspace = true } worker = { workspace = true, optional = true } +tokio = { workspace = true, optional = true, features = ["rt"] } [lints] workspace = true diff --git a/vortex-error/src/lib.rs b/vortex-error/src/lib.rs index 5e67aa7cef..9e0e36a923 100644 --- a/vortex-error/src/lib.rs +++ b/vortex-error/src/lib.rs @@ -212,6 +212,14 @@ pub enum VortexError { #[backtrace] TryFromIntError, ), + /// A wrapper for Tokio join error. + #[cfg(feature = "tokio")] + #[error(transparent)] + JoinError( + #[from] + #[backtrace] + tokio::task::JoinError, + ), } impl VortexError { diff --git a/vortex-layout/Cargo.toml b/vortex-layout/Cargo.toml index 3fa2b55dd7..7e702eb80a 100644 --- a/vortex-layout/Cargo.toml +++ b/vortex-layout/Cargo.toml @@ -28,11 +28,12 @@ pin-project-lite = { workspace = true } vortex-array = { workspace = true } vortex-buffer = { workspace = true } vortex-dtype = { workspace = true } -vortex-error = { workspace = true } +vortex-error = { workspace = true, features = ["tokio"] } vortex-expr = { workspace = true } vortex-flatbuffers = { workspace = true, features = ["layout"] } vortex-mask = { workspace = true } vortex-scalar = { workspace = true } +tokio = { workspace = true } [dev-dependencies] futures = { workspace = true, features = ["executor"] } diff --git a/vortex-layout/src/scan/mod.rs b/vortex-layout/src/scan/mod.rs index 9c716f7cf1..eed7546232 100644 --- a/vortex-layout/src/scan/mod.rs +++ b/vortex-layout/src/scan/mod.rs @@ -285,15 +285,20 @@ impl Scan { row_mask.end(), row_mask.filter_mask().density() ); - async move { pruning.new_evaluation(&row_mask).evaluate(reader).await } + tokio::task::spawn(async move { + pruning.new_evaluation(&row_mask).evaluate(reader).await + }) }) // Instead of buffering, we should be smarter where we poll the stream until // the I/O queue has ~256MB of requests in it. Our working set size. // We then return Pending and the I/O thread is free to spawn the requests. .buffered(self.concurrency) .filter_map(|r| async move { - r.map(|r| (!r.filter_mask().all_false()).then_some(r)) - .transpose() + match r { + Ok(Ok(r)) => (!r.filter_mask().all_false()).then_some(Ok(r)), + Ok(Err(e)) => Some(Err(e)), + Err(e) => Some(Err(e.into())), + } }) .boxed() } else { @@ -308,16 +313,17 @@ impl Scan { let reader = reader.clone(); let projection = projection.clone(); let executor = executor.clone(); - async move { + tokio::task::spawn(async move { let row_mask = row_mask?; let mut array = reader.evaluate_expr(row_mask, projection).await?; if self.canonicalize { array = executor.evaluate(&array, &[ScanTask::Canonicalize]).await?; } Ok(array) - } + }) }) - .buffered(self.concurrency); + .buffered(self.concurrency) + .map(|v| v?); let io_stream = self.driver.io_stream(); From 6978c28ba91709afcaa4f4ade2eccdec4430b470 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Thu, 13 Feb 2025 16:53:50 +0000 Subject: [PATCH 2/4] . --- vortex-layout/Cargo.toml | 4 +- vortex-layout/src/layouts/flat/eval_expr.rs | 2 +- .../src/layouts/struct_/eval_expr.rs | 1 - vortex-layout/src/scan/executor/mod.rs | 77 +++++++++++++++++++ vortex-layout/src/scan/executor/tokio.rs | 33 ++++++++ vortex-layout/src/scan/mod.rs | 65 +++++++++++----- vortex-layout/src/scan/task.rs | 8 +- 7 files changed, 162 insertions(+), 28 deletions(-) create mode 100644 vortex-layout/src/scan/executor/mod.rs create mode 100644 vortex-layout/src/scan/executor/tokio.rs diff --git a/vortex-layout/Cargo.toml b/vortex-layout/Cargo.toml index 7e702eb80a..fe5daf0057 100644 --- a/vortex-layout/Cargo.toml +++ b/vortex-layout/Cargo.toml @@ -21,7 +21,7 @@ bit-vec = { workspace = true } bytes = { workspace = true } exponential-decay-histogram = { workspace = true } flatbuffers = { workspace = true } -futures = { workspace = true, features = ["alloc"] } +futures = { workspace = true, features = ["alloc", "executor"] } itertools = { workspace = true } log = { workspace = true } pin-project-lite = { workspace = true } @@ -33,7 +33,7 @@ vortex-expr = { workspace = true } vortex-flatbuffers = { workspace = true, features = ["layout"] } vortex-mask = { workspace = true } vortex-scalar = { workspace = true } -tokio = { workspace = true } +tokio = { workspace = true, optional = true } [dev-dependencies] futures = { workspace = true, features = ["executor"] } diff --git a/vortex-layout/src/layouts/flat/eval_expr.rs b/vortex-layout/src/layouts/flat/eval_expr.rs index e1c00eaa8a..66803718f4 100644 --- a/vortex-layout/src/layouts/flat/eval_expr.rs +++ b/vortex-layout/src/layouts/flat/eval_expr.rs @@ -35,7 +35,7 @@ impl ExprEvaluator for FlatReader { tasks.push(ScanTask::Expr(expr)); } - self.executor().evaluate(&array, &tasks).await + self.executor().evaluate(&array, &tasks) } async fn prune_mask(&self, row_mask: RowMask, _expr: ExprRef) -> VortexResult { diff --git a/vortex-layout/src/layouts/struct_/eval_expr.rs b/vortex-layout/src/layouts/struct_/eval_expr.rs index 605a336de3..32636f7f6e 100644 --- a/vortex-layout/src/layouts/struct_/eval_expr.rs +++ b/vortex-layout/src/layouts/struct_/eval_expr.rs @@ -54,7 +54,6 @@ impl ExprEvaluator for StructReader { self.executor() .evaluate(&root_scope, &[ScanTask::Expr(partitioned.root.clone())]) - .await } async fn prune_mask(&self, row_mask: RowMask, expr: ExprRef) -> VortexResult { diff --git a/vortex-layout/src/scan/executor/mod.rs b/vortex-layout/src/scan/executor/mod.rs new file mode 100644 index 0000000000..32740f6032 --- /dev/null +++ b/vortex-layout/src/scan/executor/mod.rs @@ -0,0 +1,77 @@ +#[cfg(feature = "tokio")] +mod tokio; + +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use futures::channel::oneshot; +use futures::FutureExt as _; +#[cfg(feature = "tokio")] +pub use tokio::*; +use vortex_error::{vortex_err, VortexResult}; + +pub struct JoinHandle { + inner: oneshot::Receiver, +} + +impl Future for JoinHandle { + type Output = VortexResult; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.inner.poll_unpin(cx) { + Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)), + Poll::Ready(Err(_)) => Poll::Ready(Err(vortex_err!("Task was canceled"))), + Poll::Pending => Poll::Pending, + } + } +} + +pub trait Spawn { + fn spawn(&self, f: F) -> JoinHandle + where + F: Future + Send + 'static, + ::Output: Send + 'static; +} + +#[derive(Default, Clone)] +pub struct InlineExecutor; + +#[async_trait::async_trait] +impl Spawn for InlineExecutor { + fn spawn(&self, f: F) -> JoinHandle + where + F: Future + Send + 'static, + ::Output: Send + 'static, + { + let (tx, rx) = oneshot::channel(); + // This is very hacky and probably not a great idea, but I don't have a much better idea on how to have a sane default here. + futures::executor::block_on(async move { + _ = tx.send(f.await); + }); + + JoinHandle { inner: rx } + } +} + +#[derive(Clone)] +pub enum Executor { + Inline(InlineExecutor), + #[cfg(feature = "tokio")] + Tokio(TokioExecutor), +} + +#[async_trait::async_trait] +impl Spawn for Executor { + fn spawn(&self, f: F) -> JoinHandle + where + F: Future + Send + 'static, + ::Output: Send + 'static, + { + match self { + Executor::Inline(inline_executor) => inline_executor.spawn(f), + #[cfg(feature = "tokio")] + Executor::Tokio(tokio_executor) => tokio_executor.spawn(f), + } + } +} diff --git a/vortex-layout/src/scan/executor/tokio.rs b/vortex-layout/src/scan/executor/tokio.rs new file mode 100644 index 0000000000..4f268ff0fa --- /dev/null +++ b/vortex-layout/src/scan/executor/tokio.rs @@ -0,0 +1,33 @@ +use std::future::Future; + +use futures::channel::oneshot; +use tokio::runtime::Handle; + +use super::{JoinHandle, Spawn}; + +#[derive(Clone)] +pub struct TokioExecutor(Handle); + +impl TokioExecutor { + pub fn new(handle: Handle) -> Self { + Self(handle) + } +} + +#[async_trait::async_trait] +impl Spawn for TokioExecutor { + fn spawn(&self, f: F) -> JoinHandle + where + F: Future + Send + 'static, + ::Output: Send + 'static, + { + let (tx, rx) = oneshot::channel(); + + self.0.spawn(async move { + let v = f.await; + _ = tx.send(v); + }); + + JoinHandle { inner: rx } + } +} diff --git a/vortex-layout/src/scan/mod.rs b/vortex-layout/src/scan/mod.rs index eed7546232..0a96b0f715 100644 --- a/vortex-layout/src/scan/mod.rs +++ b/vortex-layout/src/scan/mod.rs @@ -1,11 +1,13 @@ use std::sync::Arc; +use executor::{Executor, InlineExecutor, Spawn}; use futures::stream::BoxStream; use futures::{stream, Stream}; use itertools::Itertools; use vortex_array::stream::{ArrayStream, ArrayStreamAdapter, ArrayStreamExt}; use vortex_buffer::{Buffer, ByteBuffer}; use vortex_expr::{ExprRef, Identity}; +pub mod executor; pub(crate) mod filter; mod split_by; mod task; @@ -45,6 +47,7 @@ pub trait ScanDriver: 'static + Sized { pub struct ScanBuilder { driver: D, task_executor: Option>, + executor: Option, layout: Layout, ctx: ContextRef, // TODO(ngates): store this on larger context on Layout projection: ExprRef, @@ -61,6 +64,7 @@ impl ScanBuilder { Self { driver, task_executor: None, + executor: None, layout, ctx, projection: Identity::new_expr(), @@ -119,6 +123,11 @@ impl ScanBuilder { self } + pub fn with_executor(mut self, executor: Executor) -> Self { + self.executor = Some(executor); + self + } + pub fn build(self) -> VortexResult> { let projection = simplify_typed(self.projection, self.layout.dtype())?; let filter = self @@ -182,6 +191,7 @@ impl ScanBuilder { task_executor: self .task_executor .unwrap_or_else(|| Arc::new(InlineTaskExecutor)), + executor: self.executor.unwrap_or(Executor::Inline(InlineExecutor)), layout: self.layout, ctx: self.ctx, projection, @@ -220,14 +230,15 @@ impl ScanExecutor { self.segment_reader.get(id).await } - pub async fn evaluate(&self, array: &Array, tasks: &[ScanTask]) -> VortexResult { - self.task_executor.execute(array, tasks).await + pub fn evaluate(&self, array: &Array, tasks: &[ScanTask]) -> VortexResult { + self.task_executor.execute(array, tasks) } } pub struct Scan { driver: D, task_executor: Arc, + executor: Executor, layout: Layout, ctx: ContextRef, // Guaranteed to be simplified @@ -246,12 +257,14 @@ impl Scan { /// frequent polling to make progress. pub fn into_array_stream(self) -> VortexResult { // Create a single LayoutReader that is reused for the entire scan. - let executor = Arc::new(ScanExecutor { + let scan_executor = Arc::new(ScanExecutor { segment_reader: self.driver.segment_reader(), task_executor: self.task_executor.clone(), }); - let reader: Arc = - self.layout.reader(executor.clone(), self.ctx.clone())?; + let executor = self.executor.clone(); + let reader: Arc = self + .layout + .reader(scan_executor.clone(), self.ctx.clone())?; // We start with a stream of row masks let row_masks = stream::iter(self.row_masks); @@ -260,6 +273,7 @@ impl Scan { let row_masks: BoxStream<'static, VortexResult> = if let Some(filter) = self.filter.clone() { let reader = reader.clone(); + let executor = executor.clone(); let pruning = Arc::new(FilterExpr::try_new( reader @@ -277,6 +291,7 @@ impl Scan { let reader = reader.clone(); let filter = filter.clone(); let pruning = pruning.clone(); + let executor = executor.clone(); log::debug!( "Evaluating filter {} for row mask {}..{} {}", @@ -285,9 +300,13 @@ impl Scan { row_mask.end(), row_mask.filter_mask().density() ); - tokio::task::spawn(async move { - pruning.new_evaluation(&row_mask).evaluate(reader).await - }) + async move { + executor + .spawn(async move { + pruning.new_evaluation(&row_mask).evaluate(reader).await + }) + .await + } }) // Instead of buffering, we should be smarter where we poll the stream until // the I/O queue has ~256MB of requests in it. Our working set size. @@ -296,8 +315,7 @@ impl Scan { .filter_map(|r| async move { match r { Ok(Ok(r)) => (!r.filter_mask().all_false()).then_some(Ok(r)), - Ok(Err(e)) => Some(Err(e)), - Err(e) => Some(Err(e.into())), + Ok(Err(e)) | Err(e) => Some(Err(e)), } }) .boxed() @@ -312,18 +330,29 @@ impl Scan { .map(move |row_mask| { let reader = reader.clone(); let projection = projection.clone(); + let scan_executor = scan_executor.clone(); let executor = executor.clone(); - tokio::task::spawn(async move { - let row_mask = row_mask?; - let mut array = reader.evaluate_expr(row_mask, projection).await?; - if self.canonicalize { - array = executor.evaluate(&array, &[ScanTask::Canonicalize]).await?; + async move { + let r = executor + .spawn(async move { + let row_mask = row_mask?; + let mut array = reader.evaluate_expr(row_mask, projection).await?; + if self.canonicalize { + array = + scan_executor.evaluate(&array, &[ScanTask::Canonicalize])?; + } + Ok(array) + }) + .await; + + match r { + Ok(Ok(r)) => Ok(r), + Ok(Err(e)) | Err(e) => Err(e), } - Ok(array) - }) + } }) .buffered(self.concurrency) - .map(|v| v?); + .map(|v| v); let io_stream = self.driver.io_stream(); diff --git a/vortex-layout/src/scan/task.rs b/vortex-layout/src/scan/task.rs index 8217522086..8554b7e6a7 100644 --- a/vortex-layout/src/scan/task.rs +++ b/vortex-layout/src/scan/task.rs @@ -1,6 +1,5 @@ use std::ops::Range; -use async_trait::async_trait; use vortex_array::arrow::{FromArrowArray, IntoArrowArray}; use vortex_array::compute::{filter, slice}; use vortex_array::Array; @@ -36,17 +35,14 @@ impl ScanTask { } } -/// A trait used to spawn and execute blocking tasks. -#[async_trait] pub trait TaskExecutor: 'static + Send + Sync { - async fn execute(&self, array: &Array, tasks: &[ScanTask]) -> VortexResult; + fn execute(&self, array: &Array, tasks: &[ScanTask]) -> VortexResult; } pub struct InlineTaskExecutor; -#[async_trait] impl TaskExecutor for InlineTaskExecutor { - async fn execute(&self, array: &Array, tasks: &[ScanTask]) -> VortexResult { + fn execute(&self, array: &Array, tasks: &[ScanTask]) -> VortexResult { let mut array = array.clone(); for task in tasks { array = task.execute(&array)?; From bea1dbc0e7bf886f38276ce017373d974def6b17 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Thu, 13 Feb 2025 16:54:59 +0000 Subject: [PATCH 3/4] . --- vortex-layout/src/scan/mod.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/vortex-layout/src/scan/mod.rs b/vortex-layout/src/scan/mod.rs index 0a96b0f715..94aa9ef82a 100644 --- a/vortex-layout/src/scan/mod.rs +++ b/vortex-layout/src/scan/mod.rs @@ -351,8 +351,7 @@ impl Scan { } } }) - .buffered(self.concurrency) - .map(|v| v); + .buffered(self.concurrency); let io_stream = self.driver.io_stream(); From 29e1da23d6d00e092d5d35ac565c181ccb1014f2 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Thu, 13 Feb 2025 16:57:09 +0000 Subject: [PATCH 4/4] enable feature --- vortex-datafusion/Cargo.toml | 2 +- vortex-datafusion/src/persistent/opener.rs | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/vortex-datafusion/Cargo.toml b/vortex-datafusion/Cargo.toml index 15cf066482..a7dcac4328 100644 --- a/vortex-datafusion/Cargo.toml +++ b/vortex-datafusion/Cargo.toml @@ -47,7 +47,7 @@ vortex-error = { workspace = true, features = ["datafusion"] } vortex-expr = { workspace = true, features = ["datafusion"] } vortex-file = { workspace = true, features = ["object_store", "tokio"] } vortex-io = { workspace = true, features = ["object_store", "tokio"] } -vortex-layout = { workspace = true } +vortex-layout = { workspace = true, features = ["tokio"] } [features] tracing = ["dep:tracing", "dep:tracing-futures", "vortex-io/tracing"] diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 38c45d0f3b..da299deba3 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -5,9 +5,11 @@ use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener use datafusion_common::Result as DFResult; use futures::{FutureExt as _, StreamExt}; use object_store::{ObjectStore, ObjectStoreScheme}; +use tokio::runtime::Handle; use vortex_array::{ContextRef, IntoArrayVariant}; use vortex_error::VortexResult; use vortex_expr::{ExprRef, VortexExpr}; +use vortex_file::executor::{Executor, TokioExecutor}; use vortex_file::{SplitBy, VortexOpenOptions}; use vortex_io::ObjectStoreReadAt; @@ -65,6 +67,7 @@ impl FileOpener for VortexFileOpener { let object_store = self.object_store.clone(); let projected_arrow_schema = self.projected_arrow_schema.clone(); let batch_size = self.batch_size; + let executor = Executor::Tokio(TokioExecutor::new(Handle::current())); Ok(async move { let vxf = VortexOpenOptions::file(read_at) @@ -86,6 +89,7 @@ impl FileOpener for VortexFileOpener { // but at the moment our scanner has too much overhead to process small // batches efficiently. .with_split_by(SplitBy::RowCount(8 * batch_size)) + .with_executor(executor) .into_array_stream()? .map(move |array| { let st = array?.into_struct()?;