Skip to content

Commit

Permalink
Merge 29e1da2 into 978ae15
Browse files Browse the repository at this point in the history
  • Loading branch information
AdamGS authored Feb 13, 2025
2 parents 978ae15 + 29e1da2 commit 206ae5b
Show file tree
Hide file tree
Showing 12 changed files with 179 additions and 24 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vortex-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ vortex-error = { workspace = true, features = ["datafusion"] }
vortex-expr = { workspace = true, features = ["datafusion"] }
vortex-file = { workspace = true, features = ["object_store", "tokio"] }
vortex-io = { workspace = true, features = ["object_store", "tokio"] }
vortex-layout = { workspace = true }
vortex-layout = { workspace = true, features = ["tokio"] }
[features]
tracing = ["dep:tracing", "dep:tracing-futures", "vortex-io/tracing"]

Expand Down
4 changes: 4 additions & 0 deletions vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener
use datafusion_common::Result as DFResult;
use futures::{FutureExt as _, StreamExt};
use object_store::{ObjectStore, ObjectStoreScheme};
use tokio::runtime::Handle;
use vortex_array::{ContextRef, IntoArrayVariant};
use vortex_error::VortexResult;
use vortex_expr::{ExprRef, VortexExpr};
use vortex_file::executor::{Executor, TokioExecutor};
use vortex_file::{SplitBy, VortexOpenOptions};
use vortex_io::ObjectStoreReadAt;

