Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Spawn layout evaluation #2348

Merged
merged 39 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
308d804
.
AdamGS Feb 13, 2025
6978c28
.
AdamGS Feb 13, 2025
bea1dbc
.
AdamGS Feb 13, 2025
29e1da2
enable feature
AdamGS Feb 13, 2025
c78ed3a
.
AdamGS Feb 13, 2025
ef2bf80
Merge branch 'develop' into adamg/spawn-evaluate
AdamGS Feb 14, 2025
322c72a
nicer version
AdamGS Feb 14, 2025
b4a5b08
.
AdamGS Feb 14, 2025
a90fd9d
.
AdamGS Feb 14, 2025
96e119d
.
AdamGS Feb 14, 2025
aead48a
.
AdamGS Feb 14, 2025
be57a8f
some work, need to unifiy to one spawn
AdamGS Feb 14, 2025
c744f90
.
AdamGS Feb 14, 2025
bd4c6ff
Merge branch 'develop' into adamg/spawn-evaluate
AdamGS Feb 14, 2025
6dbab42
.
AdamGS Feb 14, 2025
defc115
VortexResult::flatten -> unnest (#2361)
AdamGS Feb 14, 2025
45b39a1
.
AdamGS Feb 14, 2025
fadcc6d
.
AdamGS Feb 14, 2025
8a127ec
Merge branch 'develop' into adamg/spawn-evaluate
AdamGS Feb 14, 2025
bf14120
.
AdamGS Feb 14, 2025
539bacb
.
AdamGS Feb 14, 2025
6032cbe
go back to higher default concurrency, not sure how to pick that number
AdamGS Feb 14, 2025
625789d
.
AdamGS Feb 14, 2025
3e91ef8
.
AdamGS Feb 14, 2025
8c568e9
Merge branch 'develop' into adamg/spawn-evaluate
AdamGS Feb 14, 2025
d6381e4
.
AdamGS Feb 17, 2025
cab3012
Merge branch 'develop' into adamg/spawn-evaluate
AdamGS Feb 17, 2025
df579e9
.
AdamGS Feb 17, 2025
78f4eb0
.
AdamGS Feb 17, 2025
41aa3d1
Merge branch 'develop' into adamg/spawn-evaluate
AdamGS Feb 17, 2025
a643a5d
Merge branch 'develop' into adamg/spawn-evaluate
AdamGS Feb 18, 2025
b1133f1
cleanup ScanExecutor, move some tests to rstest
AdamGS Feb 18, 2025
169bea9
Merge branch 'develop' into adamg/spawn-evaluate
AdamGS Feb 18, 2025
f210bbc
Merge branch 'develop' into adamg/spawn-evaluate
AdamGS Feb 18, 2025
5e171f2
Merge branch 'develop' into adamg/spawn-evaluate
gatesn Feb 18, 2025
f3ecbb7
typo
AdamGS Feb 18, 2025
32067e1
CR comments
AdamGS Feb 18, 2025
ea6103c
.
AdamGS Feb 18, 2025
a775f89
remove unused code
AdamGS Feb 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions vortex-error/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ rancor = { workspace = true, optional = true }
thiserror = { workspace = true }
url = { workspace = true }
worker = { workspace = true, optional = true }
tokio = { workspace = true, optional = true, features = ["rt"] }

[lints]
workspace = true
8 changes: 8 additions & 0 deletions vortex-error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,14 @@ pub enum VortexError {
#[backtrace]
TryFromIntError,
),
/// A wrapper for Tokio join error.
#[cfg(feature = "tokio")]
#[error(transparent)]
JoinError(
#[from]
#[backtrace]
tokio::task::JoinError,
),
}

impl VortexError {
Expand Down
5 changes: 3 additions & 2 deletions vortex-layout/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,19 @@ bit-vec = { workspace = true }
bytes = { workspace = true }
exponential-decay-histogram = { workspace = true }
flatbuffers = { workspace = true }
futures = { workspace = true, features = ["alloc"] }
futures = { workspace = true, features = ["alloc", "executor"] }
itertools = { workspace = true }
log = { workspace = true }
pin-project-lite = { workspace = true }
vortex-array = { workspace = true }
vortex-buffer = { workspace = true }
vortex-dtype = { workspace = true }
vortex-error = { workspace = true }
vortex-error = { workspace = true, features = ["tokio"] }
vortex-expr = { workspace = true }
vortex-flatbuffers = { workspace = true, features = ["layout"] }
vortex-mask = { workspace = true }
vortex-scalar = { workspace = true }
tokio = { workspace = true, optional = true }

[dev-dependencies]
futures = { workspace = true, features = ["executor"] }
Expand Down
2 changes: 1 addition & 1 deletion vortex-layout/src/layouts/flat/eval_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl ExprEvaluator for FlatReader {
tasks.push(ScanTask::Expr(expr));
}

self.executor().evaluate(&array, &tasks).await
self.executor().evaluate(&array, &tasks)
}

