Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions vortex-datafusion/src/persistent/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use chrono::Utc;
use datafusion_common::ScalarValue;
use moka::future::Cache;
use object_store::ObjectMeta;
use object_store::ObjectStore;
use object_store::path::Path;
use vortex::buffer::ByteBuffer;
use vortex::dtype::DType;
Expand All @@ -22,6 +21,7 @@ use vortex::file::Footer;
use vortex::file::OpenOptionsSessionExt;
use vortex::file::SegmentSpec;
use vortex::file::VortexFile;
use vortex::io::VortexReadAt;
use vortex::layout::segments::SegmentCache;
use vortex::layout::segments::SegmentId;
use vortex::metrics::MetricsSessionExt;
Expand Down Expand Up @@ -100,7 +100,7 @@ impl VortexFileCache {
pub async fn try_get(
&self,
object: &ObjectMeta,
object_store: Arc<dyn ObjectStore>,
reader: Arc<dyn VortexReadAt>,
) -> VortexResult<VortexFile> {
let file_key = FileKey::from(object);
self.file_cache
Expand All @@ -119,7 +119,7 @@ impl VortexFileCache {
file_key,
segment_cache: self.segment_cache.clone(),
}))
.open_object_store(&object_store, object.location.as_ref()),
.open(reader),
)
.await
.map_err(|e: Arc<VortexError>| {
Expand Down
18 changes: 15 additions & 3 deletions vortex-datafusion/src/persistent/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ use vortex::expr::stats::Stat;
use vortex::file::EOF_SIZE;
use vortex::file::MAX_POSTSCRIPT_SIZE;
use vortex::file::VORTEX_FILE_EXTENSION;
use vortex::io::file::object_store::ObjectStoreSource;
use vortex::io::session::RuntimeSessionExt;
use vortex::scalar::Scalar;
use vortex::session::VortexSession;

Expand Down Expand Up @@ -243,10 +245,14 @@ impl FileFormat for VortexFormat {
) -> DFResult<SchemaRef> {
let mut file_schemas = stream::iter(objects.iter().cloned())
.map(|o| {
let store = store.clone();
let reader = Arc::new(ObjectStoreSource::new(
store.clone(),
o.location.clone(),
self.session.handle(),
));
let cache = self.file_cache.clone();
SpawnedTask::spawn(async move {
let vxf = cache.try_get(&o, store).await?;
let vxf = cache.try_get(&o, reader).await?;
let inferred_schema = vxf.dtype().to_arrow_schema()?;
VortexResult::Ok((o.location, inferred_schema))
})
Expand Down Expand Up @@ -275,9 +281,15 @@ impl FileFormat for VortexFormat {
let object = object.clone();
let store = store.clone();
let cache = self.file_cache.clone();
let handle = self.session.handle();

SpawnedTask::spawn(async move {
let vxf = cache.try_get(&object, store.clone()).await.map_err(|e| {
let reader = Arc::new(ObjectStoreSource::new(
store.clone(),
object.location.clone(),
handle,
));
let vxf = cache.try_get(&object, reader).await.map_err(|e| {
DataFusionError::Execution(format!(
"Failed to open Vortex file {}: {e}",
object.location
Expand Down
3 changes: 3 additions & 0 deletions vortex-datafusion/src/persistent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod cache;
mod format;
pub mod metrics;
mod opener;
mod reader;
mod sink;
mod source;
mod stream;
Expand All @@ -15,6 +16,8 @@ pub use access_plan::VortexAccessPlan;
pub use format::VortexFormat;
pub use format::VortexFormatFactory;
pub use format::VortexOptions;
pub use reader::DefaultVortexReaderFactory;
pub use reader::VortexReaderFactory;
pub use source::VortexSource;

#[cfg(test)]
Expand Down
26 changes: 16 additions & 10 deletions vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use futures::FutureExt;
use futures::StreamExt;
use futures::TryStreamExt;
use futures::stream;
use object_store::ObjectStore;
use object_store::path::Path;
use tracing::Instrument;
use vortex::array::ArrayRef;
Expand All @@ -44,6 +43,7 @@ use vortex_utils::aliases::dash_map::Entry;

use super::cache::VortexFileCache;
use crate::VortexAccessPlan;
use crate::VortexReaderFactory;
use crate::convert::exprs::ExpressionConvertor;
use crate::convert::exprs::ProcessedProjection;
use crate::convert::exprs::make_vortex_predicate;
Expand All @@ -53,7 +53,7 @@ use crate::persistent::stream::PrunableStream;
#[derive(Clone)]
pub(crate) struct VortexOpener {
pub session: VortexSession,
pub object_store: Arc<dyn ObjectStore>,
pub vortex_reader_factory: Arc<dyn VortexReaderFactory>,
/// Optional table schema projection. The indices are w.r.t. the `table_schema`, which is
/// all fields in the final scan result not including the partition columns.
pub projection: ProjectionExprs,
Expand Down Expand Up @@ -90,11 +90,14 @@ pub(crate) struct VortexOpener {
impl FileOpener for VortexOpener {
fn open(&self, file: PartitionedFile) -> DFResult<FileOpenFuture> {
let session = self.session.clone();
let object_store = self.object_store.clone();

let mut projection = self.projection.clone();
let mut filter = self.filter.clone();

let reader = self
.vortex_reader_factory
.create_reader(file.path().as_ref(), &session)?;

let file_pruning_predicate = self.file_pruning_predicate.clone();
let expr_adapter_factory = self.expr_adapter_factory.clone();

Expand Down Expand Up @@ -158,7 +161,7 @@ impl FileOpener for VortexOpener {
}

let vxf = file_cache
.try_get(&file.object_meta, object_store)
.try_get(&file.object_meta, reader)
.await
.map_err(|e| exec_datafusion_err!("Failed to open Vortex file {e}"))?;

Expand Down Expand Up @@ -382,6 +385,7 @@ fn byte_range_to_row_range(byte_range: Range<u64>, row_count: u64, total_size: u

#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::sync::LazyLock;

use arrow_schema::Field;
Expand All @@ -407,6 +411,7 @@ mod tests {
use datafusion_physical_expr::projection::ProjectionExpr;
use insta::assert_snapshot;
use itertools::Itertools;
use object_store::ObjectStore;
use object_store::memory::InMemory;
use rstest::rstest;
use vortex::VortexSessionDefault;
Expand All @@ -419,6 +424,7 @@ mod tests {
use vortex::session::VortexSession;

use super::*;
use crate::DefaultVortexReaderFactory;
use crate::VortexAccessPlan;
use crate::convert::exprs::DefaultExpressionConvertor;

Expand Down Expand Up @@ -491,7 +497,7 @@ mod tests {
) -> VortexOpener {
VortexOpener {
session: SESSION.clone(),
object_store,
vortex_reader_factory: Arc::new(DefaultVortexReaderFactory::new(object_store)),
projection: ProjectionExprs::from_indices(&[0], table_schema.file_schema()),
filter,
file_pruning_predicate: None,
Expand Down Expand Up @@ -583,7 +589,7 @@ mod tests {

let make_opener = |filter| VortexOpener {
session: SESSION.clone(),
object_store: object_store.clone(),
vortex_reader_factory: Arc::new(DefaultVortexReaderFactory::new(object_store.clone())),
projection: ProjectionExprs::from_indices(&[0], table_schema.file_schema()),
filter: Some(filter),
file_pruning_predicate: None,
Expand Down Expand Up @@ -666,7 +672,7 @@ mod tests {

let opener = VortexOpener {
session: SESSION.clone(),
object_store: object_store.clone(),
vortex_reader_factory: Arc::new(DefaultVortexReaderFactory::new(object_store)),
projection: ProjectionExprs::from_indices(&[0, 1, 2], &table_schema),
filter: None,
file_pruning_predicate: None,
Expand Down Expand Up @@ -815,7 +821,7 @@ mod tests {

let opener = VortexOpener {
session: SESSION.clone(),
object_store: object_store.clone(),
vortex_reader_factory: Arc::new(DefaultVortexReaderFactory::new(object_store.clone())),
projection: ProjectionExprs::from_indices(
projection.as_ref(),
table_schema.file_schema(),
Expand Down Expand Up @@ -874,7 +880,7 @@ mod tests {
) -> VortexOpener {
VortexOpener {
session: SESSION.clone(),
object_store,
vortex_reader_factory: Arc::new(DefaultVortexReaderFactory::new(object_store)),
projection,
filter: None,
file_pruning_predicate: None,
Expand Down Expand Up @@ -1072,7 +1078,7 @@ mod tests {

let opener = VortexOpener {
session: SESSION.clone(),
object_store: object_store.clone(),
vortex_reader_factory: Arc::new(DefaultVortexReaderFactory::new(object_store.clone())),
projection,
filter: None,
file_pruning_predicate: None,
Expand Down
47 changes: 47 additions & 0 deletions vortex-datafusion/src/persistent/reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::fmt::Debug;
use std::sync::Arc;

use datafusion_common::Result as DFResult;
use object_store::ObjectStore;
use vortex::io::VortexReadAt;
use vortex::io::file::object_store::ObjectStoreSource;
use vortex::io::session::RuntimeSessionExt;
use vortex::session::VortexSession;

/// Factory to create [`VortexReadAt`] instances to read the target file.
pub trait VortexReaderFactory: Debug + Send + Sync + 'static {
/// Create a reader for a target object.
fn create_reader(&self, path: &str, session: &VortexSession)
-> DFResult<Arc<dyn VortexReadAt>>;
}

/// Default factory, creates [`ObjectStore`] backed readers for files,
/// works with multiple cloud providers.
#[derive(Debug)]
pub struct DefaultVortexReaderFactory {
object_store: Arc<dyn ObjectStore>,
}

impl DefaultVortexReaderFactory {
/// Creates new instance
pub fn new(object_store: Arc<dyn ObjectStore>) -> Self {
Self { object_store }
}
}

impl VortexReaderFactory for DefaultVortexReaderFactory {
fn create_reader(
&self,
path: &str,
session: &VortexSession,
) -> DFResult<Arc<dyn VortexReadAt>> {
Ok(Arc::new(ObjectStoreSource::new(
self.object_store.clone(),
path.into(),
session.handle(),
)) as _)
}
}
22 changes: 21 additions & 1 deletion vortex-datafusion/src/persistent/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ use vortex_utils::aliases::dash_map::DashMap;
use super::cache::VortexFileCache;
use super::metrics::PARTITION_LABEL;
use super::opener::VortexOpener;
use crate::DefaultVortexReaderFactory;
use crate::VortexReaderFactory;
use crate::convert::exprs::DefaultExpressionConvertor;
use crate::convert::exprs::ExpressionConvertor;

Expand All @@ -60,6 +62,7 @@ pub struct VortexSource {
/// Sharing the readers allows us to only read every layout once from the file, even across partitions.
layout_readers: Arc<DashMap<Path, Weak<dyn LayoutReader>>>,
expression_convertor: Arc<dyn ExpressionConvertor>,
pub(crate) vortex_reader_factory: Option<Arc<dyn VortexReaderFactory>>,
}

impl VortexSource {
Expand All @@ -83,6 +86,7 @@ impl VortexSource {
_unused_df_metrics: Default::default(),
layout_readers: Arc::new(DashMap::default()),
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
vortex_reader_factory: None,
}
}

Expand All @@ -94,6 +98,17 @@ impl VortexSource {
self.expression_convertor = expr_convertor;
self
}

/// Set a user-defined factory to create the underlying [`VortexReadAt`]
///
/// [`VortexReadAt`]: vortex::io::VortexReadAt
pub fn with_vortex_reader_factory(
mut self,
vortex_reader_factory: Arc<dyn VortexReaderFactory>,
) -> Self {
self.vortex_reader_factory = Some(vortex_reader_factory);
self
}
}

impl FileSource for VortexSource {
Expand All @@ -117,9 +132,14 @@ impl FileSource for VortexSource {
.clone()
.unwrap_or_else(|| Arc::new(DefaultPhysicalExprAdapterFactory));

let vortex_reader_factory = self
.vortex_reader_factory
.clone()
.unwrap_or_else(|| Arc::new(DefaultVortexReaderFactory::new(object_store)));

let opener = VortexOpener {
session: self.session.clone(),
object_store,
vortex_reader_factory,
projection: self.projection.clone(),
filter: self.vortex_predicate.clone(),
file_pruning_predicate: self.full_predicate.clone(),
Expand Down
Loading