Expand Down Expand Up @@ -65,6 +67,7 @@ impl FileOpener for VortexFileOpener {
let object_store = self.object_store.clone();
let projected_arrow_schema = self.projected_arrow_schema.clone();
let batch_size = self.batch_size;
let executor = Executor::Tokio(TokioExecutor::new(Handle::current()));

Ok(async move {
let vxf = VortexOpenOptions::file(read_at)
Expand All @@ -86,6 +89,7 @@ impl FileOpener for VortexFileOpener {
// but at the moment our scanner has too much overhead to process small
// batches efficiently.
.with_split_by(SplitBy::RowCount(8 * batch_size))
.with_executor(executor)
.into_array_stream()?
.map(move |array| {
let st = array?.into_struct()?;
Expand Down
1 change: 1 addition & 0 deletions vortex-error/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ rancor = { workspace = true, optional = true }
thiserror = { workspace = true }
url = { workspace = true }
worker = { workspace = true, optional = true }
tokio = { workspace = true, optional = true, features = ["rt"] }

[lints]
workspace = true
8 changes: 8 additions & 0 deletions vortex-error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,14 @@ pub enum VortexError {
#[backtrace]
TryFromIntError,
),
/// A wrapper for Tokio join error.
#[cfg(feature = "tokio")]
#[error(transparent)]
JoinError(
#[from]
#[backtrace]
tokio::task::JoinError,
),
}

impl VortexError {
Expand Down
5 changes: 3 additions & 2 deletions vortex-layout/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,19 @@ bit-vec = { workspace = true }
bytes = { workspace = true }
exponential-decay-histogram = { workspace = true }
flatbuffers = { workspace = true }
futures = { workspace = true, features = ["alloc"] }
futures = { workspace = true, features = ["alloc", "executor"] }
itertools = { workspace = true }
log = { workspace = true }
pin-project-lite = { workspace = true }
vortex-array = { workspace = true }
vortex-buffer = { workspace = true }
vortex-dtype = { workspace = true }
vortex-error = { workspace = true }
vortex-error = { workspace = true, features = ["tokio"] }
vortex-expr = { workspace = true }
vortex-flatbuffers = { workspace = true, features = ["layout"] }
vortex-mask = { workspace = true }
vortex-scalar = { workspace = true }
tokio = { workspace = true, optional = true }

[dev-dependencies]
futures = { workspace = true, features = ["executor"] }
Expand Down
2 changes: 1 addition & 1 deletion vortex-layout/src/layouts/flat/eval_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl ExprEvaluator for FlatReader {
tasks.push(ScanTask::Expr(expr));
}

self.executor().evaluate(&array, &tasks).await
self.executor().evaluate(&array, &tasks)
}

async fn prune_mask(&self, row_mask: RowMask, _expr: ExprRef) -> VortexResult<RowMask> {
Expand Down
1 change: 0 additions & 1 deletion vortex-layout/src/layouts/struct_/eval_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ impl ExprEvaluator for StructReader {

self.executor()
.evaluate(&root_scope, &[ScanTask::Expr(partitioned.root.clone())])
.await
}

async fn prune_mask(&self, row_mask: RowMask, expr: ExprRef) -> VortexResult<RowMask> {
Expand Down
77 changes: 77 additions & 0 deletions vortex-layout/src/scan/executor/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#[cfg(feature = "tokio")]
mod tokio;

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

use futures::channel::oneshot;
use futures::FutureExt as _;
#[cfg(feature = "tokio")]
pub use tokio::*;
use vortex_error::{vortex_err, VortexResult};

pub struct JoinHandle<T> {
inner: oneshot::Receiver<T>,
}

impl<T> Future for JoinHandle<T> {
type Output = VortexResult<T>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.inner.poll_unpin(cx) {
Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)),
Poll::Ready(Err(_)) => Poll::Ready(Err(vortex_err!("Task was canceled"))),
Poll::Pending => Poll::Pending,
}
}
}

pub trait Spawn {
fn spawn<F>(&self, f: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
<F as Future>::Output: Send + 'static;
}

#[derive(Default, Clone)]
pub struct InlineExecutor;

#[async_trait::async_trait]
impl Spawn for InlineExecutor {
fn spawn<F>(&self, f: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
<F as Future>::Output: Send + 'static,
{
let (tx, rx) = oneshot::channel();
// This is very hacky and probably not a great idea, but I don't have a much better idea on how to have a sane default here.
futures::executor::block_on(async move {
_ = tx.send(f.await);
});

JoinHandle { inner: rx }
}
}

#[derive(Clone)]
pub enum Executor {
Inline(InlineExecutor),
#[cfg(feature = "tokio")]
Tokio(TokioExecutor),
}

#[async_trait::async_trait]
impl Spawn for Executor {
fn spawn<F>(&self, f: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
<F as Future>::Output: Send + 'static,
{
match self {
Executor::Inline(inline_executor) => inline_executor.spawn(f),
#[cfg(feature = "tokio")]
Executor::Tokio(tokio_executor) => tokio_executor.spawn(f),
}
}
}
33 changes: 33 additions & 0 deletions vortex-layout/src/scan/executor/tokio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use std::future::Future;

use futures::channel::oneshot;
use tokio::runtime::Handle;

use super::{JoinHandle, Spawn};

#[derive(Clone)]
pub struct TokioExecutor(Handle);

impl TokioExecutor {
pub fn new(handle: Handle) -> Self {
Self(handle)
}
}

#[async_trait::async_trait]
impl Spawn for TokioExecutor {
fn spawn<F>(&self, f: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
<F as Future>::Output: Send + 'static,
{
let (tx, rx) = oneshot::channel();

self.0.spawn(async move {
let v = f.await;
_ = tx.send(v);
});

JoinHandle { inner: rx }
}
}
60 changes: 47 additions & 13 deletions vortex-layout/src/scan/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::sync::Arc;

use executor::{Executor, InlineExecutor, Spawn};
use futures::stream::BoxStream;
use futures::{stream, Stream};
use itertools::Itertools;
use vortex_array::stream::{ArrayStream, ArrayStreamAdapter, ArrayStreamExt};
use vortex_buffer::{Buffer, ByteBuffer};
use vortex_expr::{ExprRef, Identity};
pub mod executor;
pub(crate) mod filter;
mod split_by;
mod task;
Expand Down Expand Up @@ -45,6 +47,7 @@ pub trait ScanDriver: 'static + Sized {
pub struct ScanBuilder<D: ScanDriver> {
driver: D,
task_executor: Option<Arc<dyn TaskExecutor>>,
executor: Option<Executor>,
layout: Layout,
ctx: ContextRef, // TODO(ngates): store this on larger context on Layout
projection: ExprRef,
Expand All @@ -61,6 +64,7 @@ impl<D: ScanDriver> ScanBuilder<D> {
Self {
driver,
task_executor: None,
executor: None,
layout,
ctx,
projection: Identity::new_expr(),
Expand Down Expand Up @@ -119,6 +123,11 @@ impl<D: ScanDriver> ScanBuilder<D> {
self
}

pub fn with_executor(mut self, executor: Executor) -> Self {
self.executor = Some(executor);
self
}

pub fn build(self) -> VortexResult<Scan<D>> {
let projection = simplify_typed(self.projection, self.layout.dtype())?;
let filter = self
Expand Down Expand Up @@ -182,6 +191,7 @@ impl<D: ScanDriver> ScanBuilder<D> {
task_executor: self
.task_executor
.unwrap_or_else(|| Arc::new(InlineTaskExecutor)),
executor: self.executor.unwrap_or(Executor::Inline(InlineExecutor)),
layout: self.layout,
ctx: self.ctx,
projection,
Expand Down Expand Up @@ -220,14 +230,15 @@ impl ScanExecutor {
self.segment_reader.get(id).await
}

pub async fn evaluate(&self, array: &Array, tasks: &[ScanTask]) -> VortexResult<Array> {
self.task_executor.execute(array, tasks).await
pub fn evaluate(&self, array: &Array, tasks: &[ScanTask]) -> VortexResult<Array> {
self.task_executor.execute(array, tasks)
}
}

pub struct Scan<D> {
driver: D,
task_executor: Arc<dyn TaskExecutor>,
executor: Executor,
layout: Layout,
ctx: ContextRef,
// Guaranteed to be simplified
Expand All @@ -246,12 +257,14 @@ impl<D: ScanDriver> Scan<D> {
/// frequent polling to make progress.
pub fn into_array_stream(self) -> VortexResult<impl ArrayStream + 'static> {
// Create a single LayoutReader that is reused for the entire scan.
let executor = Arc::new(ScanExecutor {
let scan_executor = Arc::new(ScanExecutor {
segment_reader: self.driver.segment_reader(),
task_executor: self.task_executor.clone(),
});
let reader: Arc<dyn LayoutReader> =
self.layout.reader(executor.clone(), self.ctx.clone())?;
let executor = self.executor.clone();
let reader: Arc<dyn LayoutReader> = self
.layout
.reader(scan_executor.clone(), self.ctx.clone())?;

// We start with a stream of row masks
let row_masks = stream::iter(self.row_masks);
Expand All @@ -260,6 +273,7 @@ impl<D: ScanDriver> Scan<D> {
let row_masks: BoxStream<'static, VortexResult<RowMask>> =
if let Some(filter) = self.filter.clone() {
let reader = reader.clone();
let executor = executor.clone();

let pruning = Arc::new(FilterExpr::try_new(
reader
Expand All @@ -277,6 +291,7 @@ impl<D: ScanDriver> Scan<D> {
let reader = reader.clone();
let filter = filter.clone();
let pruning = pruning.clone();
let executor = executor.clone();

log::debug!(
"Evaluating filter {} for row mask {}..{} {}",
Expand All @@ -285,15 +300,23 @@ impl<D: ScanDriver> Scan<D> {
row_mask.end(),
row_mask.filter_mask().density()
);
async move { pruning.new_evaluation(&row_mask).evaluate(reader).await }
async move {
executor
.spawn(async move {
pruning.new_evaluation(&row_mask).evaluate(reader).await
})
.await
}
})
// Instead of buffering, we should be smarter where we poll the stream until
// the I/O queue has ~256MB of requests in it. Our working set size.
// We then return Pending and the I/O thread is free to spawn the requests.
.buffered(self.concurrency)
.filter_map(|r| async move {
r.map(|r| (!r.filter_mask().all_false()).then_some(r))
.transpose()
match r {
Ok(Ok(r)) => (!r.filter_mask().all_false()).then_some(Ok(r)),
Ok(Err(e)) | Err(e) => Some(Err(e)),
}
})
.boxed()
} else {
Expand All @@ -307,14 +330,25 @@ impl<D: ScanDriver> Scan<D> {
.map(move |row_mask| {
let reader = reader.clone();
let projection = projection.clone();
let scan_executor = scan_executor.clone();
let executor = executor.clone();
async move {
let row_mask = row_mask?;
let mut array = reader.evaluate_expr(row_mask, projection).await?;
if self.canonicalize {
array = executor.evaluate(&array, &[ScanTask::Canonicalize]).await?;
let r = executor
.spawn(async move {
let row_mask = row_mask?;
let mut array = reader.evaluate_expr(row_mask, projection).await?;
if self.canonicalize {
array =
scan_executor.evaluate(&array, &[ScanTask::Canonicalize])?;
}
Ok(array)
})
.await;

match r {
Ok(Ok(r)) => Ok(r),
Ok(Err(e)) | Err(e) => Err(e),
}
Ok(array)
}
})
.buffered(self.concurrency);
Expand Down
Loading

0 comments on commit 206ae5b

Please sign in to comment.