async fn prune_mask(&self, row_mask: RowMask, _expr: ExprRef) -> VortexResult<RowMask> {
Expand Down
1 change: 0 additions & 1 deletion vortex-layout/src/layouts/struct_/eval_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ impl ExprEvaluator for StructReader {

self.executor()
.evaluate(&root_scope, &[ScanTask::Expr(partitioned.root.clone())])
.await
}

async fn prune_mask(&self, row_mask: RowMask, expr: ExprRef) -> VortexResult<RowMask> {
Expand Down
77 changes: 77 additions & 0 deletions vortex-layout/src/scan/executor/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#[cfg(feature = "tokio")]
mod tokio;

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

use futures::channel::oneshot;
use futures::FutureExt as _;
#[cfg(feature = "tokio")]
pub use tokio::*;
use vortex_error::{vortex_err, VortexResult};

pub struct JoinHandle<T> {
inner: oneshot::Receiver<T>,
}

impl<T> Future for JoinHandle<T> {
type Output = VortexResult<T>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.inner.poll_unpin(cx) {
Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)),
Poll::Ready(Err(_)) => Poll::Ready(Err(vortex_err!("Task was canceled"))),
Poll::Pending => Poll::Pending,
}
}
}

pub trait Spawn {
fn spawn<F>(&self, f: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
<F as Future>::Output: Send + 'static;
}

#[derive(Default, Clone)]
pub struct InlineExecutor;

#[async_trait::async_trait]
impl Spawn for InlineExecutor {
fn spawn<F>(&self, f: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
<F as Future>::Output: Send + 'static,
{
let (tx, rx) = oneshot::channel();
// This is very hacky and probably not a great idea, but I don't have a much better idea on how to have a sane default here.
futures::executor::block_on(async move {
_ = tx.send(f.await);
});

JoinHandle { inner: rx }
}
}

#[derive(Clone)]
pub enum Executor {
Inline(InlineExecutor),
#[cfg(feature = "tokio")]
Tokio(TokioExecutor),
}

#[async_trait::async_trait]
impl Spawn for Executor {
fn spawn<F>(&self, f: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
<F as Future>::Output: Send + 'static,
{
match self {
Executor::Inline(inline_executor) => inline_executor.spawn(f),
#[cfg(feature = "tokio")]
Executor::Tokio(tokio_executor) => tokio_executor.spawn(f),
}
}
}
33 changes: 33 additions & 0 deletions vortex-layout/src/scan/executor/tokio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use std::future::Future;

use futures::channel::oneshot;
use tokio::runtime::Handle;

use super::{JoinHandle, Spawn};

#[derive(Clone)]
pub struct TokioExecutor(Handle);

impl TokioExecutor {
pub fn new(handle: Handle) -> Self {
Self(handle)
}
}

#[async_trait::async_trait]
impl Spawn for TokioExecutor {
fn spawn<F>(&self, f: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
<F as Future>::Output: Send + 'static,
{
let (tx, rx) = oneshot::channel();

self.0.spawn(async move {
let v = f.await;
_ = tx.send(v);
});

JoinHandle { inner: rx }
}
}
63 changes: 49 additions & 14 deletions vortex-layout/src/scan/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::sync::Arc;

use executor::{Executor, InlineExecutor, Spawn};
use futures::stream::BoxStream;
use futures::{stream, Stream};
use itertools::Itertools;
use vortex_array::stream::{ArrayStream, ArrayStreamAdapter, ArrayStreamExt};
use vortex_buffer::{Buffer, ByteBuffer};
use vortex_expr::{ExprRef, Identity};
pub mod executor;
pub(crate) mod filter;
mod split_by;
mod task;
Expand Down Expand Up @@ -45,6 +47,7 @@ pub trait ScanDriver: 'static + Sized {
pub struct ScanBuilder<D: ScanDriver> {
driver: D,
task_executor: Option<Arc<dyn TaskExecutor>>,
executor: Option<Executor>,
layout: Layout,
ctx: ContextRef, // TODO(ngates): store this on larger context on Layout
projection: ExprRef,
Expand All @@ -61,6 +64,7 @@ impl<D: ScanDriver> ScanBuilder<D> {
Self {
driver,
task_executor: None,
executor: None,
layout,
ctx,
projection: Identity::new_expr(),
Expand Down Expand Up @@ -119,6 +123,11 @@ impl<D: ScanDriver> ScanBuilder<D> {
self
}

pub fn with_executor(mut self, executor: Executor) -> Self {
self.executor = Some(executor);
self
}

pub fn build(self) -> VortexResult<Scan<D>> {
let projection = simplify_typed(self.projection, self.layout.dtype())?;
let filter = self
Expand Down Expand Up @@ -182,6 +191,7 @@ impl<D: ScanDriver> ScanBuilder<D> {
task_executor: self
.task_executor
.unwrap_or_else(|| Arc::new(InlineTaskExecutor)),
executor: self.executor.unwrap_or(Executor::Inline(InlineExecutor)),
layout: self.layout,
ctx: self.ctx,
projection,
Expand Down Expand Up @@ -220,14 +230,15 @@ impl ScanExecutor {
self.segment_reader.get(id).await
}

pub async fn evaluate(&self, array: &Array, tasks: &[ScanTask]) -> VortexResult<Array> {
self.task_executor.execute(array, tasks).await
pub fn evaluate(&self, array: &Array, tasks: &[ScanTask]) -> VortexResult<Array> {
self.task_executor.execute(array, tasks)
}
}

pub struct Scan<D> {
driver: D,
task_executor: Arc<dyn TaskExecutor>,
executor: Executor,
layout: Layout,
ctx: ContextRef,
// Guaranteed to be simplified
Expand All @@ -246,12 +257,14 @@ impl<D: ScanDriver> Scan<D> {
/// frequent polling to make progress.
pub fn into_array_stream(self) -> VortexResult<impl ArrayStream + 'static> {
// Create a single LayoutReader that is reused for the entire scan.
let executor = Arc::new(ScanExecutor {
let scan_executor = Arc::new(ScanExecutor {
segment_reader: self.driver.segment_reader(),
task_executor: self.task_executor.clone(),
});
let reader: Arc<dyn LayoutReader> =
self.layout.reader(executor.clone(), self.ctx.clone())?;
let executor = self.executor.clone();
let reader: Arc<dyn LayoutReader> = self
.layout
.reader(scan_executor.clone(), self.ctx.clone())?;

// We start with a stream of row masks
let row_masks = stream::iter(self.row_masks);
Expand All @@ -260,6 +273,7 @@ impl<D: ScanDriver> Scan<D> {
let row_masks: BoxStream<'static, VortexResult<RowMask>> =
if let Some(filter) = self.filter.clone() {
let reader = reader.clone();
let executor = executor.clone();

let pruning = Arc::new(FilterExpr::try_new(
reader
Expand All @@ -277,6 +291,7 @@ impl<D: ScanDriver> Scan<D> {
let reader = reader.clone();
let filter = filter.clone();
let pruning = pruning.clone();
let executor = executor.clone();

log::debug!(
"Evaluating filter {} for row mask {}..{} {}",
Expand All @@ -285,15 +300,23 @@ impl<D: ScanDriver> Scan<D> {
row_mask.end(),
row_mask.filter_mask().density()
);
async move { pruning.new_evaluation(&row_mask).evaluate(reader).await }
async move {
executor
.spawn(async move {
pruning.new_evaluation(&row_mask).evaluate(reader).await
})
.await
}
})
// Instead of buffering, we should be smarter where we poll the stream until
// the I/O queue has ~256MB of requests in it. Our working set size.
// We then return Pending and the I/O thread is free to spawn the requests.
.buffered(self.concurrency)
.filter_map(|r| async move {
r.map(|r| (!r.filter_mask().all_false()).then_some(r))
.transpose()
match r {
Ok(Ok(r)) => (!r.filter_mask().all_false()).then_some(Ok(r)),
Ok(Err(e)) | Err(e) => Some(Err(e)),
}
})
.boxed()
} else {
Expand All @@ -307,17 +330,29 @@ impl<D: ScanDriver> Scan<D> {
.map(move |row_mask| {
let reader = reader.clone();
let projection = projection.clone();
let scan_executor = scan_executor.clone();
let executor = executor.clone();
async move {
let row_mask = row_mask?;
let mut array = reader.evaluate_expr(row_mask, projection).await?;
if self.canonicalize {
array = executor.evaluate(&array, &[ScanTask::Canonicalize]).await?;
let r = executor
.spawn(async move {
let row_mask = row_mask?;
let mut array = reader.evaluate_expr(row_mask, projection).await?;
if self.canonicalize {
array =
scan_executor.evaluate(&array, &[ScanTask::Canonicalize])?;
}
Ok(array)
})
.await;

match r {
Ok(Ok(r)) => Ok(r),
Ok(Err(e)) | Err(e) => Err(e),
}
Ok(array)
}
})
.buffered(self.concurrency);
.buffered(self.concurrency)
.map(|v| v);

let io_stream = self.driver.io_stream();

Expand Down
8 changes: 2 additions & 6 deletions vortex-layout/src/scan/task.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::ops::Range;

use async_trait::async_trait;
use vortex_array::arrow::{FromArrowArray, IntoArrowArray};
use vortex_array::compute::{filter, slice};
use vortex_array::Array;
Expand Down Expand Up @@ -36,17 +35,14 @@ impl ScanTask {
}
}

/// A trait used to spawn and execute blocking tasks.
#[async_trait]
pub trait TaskExecutor: 'static + Send + Sync {
async fn execute(&self, array: &Array, tasks: &[ScanTask]) -> VortexResult<Array>;
fn execute(&self, array: &Array, tasks: &[ScanTask]) -> VortexResult<Array>;
}

pub struct InlineTaskExecutor;

#[async_trait]
impl TaskExecutor for InlineTaskExecutor {
async fn execute(&self, array: &Array, tasks: &[ScanTask]) -> VortexResult<Array> {
fn execute(&self, array: &Array, tasks: &[ScanTask]) -> VortexResult<Array> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a flyby but seems like it can be blocking now? potentially not even needed but I'm not sure what are your plans for it

let mut array = array.clone();
for task in tasks {
array = task.execute(&array)?;
Expand Down
Loading