From f2a6b2001e15410a7751a31c147eefba2acf117f Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Mon, 19 Jan 2026 14:37:00 +0000 Subject: [PATCH 1/3] Allow configuring different readers for DF integration Signed-off-by: Adam Gutglick --- vortex-datafusion/src/persistent/cache.rs | 6 +-- vortex-datafusion/src/persistent/mod.rs | 3 ++ vortex-datafusion/src/persistent/opener.rs | 26 +++++++----- vortex-datafusion/src/persistent/reader.rs | 48 ++++++++++++++++++++++ vortex-datafusion/src/persistent/source.rs | 20 ++++++++- 5 files changed, 89 insertions(+), 14 deletions(-) create mode 100644 vortex-datafusion/src/persistent/reader.rs diff --git a/vortex-datafusion/src/persistent/cache.rs b/vortex-datafusion/src/persistent/cache.rs index c910f728da7..7b798b09c83 100644 --- a/vortex-datafusion/src/persistent/cache.rs +++ b/vortex-datafusion/src/persistent/cache.rs @@ -9,7 +9,6 @@ use chrono::Utc; use datafusion_common::ScalarValue; use moka::future::Cache; use object_store::ObjectMeta; -use object_store::ObjectStore; use object_store::path::Path; use vortex::buffer::ByteBuffer; use vortex::dtype::DType; @@ -22,6 +21,7 @@ use vortex::file::Footer; use vortex::file::OpenOptionsSessionExt; use vortex::file::SegmentSpec; use vortex::file::VortexFile; +use vortex::io::VortexReadAt; use vortex::layout::segments::SegmentCache; use vortex::layout::segments::SegmentId; use vortex::metrics::MetricsSessionExt; @@ -100,7 +100,7 @@ impl VortexFileCache { pub async fn try_get( &self, object: &ObjectMeta, - object_store: Arc, + reader: Arc, ) -> VortexResult { let file_key = FileKey::from(object); self.file_cache @@ -119,7 +119,7 @@ impl VortexFileCache { file_key, segment_cache: self.segment_cache.clone(), })) - .open_object_store(&object_store, object.location.as_ref()), + .open(reader), ) .await .map_err(|e: Arc| { diff --git a/vortex-datafusion/src/persistent/mod.rs b/vortex-datafusion/src/persistent/mod.rs index 21033dbbccb..0d53684a3b2 100644 --- a/vortex-datafusion/src/persistent/mod.rs +++ b/vortex-datafusion/src/persistent/mod.rs @@ -7,6 +7,7 @@ mod cache; mod format; pub mod metrics; mod opener; +mod reader; mod sink; mod source; mod stream; @@ -15,6 +16,8 @@ pub use access_plan::VortexAccessPlan; pub use format::VortexFormat; pub use format::VortexFormatFactory; pub use format::VortexOptions; +pub use reader::DefaultVortexReaderFactory; +pub use reader::VortexReaderFactory; pub use source::VortexSource; #[cfg(test)] diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index ae6d3578a33..225996de7eb 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -28,7 +28,6 @@ use futures::FutureExt; use futures::StreamExt; use futures::TryStreamExt; use futures::stream; -use object_store::ObjectStore; use object_store::path::Path; use tracing::Instrument; use vortex::array::ArrayRef; @@ -44,6 +43,7 @@ use vortex_utils::aliases::dash_map::Entry; use super::cache::VortexFileCache; use crate::VortexAccessPlan; +use crate::VortexReaderFactory; use crate::convert::exprs::ExpressionConvertor; use crate::convert::exprs::ProcessedProjection; use crate::convert::exprs::make_vortex_predicate; @@ -53,7 +53,7 @@ use crate::persistent::stream::PrunableStream; #[derive(Clone)] pub(crate) struct VortexOpener { pub session: VortexSession, - pub object_store: Arc, + pub vortex_reader_factory: Arc, /// Optional table schema projection. The indices are w.r.t. the `table_schema`, which is /// all fields in the final scan result not including the partition columns. pub projection: ProjectionExprs, @@ -90,11 +90,14 @@ pub(crate) struct VortexOpener { impl FileOpener for VortexOpener { fn open(&self, file: PartitionedFile) -> DFResult { let session = self.session.clone(); - let object_store = self.object_store.clone(); let mut projection = self.projection.clone(); let mut filter = self.filter.clone(); + let reader = self + .vortex_reader_factory + .create_reader(file.path().as_ref(), &session)?; + let file_pruning_predicate = self.file_pruning_predicate.clone(); let expr_adapter_factory = self.expr_adapter_factory.clone(); @@ -158,7 +161,7 @@ impl FileOpener for VortexOpener { } let vxf = file_cache - .try_get(&file.object_meta, object_store) + .try_get(&file.object_meta, reader) .await .map_err(|e| exec_datafusion_err!("Failed to open Vortex file {e}"))?; @@ -382,6 +385,7 @@ fn byte_range_to_row_range(byte_range: Range, row_count: u64, total_size: u #[cfg(test)] mod tests { + use std::sync::Arc; use std::sync::LazyLock; use arrow_schema::Field; @@ -407,6 +411,7 @@ mod tests { use datafusion_physical_expr::projection::ProjectionExpr; use insta::assert_snapshot; use itertools::Itertools; + use object_store::ObjectStore; use object_store::memory::InMemory; use rstest::rstest; use vortex::VortexSessionDefault; @@ -419,6 +424,7 @@ mod tests { use vortex::session::VortexSession; use super::*; + use crate::DefaultVortexReaderFactory; use crate::VortexAccessPlan; use crate::convert::exprs::DefaultExpressionConvertor; @@ -491,7 +497,7 @@ mod tests { ) -> VortexOpener { VortexOpener { session: SESSION.clone(), - object_store, + vortex_reader_factory: Arc::new(DefaultVortexReaderFactory::new(object_store)), projection: ProjectionExprs::from_indices(&[0], table_schema.file_schema()), filter, file_pruning_predicate: None, @@ -583,7 +589,7 @@ mod tests { let make_opener = |filter| VortexOpener { session: SESSION.clone(), - object_store: object_store.clone(), + vortex_reader_factory: Arc::new(DefaultVortexReaderFactory::new(object_store.clone())), projection: ProjectionExprs::from_indices(&[0], table_schema.file_schema()), filter: Some(filter), file_pruning_predicate: None, @@ -666,7 +672,7 @@ mod tests { let opener = VortexOpener { session: SESSION.clone(), - object_store: object_store.clone(), + vortex_reader_factory: Arc::new(DefaultVortexReaderFactory::new(object_store)), projection: ProjectionExprs::from_indices(&[0, 1, 2], &table_schema), filter: None, file_pruning_predicate: None, @@ -815,7 +821,7 @@ mod tests { let opener = VortexOpener { session: SESSION.clone(), - object_store: object_store.clone(), + vortex_reader_factory: Arc::new(DefaultVortexReaderFactory::new(object_store.clone())), projection: ProjectionExprs::from_indices( projection.as_ref(), table_schema.file_schema(), @@ -874,7 +880,7 @@ mod tests { ) -> VortexOpener { VortexOpener { session: SESSION.clone(), - object_store, + vortex_reader_factory: Arc::new(DefaultVortexReaderFactory::new(object_store)), projection, filter: None, file_pruning_predicate: None, @@ -1072,7 +1078,7 @@ mod tests { let opener = VortexOpener { session: SESSION.clone(), - object_store: object_store.clone(), + vortex_reader_factory: Arc::new(DefaultVortexReaderFactory::new(object_store.clone())), projection, filter: None, file_pruning_predicate: None, diff --git a/vortex-datafusion/src/persistent/reader.rs b/vortex-datafusion/src/persistent/reader.rs new file mode 100644 index 00000000000..695e307e8c2 --- /dev/null +++ b/vortex-datafusion/src/persistent/reader.rs @@ -0,0 +1,48 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::fmt::Debug; +use std::sync::Arc; + +use datafusion_common::Result as DFResult; +use object_store::ObjectStore; +use vortex::io::VortexReadAt; +use vortex::io::file::object_store::ObjectStoreSource; +use vortex::io::session::RuntimeSessionExt; +use vortex::session::VortexSession; + +/// Factory to create [`VortexReadAt`] instances to read the target file. + +pub trait VortexReaderFactory: Debug + Send + Sync + 'static { + /// Create a reader for a target object. + fn create_reader(&self, path: &str, session: &VortexSession) + -> DFResult>; +} + +/// Default factory, creates [`ObjectStore`] backed readers for files, +/// works with multiple cloud providers. +#[derive(Debug)] +pub struct DefaultVortexReaderFactory { + object_store: Arc, +} + +impl DefaultVortexReaderFactory { + /// Creates new instance + pub fn new(object_store: Arc) -> Self { + Self { object_store } + } +} + +impl VortexReaderFactory for DefaultVortexReaderFactory { + fn create_reader( + &self, + path: &str, + session: &VortexSession, + ) -> DFResult> { + Ok(Arc::new(ObjectStoreSource::new( + self.object_store.clone(), + path.into(), + session.handle(), + )) as _) + } +} diff --git a/vortex-datafusion/src/persistent/source.rs b/vortex-datafusion/src/persistent/source.rs index cab3ea275d9..4e83338b160 100644 --- a/vortex-datafusion/src/persistent/source.rs +++ b/vortex-datafusion/src/persistent/source.rs @@ -35,6 +35,8 @@ use vortex_utils::aliases::dash_map::DashMap; use super::cache::VortexFileCache; use super::metrics::PARTITION_LABEL; use super::opener::VortexOpener; +use crate::DefaultVortexReaderFactory; +use crate::VortexReaderFactory; use crate::convert::exprs::DefaultExpressionConvertor; use crate::convert::exprs::ExpressionConvertor; @@ -60,6 +62,7 @@ pub struct VortexSource { /// Sharing the readers allows us to only read every layout once from the file, even across partitions. layout_readers: Arc>>, expression_convertor: Arc, + pub(crate) vortex_reader_factory: Option>, } impl VortexSource { @@ -83,6 +86,7 @@ impl VortexSource { _unused_df_metrics: Default::default(), layout_readers: Arc::new(DashMap::default()), expression_convertor: Arc::new(DefaultExpressionConvertor::default()), + vortex_reader_factory: None, } } @@ -94,6 +98,15 @@ impl VortexSource { self.expression_convertor = expr_convertor; self } + + /// Set a user-defined factory to create the underlying [`VortexReadAt`] + pub fn with_vortex_reader_factory( + mut self, + vortex_reader_factory: Arc, + ) -> Self { + self.vortex_reader_factory = Some(vortex_reader_factory); + self + } } impl FileSource for VortexSource { @@ -117,9 +130,14 @@ impl FileSource for VortexSource { .clone() .unwrap_or_else(|| Arc::new(DefaultPhysicalExprAdapterFactory)); + let vortex_reader_factory = self + .vortex_reader_factory + .clone() + .unwrap_or_else(|| Arc::new(DefaultVortexReaderFactory::new(object_store))); + let opener = VortexOpener { session: self.session.clone(), - object_store, + vortex_reader_factory, projection: self.projection.clone(), filter: self.vortex_predicate.clone(), file_pruning_predicate: self.full_predicate.clone(), From d03e19354ae3e5d38565c633b0538e808d1a8d1f Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Mon, 19 Jan 2026 14:46:25 +0000 Subject: [PATCH 2/3] fix stuff Signed-off-by: Adam Gutglick --- vortex-datafusion/src/persistent/format.rs | 18 +++++++++++++++--- vortex-datafusion/src/persistent/reader.rs | 1 - 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/vortex-datafusion/src/persistent/format.rs b/vortex-datafusion/src/persistent/format.rs index 2feaa2dcac7..44c489527f8 100644 --- a/vortex-datafusion/src/persistent/format.rs +++ b/vortex-datafusion/src/persistent/format.rs @@ -55,6 +55,8 @@ use vortex::expr::stats::Stat; use vortex::file::EOF_SIZE; use vortex::file::MAX_POSTSCRIPT_SIZE; use vortex::file::VORTEX_FILE_EXTENSION; +use vortex::io::file::object_store::ObjectStoreSource; +use vortex::io::session::RuntimeSessionExt; use vortex::scalar::Scalar; use vortex::session::VortexSession; @@ -243,10 +245,14 @@ impl FileFormat for VortexFormat { ) -> DFResult { let mut file_schemas = stream::iter(objects.iter().cloned()) .map(|o| { - let store = store.clone(); + let reader = Arc::new(ObjectStoreSource::new( + store.clone(), + o.location.clone(), + self.session.handle(), + )); let cache = self.file_cache.clone(); SpawnedTask::spawn(async move { - let vxf = cache.try_get(&o, store).await?; + let vxf = cache.try_get(&o, reader).await?; let inferred_schema = vxf.dtype().to_arrow_schema()?; VortexResult::Ok((o.location, inferred_schema)) }) @@ -275,9 +281,15 @@ impl FileFormat for VortexFormat { let object = object.clone(); let store = store.clone(); let cache = self.file_cache.clone(); + let handle = self.session.handle(); SpawnedTask::spawn(async move { - let vxf = cache.try_get(&object, store.clone()).await.map_err(|e| { + let reader = Arc::new(ObjectStoreSource::new( + store.clone(), + object.location.clone(), + handle, + )); + let vxf = cache.try_get(&object, reader).await.map_err(|e| { DataFusionError::Execution(format!( "Failed to open Vortex file {}: {e}", object.location diff --git a/vortex-datafusion/src/persistent/reader.rs b/vortex-datafusion/src/persistent/reader.rs index 695e307e8c2..bdcb5740249 100644 --- a/vortex-datafusion/src/persistent/reader.rs +++ b/vortex-datafusion/src/persistent/reader.rs @@ -12,7 +12,6 @@ use vortex::io::session::RuntimeSessionExt; use vortex::session::VortexSession; /// Factory to create [`VortexReadAt`] instances to read the target file. - pub trait VortexReaderFactory: Debug + Send + Sync + 'static { /// Create a reader for a target object. fn create_reader(&self, path: &str, session: &VortexSession) From c53838fc2c5c155b4877646f9ce3e82b3e31b0ae Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Wed, 21 Jan 2026 14:31:24 +0000 Subject: [PATCH 3/3] Update docs Signed-off-by: Adam Gutglick --- vortex-datafusion/src/persistent/source.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/vortex-datafusion/src/persistent/source.rs b/vortex-datafusion/src/persistent/source.rs index 4e83338b160..2157efdb57f 100644 --- a/vortex-datafusion/src/persistent/source.rs +++ b/vortex-datafusion/src/persistent/source.rs @@ -100,6 +100,8 @@ impl VortexSource { } /// Set a user-defined factory to create the underlying [`VortexReadAt`] + /// + /// [`VortexReadAt`]: vortex::io::VortexReadAt pub fn with_vortex_reader_factory( mut self, vortex_reader_factory: Arc,